Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/storage: introduce local timestamps for MVCC versions in MVCCValue #80706

Merged
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1 set the active cluster version in the format '<major>.<minor>'
version version 22.1-4 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-4</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
8 changes: 4 additions & 4 deletions pkg/ccl/backupccl/backup_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func writeDescsToMetadata(ctx context.Context, sst storage.SSTWriter, m *BackupM
b = bytes
}
}
if err := sst.PutMVCC(storage.MVCCKey{Key: k, Timestamp: i.Time}, b); err != nil {
if err := sst.PutRawMVCC(storage.MVCCKey{Key: k, Timestamp: i.Time}, b); err != nil {
return err
}

Expand All @@ -214,7 +214,7 @@ func writeDescsToMetadata(ctx context.Context, sst storage.SSTWriter, m *BackupM
return err
}
} else {
if err := sst.PutMVCC(storage.MVCCKey{Key: k, Timestamp: m.StartTime}, b); err != nil {
if err := sst.PutRawMVCC(storage.MVCCKey{Key: k, Timestamp: m.StartTime}, b); err != nil {
return err
}
}
Expand Down Expand Up @@ -340,7 +340,7 @@ func writeNamesToMetadata(ctx context.Context, sst storage.SSTWriter, m *BackupM
}
k := encodeNameSSTKey(rev.parent, rev.parentSchema, rev.name)
v := encoding.EncodeUvarintAscending(nil, uint64(rev.id))
if err := sst.PutMVCC(storage.MVCCKey{Key: k, Timestamp: rev.ts}, v); err != nil {
if err := sst.PutRawMVCC(storage.MVCCKey{Key: k, Timestamp: rev.ts}, v); err != nil {
return err
}
}
Expand Down Expand Up @@ -384,7 +384,7 @@ func writeSpansToMetadata(ctx context.Context, sst storage.SSTWriter, m *BackupM
}
} else {
k := storage.MVCCKey{Key: encodeSpanSSTKey(sp), Timestamp: ts}
if err := sst.PutMVCC(k, nil); err != nil {
if err := sst.PutRawMVCC(k, nil); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ func (s *sstSink) write(ctx context.Context, resp returnedSST) error {
return err
}
} else {
if err := s.sst.PutMVCC(sst.UnsafeKey(), sst.UnsafeValue()); err != nil {
if err := s.sst.PutRawMVCC(sst.UnsafeKey(), sst.UnsafeValue()); err != nil {
return err
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,10 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
continue
}

keyScratch = append(keyScratch[:0], iter.UnsafeKey().Key...)
key := iter.UnsafeKey()
keyScratch = append(keyScratch[:0], key.Key...)
key.Key = keyScratch
valueScratch = append(valueScratch[:0], iter.UnsafeValue()...)
key := storage.MVCCKey{Key: keyScratch, Timestamp: iter.UnsafeKey().Timestamp}
value := roachpb.Value{RawBytes: valueScratch}
iter.NextKey()

Expand Down
31 changes: 19 additions & 12 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,23 @@ func slurpSSTablesLatestKey(
if !sst.UnsafeKey().Less(end) {
break
}
var ok bool
var newKv storage.MVCCKeyValue
key := sst.UnsafeKey()
newKv.Value = append(newKv.Value, sst.UnsafeValue()...)
newKv.Key.Key = append(newKv.Key.Key, key.Key...)
newKv.Key.Timestamp = key.Timestamp
newKv.Key.Key, ok = kr.rewriteKey(newKv.Key.Key)
value, err := storage.DecodeMVCCValue(sst.UnsafeValue())
if err != nil {
t.Fatal(err)
}
newKey := key
newKey.Key = append([]byte(nil), newKey.Key...)
var ok bool
newKey.Key, ok = kr.rewriteKey(newKey.Key)
if !ok {
t.Fatalf("could not rewrite key: %s", newKv.Key.Key)
t.Fatalf("could not rewrite key: %s", newKey.Key)
}
v := roachpb.Value{RawBytes: newKv.Value}
v.ClearChecksum()
v.InitChecksum(newKv.Key.Key)
if err := batch.PutMVCC(newKv.Key, v.RawBytes); err != nil {
newValue := value
newValue.Value.RawBytes = append([]byte(nil), newValue.Value.RawBytes...)
newValue.Value.ClearChecksum()
newValue.Value.InitChecksum(newKey.Key)
if err := batch.PutMVCC(newKey, newValue); err != nil {
t.Fatal(err)
}
sst.Next()
Expand All @@ -110,7 +113,11 @@ func slurpSSTablesLatestKey(
} else if !ok || !it.UnsafeKey().Less(end) {
break
}
kvs = append(kvs, storage.MVCCKeyValue{Key: it.Key(), Value: it.Value()})
val, err := storage.DecodeMVCCValue(it.Value())
if err != nil {
t.Fatal(err)
}
kvs = append(kvs, storage.MVCCKeyValue{Key: it.Key(), Value: val.Value.RawBytes})
}
return kvs
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/engineccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func loadTestData(
timestamp := hlc.Timestamp{WallTime: minWallTime + rand.Int63n(int64(batchTimeSpan))}
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rng, valueBytes))
value.InitChecksum(key)
if err := storage.MVCCPut(ctx, batch, nil, key, timestamp, value, nil); err != nil {
if err := storage.MVCCPut(ctx, batch, nil, key, timestamp, hlc.ClockTimestamp{}, value, nil); err != nil {
return nil, err
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/storageccl/engineccl/encrypted_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ func TestPebbleEncryption2(t *testing.T) {
nil, /* ms */
roachpb.Key(key),
hlc.Timestamp{},
hlc.ClockTimestamp{},
roachpb.MakeValueFromBytes([]byte(val)),
nil, /* txn */
)
Expand Down
7 changes: 3 additions & 4 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,7 +1280,7 @@ func removeDeadReplicas(
if err != nil {
return nil, errors.Wrap(err, "loading MVCCStats")
}
err = storage.MVCCPutProto(ctx, batch, &ms, key, clock.Now(), nil /* txn */, &desc)
err = storage.MVCCPutProto(ctx, batch, &ms, key, clock.Now(), hlc.ClockTimestamp{}, nil, &desc)
if wiErr := (*roachpb.WriteIntentError)(nil); errors.As(err, &wiErr) {
if len(wiErr.Intents) != 1 {
return nil, errors.Errorf("expected 1 intent, found %d: %s", len(wiErr.Intents), wiErr)
Expand Down Expand Up @@ -1328,7 +1328,7 @@ func removeDeadReplicas(
// A crude form of the intent resolution process: abort the
// transaction by deleting its record.
txnKey := keys.TransactionKey(intent.Txn.Key, intent.Txn.ID)
if err := storage.MVCCDelete(ctx, batch, &ms, txnKey, hlc.Timestamp{}, nil); err != nil {
if err := storage.MVCCDelete(ctx, batch, &ms, txnKey, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil); err != nil {
return nil, err
}
update := roachpb.LockUpdate{
Expand All @@ -1340,8 +1340,7 @@ func removeDeadReplicas(
return nil, err
}
// With the intent resolved, we can try again.
if err := storage.MVCCPutProto(ctx, batch, &ms, key, clock.Now(),
nil /* txn */, &desc); err != nil {
if err := storage.MVCCPutProto(ctx, batch, &ms, key, clock.Now(), hlc.ClockTimestamp{}, nil, &desc); err != nil {
return nil, err
}
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug_check_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestDebugCheckStore(t *testing.T) {
// Should not error out randomly.
for _, dir := range storePaths {
out, err := check(dir)
require.NoError(t, err, dir)
require.NoError(t, err, "dir=%s\nout=%s\n", dir, out)
require.Contains(t, out, "total stats", dir)
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,14 @@ const (
// V22_1 is CockroachDB v22.1. It's used for all v22.1.x patch releases.
V22_1

// v22.2 versions.
//
// Start22_2 demarcates work towards CockroachDB v22.2.
Start22_2

// LocalTimestamps enables the use of local timestamps in MVCC values.
LocalTimestamps

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -604,6 +612,16 @@ var versionsSingleton = keyedVersions{
Version: roachpb.Version{Major: 22, Minor: 1},
},

// v22.2 versions. Internal versions must be even.
{
Key: Start22_2,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 2},
},
{
Key: LocalTimestamps,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 4},
},

// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
6 changes: 4 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) {
key := testutils.MakeKey(keys.Meta1Prefix, roachpb.KeyMax)
now := s.Clock().Now()
txn := roachpb.MakeTransaction("txn", roachpb.Key("foobar"), 0, now, 0, int32(s.SQLInstanceID()))
if err := storage.MVCCPutProto(
context.Background(), s.Engines()[0],
nil, key, now, &txn, &roachpb.RangeDescriptor{}); err != nil {
if err := storage.MVCCPutProto(context.Background(), s.Engines()[0], nil, key, now, hlc.ClockTimestamp{}, &txn, &roachpb.RangeDescriptor{}); err != nil {
t.Fatal(err)
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,9 @@ func (tcf *TxnCoordSenderFactory) TestingSetLinearizable(linearizable bool) {
func (tcf *TxnCoordSenderFactory) TestingSetMetrics(metrics TxnMetrics) {
tcf.metrics = metrics
}

// TestingSetCommitWaitFilter allows tests to instrument the beginning of a
// transaction commit wait sleep.
func (tcf *TxnCoordSenderFactory) TestingSetCommitWaitFilter(filter func()) {
tcf.testingKnobs.CommitWaitFilter = filter
}
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type ClientTestingKnobs struct {
// DisableCommitSanityCheck allows "setting" the DisableCommitSanityCheck to
// true without actually overriding the variable.
DisableCommitSanityCheck bool

// CommitWaitFilter allows tests to instrument the beginning of a transaction
// commit wait sleep.
CommitWaitFilter func()
}

var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}
Expand Down
19 changes: 11 additions & 8 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,8 @@ func (tc *TxnCoordSender) Send(
// clock to read below that future-time value, violating "monotonic reads".
//
// In practice, most transactions do not need to wait at all, because their
// commit timestamps were pulled from an HLC clock (i.e. are not synthetic)
// commit timestamps were pulled from an HLC clock (either the local clock
// or a remote clock on a node whom the local node has communicated with)
// and so they will be guaranteed to lead the local HLC's clock, assuming
// proper HLC time propagation. Only transactions whose commit timestamps
// were pushed into the future will need to wait, like those who wrote to a
Expand Down Expand Up @@ -640,18 +641,20 @@ func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) er
commitTS := tc.mu.txn.WriteTimestamp
readOnly := tc.mu.txn.Sequence == 0
linearizable := tc.linearizable
needWait := commitTS.Synthetic || (linearizable && !readOnly)
if !needWait {
// No need to wait. If !Synthetic then we know the commit timestamp
// leads the local HLC clock, and since that's all we'd need to wait
// for, we can short-circuit.
return nil
}

waitUntil := commitTS
if linearizable && !readOnly {
waitUntil = waitUntil.Add(tc.clock.MaxOffset().Nanoseconds(), 0)
}
if waitUntil.LessEq(tc.clock.Now()) {
// No wait fast-path. This is the common case for most transactions. Only
// transactions who have their commit timestamp bumped into the future will
// need to wait.
return nil
}
if fn := tc.testingKnobs.CommitWaitFilter; fn != nil {
fn()
}

before := tc.clock.PhysicalTime()
est := waitUntil.GoTime().Sub(before)
Expand Down
13 changes: 10 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1349,8 +1349,12 @@ func TestTxnCommitWait(t *testing.T) {
//
testFn := func(t *testing.T, linearizable, commit, readOnly, futureTime, deferred bool) {
s, metrics, cleanupFn := setupMetricsTest(t)
s.DB.GetFactory().(*kvcoord.TxnCoordSenderFactory).TestingSetLinearizable(linearizable)
defer cleanupFn()
s.DB.GetFactory().(*kvcoord.TxnCoordSenderFactory).TestingSetLinearizable(linearizable)
commitWaitC := make(chan struct{})
s.DB.GetFactory().(*kvcoord.TxnCoordSenderFactory).TestingSetCommitWaitFilter(func() {
close(commitWaitC)
})

// maxClockOffset defines the maximum clock offset between nodes in the
// cluster. When in linearizable mode, all writing transactions must
Expand All @@ -1361,7 +1365,7 @@ func TestTxnCommitWait(t *testing.T) {
// gateway they use. This ensures that all causally dependent
// transactions commit with higher timestamps, even if their read and
// writes sets do not conflict with the original transaction's. This, in
// turn, prevents the "causal reverse" anamoly which can be observed by
// turn, prevents the "causal reverse" anomaly which can be observed by
// a third, concurrent transaction. See the following blog post for
// more: https://www.cockroachlabs.com/blog/consistency-model/.
maxClockOffset := s.Clock.MaxOffset()
Expand Down Expand Up @@ -1468,8 +1472,10 @@ func TestTxnCommitWait(t *testing.T) {

// Advance the manual clock slowly. If the commit-wait sleep completes
// too early, we'll catch it with the require.Empty. If it completes too
// late, we'll stall when pulling from the channel.
// late, we'll stall when pulling from the channel. Before doing so, wait
// until the transaction has begun its commit-wait.
for expWait > 0 {
<-commitWaitC
require.Empty(t, errC)

adv := futureOffset / 5
Expand Down Expand Up @@ -2297,6 +2303,7 @@ func TestTxnRequestTxnTimestamp(t *testing.T) {
return err
}
}
manual.Set(txn.ProvisionalCommitTimestamp().WallTime)
return nil
}); err != nil {
t.Fatal(err)
Expand Down
23 changes: 17 additions & 6 deletions pkg/kv/kvnemesis/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,18 @@ func (e *Engine) Get(key roachpb.Key, ts hlc.Timestamp) roachpb.Value {
if !mvccKey.Key.Equal(key) {
return roachpb.Value{}
}
if len(iter.Value()) == 0 {
return roachpb.Value{}
}
var valCopy []byte
e.b, valCopy = e.b.Copy(iter.Value(), 0 /* extraCap */)
return roachpb.Value{RawBytes: valCopy, Timestamp: mvccKey.Timestamp}
mvccVal, err := storage.DecodeMVCCValue(valCopy)
if err != nil {
panic(err)
}
if mvccVal.IsTombstone() {
return roachpb.Value{}
}
val := mvccVal.Value
val.Timestamp = mvccKey.Timestamp
return val
}

// Put inserts a key/value/timestamp tuple. If an exact key/timestamp pair is
Expand Down Expand Up @@ -124,8 +130,13 @@ func (e *Engine) DebugPrint(indent string) string {
if err != nil {
fmt.Fprintf(&buf, "(err:%s)", err)
} else {
fmt.Fprintf(&buf, "%s%s %s -> %s",
indent, key.Key, key.Timestamp, roachpb.Value{RawBytes: value}.PrettyPrint())
v, err := storage.DecodeMVCCValue(value)
if err != nil {
fmt.Fprintf(&buf, "(err:%s)", err)
} else {
fmt.Fprintf(&buf, "%s%s %s -> %s",
indent, key.Key, key.Timestamp, v.Value.PrettyPrint())
}
}
})
return buf.String()
Expand Down
Loading