diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index ef32d85a0689..2f644fb057d0 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -587,6 +587,7 @@ STORAGErpc.method.initput.recvNumber of InitPut requests processedRPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGErpc.method.isspanempty.recvNumber of IsSpanEmpty requests processedRPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGErpc.method.leaseinfo.recvNumber of LeaseInfo requests processedRPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +STORAGErpc.method.linkexternalsstable.recvNumber of LinkExternalSSTable requests processedRPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGErpc.method.merge.recvNumber of Merge requests processedRPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGErpc.method.migrate.recvNumber of Migrate requests processedRPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE STORAGErpc.method.probe.recvNumber of Probe requests processedRPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE @@ -921,6 +922,7 @@ APPLICATIONdistsender.rpc.initput.sentNumber of InitPut requests processed.

This counts the requests in batches handed to DistSender, not the RPCs
sent to individual Ranges as a result.RPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONdistsender.rpc.isspanempty.sentNumber of IsSpanEmpty requests processed.

This counts the requests in batches handed to DistSender, not the RPCs
sent to individual Ranges as a result.RPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONdistsender.rpc.leaseinfo.sentNumber of LeaseInfo requests processed.

This counts the requests in batches handed to DistSender, not the RPCs
sent to individual Ranges as a result.RPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +APPLICATIONdistsender.rpc.linkexternalsstable.sentNumber of LinkExternalSSTable requests processed.

This counts the requests in batches handed to DistSender, not the RPCs
sent to individual Ranges as a result.RPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONdistsender.rpc.merge.sentNumber of Merge requests processed.

This counts the requests in batches handed to DistSender, not the RPCs
sent to individual Ranges as a result.RPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONdistsender.rpc.migrate.sentNumber of Migrate requests processed.

This counts the requests in batches handed to DistSender, not the RPCs
sent to individual Ranges as a result.RPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONdistsender.rpc.probe.sentNumber of Probe requests processed.

