Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
120570: kvserver: create LinkExternalSSTRequest kv rpc r=msbutler a=msbutler

Previously, the AddSStable rpc would process linking external files into
pebble, even though the linking logic is largely seperate from a regular
AddSSTable request. This patch splits external file linking into a new rpc:
LinkExternalSST.

After this patch, the only significant code overlap between the AddSStable and
LinkExternalSST rpcs is in raft sideloading.

Fixes cockroachdb#120526

Release note: none

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Mar 23, 2024
2 parents 4a9385c + 9320691 commit c950a9e
Show file tree
Hide file tree
Showing 20 changed files with 392 additions and 240 deletions.
2 changes: 2 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@
<tr><td>STORAGE</td><td>rpc.method.initput.recv</td><td>Number of InitPut requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>rpc.method.isspanempty.recv</td><td>Number of IsSpanEmpty requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>rpc.method.leaseinfo.recv</td><td>Number of LeaseInfo requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>rpc.method.linkexternalsstable.recv</td><td>Number of LinkExternalSSTable requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>rpc.method.merge.recv</td><td>Number of Merge requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>rpc.method.migrate.recv</td><td>Number of Migrate requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>rpc.method.probe.recv</td><td>Number of Probe requests processed</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down Expand Up @@ -931,6 +932,7 @@
<tr><td>APPLICATION</td><td>distsender.rpc.initput.sent</td><td>Number of InitPut requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.isspanempty.sent</td><td>Number of IsSpanEmpty requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.leaseinfo.sent</td><td>Number of LeaseInfo requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.linkexternalsstable.sent</td><td>Number of LinkExternalSSTable requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.merge.sent</td><td>Number of Merge requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.migrate.sent</td><td>Number of Migrate requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>distsender.rpc.probe.sent</td><td>Number of Probe requests processed.<br/><br/>This counts the requests in batches handed to DistSender, not the RPCs<br/>sent to individual Ranges as a result.</td><td>RPCs</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
22 changes: 12 additions & 10 deletions pkg/ccl/backupccl/restore_online.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,6 @@ func sendRemoteAddSSTable(
return err
}

loc := kvpb.AddSSTableRequest_RemoteFile{
Locator: file.Dir.URI,
Path: file.Path,
ApproximatePhysicalSize: fileSize,
BackingFileSize: file.BackingFileSize,
SyntheticPrefix: syntheticPrefix,
}
// TODO(dt): see if KV has any better ideas for making these up.
fileStats := &enginepb.MVCCStats{
ContainsEstimates: 1,
Expand All @@ -314,9 +307,18 @@ func sendRemoteAddSSTable(
batchTimestamp = execCtx.ExecCfg().DB.Clock().Now()
}

_, _, err = execCtx.ExecCfg().DB.AddRemoteSSTable(
ctx, file.BackupFileEntrySpan, loc, fileStats, batchTimestamp)
return err
loc := kvpb.LinkExternalSSTableRequest_ExternalFile{
Locator: file.Dir.URI,
Path: file.Path,
ApproximatePhysicalSize: fileSize,
BackingFileSize: file.BackingFileSize,
SyntheticPrefix: syntheticPrefix,
UseSyntheticSuffix: batchTimestamp.IsSet(),
MVCCStats: fileStats,
}

return execCtx.ExecCfg().DB.LinkExternalSSTable(
ctx, file.BackupFileEntrySpan, loc, batchTimestamp)
}

