Skip to content

Commit

Permalink
kvserver: create LinkExternalSSTRequest rpc
Browse files Browse the repository at this point in the history
Previously, the AddSStable rpc would process linking external files into
pebble, even though the linking logic is largely seperate from a regular
AddSSTable request. This patch splits external file linking into a new rpc:
LinkExternalSST.

NB: after this patch, External SSTs will no longer sideload into each pebble
instance.

Fixes #120526

Release note: none
  • Loading branch information
msbutler committed Mar 22, 2024
1 parent 620aa02 commit 4d293fd
Show file tree
Hide file tree
Showing 20 changed files with 412 additions and 241 deletions.
2 changes: 2 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@
<tr><td>STORAGE</td><td>rpc.method.initput.recv</td><td>Number of InitPut requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>rpc.method.isspanempty.recv</td><td>Number of IsSpanEmpty requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>rpc.method.leaseinfo.recv</td><td>Number of LeaseInfo requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>rpc.method.linkexternalsstable.recv</td><td>Number of LinkExternalSSTable requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>rpc.method.merge.recv</td><td>Number of Merge requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>rpc.method.migrate.recv</td><td>Number of Migrate requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>rpc.method.probe.recv</td><td>Number of Probe requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down Expand Up @@ -921,6 +922,7 @@
<tr><td>APPLICATION</td><td>distsender.rpc.initput.sent</td><td>Number of InitPut requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.isspanempty.sent</td><td>Number of IsSpanEmpty requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.leaseinfo.sent</td><td>Number of LeaseInfo requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.linkexternalsstable.sent</td><td>Number of LinkExternalSSTable requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.merge.sent</td><td>Number of Merge requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.migrate.sent</td><td>Number of Migrate requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.probe.sent</td><td>Number of Probe requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
21 changes: 11 additions & 10 deletions pkg/ccl/backupccl/restore_online.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,6 @@ func sendRemoteAddSSTable(
return err
}

loc := kvpb.AddSSTableRequest_RemoteFile{
Locator: file.Dir.URI,
Path: file.Path,
ApproximatePhysicalSize: fileSize,
BackingFileSize: file.BackingFileSize,
SyntheticPrefix: syntheticPrefix,
}
// TODO(dt): see if KV has any better ideas for making these up.
fileStats := &enginepb.MVCCStats{
ContainsEstimates: 1,
Expand All @@ -309,14 +302,22 @@ func sendRemoteAddSSTable(
LiveCount: counts.Rows + counts.IndexEntries,
}

loc := kvpb.LinkExternalSSTableRequest_ExternalFile{
Locator: file.Dir.URI,
Path: file.Path,
ApproximatePhysicalSize: fileSize,
BackingFileSize: file.BackingFileSize,
SyntheticPrefix: syntheticPrefix,
MVCCStats: fileStats,
}

var batchTimestamp hlc.Timestamp
if writeAtBatchTS(ctx, file.BackupFileEntrySpan, fromSystemTenant) {
batchTimestamp = execCtx.ExecCfg().DB.Clock().Now()
}

_, _, err = execCtx.ExecCfg().DB.AddRemoteSSTable(
ctx, file.BackupFileEntrySpan, loc, fileStats, batchTimestamp)
return err
return execCtx.ExecCfg().DB.LinkExternalSSTable(
ctx, file.BackupFileEntrySpan, loc, batchTimestamp)
}