This counts the requests in batches handed to DistSender, not the RPCs
sent to individual Ranges as a result.RPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE diff --git a/pkg/ccl/backupccl/restore_online.go b/pkg/ccl/backupccl/restore_online.go index 4ce4dd352fd3..5c664192ec57 100644 --- a/pkg/ccl/backupccl/restore_online.go +++ b/pkg/ccl/backupccl/restore_online.go @@ -292,13 +292,6 @@ func sendRemoteAddSSTable( return err } - loc := kvpb.AddSSTableRequest_RemoteFile{ - Locator: file.Dir.URI, - Path: file.Path, - ApproximatePhysicalSize: fileSize, - BackingFileSize: file.BackingFileSize, - SyntheticPrefix: syntheticPrefix, - } // TODO(dt): see if KV has any better ideas for making these up. fileStats := &enginepb.MVCCStats{ ContainsEstimates: 1, @@ -314,9 +307,18 @@ func sendRemoteAddSSTable( batchTimestamp = execCtx.ExecCfg().DB.Clock().Now() } - _, _, err = execCtx.ExecCfg().DB.AddRemoteSSTable( - ctx, file.BackupFileEntrySpan, loc, fileStats, batchTimestamp) - return err + loc := kvpb.LinkExternalSSTableRequest_ExternalFile{ + Locator: file.Dir.URI, + Path: file.Path, + ApproximatePhysicalSize: fileSize, + BackingFileSize: file.BackingFileSize, + SyntheticPrefix: syntheticPrefix, + UseSyntheticSuffix: batchTimestamp.IsSet(), + MVCCStats: fileStats, + } + + return execCtx.ExecCfg().DB.LinkExternalSSTable( + ctx, file.BackupFileEntrySpan, loc, batchTimestamp) } // checkManifestsForOnlineCompat returns an error if the set of diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 48d891f54f0a..044a56d4e597 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -305,6 +305,7 @@ func (b *Batch) fillResults(ctx context.Context) { case *kvpb.MigrateRequest: case *kvpb.QueryResolvedTimestampRequest: case *kvpb.BarrierRequest: + case *kvpb.LinkExternalSSTableRequest: default: if result.Err == nil { result.Err = errors.Errorf("unsupported reply: %T for %T", @@ -1035,7 +1036,6 @@ func (b *Batch) adminRelocateRange( func (b *Batch) addSSTable( s, e interface{}, data []byte, - remoteFile kvpb.AddSSTableRequest_RemoteFile, disallowConflicts bool, disallowShadowing bool, disallowShadowingBelow hlc.Timestamp, @@ -1059,7 +1059,6 @@ func (b *Batch) addSSTable( EndKey: end, }, Data: data, - RemoteFile: remoteFile, DisallowConflicts: disallowConflicts, DisallowShadowing: disallowShadowing, DisallowShadowingBelow: disallowShadowingBelow, @@ -1071,6 +1070,20 @@ func (b *Batch) addSSTable( b.initResult(1, 0, notRaw, nil) } +func (b *Batch) linkExternalSSTable( + span roachpb.Span, externalFile kvpb.LinkExternalSSTableRequest_ExternalFile, +) { + req := &kvpb.LinkExternalSSTableRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: span.Key, + EndKey: span.EndKey, + }, + ExternalFile: externalFile, + } + b.appendReqs(req) + b.initResult(1, 0, notRaw, nil) +} + // migrate is only exported on DB. func (b *Batch) migrate(s, e interface{}, version roachpb.Version) { begin, err := marshalKey(s) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 987a6f431696..9b1623db7332 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -783,8 +783,6 @@ func (db *DB) AdminRelocateRange( return getOneErr(db.Run(ctx, b), b) } -var noRemoteFile kvpb.AddSSTableRequest_RemoteFile - // AddSSTable links a file into the Pebble log-structured merge-tree. // // The disallowConflicts, disallowShadowingBelow parameters @@ -801,7 +799,7 @@ func (db *DB) AddSSTable( batchTs hlc.Timestamp, ) (roachpb.Span, int64, error) { b := &Batch{Header: kvpb.Header{Timestamp: batchTs}} - b.addSSTable(begin, end, data, noRemoteFile, disallowConflicts, disallowShadowing, disallowShadowingBelow, + b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow, stats, ingestAsWrites, hlc.Timestamp{} /* sstTimestampToRequestTimestamp */) err := getOneErr(db.Run(ctx, b), b) if err != nil { @@ -814,24 +812,23 @@ func (db *DB) AddSSTable( return resp.RangeSpan, resp.AvailableBytes, nil } -func (db *DB) AddRemoteSSTable( +// LinkExternalSSTable links an external sst into the Pebble log-structured merge-tree. +func (db *DB) LinkExternalSSTable( ctx context.Context, span roachpb.Span, - file kvpb.AddSSTableRequest_RemoteFile, - stats *enginepb.MVCCStats, + file kvpb.LinkExternalSSTableRequest_ExternalFile, batchTimestamp hlc.Timestamp, -) (roachpb.Span, int64, error) { +) error { b := &Batch{Header: kvpb.Header{Timestamp: batchTimestamp}} - b.addSSTable(span.Key, span.EndKey, nil, file, false, false, hlc.Timestamp{}, stats, false, batchTimestamp) + b.linkExternalSSTable(span, file) err := getOneErr(db.Run(ctx, b), b) if err != nil { - return roachpb.Span{}, 0, err + return err } if l := len(b.response.Responses); l != 1 { - return roachpb.Span{}, 0, errors.AssertionFailedf("expected single response, got %d", l) + return errors.AssertionFailedf("expected single response, got %d", l) } - resp := b.response.Responses[0].GetAddSstable() - return resp.RangeSpan, resp.AvailableBytes, nil + return nil } // AddSSTableAtBatchTimestamp links a file into the Pebble log-structured @@ -852,7 +849,7 @@ func (db *DB) AddSSTableAtBatchTimestamp( batchTs hlc.Timestamp, ) (hlc.Timestamp, roachpb.Span, int64, error) { b := &Batch{Header: kvpb.Header{Timestamp: batchTs}} - b.addSSTable(begin, end, data, noRemoteFile, + b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow, stats, ingestAsWrites, batchTs) err := getOneErr(db.Run(ctx, b), b) diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 972923a59eca..35c8e2d81b38 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -955,6 +955,9 @@ func (*AdminScatterRequest) Method() Method { return AdminScatter } // Method implements the Request interface. func (*AddSSTableRequest) Method() Method { return AddSSTable } +// Method implements the Request interface. +func (*LinkExternalSSTableRequest) Method() Method { return LinkExternalSSTable } + // Method implements the Request interface. func (*MigrateRequest) Method() Method { return Migrate } @@ -1215,6 +1218,12 @@ func (r *AddSSTableRequest) ShallowCopy() Request { return &shallowCopy } +// ShallowCopy implements the Request interface. +func (r *LinkExternalSSTableRequest) ShallowCopy() Request { + shallowCopy := *r + return &shallowCopy +} + // ShallowCopy implements the Request interface. func (r *MigrateRequest) ShallowCopy() Request { shallowCopy := *r @@ -1757,6 +1766,13 @@ func (r *AddSSTableRequest) flags() flag { } return flags } +func (r *LinkExternalSSTableRequest) flags() flag { + flags := isWrite | isRange | isAlone | isUnsplittable | canBackpressure | bypassesReplicaCircuitBreaker + if r.ExternalFile.UseSyntheticSuffix { + flags |= appliesTSCache + } + return flags +} func (*MigrateRequest) flags() flag { return isWrite | isRange | isAlone } // RefreshRequest and RefreshRangeRequest both determine which timestamp cache diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 3570cfde9b8b..e38ee96b1998 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2127,43 +2127,6 @@ message AddSSTableRequest { // is likely non-empty. See AddSSTableResponse.FollowingLikelyNonEmptySpanStart. bool return_following_likely_non_empty_span_start = 9; - // RemoteFile indicates that the span indicated by the request start and end - // key of the file specified should be added. Data cannot be set if RemoteFile - // is non-empty, and many other request parameters such as collision/shadow - // checking, write-at-request-timestamp, or ingest-as-writes are unsupported. - // - // TODO(dt, msbutler, bilal): This is unsupported. - // TODO(msbutler): rename to ExternalFile. - message RemoteFile { - string locator = 1; - string path = 2; - uint64 backing_file_size = 3; - uint64 approximate_physical_size = 4; - bytes synthetic_prefix = 5; - - } - RemoteFile remote_file = 10 [(gogoproto.nullable) = false]; - - // AddSStableRequest_PrefixReplacement is used to represent a prefix to be - // replaced and the prefix with which to replace it. - // - // TODO(msbutler): delete prefix replacement plumbing. - message PrefixReplacement { - bytes from = 1; - bytes to = 2; - } - - // PrefixReplacement is used to that keys in the sst with the prefix specified - // in "from" should instead appear to have that prefix replaced by "to", when - // during iteration, seeking, or other reads. For example, an SST containing - // keys a1, a2, and a3 when added with PrefixReplacement={From: a, To:x} would - // produce the same results for subsequent operations as adding an SST with - // the keys x1, x2, x3 instead. The implementaiton may however elect to defer - // these replacements until the file is read. - // - // TODO(dt,msbutler,bilal): This is unsupported. - PrefixReplacement prefix_replacement = 11 [(gogoproto.nullable) = false]; - // IgnoreKeysAboveTimestamp is used when ingesting an SSTable that contains // keys, including mvcc revisions of keys, captured over a time range, to // indicate that readers of that sstable should read it "as of" a the fixed @@ -2171,6 +2134,8 @@ message AddSSTableRequest { // // TODO(dt,msbutler,bilal): This is unsupported. util.hlc.Timestamp ignore_keys_above_timestamp = 12 [(gogoproto.nullable) = false]; + + reserved 10, 11; } // AddSSTableResponse is the response to a AddSSTable() operation. @@ -2200,6 +2165,36 @@ message AddSSTableResponse { bytes following_likely_non_empty_span_start = 4 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"]; } +// LinkExternalSSTableRequest contains arguments to the LinkExternalSSTable +// method, which links an External SST file into the Pebble log-structured +// merge-tree. The External SST must point to a backup file which contains +// versioned values with non-zero MVCC timestamps (no intents or inline values). +// It cannot be used in a transaction, cannot be split across ranges, and must +// be alone in a batch. +message LinkExternalSSTableRequest { + RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + + message ExternalFile { + string locator = 1; + string path = 2; + uint64 backing_file_size = 3; + uint64 approximate_physical_size = 4; + bytes synthetic_prefix = 5; + bool use_synthetic_suffix = 6; + + // MVCCStats are estimated MVCCStats for the contents of this Extrenal SSTable + // and is used as-is during evaluation of the command to update the range + // MVCCStats, instead of computing the stats for the SSTable by iterating it. + storage.enginepb.MVCCStats mvcc_stats = 7 [(gogoproto.customname) = "MVCCStats"]; + } + ExternalFile external_file = 4 [(gogoproto.nullable) = false]; +} + +// LinkExternalSSTableResponse is the response to a LinkExternalSSTable() operation. +message LinkExternalSSTableResponse { + ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; +} + // RefreshRequest is arguments to the Refresh() method, which verifies that no // write has occurred since the refresh_from timestamp to the specified key. // The timestamp cache is updated. A transaction must be supplied with this @@ -2544,6 +2539,7 @@ message RequestUnion { BarrierRequest barrier = 53; ProbeRequest probe = 54; IsSpanEmptyRequest is_span_empty = 56; + LinkExternalSSTableRequest link_external_sstable = 57; } reserved 8, 15, 23, 25, 27, 31, 34, 52; } @@ -2599,6 +2595,7 @@ message ResponseUnion { BarrierResponse barrier = 53; ProbeResponse probe = 54; IsSpanEmptyResponse is_span_empty = 56; + LinkExternalSSTableResponse link_external_sstable = 57; } reserved 8, 15, 23, 25, 27, 28, 31, 34, 52; } diff --git a/pkg/kv/kvpb/batch_generated.go b/pkg/kv/kvpb/batch_generated.go index c1eea2a10408..b721790a728f 100644 --- a/pkg/kv/kvpb/batch_generated.go +++ b/pkg/kv/kvpb/batch_generated.go @@ -108,6 +108,8 @@ func (ru RequestUnion) GetInner() Request { return t.Probe case *RequestUnion_IsSpanEmpty: return t.IsSpanEmpty + case *RequestUnion_LinkExternalSstable: + return t.LinkExternalSstable default: return nil } @@ -210,6 +212,8 @@ func (ru ResponseUnion) GetInner() Response { return t.Probe case *ResponseUnion_IsSpanEmpty: return t.IsSpanEmpty + case *ResponseUnion_LinkExternalSstable: + return t.LinkExternalSstable default: return nil } @@ -316,6 +320,8 @@ func (ru *RequestUnion) MustSetInner(r Request) { union = &RequestUnion_Probe{t} case *IsSpanEmptyRequest: union = &RequestUnion_IsSpanEmpty{t} + case *LinkExternalSSTableRequest: + union = &RequestUnion_LinkExternalSstable{t} default: panic(fmt.Sprintf("unsupported type %T for %T", r, ru)) } @@ -421,13 +427,15 @@ func (ru *ResponseUnion) MustSetInner(r Response) { union = &ResponseUnion_Probe{t} case *IsSpanEmptyResponse: union = &ResponseUnion_IsSpanEmpty{t} + case *LinkExternalSSTableResponse: + union = &ResponseUnion_LinkExternalSstable{t} default: panic(fmt.Sprintf("unsupported type %T for %T", r, ru)) } ru.Value = union } -type reqCounts [48]int32 +type reqCounts [49]int32 // getReqCounts returns the number of times each // request type appears in the batch. @@ -531,6 +539,8 @@ func (ba *BatchRequest) getReqCounts() reqCounts { counts[46]++ case *RequestUnion_IsSpanEmpty: counts[47]++ + case *RequestUnion_LinkExternalSstable: + counts[48]++ default: panic(fmt.Sprintf("unsupported request: %+v", ru)) } @@ -587,6 +597,7 @@ var requestNames = []string{ "Barrier", "Probe", "IsSpanEmpty", + "LinkExternalSstable", } // Summary prints a short summary of the requests in a batch. @@ -814,6 +825,10 @@ type isSpanEmptyResponseAlloc struct { union ResponseUnion_IsSpanEmpty resp IsSpanEmptyResponse } +type linkExternalSSTableResponseAlloc struct { + union ResponseUnion_LinkExternalSstable + resp LinkExternalSSTableResponse +} // CreateReply creates replies for each of the contained requests, wrapped in a // BatchResponse. The response objects are batch allocated to minimize @@ -872,6 +887,7 @@ func (ba *BatchRequest) CreateReply() *BatchResponse { var buf45 []barrierResponseAlloc var buf46 []probeResponseAlloc var buf47 []isSpanEmptyResponseAlloc + var buf48 []linkExternalSSTableResponseAlloc for i, r := range ba.Requests { switch r.GetValue().(type) { @@ -1211,6 +1227,13 @@ func (ba *BatchRequest) CreateReply() *BatchResponse { buf47[0].union.IsSpanEmpty = &buf47[0].resp br.Responses[i].Value = &buf47[0].union buf47 = buf47[1:] + case *RequestUnion_LinkExternalSstable: + if buf48 == nil { + buf48 = make([]linkExternalSSTableResponseAlloc, counts[48]) + } + buf48[0].union.LinkExternalSstable = &buf48[0].resp + br.Responses[i].Value = &buf48[0].union + buf48 = buf48[1:] default: panic(fmt.Sprintf("unsupported request: %+v", r)) } @@ -1317,6 +1340,8 @@ func CreateRequest(method Method) Request { return &ProbeRequest{} case IsSpanEmpty: return &IsSpanEmptyRequest{} + case LinkExternalSSTable: + return &LinkExternalSSTableRequest{} default: panic(fmt.Sprintf("unsupported method: %+v", method)) } diff --git a/pkg/kv/kvpb/method.go b/pkg/kv/kvpb/method.go index 85fc55fcbd77..c9fadb705874 100644 --- a/pkg/kv/kvpb/method.go +++ b/pkg/kv/kvpb/method.go @@ -139,8 +139,10 @@ const ( // AdminScatter moves replicas and leaseholders for a selection of ranges. // Best-effort. AdminScatter - // AddSSTable links a file into the RocksDB log-structured merge-tree. + // AddSSTable links a file into pebble. AddSSTable + // LinkExternallSSTable links an external sst into pebble. + LinkExternalSSTable // Migrate updates the range state to conform to a specified cluster // version. It is our main mechanism for phasing out legacy code below Raft. Migrate diff --git a/pkg/kv/kvpb/method_string.go b/pkg/kv/kvpb/method_string.go index 30089fdc75e3..e9c1567101a4 100644 --- a/pkg/kv/kvpb/method_string.go +++ b/pkg/kv/kvpb/method_string.go @@ -46,19 +46,20 @@ func _() { _ = x[Export-35] _ = x[AdminScatter-36] _ = x[AddSSTable-37] - _ = x[Migrate-38] - _ = x[RecomputeStats-39] - _ = x[Refresh-40] - _ = x[RefreshRange-41] - _ = x[Subsume-42] - _ = x[RangeStats-43] - _ = x[AdminVerifyProtectedTimestamp-44] - _ = x[QueryResolvedTimestamp-45] - _ = x[Barrier-46] - _ = x[Probe-47] - _ = x[IsSpanEmpty-48] - _ = x[MaxMethod-48] - _ = x[NumMethods-49] + _ = x[LinkExternalSSTable-38] + _ = x[Migrate-39] + _ = x[RecomputeStats-40] + _ = x[Refresh-41] + _ = x[RefreshRange-42] + _ = x[Subsume-43] + _ = x[RangeStats-44] + _ = x[AdminVerifyProtectedTimestamp-45] + _ = x[QueryResolvedTimestamp-46] + _ = x[Barrier-47] + _ = x[Probe-48] + _ = x[IsSpanEmpty-49] + _ = x[MaxMethod-49] + _ = x[NumMethods-50] } func (i Method) String() string { @@ -139,6 +140,8 @@ func (i Method) String() string { return "AdminScatter" case AddSSTable: return "AddSSTable" + case LinkExternalSSTable: + return "LinkExternalSSTable" case Migrate: return "Migrate" case RecomputeStats: @@ -207,17 +210,18 @@ var StringToMethodMap = map[string]Method{ "Export": 35, "AdminScatter": 36, "AddSSTable": 37, - "Migrate": 38, - "RecomputeStats": 39, - "Refresh": 40, - "RefreshRange": 41, - "Subsume": 42, - "RangeStats": 43, - "AdminVerifyProtectedTimestamp": 44, - "QueryResolvedTimestamp": 45, - "Barrier": 46, - "Probe": 47, - "IsSpanEmpty": 48, - "MaxMethod": 48, - "NumMethods": 49, + "LinkExternalSSTable": 38, + "Migrate": 39, + "RecomputeStats": 40, + "Refresh": 41, + "RefreshRange": 42, + "Subsume": 43, + "RangeStats": 44, + "AdminVerifyProtectedTimestamp": 45, + "QueryResolvedTimestamp": 46, + "Barrier": 47, + "Probe": 48, + "IsSpanEmpty": 49, + "MaxMethod": 49, + "NumMethods": 50, } diff --git a/pkg/kv/kvserver/app_batch.go b/pkg/kv/kvserver/app_batch.go index 55e4b7865260..9abf82854d97 100644 --- a/pkg/kv/kvserver/app_batch.go +++ b/pkg/kv/kvserver/app_batch.go @@ -195,6 +195,13 @@ func (b *appBatch) runPostAddTriggers( b.numMutations += int(added) } } - + if res.LinkExternalSSTable != nil { + linkExternalSStablePreApply( + ctx, + env, + kvpb.RaftTerm(cmd.Term), + cmd.Index(), + *res.LinkExternalSSTable) + } return nil } diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index e5de16c632c8..3b9fef24d4d9 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "cmd_lease_info.go", "cmd_lease_request.go", "cmd_lease_transfer.go", + "cmd_link_external_sstable.go", "cmd_merge.go", "cmd_migrate.go", "cmd_probe.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index a4af120d933d..58562fe9c808 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -50,22 +50,30 @@ func declareKeysAddSSTable( lockSpans *lockspanset.LockSpanSet, maxOffset time.Duration, ) error { - args := req.(*kvpb.AddSSTableRequest) + reqHeader := req.Header() if err := DefaultDeclareIsolatedKeys(rs, header, req, latchSpans, lockSpans, maxOffset); err != nil { return err } // We look up the range descriptor key to return its span. latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(rs.GetStartKey())}) - + var mvccStats *enginepb.MVCCStats + switch v := req.(type) { + case *kvpb.AddSSTableRequest: + mvccStats = v.MVCCStats + case *kvpb.LinkExternalSSTableRequest: + mvccStats = v.ExternalFile.MVCCStats + default: + return errors.AssertionFailedf("unexpected rpc request called on declareKeysAddSStable") + } // TODO(bilal): Audit all AddSSTable callers to ensure they send MVCCStats. - if args.MVCCStats == nil || args.MVCCStats.RangeKeyCount > 0 { + if mvccStats == nil || mvccStats.RangeKeyCount > 0 { // NB: The range end key is not available, so this will pessimistically // latch up to args.EndKey.Next(). If EndKey falls on the range end key, the // span will be tightened during evaluation. // Even if we obtain latches beyond the end range here, it won't cause // contention with the subsequent range because latches are enforced per // range. - l, r := rangeTombstonePeekBounds(args.Key, args.EndKey, rs.GetStartKey().AsRawKey(), nil) + l, r := rangeTombstonePeekBounds(reqHeader.Key, reqHeader.EndKey, rs.GetStartKey().AsRawKey(), nil) latchSpans.AddMVCC(spanset.SpanReadOnly, roachpb.Span{Key: l, EndKey: r}, header.Timestamp) // Obtain a read only lock on range key GC key to serialize with @@ -144,41 +152,6 @@ func EvalAddSSTable( defer span.Finish() log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", start.Key, end.Key) - // If this is a remote sst, just link it in and skip the rest of eval, since - // we do not do anything that touches the data inside an sst for remote ssts - // at the point of ingesting them. - if path := args.RemoteFile.Path; path != "" { - if len(args.Data) > 0 { - return result.Result{}, errors.AssertionFailedf("remote sst cannot include content") - } - log.VEventf(ctx, 1, "AddSSTable remote file %s in %s", path, args.RemoteFile.Locator) - - // We have no idea if the SST being ingested contains keys that will shadow - // existing keys or not, so we need to force its mvcc stats to be estimates. - s := *args.MVCCStats - s.ContainsEstimates++ - ms.Add(s) - - return result.Result{ - Replicated: kvserverpb.ReplicatedEvalResult{ - AddSSTable: &kvserverpb.ReplicatedEvalResult_AddSSTable{ - RemoteFileLoc: args.RemoteFile.Locator, - RemoteFilePath: path, - ApproximatePhysicalSize: args.RemoteFile.ApproximatePhysicalSize, - BackingFileSize: args.RemoteFile.BackingFileSize, - Span: roachpb.Span{Key: start.Key, EndKey: end.Key}, - RemoteRewriteTimestamp: sstToReqTS, - RemoteSyntheticPrefix: args.RemoteFile.SyntheticPrefix, - }, - // Since the remote SST could contain keys at any timestamp, consider it - // a history mutation. - MVCCHistoryMutation: &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{ - Spans: []roachpb.Span{{Key: start.Key, EndKey: end.Key}}, - }, - }, - }, nil - } - if min := addSSTableCapacityRemainingLimit.Get(&cArgs.EvalCtx.ClusterSettings().SV); min > 0 { cap, err := cArgs.EvalCtx.GetEngineCapacity() if err != nil { diff --git a/pkg/kv/kvserver/batcheval/cmd_link_external_sstable.go b/pkg/kv/kvserver/batcheval/cmd_link_external_sstable.go new file mode 100644 index 000000000000..62d380ee5f07 --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_link_external_sstable.go @@ -0,0 +1,90 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +func init() { + // Taking out latches/locks across the entire SST span is very coarse, and we + // could instead iterate over the SST and take out point latches/locks, but + // the cost is likely not worth it since LinkExternalSSTable is often used with + // unpopulated spans. + RegisterReadWriteCommand(kvpb.LinkExternalSSTable, declareKeysAddSSTable, EvalLinkExternalSSTable) +} + +// EvalLinkExternalSSTable evaluates a LinkExternalSSTable command. For details, see doc comment +// on LinkExternalSSTableRequest. +func EvalLinkExternalSSTable( + ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, resp kvpb.Response, +) (result.Result, error) { + args := cArgs.Args.(*kvpb.LinkExternalSSTableRequest) + ms := cArgs.Stats + start, end := storage.MVCCKey{Key: args.Key}, storage.MVCCKey{Key: args.EndKey} + + // If requested and necessary, rewrite the SST's MVCC timestamps to the + // request timestamp. This ensures the writes comply with the timestamp cache + // and closed timestamp, i.e. by not writing to timestamps that have already + // been observed or closed. + // + // NB: during AddSStable eval we also update MVCCStats. Currently, we do not + // set time based MVCCStats in a linked external sst. + var rewriteTimestamp hlc.Timestamp + if args.ExternalFile.UseSyntheticSuffix { + // LinkExternalSSTable doesn't care about the original SST timestamp, so just + // always use the request write timestamp. + rewriteTimestamp = cArgs.Header.Timestamp + } + + log.Eventf(ctx, "evaluating External SSTable [%s,%s)", start.Key, end.Key) + + path := args.ExternalFile.Path + log.VEventf(ctx, 1, "link External SSTable file %s in %s", path, args.ExternalFile.Locator) + + // MVCCStats in the linked sst are always estimates, as we currently compute + // them with back of the envelope calculations using backup file data. + s := *args.ExternalFile.MVCCStats + s.ContainsEstimates++ + ms.Add(s) + + // Eval does not check for conflicts in the existing key space, as it assumes + // the client has sent the request at a timestamp above any requests in the + // key space. + var mvccHistoryMutation *kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation + if rewriteTimestamp.IsEmpty() { + mvccHistoryMutation = + &kvserverpb.ReplicatedEvalResult_MVCCHistoryMutation{Spans: []roachpb.Span{{Key: start.Key, EndKey: end.Key}}} + } + + return result.Result{ + Replicated: kvserverpb.ReplicatedEvalResult{ + LinkExternalSSTable: &kvserverpb.ReplicatedEvalResult_LinkExternalSSTable{ + RemoteFileLoc: args.ExternalFile.Locator, + RemoteFilePath: path, + ApproximatePhysicalSize: args.ExternalFile.ApproximatePhysicalSize, + BackingFileSize: args.ExternalFile.BackingFileSize, + Span: roachpb.Span{Key: start.Key, EndKey: end.Key}, + RemoteRewriteTimestamp: rewriteTimestamp, + RemoteSyntheticPrefix: args.ExternalFile.SyntheticPrefix, + }, + MVCCHistoryMutation: mvccHistoryMutation, + }, + }, nil +} diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 25fb2966d6f6..fbbffe7da896 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -321,6 +321,13 @@ func (p *Result) MergeAndDestroy(q Result) error { } q.Replicated.AddSSTable = nil + if p.Replicated.LinkExternalSSTable == nil { + p.Replicated.LinkExternalSSTable = q.Replicated.LinkExternalSSTable + } else if q.Replicated.LinkExternalSSTable != nil { + return errors.AssertionFailedf("conflicting LinkExternalSSTable") + } + q.Replicated.LinkExternalSSTable = nil + if p.Replicated.MVCCHistoryMutation == nil { p.Replicated.MVCCHistoryMutation = q.Replicated.MVCCHistoryMutation } else if q.Replicated.MVCCHistoryMutation != nil { diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index ada36125a395..51f0763ea0a4 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -3141,20 +3141,21 @@ func TestMergeQueueWithExternalFiles(t *testing.T) { size, err := extStore.Size(ctx, fileName) require.NoError(t, err) - _, _, err = s.DB().AddRemoteSSTable(ctx, roachpb.Span{ + err = s.DB().LinkExternalSSTable(ctx, roachpb.Span{ Key: writeKey, EndKey: writeKey.Next(), - }, kvpb.AddSSTableRequest_RemoteFile{ + }, kvpb.LinkExternalSSTableRequest_ExternalFile{ Locator: externURI, Path: fileName, ApproximatePhysicalSize: uint64(size), BackingFileSize: uint64(size), - }, &enginepb.MVCCStats{ - ContainsEstimates: 1, - KeyBytes: 2, - ValBytes: 10, - KeyCount: 2, - LiveCount: 2, + MVCCStats: &enginepb.MVCCStats{ + ContainsEstimates: 1, + KeyBytes: 2, + ValBytes: 10, + KeyCount: 2, + LiveCount: 2, + }, }, s.DB().Clock().Now()) require.NoError(t, err) diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index 3bf6d7e37a31..5a46d4e647c8 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -216,13 +216,7 @@ message ReplicatedEvalResult { // older have been applied on all replicas. bool at_write_timestamp = 4; - // TODO(msbutler,dt): bikeshed the name of these fields. - string remote_file_loc = 5; - string remote_file_path = 6; - uint64 backing_file_size = 7; - uint64 approximate_physical_size = 8; - util.hlc.Timestamp remote_rewrite_timestamp = 9 [(gogoproto.nullable) = false]; - bytes remote_synthetic_prefix = 10; + reserved 5, 6, 7, 8, 9, 10; } AddSSTable add_sstable = 17 [(gogoproto.customname) = "AddSSTable"]; @@ -258,6 +252,21 @@ message ReplicatedEvalResult { // application while ensuring correctness in the case where the lease transfer // is applied on the new leaseholder through a Raft snapshot. kv.kvserver.readsummary.ReadSummary prior_read_summary = 22; + + // LinkExternalSSTable adds an ExternalSST to the raft log. Unlike AddSStable, + // the ExternalSSTable is not sideloaded into the storage engine because it + // has empty payload. + message LinkExternalSSTable { + roachpb.Span span = 1 [(gogoproto.nullable) = false]; + string remote_file_loc = 2; + string remote_file_path = 3; + uint64 backing_file_size = 4; + uint64 approximate_physical_size = 5; + util.hlc.Timestamp remote_rewrite_timestamp = 6 [(gogoproto.nullable) = false]; + bytes remote_synthetic_prefix = 7; + } + + LinkExternalSSTable link_external_sstable = 27 [(gogoproto.customname) = "LinkExternalSSTable"]; reserved 1, 5, 7, 9, 10, 14, 15, 16, 19, 10001 to 10013; } diff --git a/pkg/kv/kvserver/raftlog/payload.go b/pkg/kv/kvserver/raftlog/payload.go index 2b90c237e5aa..781282b2386f 100644 --- a/pkg/kv/kvserver/raftlog/payload.go +++ b/pkg/kv/kvserver/raftlog/payload.go @@ -51,8 +51,7 @@ func EncodeCommand( entryEncoding = EntryEncodingSideloadedWithAC } - if command.ReplicatedEvalResult.AddSSTable.Data == nil && - command.ReplicatedEvalResult.AddSSTable.RemoteFileLoc == "" { + if command.ReplicatedEvalResult.AddSSTable.Data == nil { return nil, errors.Errorf("cannot sideload empty SSTable") } } diff --git a/pkg/kv/kvserver/replica_app_batch.go b/pkg/kv/kvserver/replica_app_batch.go index 385633cfceb9..0734b929e56f 100644 --- a/pkg/kv/kvserver/replica_app_batch.go +++ b/pkg/kv/kvserver/replica_app_batch.go @@ -288,6 +288,12 @@ func (b *replicaAppBatch) runPostAddTriggersReplicaOnly( } res.AddSSTable = nil } + if res.LinkExternalSSTable != nil { + // All watching rangefeeds should error until we teach clients how to + // process linked external ssts. + b.r.disconnectRangefeedSpanWithErr(res.LinkExternalSSTable.Span, kvpb.NewError(errors.New("LinkExternalSSTable not supported in rangefeeds"))) + res.LinkExternalSSTable = nil + } if res.Split != nil { // Splits require a new HardState to be written to the new RHS diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 1abeddaf1282..dda1063cea8e 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -647,71 +647,6 @@ func addSSTablePreApply( index kvpb.RaftIndex, sst kvserverpb.ReplicatedEvalResult_AddSSTable, ) bool { - if sst.RemoteFilePath != "" { - log.Infof(ctx, - "EXPERIMENTAL AddSSTABLE EXTERNAL %s (size %d, span %s) from %s (size %d) at rewrite ts %s, synth prefix %s", - sst.RemoteFilePath, - sst.ApproximatePhysicalSize, - sst.Span, - sst.RemoteFileLoc, - sst.BackingFileSize, - sst.RemoteRewriteTimestamp, - sst.RemoteSyntheticPrefix, - ) - - start := storage.EngineKey{Key: sst.Span.Key} - // NB: sst.Span.EndKey may be nil if the span represents a single key. In - // that case, we produce an inclusive end bound equal to the start. - // Otherwise we produce an exclusive end bound. - end := start - endInclusive := true - if sst.Span.EndKey != nil { - end = storage.EngineKey{Key: sst.Span.EndKey} - endInclusive = false - } - var syntheticSuffix []byte - if sst.RemoteRewriteTimestamp.IsSet() { - syntheticSuffix = storage.EncodeMVCCTimestampSuffix(sst.RemoteRewriteTimestamp) - } - var syntheticPrefix []byte - if len(sst.RemoteSyntheticPrefix) > 0 { - syntheticPrefix = sst.RemoteSyntheticPrefix - } - - externalFile := pebble.ExternalFile{ - Locator: remote.Locator(sst.RemoteFileLoc), - ObjName: sst.RemoteFilePath, - Size: sst.ApproximatePhysicalSize, - StartKey: start.Encode(), - EndKey: end.Encode(), - EndKeyIsInclusive: endInclusive, - - SyntheticSuffix: syntheticSuffix, - SyntheticPrefix: syntheticPrefix, - // TODO(dt): pass pebble the backing file size to avoid a stat call. - - // TODO(msbutler): I guess we need to figure out if the backing external - // file has point or range keys in the target span. - HasPointKey: true, - } - tBegin := timeutil.Now() - defer func() { - if dur := timeutil.Since(tBegin); dur > addSSTPreApplyWarn.threshold && addSSTPreApplyWarn.ShouldLog() { - log.Infof(ctx, - "ingesting SST of size %s at index %d took %.2fs", - humanizeutil.IBytes(int64(len(sst.Data))), index, dur.Seconds(), - ) - } - }() - - _, ingestErr := env.eng.IngestExternalFiles(ctx, []pebble.ExternalFile{externalFile}) - if ingestErr != nil { - log.Fatalf(ctx, "while ingesting %s: %v", sst.RemoteFilePath, ingestErr) - } - // Adding without modification succeeded, no copy necessary. - log.Eventf(ctx, "ingested SSTable at index %d, term %d: external %s", index, term, sst.RemoteFilePath) - return false - } checksum := util.CRC32(sst.Data) if checksum != sst.CRC32 { @@ -765,6 +700,70 @@ func addSSTablePreApply( return false /* copied */ } +func linkExternalSStablePreApply( + ctx context.Context, + env postAddEnv, + term kvpb.RaftTerm, + index kvpb.RaftIndex, + sst kvserverpb.ReplicatedEvalResult_LinkExternalSSTable, +) { + log.Infof(ctx, + "EXPERIMENTAL AddSSTABLE EXTERNAL %s (size %d, span %s) from %s (size %d) at rewrite ts %s, synth prefix %s", + sst.RemoteFilePath, + sst.ApproximatePhysicalSize, + sst.Span, + sst.RemoteFileLoc, + sst.BackingFileSize, + sst.RemoteRewriteTimestamp, + sst.RemoteSyntheticPrefix, + ) + + start := storage.EngineKey{Key: sst.Span.Key} + // NB: sst.Span.EndKey may be nil if the span represents a single key. In + // that case, we produce an inclusive end bound equal to the start. + // Otherwise we produce an exclusive end bound. + end := start + endInclusive := true + if sst.Span.EndKey != nil { + end = storage.EngineKey{Key: sst.Span.EndKey} + endInclusive = false + } + var syntheticSuffix []byte + if sst.RemoteRewriteTimestamp.IsSet() { + syntheticSuffix = storage.EncodeMVCCTimestampSuffix(sst.RemoteRewriteTimestamp) + } + var syntheticPrefix []byte + if len(sst.RemoteSyntheticPrefix) > 0 { + syntheticPrefix = sst.RemoteSyntheticPrefix + } + + externalFile := pebble.ExternalFile{ + Locator: remote.Locator(sst.RemoteFileLoc), + ObjName: sst.RemoteFilePath, + Size: sst.ApproximatePhysicalSize, + StartKey: start.Encode(), + EndKey: end.Encode(), + EndKeyIsInclusive: endInclusive, + SyntheticSuffix: syntheticSuffix, + SyntheticPrefix: syntheticPrefix, + HasPointKey: true, + } + tBegin := timeutil.Now() + defer func() { + if dur := timeutil.Since(tBegin); dur > addSSTPreApplyWarn.threshold && addSSTPreApplyWarn.ShouldLog() { + log.Infof(ctx, + "ingesting External SST at index %d took %.2fs", index, dur.Seconds(), + ) + } + }() + + _, ingestErr := env.eng.IngestExternalFiles(ctx, []pebble.ExternalFile{externalFile}) + if ingestErr != nil { + log.Fatalf(ctx, "while ingesting %s: %v", sst.RemoteFilePath, ingestErr) + } + log.Eventf(ctx, "ingested SSTable at index %d, term %d: external %s", index, term, sst.RemoteFilePath) +} + // ingestViaCopy writes the SST to ingestPath (with rate limiting) and then ingests it // into the Engine. // diff --git a/pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer/authorizer.go b/pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer/authorizer.go index 458b489353b2..d9e653dca86f 100644 --- a/pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer/authorizer.go +++ b/pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer/authorizer.go @@ -187,34 +187,36 @@ var ( var reqMethodToCap = map[kvpb.Method]tenantcapabilities.ID{ // The following requests are authorized for all workloads. - kvpb.AddSSTable: noCapCheckNeeded, - kvpb.Barrier: noCapCheckNeeded, - kvpb.ClearRange: noCapCheckNeeded, - kvpb.ConditionalPut: noCapCheckNeeded, - kvpb.Delete: noCapCheckNeeded, - kvpb.DeleteRange: noCapCheckNeeded, - kvpb.EndTxn: noCapCheckNeeded, - kvpb.Export: noCapCheckNeeded, - kvpb.Get: noCapCheckNeeded, - kvpb.HeartbeatTxn: noCapCheckNeeded, - kvpb.Increment: noCapCheckNeeded, - kvpb.InitPut: noCapCheckNeeded, - kvpb.IsSpanEmpty: noCapCheckNeeded, - kvpb.LeaseInfo: noCapCheckNeeded, - kvpb.PushTxn: noCapCheckNeeded, - kvpb.Put: noCapCheckNeeded, - kvpb.QueryIntent: noCapCheckNeeded, - kvpb.QueryLocks: noCapCheckNeeded, - kvpb.QueryTxn: noCapCheckNeeded, - kvpb.RangeStats: noCapCheckNeeded, - kvpb.RecoverTxn: noCapCheckNeeded, - kvpb.Refresh: noCapCheckNeeded, - kvpb.RefreshRange: noCapCheckNeeded, - kvpb.ResolveIntent: noCapCheckNeeded, - kvpb.ResolveIntentRange: noCapCheckNeeded, - kvpb.ReverseScan: noCapCheckNeeded, - kvpb.RevertRange: noCapCheckNeeded, - kvpb.Scan: noCapCheckNeeded, + kvpb.AddSSTable: noCapCheckNeeded, + // TODO(dt): only allow system tenant to send LinkExternalSSTable requests. + kvpb.LinkExternalSSTable: noCapCheckNeeded, + kvpb.Barrier: noCapCheckNeeded, + kvpb.ClearRange: noCapCheckNeeded, + kvpb.ConditionalPut: noCapCheckNeeded, + kvpb.Delete: noCapCheckNeeded, + kvpb.DeleteRange: noCapCheckNeeded, + kvpb.EndTxn: noCapCheckNeeded, + kvpb.Export: noCapCheckNeeded, + kvpb.Get: noCapCheckNeeded, + kvpb.HeartbeatTxn: noCapCheckNeeded, + kvpb.Increment: noCapCheckNeeded, + kvpb.InitPut: noCapCheckNeeded, + kvpb.IsSpanEmpty: noCapCheckNeeded, + kvpb.LeaseInfo: noCapCheckNeeded, + kvpb.PushTxn: noCapCheckNeeded, + kvpb.Put: noCapCheckNeeded, + kvpb.QueryIntent: noCapCheckNeeded, + kvpb.QueryLocks: noCapCheckNeeded, + kvpb.QueryTxn: noCapCheckNeeded, + kvpb.RangeStats: noCapCheckNeeded, + kvpb.RecoverTxn: noCapCheckNeeded, + kvpb.Refresh: noCapCheckNeeded, + kvpb.RefreshRange: noCapCheckNeeded, + kvpb.ResolveIntent: noCapCheckNeeded, + kvpb.ResolveIntentRange: noCapCheckNeeded, + kvpb.ReverseScan: noCapCheckNeeded, + kvpb.RevertRange: noCapCheckNeeded, + kvpb.Scan: noCapCheckNeeded, // The following are authorized via specific capabilities. kvpb.AdminChangeReplicas: tenantcapabilities.CanAdminRelocateRange,