diff --git a/DEPS.bzl b/DEPS.bzl index 651df861b1d4..9312a31d3643 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -1555,10 +1555,10 @@ def go_deps(): patches = [ "@com_github_cockroachdb_cockroach//build/patches:com_github_cockroachdb_pebble.patch", ], - sha256 = "af71f92b799b6618a951843c7c5589cef2e095549a9f6af03153b569219219c8", - strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20230301151825-0cd1ceea645a", + sha256 = "8ea9b6931898da952aae5e5c84f551c6e6d2fadd72fead740d2794672a345e04", + strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20230306180248-e9a8c4ad65c5", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20230301151825-0cd1ceea645a.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20230306180248-e9a8c4ad65c5.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index f2c761586689..84ff558eb5ca 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -202,7 +202,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/google-api-go-client/com_github_cockroachdb_google_api_go_client-v0.80.1-0.20221117193156-6a9f7150cb93.zip": "b3378c579f4f4340403038305907d672c86f615f8233118a8873ebe4229c4f39", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gostdlib/com_github_cockroachdb_gostdlib-v1.19.0.zip": "c4d516bcfe8c07b6fc09b8a9a07a95065b36c2855627cb3514e40c98f872b69e", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20230118201751-21c54148d20b.zip": "ca7776f47e5fecb4c495490a679036bfc29d95bd7625290cfdb9abb0baf97476", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20230301151825-0cd1ceea645a.zip": "af71f92b799b6618a951843c7c5589cef2e095549a9f6af03153b569219219c8", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20230306180248-e9a8c4ad65c5.zip": "8ea9b6931898da952aae5e5c84f551c6e6d2fadd72fead740d2794672a345e04", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/redact/com_github_cockroachdb_redact-v1.1.3.zip": "7778b1e4485e4f17f35e5e592d87eb99c29e173ac9507801d000ad76dd0c261e", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/returncheck/com_github_cockroachdb_returncheck-v0.0.0-20200612231554-92cdbca611dd.zip": "ce92ba4352deec995b1f2eecf16eba7f5d51f5aa245a1c362dfe24c83d31f82b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/sentry-go/com_github_cockroachdb_sentry_go-v0.6.1-cockroachdb.2.zip": "fbb2207d02aecfdd411b1357efe1192dbb827959e36b7cab7491731ac55935c9", diff --git a/go.mod b/go.mod index 6f2ce797b53a..a9b6ce3c34f3 100644 --- a/go.mod +++ b/go.mod @@ -114,7 +114,7 @@ require ( github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55 github.com/cockroachdb/gostdlib v1.19.0 github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b - github.com/cockroachdb/pebble v0.0.0-20230301151825-0cd1ceea645a + github.com/cockroachdb/pebble v0.0.0-20230306180248-e9a8c4ad65c5 github.com/cockroachdb/redact v1.1.3 github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd github.com/cockroachdb/stress v0.0.0-20220803192808-1806698b1b7b diff --git a/go.sum b/go.sum index 8fd36c62565b..254885043644 100644 --- a/go.sum +++ b/go.sum @@ -491,8 +491,8 @@ github.com/cockroachdb/gostdlib v1.19.0/go.mod h1:+dqqpARXbE/gRDEhCak6dm0l14AaTy github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= -github.com/cockroachdb/pebble v0.0.0-20230301151825-0cd1ceea645a h1:RUozu4Qb0PCexALTQJ7zr5MbSPEBOcoszQHlOqMOwtU= -github.com/cockroachdb/pebble v0.0.0-20230301151825-0cd1ceea645a/go.mod h1:9lRMC4XN3/BLPtIp6kAKwIaHu369NOf2rMucPzipz50= +github.com/cockroachdb/pebble v0.0.0-20230306180248-e9a8c4ad65c5 h1:rGj0g8lJ4RKw2sPqWQCcDbivT4XxXzTsEL96nSOtiBg= +github.com/cockroachdb/pebble v0.0.0-20230306180248-e9a8c4ad65c5/go.mod h1:9lRMC4XN3/BLPtIp6kAKwIaHu369NOf2rMucPzipz50= github.com/cockroachdb/redact v1.1.3 h1:AKZds10rFSIj7qADf0g46UixK8NNLwWTNdCIGS5wfSQ= github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd h1:KFOt5I9nEKZgCnOSmy8r4Oykh8BYQO8bFOTgHDS8YZA= diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index 1083092b23fe..abc680b66b52 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -9,6 +9,7 @@ package backupccl import ( + "bytes" "context" "fmt" math "math" @@ -190,8 +191,8 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { writeSST := func(t *testing.T, offsets []int) string { path := strconv.FormatInt(timeutil.Now().UnixNano(), 10) - sstFile := &storage.MemFile{} - sst := storage.MakeBackupSSTWriter(ctx, cs, sstFile) + var sstFile bytes.Buffer + sst := storage.MakeBackupSSTWriter(ctx, cs, &sstFile) defer sst.Close() ts := hlc.NewClockForTesting(nil).Now() value := roachpb.MakeValueFromString("bar") @@ -206,7 +207,7 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { if err := sst.Finish(); err != nil { t.Fatalf("%+v", err) } - if err := os.WriteFile(filepath.Join(dir, "foo", path), sstFile.Data(), 0644); err != nil { + if err := os.WriteFile(filepath.Join(dir, "foo", path), sstFile.Bytes(), 0644); err != nil { t.Fatalf("%+v", err) } return path diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index c75c55ea74a9..3765fb1b784e 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -90,7 +90,7 @@ type rangeKeyBatcher struct { // adding MVCCRangeKeyValue rangeKeySSTWriterMaker func() *storage.SSTWriter // In-memory SST file for flushing MVCC range keys - rangeKeySSTFile *storage.MemFile + rangeKeySSTFile *storage.MemObject // curRangeKVBatch is the current batch of range KVs which will // be ingested through 'flush' later. curRangeKVBatch mvccRangeKeyValues @@ -113,7 +113,7 @@ func newRangeKeyBatcher( db: db, minTimestamp: hlc.MaxTimestamp, batchSummary: kvpb.BulkOpSummary{}, - rangeKeySSTFile: &storage.MemFile{}, + rangeKeySSTFile: &storage.MemObject{}, onFlush: onFlush, } batcher.rangeKeySSTWriterMaker = func() *storage.SSTWriter { diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 5dd6afe6490f..edba7c056bb6 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -168,7 +168,7 @@ type SSTBatcher struct { // The rest of the fields are per-batch and are reset via Reset() before each // batch is started. sstWriter storage.SSTWriter - sstFile *storage.MemFile + sstFile *storage.MemObject batchStartKey []byte batchEndKey []byte batchEndValue []byte @@ -365,7 +365,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value // Reset clears all state in the batcher and prepares it for reuse. func (b *SSTBatcher) Reset(ctx context.Context) error { b.sstWriter.Close() - b.sstFile = &storage.MemFile{} + b.sstFile = &storage.MemObject{} // Create sstables intended for ingestion using the newest format that all // nodes can support. MakeIngestionSSTWriter will handle cluster version // gating using b.settings. @@ -912,7 +912,7 @@ func createSplitSSTable( iter storage.SimpleMVCCIterator, settings *cluster.Settings, ) (*sstSpan, *sstSpan, error) { - sstFile := &storage.MemFile{} + sstFile := &storage.MemObject{} w := storage.MakeIngestionSSTWriter(ctx, settings, sstFile) defer w.Close() @@ -937,7 +937,7 @@ func createSplitSSTable( } left = &sstSpan{start: first, end: last.Next(), sstBytes: sstFile.Data()} - *sstFile = storage.MemFile{} + *sstFile = storage.MemObject{} w = storage.MakeIngestionSSTWriter(ctx, settings, sstFile) split = true first = nil diff --git a/pkg/kv/kvnemesis/applier_test.go b/pkg/kv/kvnemesis/applier_test.go index 876efad78d5d..225c4247e823 100644 --- a/pkg/kv/kvnemesis/applier_test.go +++ b/pkg/kv/kvnemesis/applier_test.go @@ -53,7 +53,7 @@ func TestApplier(t *testing.T) { sstValueHeader.KVNemesisSeq.Set(1) sstSpan := roachpb.Span{Key: roachpb.Key(k1), EndKey: roachpb.Key(k4)} sstTS := hlc.Timestamp{WallTime: 1} - sstFile := &storage.MemFile{} + sstFile := &storage.MemObject{} { st := cluster.MakeTestingClusterSettings() storage.ValueBlocksEnabled.Override(ctx, &st.SV, true) diff --git a/pkg/kv/kvnemesis/generator.go b/pkg/kv/kvnemesis/generator.go index e8a126aeb1d0..810b3b873f1b 100644 --- a/pkg/kv/kvnemesis/generator.go +++ b/pkg/kv/kvnemesis/generator.go @@ -573,7 +573,7 @@ func randAddSSTable(g *generator, rng *rand.Rand) Operation { sstTombstone := storage.MVCCValue{MVCCValueHeader: sstValueHeader} // Write key/value pairs to the SST. - f := &storage.MemFile{} + f := &storage.MemObject{} st := cluster.MakeTestingClusterSettings() w := storage.MakeIngestionSSTWriter(ctx, st, f) defer w.Close() diff --git a/pkg/kv/kvnemesis/operations_test.go b/pkg/kv/kvnemesis/operations_test.go index 8ef3ee4c37e0..6d435ca5d130 100644 --- a/pkg/kv/kvnemesis/operations_test.go +++ b/pkg/kv/kvnemesis/operations_test.go @@ -51,7 +51,7 @@ func TestOperationsFormat(t *testing.T) { sstValueHeader.KVNemesisSeq.Set(1) sstSpan := roachpb.Span{Key: roachpb.Key(k1), EndKey: roachpb.Key(k4)} sstTS := hlc.Timestamp{WallTime: 1} - sstFile := &storage.MemFile{} + sstFile := &storage.MemObject{} { st := cluster.MakeTestingClusterSettings() storage.ValueBlocksEnabled.Override(ctx, &st.SV, false) diff --git a/pkg/kv/kvnemesis/validator_test.go b/pkg/kv/kvnemesis/validator_test.go index a5d3707d90cb..953bd7f56071 100644 --- a/pkg/kv/kvnemesis/validator_test.go +++ b/pkg/kv/kvnemesis/validator_test.go @@ -197,7 +197,7 @@ func TestValidate(t *testing.T) { } makeAddSSTable := func(seq kvnemesisutil.Seq, kvs []sstKV) Operation { - f := &storage.MemFile{} + f := &storage.MemObject{} st := cluster.MakeTestingClusterSettings() storage.ValueBlocksEnabled.Override(ctx, &st.SV, true) w := storage.MakeIngestionSSTWriter(ctx, st, f) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 6b9edfeaab37..347a74e8db32 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -208,6 +208,7 @@ go_library( "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_pebble//:pebble", + "@com_github_cockroachdb_pebble//objstorage", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", "@com_github_google_btree//:btree", diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index c3c41cf63799..d7219fb34467 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -11,6 +11,7 @@ package batcheval_test import ( + "bytes" "context" "os" "regexp" @@ -1421,14 +1422,14 @@ func runTestDBAddSSTable( value := roachpb.MakeValueFromString("1") value.InitChecksum([]byte("foo")) - sstFile := &storage.MemFile{} - w := storage.MakeBackupSSTWriter(ctx, cs, sstFile) + var sstFile bytes.Buffer + w := storage.MakeBackupSSTWriter(ctx, cs, &sstFile) defer w.Close() require.NoError(t, w.Put(key, value.RawBytes)) require.NoError(t, w.Finish()) _, _, err := db.AddSSTable( - ctx, "b", "c", sstFile.Data(), allowConflicts, allowShadowing, allowShadowingBelow, nilStats, ingestAsSST, noTS) + ctx, "b", "c", sstFile.Bytes(), allowConflicts, allowShadowing, allowShadowingBelow, nilStats, ingestAsSST, noTS) require.Error(t, err) require.Contains(t, err.Error(), "invalid checksum") } diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 4b0fd5620647..ef9bcce22193 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -11,6 +11,7 @@ package batcheval import ( + "bytes" "context" "fmt" "time" @@ -172,7 +173,7 @@ func evalExport( var curSizeOfExportedSSTs int64 for start := args.Key; start != nil; { - destFile := &storage.MemFile{} + var destFile bytes.Buffer opts := storage.MVCCExportOptions{ StartKey: storage.MVCCKey{Key: start, Timestamp: resumeKeyTS}, EndKey: args.EndKey, @@ -198,7 +199,7 @@ func evalExport( } var hasRangeKeys bool summary, resumeInfo, fingerprint, hasRangeKeys, err = storage.MVCCExportFingerprint(ctx, - cArgs.EvalCtx.ClusterSettings(), reader, opts, destFile) + cArgs.EvalCtx.ClusterSettings(), reader, opts, &destFile) if err != nil { return result.Result{}, maybeAnnotateExceedMaxSizeError(err) } @@ -208,16 +209,16 @@ func evalExport( // part of the ExportResponse. This frees up the memory used by the empty // SST file. if !hasRangeKeys { - destFile = &storage.MemFile{} + destFile = bytes.Buffer{} } } else { summary, resumeInfo, err = storage.MVCCExportToSST(ctx, cArgs.EvalCtx.ClusterSettings(), reader, - opts, destFile) + opts, &destFile) if err != nil { return result.Result{}, maybeAnnotateExceedMaxSizeError(err) } } - data := destFile.Data() + data := destFile.Bytes() // NB: This should only happen in two cases: // diff --git a/pkg/kv/kvserver/batcheval/cmd_export_test.go b/pkg/kv/kvserver/batcheval/cmd_export_test.go index 36b6eecb07f6..c4440fa57408 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_export_test.go @@ -442,7 +442,7 @@ func exportUsingGoIterator( startKey, endKey roachpb.Key, reader storage.Reader, ) ([]byte, error) { - memFile := &storage.MemFile{} + memFile := &storage.MemObject{} sst := storage.MakeIngestionSSTWriter( ctx, cluster.MakeTestingClusterSettings(), memFile, ) @@ -593,7 +593,7 @@ func assertEqualKVs( var sst []byte maxSize := uint64(0) prevStart := start - sstFile := &storage.MemFile{} + var sstFile bytes.Buffer summary, resumeInfo, err := storage.MVCCExportToSST(ctx, st, e, storage.MVCCExportOptions{ StartKey: start, EndKey: endKey, @@ -603,10 +603,10 @@ func assertEqualKVs( TargetSize: targetSize, MaxSize: maxSize, StopMidKey: bool(stopMidKey), - }, sstFile) + }, &sstFile) require.NoError(t, err) start = resumeInfo.ResumeKey - sst = sstFile.Data() + sst = sstFile.Bytes() loaded := loadSST(t, sst, startKey, endKey) // Ensure that the pagination worked properly. if start.Key != nil { @@ -652,7 +652,7 @@ func assertEqualKVs( TargetSize: targetSize, MaxSize: maxSize, StopMidKey: false, - }, &storage.MemFile{}) + }, &bytes.Buffer{}) require.Regexp(t, fmt.Sprintf("export size \\(%d bytes\\) exceeds max size \\(%d bytes\\)", dataSizeWhenExceeded, maxSize), err) } diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 639430c19209..a6eacfd3a38d 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -3782,13 +3782,13 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { type sstFileWriter struct { span roachpb.Span - file *storage.MemFile + file *storage.MemObject writer storage.SSTWriter } keySpans := rditer.MakeReplicatedKeySpans(inSnap.Desc) sstFileWriters := map[string]sstFileWriter{} for _, span := range keySpans { - file := &storage.MemFile{} + file := &storage.MemObject{} writer := storage.MakeIngestionSSTWriter(ctx, st, file) if err := writer.ClearRawRange(span.Key, span.EndKey, true, true); err != nil { return err @@ -3850,7 +3850,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { // replicas (while absorbing their user keys into the LHS). for _, k := range []roachpb.Key{keyB, keyC} { rangeID := rangeIds[string(k)] - sstFile := &storage.MemFile{} + sstFile := &storage.MemObject{} sst := storage.MakeIngestionSSTWriter(ctx, st, sstFile) defer sst.Close() { @@ -3892,7 +3892,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { } // Construct an SST for the user key range of the subsumed replicas. - sstFile := &storage.MemFile{} + sstFile := &storage.MemObject{} sst := storage.MakeIngestionSSTWriter(ctx, st, sstFile) defer sst.Close() desc := roachpb.RangeDescriptor{ diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index fa69d60451c7..3c4866205a85 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -534,7 +534,7 @@ func TestReplicaCircuitBreaker_ExemptRequests(t *testing.T) { exemptRequests := []func() kvpb.Request{ func() kvpb.Request { return &kvpb.ExportRequest{} }, func() kvpb.Request { - sstFile := &storage.MemFile{} + sstFile := &storage.MemObject{} sst := storage.MakeIngestionSSTWriter(context.Background(), cluster.MakeTestingClusterSettings(), sstFile) defer sst.Close() require.NoError(t, sst.LogData([]byte("hello"))) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 316f4c61cd83..9ffa87ce18b1 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -392,7 +392,7 @@ func (r *Replica) LoadBasedSplitter() *split.Decider { func MakeSSTable( ctx context.Context, key, value string, ts hlc.Timestamp, ) ([]byte, storage.MVCCKeyValue) { - sstFile := &storage.MemFile{} + sstFile := &storage.MemObject{} sst := storage.MakeIngestionSSTWriter(ctx, cluster.MakeTestingClusterSettings(), sstFile) defer sst.Close() diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 2519fa21f254..9b4bd8de1923 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -497,7 +497,7 @@ func (r *Replica) applySnapshot( } if nonempty { // TODO(itsbilal): Write to SST directly in unreplicatedSST rather than - // buffering in a MemFile first. + // buffering in a MemObject first. if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil { return err } @@ -715,8 +715,8 @@ func writeUnreplicatedSST( meta raftpb.SnapshotMetadata, hs raftpb.HardState, sl *logstore.StateLoader, -) (_ *storage.MemFile, nonempty bool, _ error) { - unreplicatedSSTFile := &storage.MemFile{} +) (_ *storage.MemObject, nonempty bool, _ error) { + unreplicatedSSTFile := &storage.MemObject{} unreplicatedSST := storage.MakeIngestionSSTWriter( ctx, st, unreplicatedSSTFile, ) @@ -789,7 +789,7 @@ func clearSubsumedReplicaDiskData( totalKeySpans := append([]roachpb.Span(nil), keySpans...) for _, subDesc := range subsumedDescs { // We have to create an SST for the subsumed replica's range-id local keys. - subsumedReplSSTFile := &storage.MemFile{} + subsumedReplSSTFile := &storage.MemObject{} subsumedReplSST := storage.MakeIngestionSSTWriter( ctx, st, subsumedReplSSTFile, ) @@ -811,7 +811,7 @@ func clearSubsumedReplicaDiskData( } if subsumedReplSST.DataSize > 0 { // TODO(itsbilal): Write to SST directly in subsumedReplSST rather than - // buffering in a MemFile first. + // buffering in a MemObject first. if err := writeSST(ctx, subsumedReplSSTFile.Data()); err != nil { return err } @@ -845,7 +845,7 @@ func clearSubsumedReplicaDiskData( // subsume both r1 and r2 in S1. for i := range keySpans { if totalKeySpans[i].EndKey.Compare(keySpans[i].EndKey) > 0 { - subsumedReplSSTFile := &storage.MemFile{} + subsumedReplSSTFile := &storage.MemObject{} subsumedReplSST := storage.MakeIngestionSSTWriter( ctx, st, subsumedReplSSTFile, ) @@ -866,7 +866,7 @@ func clearSubsumedReplicaDiskData( } if subsumedReplSST.DataSize > 0 { // TODO(itsbilal): Write to SST directly in subsumedReplSST rather than - // buffering in a MemFile first. + // buffering in a MemObject first. if err := writeSST(ctx, subsumedReplSSTFile.Data()); err != nil { return err } diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index d9160a8df3b9..5d7665023720 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -347,7 +347,7 @@ func TestReplicaRangefeed(t *testing.T) { expVal6q.InitChecksum(roachpb.Key("q")) st := cluster.MakeTestingClusterSettings() - sstFile := &storage.MemFile{} + sstFile := &storage.MemObject{} sstWriter := storage.MakeIngestionSSTWriter(ctx, st, sstFile) defer sstWriter.Close() require.NoError(t, sstWriter.PutMVCC( @@ -376,7 +376,7 @@ func TestReplicaRangefeed(t *testing.T) { expVal7q.SetInt(7) expVal7q.InitChecksum(roachpb.Key("q")) - sstFile = &storage.MemFile{} + sstFile = &storage.MemObject{} sstWriter = storage.MakeIngestionSSTWriter(ctx, st, sstFile) defer sstWriter.Close() require.NoError(t, sstWriter.PutMVCC( diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage.go b/pkg/kv/kvserver/replica_sst_snapshot_storage.go index 38ce6fdb9af3..b53a0cbf6eb3 100644 --- a/pkg/kv/kvserver/replica_sst_snapshot_storage.go +++ b/pkg/kv/kvserver/replica_sst_snapshot_storage.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/objstorage" "golang.org/x/time/rate" ) @@ -154,18 +155,11 @@ func (s *SSTSnapshotStorageScratch) WriteSST(ctx context.Context, data []byte) e if err != nil { return err } - defer func() { - // Closing an SSTSnapshotStorageFile multiple times is idempotent. Nothing - // actionable if closing fails. - _ = f.Close() - }() - if _, err := f.Write(data); err != nil { + if err := f.Write(data); err != nil { + f.Abort() return err } - if err := f.Sync(); err != nil { - return err - } - return f.Close() + return f.Finish() } // SSTs returns the names of the files created. @@ -194,6 +188,8 @@ type SSTSnapshotStorageFile struct { bytesPerSync int64 } +var _ objstorage.Writable = (*SSTSnapshotStorageFile)(nil) + func (f *SSTSnapshotStorageFile) ensureFile() error { if f.created { if f.file == nil { @@ -222,41 +218,45 @@ func (f *SSTSnapshotStorageFile) ensureFile() error { return nil } -// Write writes contents to the file while respecting the limiter passed into -// SSTSnapshotStorageScratch. Writing empty contents is okay and is treated as -// a noop. The file must have not been closed. -func (f *SSTSnapshotStorageFile) Write(contents []byte) (int, error) { +// Write is part of objstorage.Writable; it writes contents to the file while +// respecting the limiter passed into SSTSnapshotStorageScratch. Writing empty +// contents is okay and is treated as a noop. +// Cannot be called after Finish or Abort. +func (f *SSTSnapshotStorageFile) Write(contents []byte) error { if len(contents) == 0 { - return 0, nil + return nil } if err := f.ensureFile(); err != nil { - return 0, err + return err } if err := kvserverbase.LimitBulkIOWrite(f.ctx, f.scratch.storage.limiter, len(contents)); err != nil { - return 0, err + return err } - return f.file.Write(contents) + // Write always returns an error if it can't write all the contents. + _, err := f.file.Write(contents) + return err } -// Close closes the file. Calling this function multiple times is idempotent. -// The file must have been written to before being closed. -func (f *SSTSnapshotStorageFile) Close() error { +// Finish is part of the objstorage.Writable interface. +func (f *SSTSnapshotStorageFile) Finish() error { // We throw an error for empty files because it would be an error to ingest // an empty SST so catch this error earlier. if !f.created { return errors.New("file is empty") } - if f.file == nil { - return nil - } - if err := f.file.Close(); err != nil { - return err - } + errSync := f.file.Sync() + errClose := f.file.Close() f.file = nil - return nil + if errSync != nil { + return errSync + } + return errClose } -// Sync syncs the file to disk. Implements writeCloseSyncer in engine. -func (f *SSTSnapshotStorageFile) Sync() error { - return f.file.Sync() +// Abort is part of the objstorage.Writable interface. +func (f *SSTSnapshotStorageFile) Abort() { + if f.file != nil { + _ = f.file.Close() + f.file = nil + } } diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go index 1f84cbf929a3..caa8174cad7b 100644 --- a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go +++ b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go @@ -57,9 +57,6 @@ func TestSSTSnapshotStorage(t *testing.T) { f, err := scratch.NewFile(ctx, 0) require.NoError(t, err) - defer func() { - require.NoError(t, f.Close()) - }() // Check that even though the files aren't created, they are still recorded in SSTs(). require.Equal(t, len(scratch.SSTs()), 1) @@ -72,8 +69,7 @@ func TestSSTSnapshotStorage(t *testing.T) { } } - _, err = f.Write([]byte("foo")) - require.NoError(t, err) + require.NoError(t, f.Write([]byte("foo"))) // After writing to files, check that they have been flushed to disk. for _, fileName := range scratch.SSTs() { @@ -86,19 +82,16 @@ func TestSSTSnapshotStorage(t *testing.T) { } // Check that closing is idempotent. - require.NoError(t, f.Close()) - require.NoError(t, f.Close()) + require.NoError(t, f.Finish()) // Check that writing to a closed file is an error. - _, err = f.Write([]byte("foo")) + err = f.Write([]byte("foo")) require.EqualError(t, err, "file has already been closed") // Check that closing an empty file is an error. f, err = scratch.NewFile(ctx, 0) require.NoError(t, err) - require.EqualError(t, f.Close(), "file is empty") - _, err = f.Write([]byte("foo")) - require.NoError(t, err) + require.EqualError(t, f.Finish(), "file is empty") // Check that Close removes the snapshot directory as well as the range // directory. @@ -146,9 +139,6 @@ func TestSSTSnapshotStorageConcurrentRange(t *testing.T) { f, err := scratch.NewFile(ctx, 0) require.NoError(t, err) - defer func() { - require.NoError(t, f.Close()) - }() // Check that even though the files aren't created, they are still recorded in SSTs(). require.Equal(t, len(scratch.SSTs()), 1) @@ -161,8 +151,7 @@ func TestSSTSnapshotStorageConcurrentRange(t *testing.T) { } } - _, err = f.Write([]byte("foo")) - require.NoError(t, err) + require.NoError(t, f.Write([]byte("foo"))) // After writing to files, check that they have been flushed to disk. for _, fileName := range scratch.SSTs() { @@ -175,19 +164,16 @@ func TestSSTSnapshotStorageConcurrentRange(t *testing.T) { } // Check that closing is idempotent. - require.NoError(t, f.Close()) - require.NoError(t, f.Close()) + require.NoError(t, f.Finish()) // Check that writing to a closed file is an error. - _, err = f.Write([]byte("foo")) + err = f.Write([]byte("foo")) require.EqualError(t, err, "file has already been closed") // Check that closing an empty file is an error. f, err = scratch.NewFile(ctx, 0) require.NoError(t, err) - require.EqualError(t, f.Close(), "file is empty") - _, err = f.Write([]byte("foo")) - require.NoError(t, err) + require.EqualError(t, f.Finish(), "file is empty") // Check that Close removes the snapshot directory. require.NoError(t, scratch.Close()) @@ -253,16 +239,15 @@ func TestSSTSnapshotStorageContextCancellation(t *testing.T) { f, err := scratch.NewFile(ctx, 0) require.NoError(t, err) defer func() { - require.NoError(t, f.Close()) + require.NoError(t, f.Finish()) }() // Before context cancellation. - _, err = f.Write([]byte("foo")) - require.NoError(t, err) + require.NoError(t, f.Write([]byte("foo"))) // After context cancellation. cancel() - _, err = f.Write([]byte("bar")) + err = f.Write([]byte("bar")) require.ErrorIs(t, err, context.Canceled) } @@ -310,7 +295,7 @@ func TestMultiSSTWriterInitSST(t *testing.T) { var expectedSSTs [][]byte for _, s := range keySpans { func() { - sstFile := &storage.MemFile{} + sstFile := &storage.MemObject{} sst := storage.MakeIngestionSSTWriter(ctx, cluster.MakeTestingClusterSettings(), sstFile) defer sst.Close() err := sst.ClearRawRange(s.Key, s.EndKey, true, true) diff --git a/pkg/storage/BUILD.bazel b/pkg/storage/BUILD.bazel index cf6fbb180e00..311d53922e59 100644 --- a/pkg/storage/BUILD.bazel +++ b/pkg/storage/BUILD.bazel @@ -87,6 +87,7 @@ go_library( "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//bloom", + "@com_github_cockroachdb_pebble//objstorage", "@com_github_cockroachdb_pebble//objstorage/shared", "@com_github_cockroachdb_pebble//rangekey", "@com_github_cockroachdb_pebble//record", @@ -183,6 +184,7 @@ go_test( "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//oserror", "@com_github_cockroachdb_pebble//:pebble", + "@com_github_cockroachdb_pebble//objstorage", "@com_github_cockroachdb_pebble//sstable", "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index 6180a5f417da..5d7a5b8ff6ea 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors/oserror" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/sstable" "github.com/stretchr/testify/require" ) @@ -1721,7 +1722,7 @@ func runCheckSSTConflicts( // between the engine keys without colliding. ctx := context.Background() st := cluster.MakeTestingClusterSettings() - sstFile := &MemFile{} + sstFile := &MemObject{} sstWriter := MakeIngestionSSTWriter(ctx, st, sstFile) var sstStart, sstEnd MVCCKey lastKeyNum := -1 @@ -1762,7 +1763,7 @@ func runSSTIterator(b *testing.B, numKeys int, verify bool) { ctx := context.Background() st := cluster.MakeTestingClusterSettings() - sstFile := &MemFile{} + sstFile := &MemObject{} sstWriter := MakeIngestionSSTWriter(ctx, st, sstFile) for i := 0; i < numKeys; i++ { @@ -1926,13 +1927,13 @@ func BenchmarkMVCCScannerWithIntentsAndVersions(b *testing.B) { return cmp < 0 }) sstFileName := fmt.Sprintf("tmp-ingest-%d", i) - sstFile, err := eng.Create(sstFileName) + sstFile, err := eng.fs.Create(sstFileName) require.NoError(b, err) // No improvement with v3 since the multiple versions are in different // files. format := sstable.TableFormatPebblev2 opts := DefaultPebbleOptions().MakeWriterOptions(0, format) - writer := sstable.NewWriter(sstFile, opts) + writer := sstable.NewWriter(objstorage.NewFileWritable(sstFile), opts) for _, kv := range kvPairs { require.NoError(b, writer.Add( pebble.InternalKey{UserKey: kv.key, Trailer: uint64(kv.kind)}, kv.value)) diff --git a/pkg/storage/fingerprint_writer.go b/pkg/storage/fingerprint_writer.go index 9f4357a4c513..dc79afa324d1 100644 --- a/pkg/storage/fingerprint_writer.go +++ b/pkg/storage/fingerprint_writer.go @@ -11,6 +11,7 @@ package storage import ( + "bytes" "context" "hash" "hash/fnv" @@ -234,8 +235,8 @@ func FingerprintRangekeys( } defer iter.Close() - destFile := &MemFile{} - fw := makeFingerprintWriter(ctx, fnv.New64(), cs, destFile, opts) + var destFile bytes.Buffer + fw := makeFingerprintWriter(ctx, fnv.New64(), cs, &destFile, opts) defer fw.Close() fingerprintRangeKey := func(stack MVCCRangeKeyStack) (uint64, error) { defer fw.hasher.Reset() @@ -281,7 +282,7 @@ func FingerprintRangekeys( fw.xorAgg.add(rangekeyFingerprint) } - if len(destFile.Data()) != 0 { + if destFile.Len() != 0 { return 0, errors.AssertionFailedf("unexpected data found in destFile") } diff --git a/pkg/storage/metamorphic/BUILD.bazel b/pkg/storage/metamorphic/BUILD.bazel index fc28c84a8b1f..9d73c133c962 100644 --- a/pkg/storage/metamorphic/BUILD.bazel +++ b/pkg/storage/metamorphic/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//bloom", + "@com_github_cockroachdb_pebble//objstorage", "@com_github_cockroachdb_pebble//vfs", ], ) diff --git a/pkg/storage/metamorphic/operations.go b/pkg/storage/metamorphic/operations.go index 75a5b26db568..915670343e86 100644 --- a/pkg/storage/metamorphic/operations.go +++ b/pkg/storage/metamorphic/operations.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uint128" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/objstorage" ) // opReference represents one operation; an opGenerator reference as well as @@ -769,7 +770,7 @@ func (i ingestOp) run(ctx context.Context) string { return fmt.Sprintf("error = %s", err.Error()) } - sstWriter := storage.MakeIngestionSSTWriter(ctx, i.m.st, f) + sstWriter := storage.MakeIngestionSSTWriter(ctx, i.m.st, objstorage.NewFileWritable(f)) for _, key := range i.keys { _ = sstWriter.Put(key, []byte("ingested")) } diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index 82301fc56307..c3f950daf2b8 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -11,6 +11,7 @@ package storage_test import ( + "bytes" "context" "fmt" "math" @@ -1383,7 +1384,7 @@ func cmdExport(e *evalCtx) error { r := e.newReader() defer r.Close() - sstFile := &storage.MemFile{} + var sstFile bytes.Buffer var summary kvpb.BulkOpSummary var resumeInfo storage.ExportRequestResumeInfo @@ -1392,17 +1393,17 @@ func cmdExport(e *evalCtx) error { var err error if shouldFingerprint { summary, resumeInfo, fingerprint, hasRangeKeys, err = storage.MVCCExportFingerprint(e.ctx, e.st, r, - opts, sstFile) + opts, &sstFile) if err != nil { return err } if !hasRangeKeys { - sstFile = &storage.MemFile{} + sstFile.Reset() } e.results.buf.Printf("export: %s", &summary) e.results.buf.Print(" fingerprint=true") } else { - summary, resumeInfo, err = storage.MVCCExportToSST(e.ctx, e.st, r, opts, sstFile) + summary, resumeInfo, err = storage.MVCCExportToSST(e.ctx, e.st, r, opts, &sstFile) if err != nil { return err } @@ -2088,7 +2089,7 @@ type evalCtx struct { locks map[string]*roachpb.Transaction ms *enginepb.MVCCStats sstWriter *storage.SSTWriter - sstFile *storage.MemFile + sstFile *storage.MemObject ssts [][]byte } @@ -2341,7 +2342,7 @@ func (e *evalCtx) newTxn( func (e *evalCtx) sst() *storage.SSTWriter { if e.sstWriter == nil { - e.sstFile = &storage.MemFile{} + e.sstFile = &storage.MemObject{} w := storage.MakeIngestionSSTWriter(e.ctx, e.st, e.sstFile) e.sstWriter = &w } diff --git a/pkg/storage/mvcc_incremental_iterator_test.go b/pkg/storage/mvcc_incremental_iterator_test.go index c4a9b7ef2522..5c6d7e535973 100644 --- a/pkg/storage/mvcc_incremental_iterator_test.go +++ b/pkg/storage/mvcc_incremental_iterator_test.go @@ -186,7 +186,6 @@ func assertExportedErrs( expectedIntents []roachpb.Intent, ) { const big = 1 << 30 - sstFile := &MemFile{} st := cluster.MakeTestingClusterSettings() _, _, err := MVCCExportToSST(context.Background(), st, e, MVCCExportOptions{ StartKey: MVCCKey{Key: startKey}, @@ -198,7 +197,7 @@ func assertExportedErrs( MaxSize: big, MaxIntents: uint64(MaxIntentsPerWriteIntentError.Default()), StopMidKey: false, - }, sstFile) + }, &bytes.Buffer{}) require.Error(t, err) if intentErr := (*kvpb.WriteIntentError)(nil); errors.As(err, &intentErr) { @@ -224,7 +223,7 @@ func assertExportedKVs( expected []MVCCKeyValue, ) { const big = 1 << 30 - sstFile := &MemFile{} + var sstFile bytes.Buffer st := cluster.MakeTestingClusterSettings() _, _, err := MVCCExportToSST(context.Background(), st, e, MVCCExportOptions{ StartKey: MVCCKey{Key: startKey}, @@ -235,9 +234,9 @@ func assertExportedKVs( TargetSize: big, MaxSize: big, StopMidKey: false, - }, sstFile) + }, &sstFile) require.NoError(t, err) - data := sstFile.Data() + data := sstFile.Bytes() if data == nil { require.Nil(t, expected) return @@ -1347,7 +1346,7 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) { defer db2.Close() ingest := func(it EngineIterator, valid bool, err error, count int) { - memFile := &MemFile{} + memFile := &MemObject{} sst := MakeIngestionSSTWriter(ctx, db2.settings, memFile) defer sst.Close() diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index 2b9eac261c9d..e74a9afbafc3 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -6158,12 +6158,12 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { dataIndex := 0 startKey := initialOpts.StartKey for len(startKey.Key) > 0 { - sstFile := &MemFile{} + var sstFile bytes.Buffer opts := initialOpts opts.StartKey = startKey - _, resumeInfo, err := MVCCExportToSST(ctx, st, engine, opts, sstFile) + _, resumeInfo, err := MVCCExportToSST(ctx, st, engine, opts, &sstFile) require.NoError(t, err) - chunk := sstToKeys(t, sstFile.Data()) + chunk := sstToKeys(t, sstFile.Bytes()) require.LessOrEqual(t, len(chunk), len(expectedData)-dataIndex, "remaining test data") for _, key := range chunk { require.True(t, key.Equal(expectedData[dataIndex]), "returned key is not equal") @@ -6272,7 +6272,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { } require.NoError(t, engine.Flush(), "Flush engine data") - sstFile := &MemFile{} + var sstFile bytes.Buffer opts := MVCCExportOptions{ StartKey: MVCCKey{Key: testKey(minKey), Timestamp: minTimestamp}, EndKey: testKey(maxKey), @@ -6297,19 +6297,19 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { // With StopMidKey=false, we expect 6 // revisions or 0 revisions. - _, _, err := MVCCExportToSST(ctx, st, engine, opts, sstFile) + _, _, err := MVCCExportToSST(ctx, st, engine, opts, &sstFile) require.NoError(t, err) - chunk := sstToKeys(t, sstFile.Data()) + chunk := sstToKeys(t, sstFile.Bytes()) require.Equal(t, 6, len(chunk)) // With StopMidKey=true, we can stop in the // middle of iteration. callsBeforeFailure = 2 - sstFile = &MemFile{} + sstFile.Reset() opts.StopMidKey = true - _, _, err = MVCCExportToSST(ctx, st, engine, opts, sstFile) + _, _, err = MVCCExportToSST(ctx, st, engine, opts, &sstFile) require.NoError(t, err) - chunk = sstToKeys(t, sstFile.Data()) + chunk = sstToKeys(t, sstFile.Bytes()) // We expect 3 here rather than 2 because the // first iteration never calls the handler. require.Equal(t, 3, len(chunk)) @@ -6339,16 +6339,16 @@ type dataLimits struct { func exportAllData(t *testing.T, engine Engine, limits queryLimits) []MVCCKey { st := cluster.MakeTestingClusterSettings() - sstFile := &MemFile{} + var sstFile bytes.Buffer _, _, err := MVCCExportToSST(context.Background(), st, engine, MVCCExportOptions{ StartKey: MVCCKey{Key: testKey(limits.minKey), Timestamp: limits.minTimestamp}, EndKey: testKey(limits.maxKey), StartTS: limits.minTimestamp, EndTS: limits.maxTimestamp, ExportAllRevisions: !limits.latest, - }, sstFile) + }, &sstFile) require.NoError(t, err, "Failed to export expected data") - return sstToKeys(t, sstFile.Data()) + return sstToKeys(t, sstFile.Bytes()) } func sstToKeys(t *testing.T, data []byte) []MVCCKey { @@ -6404,7 +6404,6 @@ func TestMVCCExportToSSTFailureIntentBatching(t *testing.T) { require.NoError(t, fillInData(ctx, engine, data)) - destination := &MemFile{} _, _, err := MVCCExportToSST(ctx, st, engine, MVCCExportOptions{ StartKey: MVCCKey{Key: key(10)}, EndKey: key(20000), @@ -6415,7 +6414,7 @@ func TestMVCCExportToSSTFailureIntentBatching(t *testing.T) { MaxSize: 0, MaxIntents: uint64(MaxIntentsPerWriteIntentError.Default()), StopMidKey: false, - }, destination) + }, &bytes.Buffer{}) if len(expectedIntentIndices) == 0 { require.NoError(t, err) } else { @@ -6493,7 +6492,6 @@ func TestMVCCExportToSSTSplitMidKey(t *testing.T) { maxSize = keyValueSize * 2 } for !resumeKey.Equal(MVCCKey{}) { - dest := &MemFile{} _, resumeInfo, err := MVCCExportToSST( ctx, st, engine, MVCCExportOptions{ StartKey: resumeKey, @@ -6504,7 +6502,7 @@ func TestMVCCExportToSSTSplitMidKey(t *testing.T) { TargetSize: 1, MaxSize: maxSize, StopMidKey: test.stopMidKey, - }, dest) + }, &bytes.Buffer{}) require.NoError(t, err) resumeKey = resumeInfo.ResumeKey if !resumeKey.Timestamp.IsEmpty() { @@ -6540,7 +6538,7 @@ func TestMVCCExportToSSTSErrorsOnLargeKV(t *testing.T) { TargetSize: 1, MaxSize: 1, StopMidKey: true, - }, &MemFile{}) + }, &bytes.Buffer{}) require.Equal(t, int64(0), summary.DataSize) expectedErr := &ExceedMaxSizeError{} require.ErrorAs(t, err, &expectedErr) @@ -6560,15 +6558,15 @@ func TestMVCCExportFingerprint(t *testing.T) { st := cluster.MakeTestingClusterSettings() fingerprint := func(opts MVCCExportOptions, engine Engine) (uint64, []byte, kvpb.BulkOpSummary, MVCCKey) { - dest := &MemFile{} + var dest bytes.Buffer var err error res, resumeInfo, fingerprint, hasRangeKeys, err := MVCCExportFingerprint( - ctx, st, engine, opts, dest) + ctx, st, engine, opts, &dest) require.NoError(t, err) if !hasRangeKeys { - dest = &MemFile{} + dest.Reset() } - return fingerprint, dest.Data(), res, resumeInfo.ResumeKey + return fingerprint, dest.Bytes(), res, resumeInfo.ResumeKey } // verifyFingerprintAgainstOracle uses the `fingerprintOracle` to compute a @@ -6814,10 +6812,10 @@ func (f *fingerprintOracle) getFingerprintAndRangeKeys( ) (uint64, []MVCCRangeKeyStack) { t.Helper() - dest := &MemFile{} - _, _, err := MVCCExportToSST(ctx, f.st, f.engine, *f.opts, dest) + var dest bytes.Buffer + _, _, err := MVCCExportToSST(ctx, f.st, f.engine, *f.opts, &dest) require.NoError(t, err) - return f.fingerprintPointKeys(t, dest.Data()), getRangeKeys(t, dest.Data()) + return f.fingerprintPointKeys(t, dest.Bytes()), getRangeKeys(t, dest.Bytes()) } func (f *fingerprintOracle) fingerprintPointKeys(t *testing.T, dataSST []byte) uint64 { diff --git a/pkg/storage/pebble_iterator_test.go b/pkg/storage/pebble_iterator_test.go index d1c9eb3b1c37..efaa2b3aac0e 100644 --- a/pkg/storage/pebble_iterator_test.go +++ b/pkg/storage/pebble_iterator_test.go @@ -11,6 +11,7 @@ package storage import ( + "bytes" "context" "io/fs" "math/rand" @@ -98,8 +99,8 @@ func TestPebbleIterator_ExternalCorruption(t *testing.T) { st := cluster.MakeTestingClusterSettingsWithVersions(version, version, true) ctx := context.Background() rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) - f := &MemFile{} - w := MakeBackupSSTWriter(ctx, st, f) + var f bytes.Buffer + w := MakeBackupSSTWriter(ctx, st, &f) // Create an example sstable. var rawValue [64]byte diff --git a/pkg/storage/sst.go b/pkg/storage/sst.go index a009de597ccd..9b7e0df9d1a2 100644 --- a/pkg/storage/sst.go +++ b/pkg/storage/sst.go @@ -1168,7 +1168,7 @@ func UpdateSSTTimestamps( return nil, enginepb.MVCCStats{}, errors.Errorf("to timestamp not given") } - sstOut := &MemFile{} + sstOut := &MemObject{} sstOut.Buffer.Grow(len(sst)) var statsDelta enginepb.MVCCStats diff --git a/pkg/storage/sst_test.go b/pkg/storage/sst_test.go index 541afbbcc3fd..ce4b0846cd9b 100644 --- a/pkg/storage/sst_test.go +++ b/pkg/storage/sst_test.go @@ -11,6 +11,7 @@ package storage import ( + "bytes" "context" "encoding/binary" "fmt" @@ -50,8 +51,8 @@ func TestCheckSSTConflictsMaxIntents(t *testing.T) { // Create SST with keys equal to intents at txn2TS. cs := cluster.MakeTestingClusterSettings() - sstFile := &MemFile{} - sstWriter := MakeBackupSSTWriter(context.Background(), cs, sstFile) + var sstFile bytes.Buffer + sstWriter := MakeBackupSSTWriter(context.Background(), cs, &sstFile) defer sstWriter.Close() for _, k := range intents { key := MVCCKey{Key: roachpb.Key(k), Timestamp: txn2TS} @@ -129,7 +130,7 @@ func runUpdateSSTTimestamps(ctx context.Context, b *testing.B, numKeys int, conc r := rand.New(rand.NewSource(7)) st := cluster.MakeTestingClusterSettings() - sstFile := &MemFile{} + sstFile := &MemObject{} writer := MakeIngestionSSTWriter(ctx, st, sstFile) defer writer.Close() diff --git a/pkg/storage/sst_writer.go b/pkg/storage/sst_writer.go index f9bbdb80143f..ffb7c38ea0ea 100644 --- a/pkg/storage/sst_writer.go +++ b/pkg/storage/sst_writer.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/sstable" ) @@ -36,26 +37,28 @@ type SSTWriter struct { var _ Writer = &SSTWriter{} var _ ExportWriter = &SSTWriter{} -// writeCloseSyncer interface copied from pebble.sstable. -type writeCloseSyncer interface { - io.WriteCloser - Sync() error -} - -// noopSyncCloser is used to wrap io.Writers for sstable.Writer so that callers -// can decide when to close/sync. -type noopSyncCloser struct { +// noopFinishAbort is used to wrap io.Writers for sstable.Writer. +type noopFinishAbort struct { io.Writer } -func (noopSyncCloser) Sync() error { - return nil +var _ objstorage.Writable = (*noopFinishAbort)(nil) + +// Write is part of the objstorage.Writable interface. +func (n *noopFinishAbort) Write(p []byte) error { + // An io.Writer always returns an error if it can't write the entire slice. + _, err := n.Writer.Write(p) + return err } -func (noopSyncCloser) Close() error { +// Finish is part of the objstorage.Writable interface. +func (*noopFinishAbort) Finish() error { return nil } +// Abort is part of the objstorage.Writable interface. +func (*noopFinishAbort) Abort() {} + // MakeIngestionWriterOptions returns writer options suitable for writing SSTs // that will subsequently be ingested (e.g. with AddSSTable). func MakeIngestionWriterOptions(ctx context.Context, cs *cluster.Settings) sstable.WriterOptions { @@ -94,7 +97,7 @@ func MakeBackupSSTWriter(ctx context.Context, cs *cluster.Settings, f io.Writer) opts.BlockSize = 128 << 10 opts.MergerName = "nullptr" return SSTWriter{ - fw: sstable.NewWriter(noopSyncCloser{f}, opts), + fw: sstable.NewWriter(&noopFinishAbort{f}, opts), supportsRangeKeys: opts.TableFormat >= sstable.TableFormatPebblev2, } } @@ -103,11 +106,11 @@ func MakeBackupSSTWriter(ctx context.Context, cs *cluster.Settings, f io.Writer) // These SSTs have bloom filters enabled (as set in DefaultPebbleOptions) and // format set to RocksDBv2. func MakeIngestionSSTWriter( - ctx context.Context, cs *cluster.Settings, f writeCloseSyncer, + ctx context.Context, cs *cluster.Settings, w objstorage.Writable, ) SSTWriter { opts := MakeIngestionWriterOptions(ctx, cs) return SSTWriter{ - fw: sstable.NewWriter(f, opts), + fw: sstable.NewWriter(w, opts), supportsRangeKeys: opts.TableFormat >= sstable.TableFormatPebblev2, } } @@ -422,33 +425,34 @@ func (fw *SSTWriter) BufferedSize() int { return 0 } -// MemFile is a file-like struct that buffers all data written to it in memory. -// Implements the writeCloseSyncer interface and is intended for use with -// SSTWriter. -type MemFile struct { +// MemObject is an in-memory implementation of objstorage.Writable, intended +// use with SSTWriter. +type MemObject struct { bytes.Buffer } -// Close implements the writeCloseSyncer interface. -func (*MemFile) Close() error { - return nil +var _ objstorage.Writable = (*MemObject)(nil) + +// Write is part of the objstorage.Writable interface. +func (f *MemObject) Write(p []byte) error { + _, err := f.Buffer.Write(p) + return err } -// Flush implements the same interface as the standard library's *bufio.Writer's -// Flush method. The Pebble sstable Writer tests whether files implement a Flush -// method. If not, it wraps the file with a bufio.Writer to buffer writes to the -// underlying file. This buffering is not necessary for an in-memory file. We -// signal this by implementing Flush as a noop. -func (*MemFile) Flush() error { +// Finish is part of the objstorage.Writable interface. +func (*MemObject) Finish() error { return nil } -// Sync implements the writeCloseSyncer interface. -func (*MemFile) Sync() error { +// Abort is part of the objstorage.Writable interface. +func (*MemObject) Abort() {} + +// Close implements the writeCloseSyncer interface. +func (*MemObject) Close() error { return nil } -// Data returns the in-memory buffer behind this MemFile. -func (f *MemFile) Data() []byte { +// Data returns the in-memory buffer behind this MemObject. +func (f *MemObject) Data() []byte { return f.Bytes() } diff --git a/pkg/storage/sst_writer_test.go b/pkg/storage/sst_writer_test.go index b46b3641c898..0dd72bcf672e 100644 --- a/pkg/storage/sst_writer_test.go +++ b/pkg/storage/sst_writer_test.go @@ -56,12 +56,12 @@ func makeIntTableKVs(numKeys, valueSize, maxRevisions int) []MVCCKeyValue { func makePebbleSST(t testing.TB, kvs []MVCCKeyValue, ingestion bool) []byte { ctx := context.Background() st := cluster.MakeTestingClusterSettings() - f := &MemFile{} + f := &MemObject{} var w SSTWriter if ingestion { w = MakeIngestionSSTWriter(ctx, st, f) } else { - w = MakeBackupSSTWriter(ctx, st, f) + w = MakeBackupSSTWriter(ctx, st, &f.Buffer) } defer w.Close() @@ -120,7 +120,7 @@ func TestSSTWriterRangeKeys(t *testing.T) { ctx := context.Background() st := cluster.MakeTestingClusterSettings() - sstFile := &MemFile{} + sstFile := &MemObject{} sst := MakeIngestionSSTWriter(ctx, st, sstFile) defer sst.Close() diff --git a/pkg/testutils/storageutils/sst.go b/pkg/testutils/storageutils/sst.go index 306bc965f6a2..22454203d89d 100644 --- a/pkg/testutils/storageutils/sst.go +++ b/pkg/testutils/storageutils/sst.go @@ -31,7 +31,7 @@ func MakeSST( ) ([]byte, roachpb.Key, roachpb.Key) { t.Helper() - sstFile := &storage.MemFile{} + sstFile := &storage.MemObject{} writer := storage.MakeIngestionSSTWriter(context.Background(), st, sstFile) defer writer.Close()