diff --git a/CHANGELOG.md b/CHANGELOG.md index 153ac93..5e661c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,11 @@ The format is based on [Keep a Changelog], and this project adheres to ## Unreleased +### Added + +- Added `journal.RunBenchmarks()` and `kv.RunBenchmarks()` to run generic + benchmarks for a journal or key-value store implementation. + ### Changed - **[BC]** The PostgreSQL drivers `pgjournal` and `pgkv` now assign each journal diff --git a/driver/aws/dynamojournal/store_test.go b/driver/aws/dynamojournal/store_test.go index 0a9fc8b..2572248 100644 --- a/driver/aws/dynamojournal/store_test.go +++ b/driver/aws/dynamojournal/store_test.go @@ -5,36 +5,57 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" . "github.com/dogmatiq/persistencekit/driver/aws/dynamojournal" "github.com/dogmatiq/persistencekit/driver/aws/internal/dynamox" "github.com/dogmatiq/persistencekit/journal" ) func TestStore(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + client, table := setup(t) + journal.RunTests( + t, + func(t *testing.T) journal.Store { + return &Store{ + Client: client, + Table: table, + } + }, + ) +} + +func BenchmarkStore(b *testing.B) { + client, table := setup(b) + journal.RunBenchmarks( + b, + func(b *testing.B) journal.Store { + return &Store{ + Client: client, + Table: table, + } + }, + ) +} +func setup(t testing.TB) (*dynamodb.Client, string) { client := dynamox.NewTestClient(t) table := "journal" + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + if err := CreateTable(ctx, client, table); err != nil { t.Fatal(err) } t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + if err := dynamox.DeleteTableIfNotExists(ctx, client, table); err != nil { t.Fatal(err) } - - cancel() }) - journal.RunTests( - t, - func(t *testing.T) journal.Store { - return &Store{ - Client: client, - Table: table, - } - }, - ) + return client, table } diff --git a/driver/memory/memoryjournal/store_test.go b/driver/memory/memoryjournal/store_test.go index 9cd5b02..863ac93 100644 --- a/driver/memory/memoryjournal/store_test.go +++ b/driver/memory/memoryjournal/store_test.go @@ -15,3 +15,12 @@ func TestStore(t *testing.T) { }, ) } + +func BenchmarkStore(b *testing.B) { + journal.RunBenchmarks( + b, + func(b *testing.B) journal.Store { + return &Store{} + }, + ) +} diff --git a/driver/sql/postgres/pgjournal/store_test.go b/driver/sql/postgres/pgjournal/store_test.go index 3941423..af3a293 100644 --- a/driver/sql/postgres/pgjournal/store_test.go +++ b/driver/sql/postgres/pgjournal/store_test.go @@ -2,7 +2,9 @@ package pgjournal_test import ( "context" + "database/sql" "testing" + "time" . "github.com/dogmatiq/persistencekit/driver/sql/postgres/pgjournal" "github.com/dogmatiq/persistencekit/journal" @@ -10,7 +12,33 @@ import ( ) func TestStore(t *testing.T) { - ctx := context.Background() + db := setup(t) + journal.RunTests( + t, + func(t *testing.T) journal.Store { + return &Store{ + DB: db, + } + }, + ) +} + +func BenchmarkStore(b *testing.B) { + db := setup(b) + journal.RunBenchmarks( + b, + func(b *testing.B) journal.Store { + return &Store{ + DB: db, + } + }, + ) +} + +func setup(t testing.TB) *sql.DB { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + database, err := sqltest.NewDatabase(ctx, sqltest.PGXDriver, sqltest.PostgreSQL) if err != nil { t.Fatalf("cannot create test database: %s", err) @@ -31,12 +59,5 @@ func TestStore(t *testing.T) { } }) - journal.RunTests( - t, - func(t *testing.T) journal.Store { - return &Store{ - DB: db, - } - }, - ) + return db } diff --git a/go.mod b/go.mod index ba679a8..1fa1cc8 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/jackc/pgproto3/v2 v2.3.3 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgtype v1.14.0 // indirect - github.com/jackc/pgx/v4 v4.18.2 // indirect + github.com/jackc/pgx/v4 v4.18.3 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/lib/pq v1.10.2 // indirect github.com/mattn/go-sqlite3 v1.14.17 // indirect diff --git a/go.sum b/go.sum index c89effa..2bfa1e3 100644 --- a/go.sum +++ b/go.sum @@ -131,8 +131,8 @@ github.com/jackc/pgx/v4 v4.6.1-0.20200510190926-94ba730bb1e9/go.mod h1:t3/cdRQl6 github.com/jackc/pgx/v4 v4.6.1-0.20200606145419-4e5062306904/go.mod h1:ZDaNWkt9sW1JMiNn0kdYBaLelIhw7Pg4qd+Vk6tw7Hg= github.com/jackc/pgx/v4 v4.10.0/go.mod h1:QlrWebbs3kqEZPHCTGyxecvzG6tvIsYu+A5b1raylkA= github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= -github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU= -github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= +github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= +github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= diff --git a/internal/benchmark/doc.go b/internal/benchmark/doc.go new file mode 100644 index 0000000..c66ff0e --- /dev/null +++ b/internal/benchmark/doc.go @@ -0,0 +1,2 @@ +// Package benchmark provides utilities for running benchmarks. +package benchmark diff --git a/internal/benchmark/run.go b/internal/benchmark/run.go new file mode 100644 index 0000000..63bf28c --- /dev/null +++ b/internal/benchmark/run.go @@ -0,0 +1,72 @@ +package benchmark + +import ( + "context" + "testing" + "time" +) + +// Run benchmarks fn. +func Run( + b *testing.B, + setup func(context.Context) error, + before func(context.Context) error, + fn func(context.Context) error, + after func(context.Context) error, +) { + b.StopTimer() + checkIterationThreshold(b) + + if setup != nil { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + err := setup(ctx) + if err != nil { + b.Fatal(err) + } + } + + for i := 0; i < b.N; i++ { + if before != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err := before(ctx) + cancel() + if err != nil { + b.Fatal(err) + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + + b.StartTimer() + err := fn(ctx) + b.StopTimer() + + cancel() + + if err != nil { + b.Fatal(err) + } + + if after != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err := after(ctx) + cancel() + if err != nil { + b.Fatal(err) + } + } + } +} + +// checkIterationThreshold skips the benchmark if the number of iterations is +// too high. This usually occurs when the benchmarking framework is unable to +// measure the duration of each iteration, typically because the benchmarked +// code is "too fast". +func checkIterationThreshold(b *testing.B) { + const threshold = 1_000_000 + if b.N >= threshold { + b.Skipf("benchmark skipped, too many iterations (%d); benchmarked code is likely too fast to measure meaningfully", b.N) + } +} diff --git a/journal/benchmark.go b/journal/benchmark.go new file mode 100644 index 0000000..d0335ca --- /dev/null +++ b/journal/benchmark.go @@ -0,0 +1,319 @@ +package journal + +import ( + "context" + "fmt" + "math/rand" + "testing" + + "github.com/dogmatiq/persistencekit/internal/benchmark" +) + +// RunBenchmarks runs benchmarks against a [Store] implementation. +func RunBenchmarks( + b *testing.B, + newStore func(b *testing.B) Store, +) { + b.Run("Store", func(b *testing.B) { + b.Run("Open", func(b *testing.B) { + b.Run("existing journal", func(b *testing.B) { + var name string + + benchmarkStore( + b, + newStore, + // SETUP + func(ctx context.Context, store Store) error { + name = uniqueName() + + // pre-create the journal + ks, err := store.Open(ctx, name) + if err != nil { + return err + } + return ks.Close() + }, + // BEFORE EACH + nil, + // BENCHMARKED CODE + func(ctx context.Context, store Store) (Journal, error) { + return store.Open(ctx, name) + }, + // AFTER EACH + func(j Journal) error { + return j.Close() + }, + ) + }) + + b.Run("new journal", func(b *testing.B) { + var name string + + benchmarkStore( + b, + newStore, + // SETUP + nil, + // BEFORE EACH + func(context.Context, Store) error { + name = uniqueName() + return nil + }, + // BENCHMARKED CODE + func(ctx context.Context, store Store) (Journal, error) { + return store.Open(ctx, name) + }, + // AFTER EACH + func(j Journal) error { + return j.Close() + }, + ) + }) + }) + }) + + b.Run("Journal", func(b *testing.B) { + b.Run("Get", func(b *testing.B) { + b.Run("non-existent record", func(b *testing.B) { + var pos Position + + benchmarkJournal( + b, + newStore, + // SETUP + nil, + // BEFORE EACH + func(context.Context, Journal) error { + pos = Position( + rand.Int63n( + int64(MaxPosition), + ), + ) + return nil + }, + // BENCHMARKED CODE + func(ctx context.Context, j Journal) error { + _, err := j.Get(ctx, pos) + if err == ErrNotFound { + return nil + } + return err + }, + // AFTER EACH + nil, + ) + }) + + b.Run("existing record", func(b *testing.B) { + var pos Position + + benchmarkJournal( + b, + newStore, + // SETUP + func(ctx context.Context, _ Store, j Journal) error { + for pos := Position(0); pos < 10000; pos++ { + rec := []byte(fmt.Sprintf("", pos)) + if err := j.Append(ctx, pos, rec); err != nil { + return err + } + } + return nil + }, + // BEFORE EACH + func(ctx context.Context, j Journal) error { + pos = Position(rand.Uint64() % 10000) + return nil + }, + // BENCHMARKED CODE + func(ctx context.Context, j Journal) error { + _, err := j.Get(ctx, pos) + if err == ErrNotFound { + return nil + } + return err + }, + // AFTER EACH + nil, + ) + }) + }) + + b.Run("Append", func(b *testing.B) { + var pos Position + + benchmarkJournal( + b, + newStore, + // SETUP + nil, + // BEFORE EACH + nil, + // BENCHMARKED CODE + func(ctx context.Context, j Journal) error { + return j.Append(ctx, pos, []byte("")) + }, + // AFTER EACH + func() error { + pos++ + return nil + }, + ) + }) + + b.Run("Range (3k records)", func(b *testing.B) { + benchmarkJournal( + b, + newStore, + // SETUP + func(ctx context.Context, _ Store, j Journal) error { + rec := []byte("") + for pos := Position(0); pos < 3000; pos++ { + if err := j.Append(ctx, pos, rec); err != nil { + return err + } + } + return nil + }, + // BEFORE EACH + nil, + // BENCHMARKED CODE + func(ctx context.Context, j Journal) error { + return j.Range( + ctx, + 0, + func(context.Context, Position, []byte) (bool, error) { + return true, nil + }, + ) + }, + // AFTER EACH + nil, + ) + }) + + b.Run("Truncate (1 record)", func(b *testing.B) { + var pos Position + + benchmarkJournal( + b, + newStore, + // SETUP + func(ctx context.Context, store Store, j Journal) error { + rec := []byte("") + for pos := 0; pos < b.N; pos++ { + if err := j.Append(ctx, Position(pos), rec); err != nil { + return err + } + } + return nil + }, + // BEFORE EACH + nil, + // BENCHMARKED CODE + func(ctx context.Context, j Journal) error { + pos++ + return j.Truncate(ctx, pos) + }, + // AFTER EACH + nil, + ) + }) + }) +} + +func benchmarkStore[T any]( + b *testing.B, + newStore func(b *testing.B) Store, + setup func(context.Context, Store) error, + before func(context.Context, Store) error, + fn func(context.Context, Store) (T, error), + after func(T) error, +) { + var ( + store Store + result T + ) + + benchmark.Run( + b, + func(ctx context.Context) error { + store = newStore(b) + + if setup != nil { + return setup(ctx, store) + } + + return nil + }, + func(ctx context.Context) error { + if before != nil { + return before(ctx, store) + } + return nil + }, + func(ctx context.Context) error { + var err error + result, err = fn(ctx, store) + return err + }, + func(ctx context.Context) error { + if after != nil { + return after(result) + } + return nil + }, + ) +} + +func benchmarkJournal( + b *testing.B, + newStore func(b *testing.B) Store, + setup func(context.Context, Store, Journal) error, + before func(context.Context, Journal) error, + fn func(context.Context, Journal) error, + after func() error, +) { + var ( + store Store + journ Journal + ) + + benchmark.Run( + b, + func(ctx context.Context) error { + store = newStore(b) + + var err error + journ, err = store.Open(ctx, uniqueName()) + if err != nil { + return err + } + + b.Cleanup(func() { + journ.Close() + }) + + if setup != nil { + return setup(ctx, store, journ) + } + + return nil + }, + func(ctx context.Context) error { + if before != nil { + return before(ctx, journ) + } + return nil + }, + func(ctx context.Context) error { + return fn(ctx, journ) + }, + func(ctx context.Context) error { + if after != nil { + return after() + } + return nil + }, + ) +} diff --git a/journal/journal.go b/journal/journal.go index b420b9f..c547f00 100644 --- a/journal/journal.go +++ b/journal/journal.go @@ -3,12 +3,19 @@ package journal import ( "context" "errors" + "math" ) // Position is the index of a record within a [Journal]. The first record is always // at position 0. type Position uint64 +// MaxPosition the largest value that can be represented by a [Position]. +// +// Even though [Position] is defined as a 64-bit unsigned integer, some +// implementations use signed 64-bit integers internally. +const MaxPosition Position = math.MaxInt64 + // A RangeFunc is a function used to range over the records in a [Journal]. // // If err is non-nil, ranging stops and err is propagated up the stack. diff --git a/journal/test.go b/journal/test.go index 9fe9b86..6574d08 100644 --- a/journal/test.go +++ b/journal/test.go @@ -32,7 +32,7 @@ func RunTests( deps := &dependencies{ Store: newStore(t), - JournalName: fmt.Sprintf("", journalCounter.Add(1)), + JournalName: uniqueName(), } j, err := deps.Store.Open(ctx, deps.JournalName) @@ -493,7 +493,11 @@ func RunTests( }) } -var journalCounter atomic.Uint64 +var nameCounter atomic.Uint64 + +func uniqueName() string { + return fmt.Sprintf("", nameCounter.Add(1)) +} // appendRecords appends records to j. func appendRecords( diff --git a/kv/benchmark.go b/kv/benchmark.go index ea440ef..e225856 100644 --- a/kv/benchmark.go +++ b/kv/benchmark.go @@ -6,10 +6,9 @@ import ( "fmt" "io" "testing" - "time" -) -const iterationThreshold = 1_000_000 + "github.com/dogmatiq/persistencekit/internal/benchmark" +) // RunBenchmarks runs benchmarks against a [Store] implementation. func RunBenchmarks( @@ -26,7 +25,7 @@ func RunBenchmarks( newStore, // SETUP func(ctx context.Context, store Store) error { - name = uniqueKeyspaceName() + name = uniqueName() // pre-create the keyspace ks, err := store.Open(ctx, name) @@ -58,7 +57,7 @@ func RunBenchmarks( nil, // BEFORE EACH func(context.Context, Store) error { - name = uniqueKeyspaceName() + name = uniqueName() return nil }, // BENCHMARKED CODE @@ -112,7 +111,7 @@ func RunBenchmarks( if _, err := io.ReadFull(rand.Reader, key[:]); err != nil { return err } - return ks.Set(ctx, key[:], []byte("value")) + return ks.Set(ctx, key[:], []byte("")) }, // BENCHMARKED CODE func(ctx context.Context, ks Keyspace) error { @@ -162,7 +161,7 @@ func RunBenchmarks( if _, err := io.ReadFull(rand.Reader, key[:]); err != nil { return err } - return ks.Set(ctx, key[:], []byte("value")) + return ks.Set(ctx, key[:], []byte("")) }, // BENCHMARKED CODE func(ctx context.Context, ks Keyspace) error { @@ -191,7 +190,7 @@ func RunBenchmarks( }, // BENCHMARKED CODE func(ctx context.Context, ks Keyspace) error { - return ks.Set(ctx, key[:], []byte("value")) + return ks.Set(ctx, key[:], []byte("")) }, // AFTER EACH nil, @@ -211,11 +210,11 @@ func RunBenchmarks( if _, err := io.ReadFull(rand.Reader, key[:]); err != nil { return err } - return ks.Set(ctx, key[:], []byte("value-1")) + return ks.Set(ctx, key[:], []byte("")) }, // BENCHMARKED CODE func(ctx context.Context, ks Keyspace) error { - return ks.Set(ctx, key[:], []byte("value-2")) + return ks.Set(ctx, key[:], []byte("")) }, // AFTER EACH nil, @@ -235,7 +234,7 @@ func RunBenchmarks( if _, err := io.ReadFull(rand.Reader, key[:]); err != nil { return err } - return ks.Set(ctx, key[:], []byte("value")) + return ks.Set(ctx, key[:], []byte("")) }, // BENCHMARKED CODE func(ctx context.Context, ks Keyspace) error { @@ -247,16 +246,15 @@ func RunBenchmarks( }) }) - b.Run("Range (10k pairs)", func(b *testing.B) { + b.Run("Range (3k pairs)", func(b *testing.B) { benchmarkKeyspace( b, newStore, // SETUP func(ctx context.Context, _ Store, ks Keyspace) error { - for i := 0; i < 10000; i++ { - k := []byte(fmt.Sprintf("key-%d", i)) - v := []byte(fmt.Sprintf("value-%d", i)) - + for i := 0; i < 3000; i++ { + k := []byte(fmt.Sprintf("", i)) + v := []byte("") if err := ks.Set(ctx, k, v); err != nil { return err } @@ -269,7 +267,7 @@ func RunBenchmarks( func(ctx context.Context, ks Keyspace) error { return ks.Range( ctx, - func(ctx context.Context, k, v []byte) (bool, error) { + func(context.Context, []byte, []byte) (bool, error) { return true, nil }, ) @@ -285,114 +283,94 @@ func benchmarkStore[T any]( b *testing.B, newStore func(b *testing.B) Store, setup func(context.Context, Store) error, - beforeEach func(context.Context, Store) error, - run func(context.Context, Store) (T, error), - afterEach func(T) error, + before func(context.Context, Store) error, + fn func(context.Context, Store) (T, error), + after func(T) error, ) { - b.StopTimer() - - if b.N >= iterationThreshold { - b.Skipf("too many iterations (%d); benchmarked code is likely too fast to measure meaningfully", b.N) - } - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - store := newStore(b) - - if setup != nil { - err := setup(ctx, store) - if err != nil { - b.Fatal(err) - } - } - - for i := 0; i < b.N; i++ { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - - if beforeEach != nil { - if err := beforeEach(ctx, store); err != nil { - cancel() - b.Fatal(err) + var ( + store Store + result T + ) + + benchmark.Run( + b, + func(ctx context.Context) error { + store = newStore(b) + + if setup != nil { + return setup(ctx, store) } - } - b.StartTimer() - result, err := run(ctx, store) - b.StopTimer() - - cancel() - - if err != nil { - b.Fatal(err) - } - - if afterEach != nil { - if err := afterEach(result); err != nil { - b.Fatal(err) + return nil + }, + func(ctx context.Context) error { + if before != nil { + return before(ctx, store) + } + return nil + }, + func(ctx context.Context) error { + var err error + result, err = fn(ctx, store) + return err + }, + func(ctx context.Context) error { + if after != nil { + return after(result) } - } - } + return nil + }, + ) } func benchmarkKeyspace( b *testing.B, newStore func(b *testing.B) Store, setup func(context.Context, Store, Keyspace) error, - beforeEach func(context.Context, Keyspace) error, - run func(context.Context, Keyspace) error, - afterEach func() error, + before func(context.Context, Keyspace) error, + fn func(context.Context, Keyspace) error, + after func() error, ) { - b.StopTimer() - - if b.N >= iterationThreshold { - b.Skipf("too many iterations (%d); benchmarked code is likely too fast to measure meaningfully", b.N) - } - - store := newStore(b) - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - ks, err := store.Open(ctx, uniqueKeyspaceName()) - if err != nil { - b.Fatal(err) - } - - if setup != nil { - err := setup(ctx, store, ks) - if err != nil { - b.Fatal(err) - } - } - - for i := 0; i < b.N; i++ { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - if err != nil { - cancel() - } - - if beforeEach != nil { - if err := beforeEach(ctx, ks); err != nil { - cancel() - b.Fatal(err) + var ( + store Store + keyspace Keyspace + ) + + benchmark.Run( + b, + func(ctx context.Context) error { + store = newStore(b) + + var err error + keyspace, err = store.Open(ctx, uniqueName()) + if err != nil { + return err } - } - b.StartTimer() - err := run(ctx, ks) - b.StopTimer() - - cancel() + b.Cleanup(func() { + keyspace.Close() + }) - if err != nil { - b.Fatal(err) - } + if setup != nil { + return setup(ctx, store, keyspace) + } - if afterEach != nil { - if err := afterEach(); err != nil { - b.Fatal(err) + return nil + }, + func(ctx context.Context) error { + if before != nil { + return before(ctx, keyspace) + } + return nil + }, + func(ctx context.Context) error { + return fn(ctx, keyspace) + }, + func(ctx context.Context) error { + if after != nil { + return after() } - } - } + return nil + }, + ) } diff --git a/kv/test.go b/kv/test.go index 964c285..79d7f09 100644 --- a/kv/test.go +++ b/kv/test.go @@ -533,10 +533,10 @@ func RunTests( }) } -var keyspaceID atomic.Uint64 +var nameCounter atomic.Uint64 -func uniqueKeyspaceName() string { - return fmt.Sprintf("", keyspaceID.Add(1)) +func uniqueName() string { + return fmt.Sprintf("", nameCounter.Add(1)) } func setup( @@ -548,7 +548,7 @@ func setup( store := newStore(t) - ks, err := store.Open(ctx, uniqueKeyspaceName()) + ks, err := store.Open(ctx, uniqueName()) if err != nil { t.Fatal(err) }