// checkManifestsForOnlineCompat returns an error if the set of
Expand Down
20 changes: 18 additions & 2 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func (b *Batch) fillResults(ctx context.Context) {
case *kvpb.MigrateRequest:
case *kvpb.QueryResolvedTimestampRequest:
case *kvpb.BarrierRequest:
case *kvpb.LinkExternalSSTableRequest:
default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
Expand Down Expand Up @@ -1035,7 +1036,6 @@ func (b *Batch) adminRelocateRange(
func (b *Batch) addSSTable(
s, e interface{},
data []byte,
remoteFile kvpb.AddSSTableRequest_RemoteFile,
disallowConflicts bool,
disallowShadowing bool,
disallowShadowingBelow hlc.Timestamp,
Expand All @@ -1059,7 +1059,6 @@ func (b *Batch) addSSTable(
EndKey: end,
},
Data: data,
RemoteFile: remoteFile,
DisallowConflicts: disallowConflicts,
DisallowShadowing: disallowShadowing,
DisallowShadowingBelow: disallowShadowingBelow,
Expand All @@ -1071,6 +1070,23 @@ func (b *Batch) addSSTable(
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) linkExternalSSTable(
span roachpb.Span,
externalFile kvpb.LinkExternalSSTableRequest_ExternalFile,
sstTimestampToRequestTimestamp hlc.Timestamp,
) {
req := &kvpb.LinkExternalSSTableRequest{
RequestHeader: kvpb.RequestHeader{
Key: span.Key,
EndKey: span.EndKey,
},
ExternalFile: externalFile,
SSTTimestampToRequestTimestamp: sstTimestampToRequestTimestamp,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// migrate is only exported on DB.
func (b *Batch) migrate(s, e interface{}, version roachpb.Version) {
begin, err := marshalKey(s)
Expand Down
23 changes: 10 additions & 13 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,8 +783,6 @@ func (db *DB) AdminRelocateRange(
return getOneErr(db.Run(ctx, b), b)
}

var noRemoteFile kvpb.AddSSTableRequest_RemoteFile

// AddSSTable links a file into the Pebble log-structured merge-tree.
//
// The disallowConflicts, disallowShadowingBelow parameters
Expand All @@ -801,7 +799,7 @@ func (db *DB) AddSSTable(
batchTs hlc.Timestamp,
) (roachpb.Span, int64, error) {
b := &Batch{Header: kvpb.Header{Timestamp: batchTs}}
b.addSSTable(begin, end, data, noRemoteFile, disallowConflicts, disallowShadowing, disallowShadowingBelow,
b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow,
stats, ingestAsWrites, hlc.Timestamp{} /* sstTimestampToRequestTimestamp */)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
Expand All @@ -814,24 +812,23 @@ func (db *DB) AddSSTable(
return resp.RangeSpan, resp.AvailableBytes, nil
}

func (db *DB) AddRemoteSSTable(
// LinkExternalSSTable links an external sst into the Pebble log-structured merge-tree.
func (db *DB) LinkExternalSSTable(
ctx context.Context,
span roachpb.Span,
file kvpb.AddSSTableRequest_RemoteFile,
stats *enginepb.MVCCStats,
file kvpb.LinkExternalSSTableRequest_ExternalFile,
batchTimestamp hlc.Timestamp,
) (roachpb.Span, int64, error) {
) error {
b := &Batch{Header: kvpb.Header{Timestamp: batchTimestamp}}
b.addSSTable(span.Key, span.EndKey, nil, file, false, false, hlc.Timestamp{}, stats, false, batchTimestamp)
b.linkExternalSSTable(span, file, batchTimestamp)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
return roachpb.Span{}, 0, err
return err
}
if l := len(b.response.Responses); l != 1 {
return roachpb.Span{}, 0, errors.AssertionFailedf("expected single response, got %d", l)
return errors.AssertionFailedf("expected single response, got %d", l)
}
resp := b.response.Responses[0].GetAddSstable()
return resp.RangeSpan, resp.AvailableBytes, nil
return nil
}

// AddSSTableAtBatchTimestamp links a file into the Pebble log-structured
Expand All @@ -852,7 +849,7 @@ func (db *DB) AddSSTableAtBatchTimestamp(
batchTs hlc.Timestamp,
) (hlc.Timestamp, roachpb.Span, int64, error) {
b := &Batch{Header: kvpb.Header{Timestamp: batchTs}}
b.addSSTable(begin, end, data, noRemoteFile,
b.addSSTable(begin, end, data,
disallowConflicts, disallowShadowing, disallowShadowingBelow,
stats, ingestAsWrites, batchTs)
err := getOneErr(db.Run(ctx, b), b)
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,9 @@ func (*AdminScatterRequest) Method() Method { return AdminScatter }
// Method implements the Request interface.
func (*AddSSTableRequest) Method() Method { return AddSSTable }

// Method implements the Request interface.
func (*LinkExternalSSTableRequest) Method() Method { return LinkExternalSSTable }

// Method implements the Request interface.
func (*MigrateRequest) Method() Method { return Migrate }

Expand Down Expand Up @@ -1215,6 +1218,12 @@ func (r *AddSSTableRequest) ShallowCopy() Request {
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (r *LinkExternalSSTableRequest) ShallowCopy() Request {
shallowCopy := *r
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (r *MigrateRequest) ShallowCopy() Request {
shallowCopy := *r
Expand Down Expand Up @@ -1757,6 +1766,13 @@ func (r *AddSSTableRequest) flags() flag {
}
return flags
}
func (r *LinkExternalSSTableRequest) flags() flag {
flags := isWrite | isRange | isAlone | isUnsplittable | canBackpressure | bypassesReplicaCircuitBreaker
if r.SSTTimestampToRequestTimestamp.IsSet() {
flags |= appliesTSCache
}
return flags
}
func (*MigrateRequest) flags() flag { return isWrite | isRange | isAlone }

// RefreshRequest and RefreshRangeRequest both determine which timestamp cache
Expand Down
88 changes: 51 additions & 37 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2127,50 +2127,15 @@ message AddSSTableRequest {
// is likely non-empty. See AddSSTableResponse.FollowingLikelyNonEmptySpanStart.
bool return_following_likely_non_empty_span_start = 9;

// RemoteFile indicates that the span indicated by the request start and end
// key of the file specified should be added. Data cannot be set if RemoteFile
// is non-empty, and many other request parameters such as collision/shadow
// checking, write-at-request-timestamp, or ingest-as-writes are unsupported.
//
// TODO(dt, msbutler, bilal): This is unsupported.
// TODO(msbutler): rename to ExternalFile.
message RemoteFile {
string locator = 1;
string path = 2;
uint64 backing_file_size = 3;
uint64 approximate_physical_size = 4;
bytes synthetic_prefix = 5;

}
RemoteFile remote_file = 10 [(gogoproto.nullable) = false];

// AddSStableRequest_PrefixReplacement is used to represent a prefix to be
// replaced and the prefix with which to replace it.
//
// TODO(msbutler): delete prefix replacement plumbing.
message PrefixReplacement {
bytes from = 1;
bytes to = 2;
}

// PrefixReplacement is used to that keys in the sst with the prefix specified
// in "from" should instead appear to have that prefix replaced by "to", when
// during iteration, seeking, or other reads. For example, an SST containing
// keys a1, a2, and a3 when added with PrefixReplacement={From: a, To:x} would
// produce the same results for subsequent operations as adding an SST with
// the keys x1, x2, x3 instead. The implementaiton may however elect to defer
// these replacements until the file is read.
//
// TODO(dt,msbutler,bilal): This is unsupported.
PrefixReplacement prefix_replacement = 11 [(gogoproto.nullable) = false];

// IgnoreKeysAboveTimestamp is used when ingesting an SSTable that contains
// keys, including mvcc revisions of keys, captured over a time range, to
// indicate that readers of that sstable should read it "as of" a the fixed
// time, ignoring any keys with later timestamps.
//
// TODO(dt,msbutler,bilal): This is unsupported.
util.hlc.Timestamp ignore_keys_above_timestamp = 12 [(gogoproto.nullable) = false];

reserved 10, 11;
}

// AddSSTableResponse is the response to a AddSSTable() operation.
Expand Down Expand Up @@ -2200,6 +2165,53 @@ message AddSSTableResponse {
bytes following_likely_non_empty_span_start = 4 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"];
}

// LinkExternalSSTableRequest contains arguments to the LinkExternalSSTable
// method, which links an External SST file into the Pebble log-structured
// merge-tree. The External SST must point to a backup file which contains
// versioned values with non-zero MVCC timestamps (no intents or inline values).
// It cannot be used in a transaction, cannot be split across ranges, and must
// be alone in a batch.
message LinkExternalSSTableRequest {
RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];

// SSTTimestampToRequestTimestamp gives the timestamp used for all MVCC keys
// in the provided SST. If this timestamp differs from the request timestamp
// (e.g. if the request gets pushed) then all MVCC keys in the SST will be
// rewritten to the request timestamp during request evaluation. This ensures
// the writes comply with the timestamp cache and closed timestamp. It also
// causes the External SSTable to be emitted via the range feed, since it
// respects the closed timestamp.
//
// Callers should always set this, except in very special circumstances when
// the timestamp cache and closed timestamp can safely be ignored (e.g.
// restore of a tenant).
//
// Note that this alone is not sufficient to guarantee MVCC correctness, since
// it can write below or replace existing committed versions (the tscache is
// only bumped when the values are subsequently read).
util.hlc.Timestamp sst_timestamp_to_request_timestamp = 2 [
(gogoproto.customname) = "SSTTimestampToRequestTimestamp", (gogoproto.nullable) = false];

message ExternalFile {
string locator = 1;
string path = 2;
uint64 backing_file_size = 3;
uint64 approximate_physical_size = 4;
bytes synthetic_prefix = 5;

// MVCCStats are estimated MVCCStats for the contents of this Extrenal SSTable
// and is used as-is during evaluation of the command to update the range
// MVCCStats, instead of computing the stats for the SSTable by iterating it.
storage.enginepb.MVCCStats mvcc_stats = 6 [(gogoproto.customname) = "MVCCStats"];
}
ExternalFile external_file = 4 [(gogoproto.nullable) = false];
}

// LinkExternalSSTableResponse is the response to a LinkExternalSSTable() operation.
message LinkExternalSSTableResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
}

// RefreshRequest is arguments to the Refresh() method, which verifies that no
// write has occurred since the refresh_from timestamp to the specified key.
// The timestamp cache is updated. A transaction must be supplied with this
Expand Down Expand Up @@ -2544,6 +2556,7 @@ message RequestUnion {
BarrierRequest barrier = 53;
ProbeRequest probe = 54;
IsSpanEmptyRequest is_span_empty = 56;
LinkExternalSSTableRequest link_external_sstable = 57;
}
reserved 8, 15, 23, 25, 27, 31, 34, 52;
}
Expand Down Expand Up @@ -2599,6 +2612,7 @@ message ResponseUnion {
BarrierResponse barrier = 53;
ProbeResponse probe = 54;
IsSpanEmptyResponse is_span_empty = 56;
LinkExternalSSTableResponse link_external_sstable = 57;
}
reserved 8, 15, 23, 25, 27, 28, 31, 34, 52;
}
Expand Down
Loading

0 comments on commit 4d293fd

Please sign in to comment.