Skip to content

Commit

Permalink
kv: pass explicit Now timestamp on BatchRequest, remove timestamp dow…
Browse files Browse the repository at this point in the history
…ncasting

This commit adds an explicit `ClockTimestamp` field called `Now` to the
`BatchRequest` header, which mirrors the `Now` field on the `BatchResponse`
header.

In doing so, it removes the last instance where we downcasted a `Timestamp` to a
`ClockTimestamp` using the `TryToClockTimestamp` method. With this change, MVCC
("operation") timestamps never flow back into HLC clocks as clock signals. This
was enabled by cockroachdb#80706 and sets the groundwork to remove synthetic timestamps in
v23.1 — the role they played in dynamic typing of clock timestamps is now
entirely fulfilled by statically typed `ClockTimestamp` channels.

This is an important step in separating out the MVCC timestamp domain from the
clock timestamp domain and clarifying the roles of the two layers. In turn, this
layering opens the door for CockroachDB to start thinking about dynamic clock
synchronization error bounds.
  • Loading branch information
nvanbenschoten committed Aug 11, 2022
1 parent 5845f4b commit 90e191d
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 30 deletions.
4 changes: 2 additions & 2 deletions docs/generated/swagger/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@
},
"ClockTimestamp": {
"description": "ClockTimestamp is the statically typed version of a Timestamp with its\nSynthetic flag set to false.",
"title": "ClockTimestamp is a Timestamp with the added capability of being able to\nupdate a peer's HLC clock. It possesses this capability because the clock\ntimestamp itself is guaranteed to have come from an HLC clock somewhere in\nthe system. As such, a clock timestamp is an promise that some node in the\nsystem has a clock with a reading equal to or above its value.",
"title": "ClockTimestamp is a Timestamp with the added capability of being able to\nupdate a peer's HLC clock. It possesses this capability because the clock\ntimestamp itself is guaranteed to have come from an HLC clock somewhere in\nthe system. As such, a clock timestamp is a promise that some node in the\nsystem has a clock with a reading equal to or above its value.",
"$ref": "#/definitions/Timestamp"
},
"Constraint": {
Expand Down Expand Up @@ -1713,7 +1713,7 @@
"x-go-name": "Logical"
},
"synthetic": {
"description": "Indicates that the Timestamp did not come from an HLC clock somewhere\nin the system and, therefore, does not have the ability to update a\npeer's HLC clock. If set to true, the \"synthetic timestamp\" may be\narbitrarily disconnected from real time.\n\nThe flag serves as the dynamically typed version of a ClockTimestamp\n(but inverted). Only Timestamps with this flag set to false can be\ndowncast to a ClockTimestamp successfully (see TryToClockTimestamp).\n\nSynthetic timestamps with this flag set to true are central to\nnon-blocking transactions, which write \"into the future\". Setting the\nflag to true is also used to disconnect some committed MVCC versions\nfrom observed timestamps by indicating that those versions were moved\nfrom the timestamp at which they were originally written. Committed\nMVCC versions with synthetic timestamps require observing the full\nuncertainty interval, whereas readings off the leaseholders's clock\ncan tighten the uncertainty interval that is applied to MVCC versions\nwith clock timestamp.\n\nThis flag does not affect the sort order of Timestamps. However, it\nis considered when performing structural equality checks (e.g. using\nthe == operator). Consider use of the EqOrdering method when testing\nfor equality.",
"description": "Indicates that the Timestamp did not come from an HLC clock somewhere\nin the system and, therefore, does not have the ability to update a\npeer's HLC clock. If set to true, the \"synthetic timestamp\" may be\narbitrarily disconnected from real time.\n\nThe flag serves as the dynamically typed version of a ClockTimestamp\n(but inverted). Only Timestamps with this flag set to false can be\ndowncast to a ClockTimestamp successfully (see\nDeprecatedTryToClockTimestamp).\n\nSynthetic timestamps with this flag set to true are central to\nnon-blocking transactions, which write \"into the future\". Setting the\nflag to true is also used to disconnect some committed MVCC versions\nfrom observed timestamps by indicating that those versions were moved\nfrom the timestamp at which they were originally written. Committed\nMVCC versions with synthetic timestamps require observing the full\nuncertainty interval, whereas readings off the leaseholders's clock\ncan tighten the uncertainty interval that is applied to MVCC versions\nwith clock timestamp.\n\nThis flag does not affect the sort order of Timestamps. However, it\nis considered when performing structural equality checks (e.g. using\nthe == operator). Consider use of the EqOrdering method when testing\nfor equality.",
"type": "boolean",
"x-go-name": "Synthetic"
},
Expand Down
10 changes: 7 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,14 +653,18 @@ func (ds *DistSender) initAndVerifyBatch(
ctx context.Context, ba *roachpb.BatchRequest,
) *roachpb.Error {
// Attach the local node ID to each request.
if ba.Header.GatewayNodeID == 0 {
ba.Header.GatewayNodeID = ds.getNodeID()
if ba.GatewayNodeID == 0 {
ba.GatewayNodeID = ds.getNodeID()
}

// Attach a clock reading from the local node to help stabilize HLCs across
// the cluster. This is NOT required for correctness.
ba.Now = ds.clock.NowAsClockTimestamp()

// In the event that timestamp isn't set and read consistency isn't
// required, set the timestamp using the local clock.
if ba.ReadConsistency != roachpb.CONSISTENT && ba.Timestamp.IsEmpty() {
ba.Timestamp = ds.clock.Now()
ba.Timestamp = ba.Now.ToTimestamp()
}

if len(ba.Requests) < 1 {
Expand Down
11 changes: 7 additions & 4 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func TestReplicaClockUpdates(t *testing.T) {
// MaxOffset. Set the synthetic flag according to the test case.
reqTS := clocks[0].Now().Add(clocks[0].MaxOffset().Nanoseconds()/2, 0).WithSynthetic(synthetic)
h := roachpb.Header{Timestamp: reqTS}
if !reqTS.Synthetic {
h.Now = hlc.ClockTimestamp(reqTS)
}

// Execute the command.
var req roachpb.Request
Expand Down Expand Up @@ -178,8 +181,8 @@ func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) {
const numCmds = 3
clockOffset := s.Clock().MaxOffset() / numCmds
for i := int64(1); i <= numCmds; i++ {
ts := ts1.Add(i*clockOffset.Nanoseconds(), 0).WithSynthetic(false)
if _, err := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Timestamp: ts}, incArgs); err != nil {
ts := hlc.ClockTimestamp(ts1.Add(i*clockOffset.Nanoseconds(), 0))
if _, err := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Now: ts}, incArgs); err != nil {
t.Fatal(err)
}
}
Expand All @@ -190,8 +193,8 @@ func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) {
}