// checkManifestsForOnlineCompat returns an error if the set of
Expand Down
17 changes: 15 additions & 2 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func (b *Batch) fillResults(ctx context.Context) {
case *kvpb.MigrateRequest:
case *kvpb.QueryResolvedTimestampRequest:
case *kvpb.BarrierRequest:
case *kvpb.LinkExternalSSTableRequest:
default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
Expand Down Expand Up @@ -1035,7 +1036,6 @@ func (b *Batch) adminRelocateRange(
func (b *Batch) addSSTable(
s, e interface{},
data []byte,
remoteFile kvpb.AddSSTableRequest_RemoteFile,
disallowConflicts bool,
disallowShadowing bool,
disallowShadowingBelow hlc.Timestamp,
Expand All @@ -1059,7 +1059,6 @@ func (b *Batch) addSSTable(
EndKey: end,
},
Data: data,
RemoteFile: remoteFile,
DisallowConflicts: disallowConflicts,
DisallowShadowing: disallowShadowing,
DisallowShadowingBelow: disallowShadowingBelow,
Expand All @@ -1071,6 +1070,20 @@ func (b *Batch) addSSTable(
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) linkExternalSSTable(
span roachpb.Span, externalFile kvpb.LinkExternalSSTableRequest_ExternalFile,
) {
req := &kvpb.LinkExternalSSTableRequest{
RequestHeader: kvpb.RequestHeader{
Key: span.Key,
EndKey: span.EndKey,
},
ExternalFile: externalFile,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// migrate is only exported on DB.
func (b *Batch) migrate(s, e interface{}, version roachpb.Version) {
begin, err := marshalKey(s)
Expand Down
23 changes: 10 additions & 13 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,8 +783,6 @@ func (db *DB) AdminRelocateRange(
return getOneErr(db.Run(ctx, b), b)
}

var noRemoteFile kvpb.AddSSTableRequest_RemoteFile

// AddSSTable links a file into the Pebble log-structured merge-tree.
//
// The disallowConflicts, disallowShadowingBelow parameters
Expand All @@ -801,7 +799,7 @@ func (db *DB) AddSSTable(
batchTs hlc.Timestamp,
) (roachpb.Span, int64, error) {
b := &Batch{Header: kvpb.Header{Timestamp: batchTs}}
b.addSSTable(begin, end, data, noRemoteFile, disallowConflicts, disallowShadowing, disallowShadowingBelow,
b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow,
stats, ingestAsWrites, hlc.Timestamp{} /* sstTimestampToRequestTimestamp */)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
Expand All @@ -814,24 +812,23 @@ func (db *DB) AddSSTable(
return resp.RangeSpan, resp.AvailableBytes, nil
}

func (db *DB) AddRemoteSSTable(
// LinkExternalSSTable links an external sst into the Pebble log-structured merge-tree.
func (db *DB) LinkExternalSSTable(
ctx context.Context,
span roachpb.Span,
file kvpb.AddSSTableRequest_RemoteFile,
stats *enginepb.MVCCStats,
file kvpb.LinkExternalSSTableRequest_ExternalFile,
batchTimestamp hlc.Timestamp,
) (roachpb.Span, int64, error) {
) error {
b := &Batch{Header: kvpb.Header{Timestamp: batchTimestamp}}
b.addSSTable(span.Key, span.EndKey, nil, file, false, false, hlc.Timestamp{}, stats, false, batchTimestamp)
b.linkExternalSSTable(span, file)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
return roachpb.Span{}, 0, err
return err
}
if l := len(b.response.Responses); l != 1 {
return roachpb.Span{}, 0, errors.AssertionFailedf("expected single response, got %d", l)
return errors.AssertionFailedf("expected single response, got %d", l)
}
resp := b.response.Responses[0].GetAddSstable()
return resp.RangeSpan, resp.AvailableBytes, nil
return nil
}

// AddSSTableAtBatchTimestamp links a file into the Pebble log-structured
Expand All @@ -852,7 +849,7 @@ func (db *DB) AddSSTableAtBatchTimestamp(
batchTs hlc.Timestamp,
) (hlc.Timestamp, roachpb.Span, int64, error) {
b := &Batch{Header: kvpb.Header{Timestamp: batchTs}}
b.addSSTable(begin, end, data, noRemoteFile,
b.addSSTable(begin, end, data,
disallowConflicts, disallowShadowing, disallowShadowingBelow,
stats, ingestAsWrites, batchTs)
err := getOneErr(db.Run(ctx, b), b)
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,9 @@ func (*AdminScatterRequest) Method() Method { return AdminScatter }
// Method implements the Request interface.
func (*AddSSTableRequest) Method() Method { return AddSSTable }

// Method implements the Request interface.
func (*LinkExternalSSTableRequest) Method() Method { return LinkExternalSSTable }

// Method implements the Request interface.
func (*MigrateRequest) Method() Method { return Migrate }

Expand Down Expand Up @@ -1215,6 +1218,12 @@ func (r *AddSSTableRequest) ShallowCopy() Request {
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (r *LinkExternalSSTableRequest) ShallowCopy() Request {
shallowCopy := *r
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (r *MigrateRequest) ShallowCopy() Request {
shallowCopy := *r
Expand Down Expand Up @@ -1757,6 +1766,13 @@ func (r *AddSSTableRequest) flags() flag {
}
return flags
}
func (r *LinkExternalSSTableRequest) flags() flag {
flags := isWrite | isRange | isAlone | isUnsplittable | canBackpressure | bypassesReplicaCircuitBreaker
if r.ExternalFile.UseSyntheticSuffix {
flags |= appliesTSCache
}
return flags
}
func (*MigrateRequest) flags() flag { return isWrite | isRange | isAlone }

// RefreshRequest and RefreshRangeRequest both determine which timestamp cache
Expand Down
71 changes: 34 additions & 37 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2127,50 +2127,15 @@ message AddSSTableRequest {
// is likely non-empty. See AddSSTableResponse.FollowingLikelyNonEmptySpanStart.
bool return_following_likely_non_empty_span_start = 9;

// RemoteFile indicates that the span indicated by the request start and end
// key of the file specified should be added. Data cannot be set if RemoteFile
// is non-empty, and many other request parameters such as collision/shadow
// checking, write-at-request-timestamp, or ingest-as-writes are unsupported.
//
// TODO(dt, msbutler, bilal): This is unsupported.
// TODO(msbutler): rename to ExternalFile.
message RemoteFile {
string locator = 1;
string path = 2;
uint64 backing_file_size = 3;
uint64 approximate_physical_size = 4;
bytes synthetic_prefix = 5;

}
RemoteFile remote_file = 10 [(gogoproto.nullable) = false];

// AddSStableRequest_PrefixReplacement is used to represent a prefix to be
// replaced and the prefix with which to replace it.
//
// TODO(msbutler): delete prefix replacement plumbing.
message PrefixReplacement {
bytes from = 1;
bytes to = 2;
}

// PrefixReplacement is used to that keys in the sst with the prefix specified
// in "from" should instead appear to have that prefix replaced by "to", when
// during iteration, seeking, or other reads. For example, an SST containing
// keys a1, a2, and a3 when added with PrefixReplacement={From: a, To:x} would
// produce the same results for subsequent operations as adding an SST with
// the keys x1, x2, x3 instead. The implementaiton may however elect to defer
// these replacements until the file is read.
//
// TODO(dt,msbutler,bilal): This is unsupported.
PrefixReplacement prefix_replacement = 11 [(gogoproto.nullable) = false];

// IgnoreKeysAboveTimestamp is used when ingesting an SSTable that contains
// keys, including mvcc revisions of keys, captured over a time range, to
// indicate that readers of that sstable should read it "as of" a the fixed
// time, ignoring any keys with later timestamps.
//
// TODO(dt,msbutler,bilal): This is unsupported.
util.hlc.Timestamp ignore_keys_above_timestamp = 12 [(gogoproto.nullable) = false];

reserved 10, 11;
}

// AddSSTableResponse is the response to a AddSSTable() operation.
Expand Down Expand Up @@ -2200,6 +2165,36 @@ message AddSSTableResponse {
bytes following_likely_non_empty_span_start = 4 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.Key"];
}

// LinkExternalSSTableRequest contains arguments to the LinkExternalSSTable
// method, which links an External SST file into the Pebble log-structured
// merge-tree. The External SST must point to a backup file which contains
// versioned values with non-zero MVCC timestamps (no intents or inline values).
// It cannot be used in a transaction, cannot be split across ranges, and must
// be alone in a batch.
message LinkExternalSSTableRequest {
RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];

message ExternalFile {
string locator = 1;
string path = 2;
uint64 backing_file_size = 3;
uint64 approximate_physical_size = 4;
bytes synthetic_prefix = 5;
bool use_synthetic_suffix = 6;

// MVCCStats are estimated MVCCStats for the contents of this Extrenal SSTable
// and is used as-is during evaluation of the command to update the range
// MVCCStats, instead of computing the stats for the SSTable by iterating it.
storage.enginepb.MVCCStats mvcc_stats = 7 [(gogoproto.customname) = "MVCCStats"];
}
ExternalFile external_file = 4 [(gogoproto.nullable) = false];
}

// LinkExternalSSTableResponse is the response to a LinkExternalSSTable() operation.
message LinkExternalSSTableResponse {
ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
}

// RefreshRequest is arguments to the Refresh() method, which verifies that no
// write has occurred since the refresh_from timestamp to the specified key.
// The timestamp cache is updated. A transaction must be supplied with this
Expand Down Expand Up @@ -2544,6 +2539,7 @@ message RequestUnion {
BarrierRequest barrier = 53;
ProbeRequest probe = 54;
IsSpanEmptyRequest is_span_empty = 56;
LinkExternalSSTableRequest link_external_sstable = 57;
}
reserved 8, 15, 23, 25, 27, 31, 34, 52;
}
Expand Down Expand Up @@ -2599,6 +2595,7 @@ message ResponseUnion {
BarrierResponse barrier = 53;
ProbeResponse probe = 54;
IsSpanEmptyResponse is_span_empty = 56;
LinkExternalSSTableResponse link_external_sstable = 57;
}
reserved 8, 15, 23, 25, 27, 28, 31, 34, 52;
}
Expand Down
Loading

0 comments on commit c950a9e

Please sign in to comment.