From fde64ccd0e8a4179181dff3bb966d2280a6271d7 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Mon, 24 Jul 2023 16:49:18 -0400 Subject: [PATCH 1/2] engineccl: add randomized error injector test for encryptedFS The encryptedFS can return an error after doing part of the work, as modifying the encryption metadata and the underlying FS is not atomic. This makes some operations (rename, link, remove) non-idempotent, which is harmless for the CockroachDB use cases (since they don't retry on the same files). The test works around these by retrying in a way that makes them idempotent. Additionally, the test catches panics caused by FS errors, in order to test a node that crashes because of a panic caused by a transient error, and is subsequently restarted. Epic: none Informs: #96670 Release note: None --- pkg/ccl/storageccl/engineccl/BUILD.bazel | 3 + .../storageccl/engineccl/encrypted_fs_test.go | 484 ++++++++++++++++++ .../engineccl/pebble_key_manager_test.go | 4 +- pkg/storage/pebble.go | 4 +- 4 files changed, 492 insertions(+), 3 deletions(-) diff --git a/pkg/ccl/storageccl/engineccl/BUILD.bazel b/pkg/ccl/storageccl/engineccl/BUILD.bazel index c3c7c8085516..5b52b592df52 100644 --- a/pkg/ccl/storageccl/engineccl/BUILD.bazel +++ b/pkg/ccl/storageccl/engineccl/BUILD.bazel @@ -61,9 +61,12 @@ go_test( "//pkg/util/randutil", "//pkg/util/timeutil", "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_errors//oserror", "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_pebble//vfs/atomicfs", + "@com_github_cockroachdb_pebble//vfs/errorfs", "@com_github_gogo_protobuf//proto", "@com_github_kr_pretty//:pretty", "@com_github_stretchr_testify//require", diff --git a/pkg/ccl/storageccl/engineccl/encrypted_fs_test.go b/pkg/ccl/storageccl/engineccl/encrypted_fs_test.go index 1d3abff8238a..303b5799a66e 100644 --- a/pkg/ccl/storageccl/engineccl/encrypted_fs_test.go +++ b/pkg/ccl/storageccl/engineccl/encrypted_fs_test.go @@ -11,8 +11,10 @@ package engineccl import ( "bytes" "context" + "flag" "fmt" "io" + "math/rand" "os" "strconv" "strings" @@ -29,8 +31,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/errors/oserror" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/vfs" + "github.com/cockroachdb/pebble/vfs/errorfs" "github.com/stretchr/testify/require" ) @@ -436,3 +442,481 @@ func TestCanRegistryElide(t *testing.T) { entry.EncryptionSettings = b require.False(t, canRegistryElide(entry)) } + +// errorInjector injects errors into metadata writes involving the +// encryptedFS, i.e., writes by the file registry and key manager. Data files +// in the test below have names prefixed by TEST and are spared this error +// injection, since the goal of the test is to discover bugs in the metadata +// processing. +type errorInjector struct { + prob float64 + rand *rand.Rand + + // The test is single threaded, and there is no async IO in the lower + // layers, so this field does not need synchronization. + startInjecting bool +} + +func (i *errorInjector) MaybeError(op errorfs.Op, path string) error { + if i.startInjecting && op.OpKind() == errorfs.OpKindWrite && + !strings.HasPrefix(path, "TEST") && i.rand.Float64() < i.prob { + return errors.WithStack(errorfs.ErrInjected) + } + return nil +} + +func (i *errorInjector) startErrors() { + i.startInjecting = true +} + +// testFS is the interface implemented by a plain FS and encrypted FS being +// tested. +type testFS interface { + fs() vfs.FS + // restart is used to "reset" to the synced state. It is used for 2 reasons: + // - explicit testing of node restarts. + // - encryptedFS has unrecoverable errors that cause panics. We do want to + // test situations where the node is restarted after the panic. + restart() error + // syncDir is a convenience function to sync the dir being used by the test. + syncDir(t *testing.T) +} + +// plainTestFS is not encrypted and does not require syncs for state to be +// preserved. It defined the expected output of the randomized test. +type plainTestFS struct { + vfs vfs.FS +} + +func (ptfs *plainTestFS) fs() vfs.FS { + return ptfs.vfs +} + +func (ptfs *plainTestFS) restart() error { return nil } + +func (ptfs *plainTestFS) syncDir(t *testing.T) {} + +// encryptedTestFS is the encrypted FS being tested. +type encryptedTestFS struct { + // The base strict FS which is wrapped for error injection and encryption. + mem *vfs.MemFS + encOptionsBytes []byte + errorProb float64 + errorRand *rand.Rand + + encEnv *storage.EncryptionEnv +} + +func (etfs *encryptedTestFS) fs() vfs.FS { + return etfs.encEnv.FS +} + +func (etfs *encryptedTestFS) syncDir(t *testing.T) { + dir, err := etfs.mem.OpenDir("/") + // These operations are on the base FS, so there should be no errors. + require.NoError(t, err) + require.NoError(t, dir.Sync()) + require.NoError(t, dir.Close()) +} + +func (etfs *encryptedTestFS) restart() error { + if etfs.encEnv != nil { + etfs.encEnv.Closer.Close() + etfs.encEnv = nil + } + etfs.mem.ResetToSyncedState() + ei := &errorInjector{prob: etfs.errorProb, rand: etfs.errorRand} + fsMeta := errorfs.Wrap(etfs.mem, ei) + // TODO(sumeer): Do deterministic rollover of file registry after small + // number of operations. + fileRegistry := &storage.PebbleFileRegistry{ + FS: fsMeta, DBDir: "", ReadOnly: false, NumOldRegistryFiles: 2} + if err := fileRegistry.Load(context.Background()); err != nil { + return err + } + encEnv, err := newEncryptedEnv( + fsMeta, fileRegistry, "", false, etfs.encOptionsBytes) + if err != nil { + return err + } + etfs.encEnv = encEnv + // Error injection starts after initialization, to simplify the test (the + // caller does not need to handle errors and call restart again). + ei.startErrors() + return nil +} + +func makeEncryptedTestFS(t *testing.T, errorProb float64, errorRand *rand.Rand) *encryptedTestFS { + mem := vfs.NewStrictMem() + keyFile128 := "111111111111111111111111111111111234567890123456" + writeToFile(t, mem, "16.key", []byte(keyFile128)) + dir, err := mem.OpenDir("/") + require.NoError(t, err) + require.NoError(t, dir.Sync()) + require.NoError(t, dir.Close()) + + var encOptions baseccl.EncryptionOptions + encOptions.KeySource = baseccl.EncryptionKeySource_KeyFiles + encOptions.KeyFiles = &baseccl.EncryptionKeyFiles{ + CurrentKey: "16.key", + OldKey: "plain", + } + // Effectively infinite period. + // + // TODO(sumeer): Do deterministic data key rotation. Inject kmTimeNow and + // operations that advance time. + encOptions.DataKeyRotationPeriod = 100000 + encOptionsBytes, err := protoutil.Marshal(&encOptions) + require.NoError(t, err) + etfs := &encryptedTestFS{ + mem: mem, + encOptionsBytes: encOptionsBytes, + errorProb: errorProb, + errorRand: errorRand, + encEnv: nil, + } + require.NoError(t, etfs.restart()) + return etfs +} + +// fsTest is used by the various operations. +type fsTest struct { + t *testing.T + fs testFS + + output strings.Builder +} + +func (t *fsTest) outputOkOrError(err error) { + if err != nil { + fmt.Fprintf(&t.output, " %s\n", err.Error()) + } else { + fmt.Fprintf(&t.output, " ok\n") + } +} + +// The operations in the test. +type fsTestOp interface { + run(t *fsTest) +} + +// createOp creates a file with name and the given value. +type createOp struct { + name string + value []byte +} + +func (op *createOp) run(t *fsTest) { + var err error + buf := make([]byte, len(op.value)) + // Create is idempotent, so we simply retry on injected errors. + withRetry(t, func() error { + var f vfs.File + f, err := t.fs.fs().Create(op.name) + if err != nil { + return err + } + // copy the value since Write can modify the value. + copy(buf, op.value) + _, err = f.Write(buf) + if err != nil { + f.Close() + return err + } + if err = f.Sync(); err != nil { + f.Close() + return err + } + return f.Close() + }) + t.fs.syncDir(t.t) + fmt.Fprintf(&t.output, "createOp(%s, %s):", op.name, string(op.value)) + t.outputOkOrError(err) +} + +// removeOp removed the file with name. +type removeOp struct { + name string +} + +func (op *removeOp) run(t *fsTest) { + var err error + observedError := false + // Remove can fail if the file got removed from the underlying FS and we did + // not remove from the registry. It can also fail even when it was removed + // from the registry, if the registry rollover fails. So we swallow + // ErrNotExist, which will happen if the file has been removed from the + // underlying FS. And additionally call restart to get the registry into a + // clean state (it elides files that are not found in the underlying FS when + // loading the registry) -- this is just defensive code since we want to + // avoid relying too much on the internal implementation details of + // encryptedFS. + withRetry(t, func() error { + err = t.fs.fs().Remove(op.name) + if err != nil { + observedError = true + if oserror.IsNotExist(err) { + // Removal is considered done. + err = nil + } + } + return err + }) + t.fs.syncDir(t.t) + fmt.Fprintf(&t.output, "removeOp(%s):", op.name) + t.outputOkOrError(err) + if observedError { + require.NoError(t.t, t.fs.restart()) + } +} + +// linkOp exercises vfs.FS.Link. +type linkOp struct { + fromName string + toName string +} + +func (op *linkOp) run(t *fsTest) { + var err error + // Link is not idempotent, in that the underlying FS linking happens first, + // and if the subsequent changes to the registry fail, the underlying FS + // will not permit linking again (since from file already exists). But we do + // want to test failure handling of encryptedFS, so we massage this to make + // it idempotent. + _, err = t.fs.fs().Stat(op.toName) + // Do something only if the to file does not exist. + if oserror.IsNotExist(err) { + withRetry(t, func() error { + err = t.fs.fs().Link(op.fromName, op.toName) + if oserror.IsExist(err) { + // Partially done and failed, so remove the to file and try again. + err = errorfs.ErrInjected + _ = t.fs.fs().Remove(op.toName) + } + return err + }) + } + t.fs.syncDir(t.t) + fmt.Fprintf(&t.output, "linkOp(%s, %s):", op.fromName, op.toName) + t.outputOkOrError(err) +} + +// renameOp exercises vfs.FS.Rename +type renameOp struct { + fromName string + toName string +} + +func (op *renameOp) run(t *fsTest) { + var err error + observedError := false + // Only do the rename if from file exists: encryptedFS will delete the + // toName entry from the registry if the fromName file is not in the + // registry. And the underlying FS.Rename will return an error since the + // from file does not exist, so the semantics of encrypted FS and plain FS + // are not compatible for this case. + _, err = t.fs.fs().Stat(op.fromName) + if err == nil { + withRetry(t, func() error { + err = t.fs.fs().Rename(op.fromName, op.toName) + if err != nil { + observedError = true + _, err2 := t.fs.fs().Stat(op.fromName) + if oserror.IsNotExist(err2) { + // Swallow the error since rename must have happened. + err = nil + } + } + return err + }) + } + t.fs.syncDir(t.t) + fmt.Fprintf(&t.output, "renameOp(%s, %s):", op.fromName, op.toName) + t.outputOkOrError(err) + if observedError { + require.NoError(t.t, t.fs.restart()) + } +} + +type restartOp struct { +} + +func (op *restartOp) run(t *fsTest) { + err := t.fs.restart() + fmt.Fprintf(&t.output, "restartOp:") + t.outputOkOrError(err) +} + +type readOp struct { + name string +} + +func (op *readOp) run(t *fsTest) { + var err error + var value []byte + withRetry(t, func() error { + var f vfs.File + f, err = t.fs.fs().Open(op.name) + if err != nil { + return err + } + defer f.Close() + value, err = io.ReadAll(f) + return err + }) + fmt.Fprintf(&t.output, "readOp(%s):", op.name) + if err != nil { + fmt.Fprintf(&t.output, " %s\n", err.Error()) + } else { + fmt.Fprintf(&t.output, " %s\n", string(value)) + } +} + +func fillRand(rng *rand.Rand, buf []byte) { + const letters = "abcdefghijklmnopqrstuvwxyz" + const lettersLen = uint64(len(letters)) + const lettersCharsPerRand = 12 // floor(log(math.MaxUint64)/log(lettersLen)) + + var r uint64 + var q int + for i := 0; i < len(buf); i++ { + if q == 0 { + r = rng.Uint64() + q = lettersCharsPerRand + } + buf[i] = letters[r%lettersLen] + r = r / lettersLen + q-- + } +} + +func withRetry(t *fsTest, fn func() error) { + retryFunc := func() (err error) { + defer func() { + // Recover from a panic by restarting and retrying. + if r := recover(); r != nil { + err = errorfs.ErrInjected + require.NoError(t.t, t.fs.restart()) + } + }() + err = fn() + return + } + i := 0 + for { + err := retryFunc() + if !errors.Is(err, errorfs.ErrInjected) { + break + } + i++ + } +} + +func makeOps(rng *rand.Rand, numOps int) []fsTestOp { + var filenames []string + newFilename := func() string { + var fileNameBytes [8]byte + fillRand(rng, fileNameBytes[:]) + name := "TEST" + string(fileNameBytes[:]) + filenames = append(filenames, name) + return name + } + pickFileName := func(newProb float64) string { + if len(filenames) == 0 || rng.Float64() < newProb { + return newFilename() + } + return filenames[rng.Intn(len(filenames))] + } + ops := make([]fsTestOp, 0, numOps) + for len(ops) < numOps { + v := rng.Float64() + if v < 0.5 { + // Read + if len(filenames) == 0 { + continue + } + name := filenames[rng.Intn(len(filenames))] + ops = append(ops, &readOp{name: name}) + } else if v < 0.75 { + // Create + name := newFilename() + var buf [10]byte + fillRand(rng, buf[:]) + ops = append(ops, &createOp{ + name: name, + value: buf[:], + }) + } else if v < 0.80 { + // Link + fromName := pickFileName(0.01) + var toName string + for { + toName = pickFileName(0.9) + if fromName != toName { + break + } + } + ops = append(ops, &linkOp{ + fromName: fromName, + toName: toName, + }) + } else if v < 0.85 { + // Rename + fromName := pickFileName(0.01) + var toName string + for { + toName = pickFileName(0.9) + if fromName != toName { + break + } + } + ops = append(ops, &renameOp{ + fromName: fromName, + toName: toName, + }) + } else if v < 0.9 { + // Remove + name := pickFileName(0.9) + ops = append(ops, &removeOp{name: name}) + } else { + // Restart + ops = append(ops, &restartOp{}) + } + } + return ops +} + +var seed = flag.Uint64("seed", 0, "a pseudorandom number generator seed") + +func TestEncryptedFSRandomizedWithErrors(t *testing.T) { + defer leaktest.AfterTest(t)() + + if *seed == 0 { + *seed = uint64(timeutil.Now().UnixNano()) + } + t.Logf("seed %d", *seed) + rng := rand.New(rand.NewSource(int64(*seed))) + + errorProb := 0.0 + if rng.Float64() < 0.9 { + errorProb = rng.Float64() / 8 + } + ptfs := &plainTestFS{vfs: vfs.NewMem()} + etfs := makeEncryptedTestFS(t, errorProb, rng) + pTest := fsTest{t: t, fs: ptfs} + eTest := fsTest{t: t, fs: etfs} + ops := makeOps(rng, 500) + for i := range ops { + fmt.Fprintf(&pTest.output, "%d: ", i) + ops[i].run(&pTest) + fmt.Fprintf(&eTest.output, "%d: ", i) + ops[i].run(&eTest) + } + expectedStr := pTest.output.String() + actualStr := eTest.output.String() + if true || expectedStr != actualStr { + t.Logf("---- expected\n%s\n", expectedStr) + t.Logf("---- actual\n%s\n", actualStr) + } + require.Equal(t, pTest.output.String(), eTest.output.String()) +} diff --git a/pkg/ccl/storageccl/engineccl/pebble_key_manager_test.go b/pkg/ccl/storageccl/engineccl/pebble_key_manager_test.go index cbf6247e9d55..fe38ca7c092e 100644 --- a/pkg/ccl/storageccl/engineccl/pebble_key_manager_test.go +++ b/pkg/ccl/storageccl/engineccl/pebble_key_manager_test.go @@ -38,8 +38,8 @@ func writeToFile(t *testing.T, fs vfs.FS, filename string, b []byte) { breader := bytes.NewReader(b) _, err = io.Copy(f, breader) require.NoError(t, err) - err = f.Close() - require.NoError(t, err) + require.NoError(t, f.Sync()) + require.NoError(t, f.Close()) } const ( diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index ae3a8ad449ef..8eb213fcc707 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -845,7 +845,9 @@ var WorkloadCollectorEnabled = envutil.EnvOrDefaultBool("COCKROACH_STORAGE_WORKL // and writing data. This should be initialized by calling engineccl.Init() before calling // NewPebble(). The optionBytes is a binary serialized baseccl.EncryptionOptions, so that non-CCL // code does not depend on CCL code. -var NewEncryptedEnvFunc func(fs vfs.FS, fr *PebbleFileRegistry, dbDir string, readOnly bool, optionBytes []byte) (*EncryptionEnv, error) +var NewEncryptedEnvFunc func( + fs vfs.FS, fr *PebbleFileRegistry, dbDir string, readOnly bool, optionBytes []byte, +) (*EncryptionEnv, error) // SetCompactionConcurrency will return the previous compaction concurrency. func (p *Pebble) SetCompactionConcurrency(n uint64) uint64 { From 9061bd6671c26c90714552e4c552c57714ab61e1 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Tue, 1 Aug 2023 00:22:55 -0400 Subject: [PATCH 2/2] roachtest: ignore some ORM tests Release note: None --- pkg/cmd/roachtest/tests/hibernate_blocklist.go | 2 ++ pkg/cmd/roachtest/tests/npgsql_blocklist.go | 1 + pkg/cmd/roachtest/tests/pgjdbc_blocklist.go | 1 + 3 files changed, 4 insertions(+) diff --git a/pkg/cmd/roachtest/tests/hibernate_blocklist.go b/pkg/cmd/roachtest/tests/hibernate_blocklist.go index 187ff7aef16f..8b6cfda2d9c5 100644 --- a/pkg/cmd/roachtest/tests/hibernate_blocklist.go +++ b/pkg/cmd/roachtest/tests/hibernate_blocklist.go @@ -37,9 +37,11 @@ var hibernateBlockList = blocklist{ var hibernateSpatialIgnoreList = blocklist{ "org.hibernate.serialization.SessionFactorySerializationTest.testUnNamedSessionFactorySerialization": "flaky", "org.hibernate.serialization.SessionFactorySerializationTest.testNamedSessionFactorySerialization": "flaky", + "org.hibernate.test.batch.BatchTest.testBatchInsertUpdate": "flaky", } var hibernateIgnoreList = blocklist{ "org.hibernate.serialization.SessionFactorySerializationTest.testUnNamedSessionFactorySerialization": "flaky", "org.hibernate.serialization.SessionFactorySerializationTest.testNamedSessionFactorySerialization": "flaky", + "org.hibernate.test.batch.BatchTest.testBatchInsertUpdate": "flaky", } diff --git a/pkg/cmd/roachtest/tests/npgsql_blocklist.go b/pkg/cmd/roachtest/tests/npgsql_blocklist.go index 9576440c88fe..3f0fa6b36569 100644 --- a/pkg/cmd/roachtest/tests/npgsql_blocklist.go +++ b/pkg/cmd/roachtest/tests/npgsql_blocklist.go @@ -786,6 +786,7 @@ var npgsqlIgnoreList = blocklist{ `Npgsql.Tests.CopyTests(NonMultiplexing).Wrong_format_binary_import`: "flaky", `Npgsql.Tests.CopyTests(NonMultiplexing).Wrong_format_raw_binary_copy`: "flaky", `Npgsql.Tests.NotificationTests.WaitAsync_with_timeout`: "flaky", + `Npgsql.Tests.ReaderTests(Multiplexing,Default).Cleans_up_ok_with_dispose_calls(NotPrepared)`: "flaky", `Npgsql.Tests.TransactionTests(Multiplexing).Failed_transaction_on_close_with_custom_timeout`: "flaky", `Npgsql.Tests.TransactionTests(NonMultiplexing).CommitAsync(Prepared)`: "flaky", } diff --git a/pkg/cmd/roachtest/tests/pgjdbc_blocklist.go b/pkg/cmd/roachtest/tests/pgjdbc_blocklist.go index a74851a9bacc..24b06f02e563 100644 --- a/pkg/cmd/roachtest/tests/pgjdbc_blocklist.go +++ b/pkg/cmd/roachtest/tests/pgjdbc_blocklist.go @@ -923,6 +923,7 @@ var pgjdbcIgnoreList = blocklist{ "org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testReWriteDisabledForPlainBatch[2: autoCommit=NO, binary=REGULAR]": "54477", "org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testReWriteDisabledForPlainBatch[3: autoCommit=NO, binary=FORCE]": "54477", "org.postgresql.test.jdbc2.CursorFetchTest.testBasicFetch[binary = FORCE]": "flaky", + "org.postgresql.test.jdbc2.CursorFetchTest.testBasicFetch[binary = REGULAR]": "flaky", "org.postgresql.test.jdbc2.DatabaseEncodingTest.testBadUTF8Decode": "54477", "org.postgresql.test.jdbc2.DatabaseEncodingTest.testTruncatedUTF8Decode": "54477", "org.postgresql.test.jdbc2.DatabaseEncodingTest.testUTF8Decode": "54477",