// Once the accumulated offset reaches MaxOffset, commands will be rejected.
tsFuture := ts1.Add(s.Clock().MaxOffset().Nanoseconds()+1, 0).WithSynthetic(false)
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Timestamp: tsFuture}, incArgs)
tsFuture := hlc.ClockTimestamp(ts1.Add(s.Clock().MaxOffset().Nanoseconds()+1, 0))
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Now: tsFuture}, incArgs)
if !testutils.IsPError(pErr, "remote wall time is too far ahead") {
t.Fatalf("unexpected error %v", pErr)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_closedts_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,8 @@ func TestQueryResolvedTimestampResolvesAbandonedIntents(t *testing.T) {
require.Nil(t, pErr)
require.True(t, intentExists())

// Inject a closed timestamp.
// Bump the clock and inject a closed timestamp.
tc.manualClock.AdvanceTo(ts20.GoTime())
tc.repl.mu.Lock()
tc.repl.mu.state.RaftClosedTimestamp = ts20
tc.repl.mu.Unlock()
Expand Down
10 changes: 2 additions & 8 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,7 @@ func (tc *testContext) Sender() kv.Sender {
tc.Fatal(err)
}
}
if baClockTS, ok := ba.Timestamp.TryToClockTimestamp(); ok {
tc.Clock().Update(baClockTS)
}
tc.Clock().Update(ba.Now)
return ba
})
}
Expand Down Expand Up @@ -5996,10 +5994,6 @@ func TestPushTxnPushTimestamp(t *testing.T) {
if reply.PusheeTxn.Status != roachpb.PENDING {
t.Errorf("expected pushed txn to have status PENDING; got %s", reply.PusheeTxn.Status)
}

// Sanity check clock update, or lack thereof.
after := tc.Clock().Now()
require.Equal(t, synthetic, after.Less(expTS))
})
}

