Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add MVCC-compliant AddSSTable variant #72085

Merged
merged 4 commits into from
Nov 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-14 set the active cluster version in the format '<major>.<minor>'
version version 21.2-16 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-14</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-16</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
6 changes: 4 additions & 2 deletions pkg/ccl/importccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ func benchmarkAddSSTable(b *testing.B, dir string, tables []tableSSTable) {
b.StartTimer()
for _, t := range tables {
totalBytes += int64(len(t.sstData))
require.NoError(b, kvDB.AddSSTable(
ctx, t.span.Key, t.span.EndKey, t.sstData, true /* disallowShadowing */, nil /* stats */, false /*ingestAsWrites */, hlc.Timestamp{},
require.NoError(b, kvDB.AddSSTable(ctx, t.span.Key, t.span.EndKey, t.sstData,
false /* disallowConflicts */, true, /* disallowShadowing */
hlc.Timestamp{} /* disallowShadowingBelow */, nil, /* stats */
false /*ingestAsWrites */, hlc.Timestamp{}, false, /* writeAtBatchTS */
))
}
b.StopTimer()
Expand Down
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ const (
// system.statement_diagnostics_requests table to support collecting stmt
// bundles when the query latency exceeds the user provided threshold.
AlterSystemStmtDiagReqs
// MVCCAddSSTable supports MVCC-compliant AddSSTable requests via the new
// WriteAtRequestTimestamp and DisallowConflicts parameters.
MVCCAddSSTable

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -497,6 +500,10 @@ var versionsSingleton = keyedVersions{
Key: AlterSystemStmtDiagReqs,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 14},
},
{
Key: MVCCAddSSTable,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 16},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,9 +764,12 @@ func (b *Batch) adminRelocateRange(
func (b *Batch) addSSTable(
s, e interface{},
data []byte,
disallowConflicts bool,
disallowShadowing bool,
disallowShadowingBelow hlc.Timestamp,
stats *enginepb.MVCCStats,
ingestAsWrites bool,
writeAtRequestTimestamp bool,
) {
begin, err := marshalKey(s)
if err != nil {
Expand All @@ -783,10 +786,13 @@ func (b *Batch) addSSTable(
Key: begin,
EndKey: end,
},
Data: data,
DisallowShadowing: disallowShadowing,
MVCCStats: stats,
IngestAsWrites: ingestAsWrites,
Data: data,
DisallowConflicts: disallowConflicts,
DisallowShadowing: disallowShadowing,
DisallowShadowingBelow: disallowShadowingBelow,
MVCCStats: stats,
IngestAsWrites: ingestAsWrites,
WriteAtRequestTimestamp: writeAtRequestTimestamp,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,13 @@ type SSTSender interface {
ctx context.Context,
begin, end interface{},
data []byte,
disallowConflicts bool,
disallowShadowing bool,
disallowShadowingBelow hlc.Timestamp,
stats *enginepb.MVCCStats,
ingestAsWrites bool,
batchTs hlc.Timestamp,
writeAtBatchTs bool,
) error
SplitAndScatter(ctx context.Context, key roachpb.Key, expirationTime hlc.Timestamp) error
}
Expand Down Expand Up @@ -457,7 +460,9 @@ func AddSSTable(
ingestAsWriteBatch = true
}
// This will fail if the range has split but we'll check for that below.
err = db.AddSSTable(ctx, item.start, item.end, item.sstBytes, item.disallowShadowing, &item.stats, ingestAsWriteBatch, batchTs)
err = db.AddSSTable(ctx, item.start, item.end, item.sstBytes, false, /* disallowConflicts */
item.disallowShadowing, hlc.Timestamp{} /* disallowShadowingBelow */, &item.stats,
ingestAsWriteBatch, batchTs, false /* writeAtBatchTs */)
if err == nil {
log.VEventf(ctx, 3, "adding %s AddSSTable [%s,%s) took %v", sz(len(item.sstBytes)), item.start, item.end, timeutil.Since(before))
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,13 @@ func (m mockSender) AddSSTable(
ctx context.Context,
begin, end interface{},
data []byte,
disallowConflicts bool,
disallowShadowing bool,
disallowShadowingBelow hlc.Timestamp,
_ *enginepb.MVCCStats,
ingestAsWrites bool,
batchTS hlc.Timestamp,
writeAtBatchTS bool,
) error {
return m(roachpb.Span{Key: begin.(roachpb.Key), EndKey: end.(roachpb.Key)})
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,17 +664,24 @@ func (db *DB) AdminRelocateRange(

// AddSSTable links a file into the RocksDB log-structured merge-tree. Existing
// data in the range is cleared.
//
// The disallowConflicts, disallowShadowingBelow, and writeAtBatchTs parameters
// require the MVCCAddSSTable version gate, as they are new in 22.1.
func (db *DB) AddSSTable(
ctx context.Context,
begin, end interface{},
data []byte,
disallowConflicts bool,
disallowShadowing bool,
disallowShadowingBelow hlc.Timestamp,
stats *enginepb.MVCCStats,
ingestAsWrites bool,
batchTs hlc.Timestamp,
writeAtBatchTs bool,
) error {
b := &Batch{Header: roachpb.Header{Timestamp: batchTs}}
b.addSSTable(begin, end, data, disallowShadowing, stats, ingestAsWrites)
b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, disallowShadowingBelow,
stats, ingestAsWrites, writeAtBatchTs)
return getOneErr(db.Run(ctx, b), b)
}

Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ go_test(
"//pkg/util/uint128",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_kr_pretty//:pretty",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
Expand Down
Loading