Skip to content

Commit

Permalink
kvserver: add MVCC-compliant AddSSTable variant
Browse files Browse the repository at this point in the history
`AddSSTable` does not comply with MVCC, the timestamp cache, nor the
closed timestamp, since the SST MVCC timestamps are written exactly as
given and thus can rewrite history.

This patch adds two new parameters that can make `AddSSTable` fully
MVCC-compliant, with a corresponding `MVCCAddSSTable` version gate:

* `WriteAtRequestTimestamp`: rewrites the MVCC timestamps to the request
  timestamp, complying with the timestamp cache and closed timestamp.

* `DisallowConflicts`: checks for any conflicting keys and intents at or
  above the SST's MVCC timestamps, complying with MVCC.

The existing `DisallowShadowing` parameter is an extension of
`DisallowConflicts` which also errors on any existing visible keys below
the SST key's timestamp (but not tombstones). For backwards
compatibility, this also allows idempotent writes, but it no longer
allows replacing a tombstone with a value at the exact same timestamp.

Additionally, even blind `AddSSTable` requests that do not check for
conflicts now take out lock spans and scan for existing intents,
returning a `WriteIntentError` to resolve them. This should be cheap
in the common case, since the caller is expected to ensure there are no
concurrent writes over the span, and so there should be no or few
intents.

The `WriteAtRequestTimestamp` SST rewrite implementation here is correct
but slow. Optimizations will be explored later.

Release note: None
  • Loading branch information
erikgrinaker committed Nov 8, 2021
1 parent 14fcd04 commit e75afdc
Show file tree
Hide file tree
Showing 26 changed files with 2,118 additions and 1,866 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-8 set the active cluster version in the format '<major>.<minor>'
version version 21.2-10 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-8</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-10</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
5 changes: 3 additions & 2 deletions pkg/ccl/importccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ func benchmarkAddSSTable(b *testing.B, dir string, tables []tableSSTable) {
b.StartTimer()
for _, t := range tables {
totalBytes += int64(len(t.sstData))
require.NoError(b, kvDB.AddSSTable(
ctx, t.span.Key, t.span.EndKey, t.sstData, true /* disallowShadowing */, nil /* stats */, false /*ingestAsWrites */, hlc.Timestamp{},
require.NoError(b, kvDB.AddSSTable(ctx, t.span.Key, t.span.EndKey, t.sstData,
false /* disallowConflicts */, true /* disallowShadowing */, nil, /* stats */
false /*ingestAsWrites */, hlc.Timestamp{}, false, /* writeAtBatchTS */
))
}
b.StopTimer()
Expand Down
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ const (
// descriptors have draining names.
DrainingNamesMigration

// MVCCAddSSTable supports MVCC-compliant AddSSTable requests via the new
// WriteAtRequestTimestamp and DisallowConflicts parameters.
MVCCAddSSTable

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -473,6 +477,10 @@ var versionsSingleton = keyedVersions{
Key: DrainingNamesMigration,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 8},
},
{
Key: MVCCAddSSTable,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 10},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,9 +764,11 @@ func (b *Batch) adminRelocateRange(
func (b *Batch) addSSTable(
s, e interface{},
data []byte,
disallowConflicts bool,
disallowShadowing bool,
stats *enginepb.MVCCStats,
ingestAsWrites bool,
writeAtRequestTimestamp bool,
) {
begin, err := marshalKey(s)
if err != nil {
Expand All @@ -783,10 +785,12 @@ func (b *Batch) addSSTable(
Key: begin,
EndKey: end,
},
Data: data,
DisallowShadowing: disallowShadowing,
MVCCStats: stats,
IngestAsWrites: ingestAsWrites,
Data: data,
DisallowConflicts: disallowConflicts,
DisallowShadowing: disallowShadowing,
MVCCStats: stats,
IngestAsWrites: ingestAsWrites,
WriteAtRequestTimestamp: writeAtRequestTimestamp,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,12 @@ type SSTSender interface {
ctx context.Context,
begin, end interface{},
data []byte,
disallowConflicts bool,
disallowShadowing bool,
stats *enginepb.MVCCStats,
ingestAsWrites bool,
batchTs hlc.Timestamp,
writeAtBatchTs bool,
) error
SplitAndScatter(ctx context.Context, key roachpb.Key, expirationTime hlc.Timestamp) error
}
Expand Down Expand Up @@ -451,7 +453,8 @@ func AddSSTable(
ingestAsWriteBatch = true
}
// This will fail if the range has split but we'll check for that below.
err = db.AddSSTable(ctx, item.start, item.end, item.sstBytes, item.disallowShadowing, &item.stats, ingestAsWriteBatch, batchTs)
err = db.AddSSTable(ctx, item.start, item.end, item.sstBytes, false, /* disallowConflicts */
item.disallowShadowing, &item.stats, ingestAsWriteBatch, batchTs, false /* writeAtBatchTs */)
if err == nil {
log.VEventf(ctx, 3, "adding %s AddSSTable [%s,%s) took %v", sz(len(item.sstBytes)), item.start, item.end, timeutil.Since(before))
return nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,12 @@ func (m mockSender) AddSSTable(
ctx context.Context,
begin, end interface{},
data []byte,
disallowConflicts bool,
disallowShadowing bool,
_ *enginepb.MVCCStats,
ingestAsWrites bool,
batchTS hlc.Timestamp,
writeAtBatchTS bool,
) error {
return m(roachpb.Span{Key: begin.(roachpb.Key), EndKey: end.(roachpb.Key)})
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,17 +664,23 @@ func (db *DB) AdminRelocateRange(

// AddSSTable links a file into the RocksDB log-structured merge-tree. Existing
// data in the range is cleared.
//
// The disallowConflicts and writeAtBatchTs parameters require the
// MVCCAddSSTable version gate, as they are new in 22.1.
func (db *DB) AddSSTable(
ctx context.Context,
begin, end interface{},
data []byte,
disallowConflicts bool,
disallowShadowing bool,
stats *enginepb.MVCCStats,
ingestAsWrites bool,
batchTs hlc.Timestamp,
writeAtBatchTs bool,
) error {
b := &Batch{Header: roachpb.Header{Timestamp: batchTs}}
b.addSSTable(begin, end, data, disallowShadowing, stats, ingestAsWrites)
b.addSSTable(begin, end, data, disallowConflicts, disallowShadowing, stats,
ingestAsWrites, writeAtBatchTs)
return getOneErr(db.Run(ctx, b), b)
}

Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ go_test(
"//pkg/util/uint128",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_kr_pretty//:pretty",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
Expand Down
Loading

0 comments on commit e75afdc

Please sign in to comment.