Expand Down Expand Up @@ -8677,7 +8671,7 @@ func TestRefreshFromBelowGCThreshold(t *testing.T) {
t.Run(fmt.Sprintf("gcThreshold=%s", testCase.gc), func(t *testing.T) {
if !testCase.gc.IsEmpty() {
gcr := roachpb.GCRequest{Threshold: testCase.gc}
_, pErr := tc.SendWrapped(&gcr)
_, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: testCase.gc}, &gcr)
require.Nil(t, pErr)
}

Expand Down
15 changes: 14 additions & 1 deletion pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -88,7 +89,19 @@ func (s *Store) SendWithWriteBytes(
// Update our clock with the incoming request timestamp. This advances the
// local node's clock to a high water mark from all nodes with which it has
// interacted.
if baClockTS, ok := ba.Timestamp.TryToClockTimestamp(); ok {
baClockTS := ba.Now
if baClockTS.IsEmpty() && !s.ClusterSettings().Version.IsActive(ctx, clusterversion.LocalTimestamps) {
// TODO(nvanbenschoten): remove this in v23.1. v21.2 nodes will still send
// requests without a Now field. This is not necessary for correctness now
// that local timestamps pulled from the leaseholder's own HLC are used in
// conjunction with observed timestamps to prevent stale reads, but using
// this timestamp when available can help stabilize HLCs.
//
// NOTE: we version gate this so that no test hits this branch and relies on
// this behavior.
baClockTS, _ = ba.Timestamp.DeprecatedTryToClockTimestamp()
}
if !baClockTS.IsEmpty() {
if s.cfg.TestingKnobs.DisableMaxOffsetCheck {
s.cfg.Clock.Update(baClockTS)
} else {
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,14 +1087,14 @@ func TestStoreSendUpdateTime(t *testing.T) {
defer stopper.Stop(ctx)
store, _ := createTestStore(ctx, t, testStoreOpts{createSystemRanges: true}, stopper)
args := getArgs([]byte("a"))
reqTS := store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds(), 0).WithSynthetic(false)
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Timestamp: reqTS}, &args)
now := hlc.ClockTimestamp(store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds(), 0))
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Now: now}, &args)
if pErr != nil {
t.Fatal(pErr)
}
ts := store.cfg.Clock.Now()
if ts.WallTime != reqTS.WallTime || ts.Logical <= reqTS.Logical {
t.Errorf("expected store clock to advance to %s; got %s", reqTS, ts)
if ts.WallTime != now.WallTime || ts.Logical <= now.Logical {
t.Errorf("expected store clock to advance to %s; got %s", now, ts)
}
}

Expand Down Expand Up @@ -1135,8 +1135,8 @@ func TestStoreSendWithClockOffset(t *testing.T) {
store, _ := createTestStore(ctx, t, testStoreOpts{createSystemRanges: true}, stopper)
args := getArgs([]byte("a"))
// Set args timestamp to exceed max offset.
reqTS := store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds()+1, 0).WithSynthetic(false)
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Timestamp: reqTS}, &args)
now := hlc.ClockTimestamp(store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds()+1, 0))
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Now: now}, &args)
if !testutils.IsPError(pErr, "remote wall time is too far ahead") {
t.Errorf("unexpected error: %v", pErr)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,12 @@ message Header {
// request's uncertainty interval remains fixed across retries.
util.hlc.Timestamp timestamp_from_server_clock = 27 [
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"];
// now is a clock reading from the sender of the request. It can be used by
// the receiver to update its local HLC, which can help stabilize HLCs across
// the cluster. The receiver is NOT required to use the timestamp to update
// its local clock, and the sender is NOT required to attach a timestamp.
util.hlc.Timestamp now = 28 [(gogoproto.nullable) = false,
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"];
// replica specifies the destination of the request.
ReplicaDescriptor replica = 2 [(gogoproto.nullable) = false];
// range_id specifies the ID of the Raft consensus group which the key
Expand Down
11 changes: 7 additions & 4 deletions pkg/util/hlc/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,16 +386,19 @@ func (LegacyTimestamp) SafeValue() {}
// ClockTimestamp is a Timestamp with the added capability of being able to
// update a peer's HLC clock. It possesses this capability because the clock
// timestamp itself is guaranteed to have come from an HLC clock somewhere in
// the system. As such, a clock timestamp is an promise that some node in the
// the system. As such, a clock timestamp is a promise that some node in the
// system has a clock with a reading equal to or above its value.
//
// ClockTimestamp is the statically typed version of a Timestamp with its
// Synthetic flag set to false.
type ClockTimestamp Timestamp

// TryToClockTimestamp attempts to downcast a Timestamp into a ClockTimestamp.
// Returns the result and a boolean indicating whether the cast succeeded.
func (t Timestamp) TryToClockTimestamp() (ClockTimestamp, bool) {
// DeprecatedTryToClockTimestamp attempts to downcast a Timestamp into a
// ClockTimestamp. Returns the result and a boolean indicating whether the cast
// succeeded.
// TODO(nvanbenschoten): remove this in v23.1 when we remove the synthetic
// timestamp bit.
func (t Timestamp) DeprecatedTryToClockTimestamp() (ClockTimestamp, bool) {
if t.Synthetic {
return ClockTimestamp{}, false
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/hlc/timestamp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ message Timestamp {
//
// The flag serves as the dynamically typed version of a ClockTimestamp
// (but inverted). Only Timestamps with this flag set to false can be
// downcast to a ClockTimestamp successfully (see TryToClockTimestamp).
// downcast to a ClockTimestamp successfully (see
// DeprecatedTryToClockTimestamp).
//
// Synthetic timestamps with this flag set to true are central to
// non-blocking transactions, which write "into the future". Setting the
Expand Down

0 comments on commit 90e191d

Please sign in to comment.