From c7c154af8cf34c69aff26ce2b8e985fa8fd2e891 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 8 Aug 2022 14:47:55 -0400 Subject: [PATCH] kv: pass explicit Now timestamp on BatchRequest, remove timestamp downcasting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds an explicit `ClockTimestamp` field called `Now` to the `BatchRequest` header, which mirrors the `Now` field on the `BatchResponse` header. In doing so, it removes the last instance where we downcasted a `Timestamp` to a `ClockTimestamp` using the `TryToClockTimestamp` method. With this change, MVCC ("operation") timestamps never flow back into HLC clocks as clock signals. This was enabled by #80706 and sets the groundwork to remove synthetic timestamps in v23.1 — the role they played in dynamic typing of clock timestamps is now entirely fulfilled by statically typed `ClockTimestamp` channels. This is an important step in separating out the MVCC timestamp domain from the clock timestamp domain and clarifying the roles of the two layers. In turn, this layering opens the door for CockroachDB to start thinking about dynamic clock synchronization error bounds. --- docs/generated/swagger/spec.json | 4 ++-- pkg/kv/kvclient/kvcoord/dist_sender.go | 10 +++++++--- pkg/kv/kvserver/client_replica_test.go | 11 +++++++---- pkg/kv/kvserver/replica_closedts_internal_test.go | 3 ++- pkg/kv/kvserver/replica_test.go | 15 ++++----------- pkg/kv/kvserver/store_send.go | 15 ++++++++++++++- pkg/kv/kvserver/store_test.go | 12 ++++++------ pkg/roachpb/api.proto | 6 ++++++ pkg/util/hlc/timestamp.go | 11 +++++++---- pkg/util/hlc/timestamp.proto | 3 ++- 10 files changed, 57 insertions(+), 33 deletions(-) diff --git a/docs/generated/swagger/spec.json b/docs/generated/swagger/spec.json index a2bde3976710..7ec2c125eab6 100644 --- a/docs/generated/swagger/spec.json +++ b/docs/generated/swagger/spec.json @@ -878,7 +878,7 @@ }, "ClockTimestamp": { "description": "ClockTimestamp is the statically typed version of a Timestamp with its\nSynthetic flag set to false.", - "title": "ClockTimestamp is a Timestamp with the added capability of being able to\nupdate a peer's HLC clock. It possesses this capability because the clock\ntimestamp itself is guaranteed to have come from an HLC clock somewhere in\nthe system. As such, a clock timestamp is an promise that some node in the\nsystem has a clock with a reading equal to or above its value.", + "title": "ClockTimestamp is a Timestamp with the added capability of being able to\nupdate a peer's HLC clock. It possesses this capability because the clock\ntimestamp itself is guaranteed to have come from an HLC clock somewhere in\nthe system. As such, a clock timestamp is a promise that some node in the\nsystem has a clock with a reading equal to or above its value.", "$ref": "#/definitions/Timestamp" }, "Constraint": { @@ -1713,7 +1713,7 @@ "x-go-name": "Logical" }, "synthetic": { - "description": "Indicates that the Timestamp did not come from an HLC clock somewhere\nin the system and, therefore, does not have the ability to update a\npeer's HLC clock. If set to true, the \"synthetic timestamp\" may be\narbitrarily disconnected from real time.\n\nThe flag serves as the dynamically typed version of a ClockTimestamp\n(but inverted). Only Timestamps with this flag set to false can be\ndowncast to a ClockTimestamp successfully (see TryToClockTimestamp).\n\nSynthetic timestamps with this flag set to true are central to\nnon-blocking transactions, which write \"into the future\". Setting the\nflag to true is also used to disconnect some committed MVCC versions\nfrom observed timestamps by indicating that those versions were moved\nfrom the timestamp at which they were originally written. Committed\nMVCC versions with synthetic timestamps require observing the full\nuncertainty interval, whereas readings off the leaseholders's clock\ncan tighten the uncertainty interval that is applied to MVCC versions\nwith clock timestamp.\n\nThis flag does not affect the sort order of Timestamps. However, it\nis considered when performing structural equality checks (e.g. using\nthe == operator). Consider use of the EqOrdering method when testing\nfor equality.", + "description": "Indicates that the Timestamp did not come from an HLC clock somewhere\nin the system and, therefore, does not have the ability to update a\npeer's HLC clock. If set to true, the \"synthetic timestamp\" may be\narbitrarily disconnected from real time.\n\nThe flag serves as the dynamically typed version of a ClockTimestamp\n(but inverted). Only Timestamps with this flag set to false can be\ndowncast to a ClockTimestamp successfully (see\nDeprecatedTryToClockTimestamp).\n\nSynthetic timestamps with this flag set to true are central to\nnon-blocking transactions, which write \"into the future\". Setting the\nflag to true is also used to disconnect some committed MVCC versions\nfrom observed timestamps by indicating that those versions were moved\nfrom the timestamp at which they were originally written. Committed\nMVCC versions with synthetic timestamps require observing the full\nuncertainty interval, whereas readings off the leaseholders's clock\ncan tighten the uncertainty interval that is applied to MVCC versions\nwith clock timestamp.\n\nThis flag does not affect the sort order of Timestamps. However, it\nis considered when performing structural equality checks (e.g. using\nthe == operator). Consider use of the EqOrdering method when testing\nfor equality.", "type": "boolean", "x-go-name": "Synthetic" }, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index acdda1f99dc0..23d873e9eadd 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -653,14 +653,18 @@ func (ds *DistSender) initAndVerifyBatch( ctx context.Context, ba *roachpb.BatchRequest, ) *roachpb.Error { // Attach the local node ID to each request. - if ba.Header.GatewayNodeID == 0 { - ba.Header.GatewayNodeID = ds.getNodeID() + if ba.GatewayNodeID == 0 { + ba.GatewayNodeID = ds.getNodeID() } + // Attach a clock reading from the local node to help stabilize HLCs across + // the cluster. This is NOT required for correctness. + ba.Now = ds.clock.NowAsClockTimestamp() + // In the event that timestamp isn't set and read consistency isn't // required, set the timestamp using the local clock. if ba.ReadConsistency != roachpb.CONSISTENT && ba.Timestamp.IsEmpty() { - ba.Timestamp = ds.clock.Now() + ba.Timestamp = ba.Now.ToTimestamp() } if len(ba.Requests) < 1 { diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 8c860c22d8c7..e17bd407638f 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -118,6 +118,9 @@ func TestReplicaClockUpdates(t *testing.T) { // MaxOffset. Set the synthetic flag according to the test case. reqTS := clocks[0].Now().Add(clocks[0].MaxOffset().Nanoseconds()/2, 0).WithSynthetic(synthetic) h := roachpb.Header{Timestamp: reqTS} + if !reqTS.Synthetic { + h.Now = hlc.ClockTimestamp(reqTS) + } // Execute the command. var req roachpb.Request @@ -178,8 +181,8 @@ func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) { const numCmds = 3 clockOffset := s.Clock().MaxOffset() / numCmds for i := int64(1); i <= numCmds; i++ { - ts := ts1.Add(i*clockOffset.Nanoseconds(), 0).WithSynthetic(false) - if _, err := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Timestamp: ts}, incArgs); err != nil { + ts := hlc.ClockTimestamp(ts1.Add(i*clockOffset.Nanoseconds(), 0)) + if _, err := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Now: ts}, incArgs); err != nil { t.Fatal(err) } } @@ -190,8 +193,8 @@ func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) { } // Once the accumulated offset reaches MaxOffset, commands will be rejected. - tsFuture := ts1.Add(s.Clock().MaxOffset().Nanoseconds()+1, 0).WithSynthetic(false) - _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Timestamp: tsFuture}, incArgs) + tsFuture := hlc.ClockTimestamp(ts1.Add(s.Clock().MaxOffset().Nanoseconds()+1, 0)) + _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Now: tsFuture}, incArgs) if !testutils.IsPError(pErr, "remote wall time is too far ahead") { t.Fatalf("unexpected error %v", pErr) } diff --git a/pkg/kv/kvserver/replica_closedts_internal_test.go b/pkg/kv/kvserver/replica_closedts_internal_test.go index 6255ec19ff7e..a7e1e9b5bd73 100644 --- a/pkg/kv/kvserver/replica_closedts_internal_test.go +++ b/pkg/kv/kvserver/replica_closedts_internal_test.go @@ -728,7 +728,8 @@ func TestQueryResolvedTimestampResolvesAbandonedIntents(t *testing.T) { require.Nil(t, pErr) require.True(t, intentExists()) - // Inject a closed timestamp. + // Bump the clock and inject a closed timestamp. + tc.manualClock.AdvanceTo(ts20.GoTime()) tc.repl.mu.Lock() tc.repl.mu.state.RaftClosedTimestamp = ts20 tc.repl.mu.Unlock() diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 99d7057dab44..73bc379b1038 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -231,9 +231,7 @@ func (tc *testContext) Sender() kv.Sender { tc.Fatal(err) } } - if baClockTS, ok := ba.Timestamp.TryToClockTimestamp(); ok { - tc.Clock().Update(baClockTS) - } + tc.Clock().Update(ba.Now) return ba }) } @@ -5996,10 +5994,6 @@ func TestPushTxnPushTimestamp(t *testing.T) { if reply.PusheeTxn.Status != roachpb.PENDING { t.Errorf("expected pushed txn to have status PENDING; got %s", reply.PusheeTxn.Status) } - - // Sanity check clock update, or lack thereof. - after := tc.Clock().Now() - require.Equal(t, synthetic, after.Less(expTS)) }) } @@ -8677,7 +8671,7 @@ func TestRefreshFromBelowGCThreshold(t *testing.T) { t.Run(fmt.Sprintf("gcThreshold=%s", testCase.gc), func(t *testing.T) { if !testCase.gc.IsEmpty() { gcr := roachpb.GCRequest{Threshold: testCase.gc} - _, pErr := tc.SendWrapped(&gcr) + _, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: testCase.gc}, &gcr) require.Nil(t, pErr) } @@ -8883,12 +8877,11 @@ func BenchmarkMVCCGCWithForegroundTraffic(b *testing.B) { tc.Start(ctx, b, stopper) key := roachpb.Key("test") - now := func() hlc.Timestamp { return hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} } // send sends the Request with a present-time batch timestamp. send := func(args roachpb.Request) *roachpb.BatchResponse { var header roachpb.Header - header.Timestamp = now() + header.Timestamp = tc.Clock().Now() ba := roachpb.BatchRequest{} ba.Header = header ba.Add(args) @@ -8928,7 +8921,7 @@ func BenchmarkMVCCGCWithForegroundTraffic(b *testing.B) { go func() { defer wg.Done() for { - gc(key, now()) // NB: These are no-op GC requests. + gc(key, tc.Clock().Now()) // NB: These are no-op GC requests. time.Sleep(10 * time.Microsecond) select { diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index aa5bd628cc2d..21618f188f70 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -88,7 +89,19 @@ func (s *Store) SendWithWriteBytes( // Update our clock with the incoming request timestamp. This advances the // local node's clock to a high water mark from all nodes with which it has // interacted. - if baClockTS, ok := ba.Timestamp.TryToClockTimestamp(); ok { + baClockTS := ba.Now + if baClockTS.IsEmpty() && !s.ClusterSettings().Version.IsActive(ctx, clusterversion.LocalTimestamps) { + // TODO(nvanbenschoten): remove this in v23.1. v21.2 nodes will still send + // requests without a Now field. This is not necessary for correctness now + // that local timestamps pulled from the leaseholder's own HLC are used in + // conjunction with observed timestamps to prevent stale reads, but using + // this timestamp when available can help stabilize HLCs. + // + // NOTE: we version gate this so that no test hits this branch and relies on + // this behavior. + baClockTS, _ = ba.Timestamp.DeprecatedTryToClockTimestamp() + } + if !baClockTS.IsEmpty() { if s.cfg.TestingKnobs.DisableMaxOffsetCheck { s.cfg.Clock.Update(baClockTS) } else { diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 99df55f83d15..b868d803c672 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -1087,14 +1087,14 @@ func TestStoreSendUpdateTime(t *testing.T) { defer stopper.Stop(ctx) store, _ := createTestStore(ctx, t, testStoreOpts{createSystemRanges: true}, stopper) args := getArgs([]byte("a")) - reqTS := store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds(), 0).WithSynthetic(false) - _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Timestamp: reqTS}, &args) + now := hlc.ClockTimestamp(store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds(), 0)) + _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Now: now}, &args) if pErr != nil { t.Fatal(pErr) } ts := store.cfg.Clock.Now() - if ts.WallTime != reqTS.WallTime || ts.Logical <= reqTS.Logical { - t.Errorf("expected store clock to advance to %s; got %s", reqTS, ts) + if ts.WallTime != now.WallTime || ts.Logical <= now.Logical { + t.Errorf("expected store clock to advance to %s; got %s", now, ts) } } @@ -1135,8 +1135,8 @@ func TestStoreSendWithClockOffset(t *testing.T) { store, _ := createTestStore(ctx, t, testStoreOpts{createSystemRanges: true}, stopper) args := getArgs([]byte("a")) // Set args timestamp to exceed max offset. - reqTS := store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds()+1, 0).WithSynthetic(false) - _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Timestamp: reqTS}, &args) + now := hlc.ClockTimestamp(store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds()+1, 0)) + _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Now: now}, &args) if !testutils.IsPError(pErr, "remote wall time is too far ahead") { t.Errorf("unexpected error: %v", pErr) } diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 27c3244c725b..07247e0fa119 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2258,6 +2258,12 @@ message Header { // request's uncertainty interval remains fixed across retries. util.hlc.Timestamp timestamp_from_server_clock = 27 [ (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"]; + // now is a clock reading from the sender of the request. It can be used by + // the receiver to update its local HLC, which can help stabilize HLCs across + // the cluster. The receiver is NOT required to use the timestamp to update + // its local clock, and the sender is NOT required to attach a timestamp. + util.hlc.Timestamp now = 28 [(gogoproto.nullable) = false, + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"]; // replica specifies the destination of the request. ReplicaDescriptor replica = 2 [(gogoproto.nullable) = false]; // range_id specifies the ID of the Raft consensus group which the key diff --git a/pkg/util/hlc/timestamp.go b/pkg/util/hlc/timestamp.go index 22aa6262852d..b7cc112cff4d 100644 --- a/pkg/util/hlc/timestamp.go +++ b/pkg/util/hlc/timestamp.go @@ -386,16 +386,19 @@ func (LegacyTimestamp) SafeValue() {} // ClockTimestamp is a Timestamp with the added capability of being able to // update a peer's HLC clock. It possesses this capability because the clock // timestamp itself is guaranteed to have come from an HLC clock somewhere in -// the system. As such, a clock timestamp is an promise that some node in the +// the system. As such, a clock timestamp is a promise that some node in the // system has a clock with a reading equal to or above its value. // // ClockTimestamp is the statically typed version of a Timestamp with its // Synthetic flag set to false. type ClockTimestamp Timestamp -// TryToClockTimestamp attempts to downcast a Timestamp into a ClockTimestamp. -// Returns the result and a boolean indicating whether the cast succeeded. -func (t Timestamp) TryToClockTimestamp() (ClockTimestamp, bool) { +// DeprecatedTryToClockTimestamp attempts to downcast a Timestamp into a +// ClockTimestamp. Returns the result and a boolean indicating whether the cast +// succeeded. +// TODO(nvanbenschoten): remove this in v23.1 when we remove the synthetic +// timestamp bit. +func (t Timestamp) DeprecatedTryToClockTimestamp() (ClockTimestamp, bool) { if t.Synthetic { return ClockTimestamp{}, false } diff --git a/pkg/util/hlc/timestamp.proto b/pkg/util/hlc/timestamp.proto index 0ee60edcaba4..06b8a91581e3 100644 --- a/pkg/util/hlc/timestamp.proto +++ b/pkg/util/hlc/timestamp.proto @@ -43,7 +43,8 @@ message Timestamp { // // The flag serves as the dynamically typed version of a ClockTimestamp // (but inverted). Only Timestamps with this flag set to false can be - // downcast to a ClockTimestamp successfully (see TryToClockTimestamp). + // downcast to a ClockTimestamp successfully (see + // DeprecatedTryToClockTimestamp). // // Synthetic timestamps with this flag set to true are central to // non-blocking transactions, which write "into the future". Setting the