Skip to content

Commit

Permalink
hlc: strongly type ClockTimestamp as specialization of Timestamp
Browse files Browse the repository at this point in the history
This commit splits off a new hlc.ClockTimestamp type from the existing
hlc.Timestamp type through a type alias. While the two types share the
same memory and proto representation, they have different purposes and
properties. Timestamp serves the role of representing an arbitrary
timestamp, one that makes no claim about real time. ClockTimestamp
serves the role of representing a real timestamp pulled from one of the
HLC clocks in the system. Because of this, it has the added capability
to update a peer's HLC clock. As such, a clock timestamp is an promise
that some node in the system has a clock with a reading equal to or
above its value.

The commit also pushes towards a world where the ClockTimestamp
specialization is maintained through a combination of static and dynamic
typing. While the primary mechanisms that use ClockTimestamps will use
static typing, Timestamp will also carry a bit indicating whether it can
be downcast to a ClockTimestamp. This bit will replace the current flag
structure. So instead of an interaction like the one introduced in #57077
checking whether the value has a "synthetic" flag set, it will instead
check whether the value has a "clock timestamp".

This serves as an alternative to an approach like the one in #58213,
where we split the types in the other direction, keeping Timestamp to
represent a clock timestamp and introduce a new enginepb.TxnTimestamp to
represent an arbitrary MVCC timestamp. That change resulted in a
significantly larger diff, as it misjudged the extremely small percent
of all Timestamp usages which care about the capability of updating
remote HLC clocks. It also forced all users (tests, etc.) of timestamps
to address this question front-and-center, which had benefits but was
also a burden on all uses of timestamps.

The original intention of this change was to follow it up by inverting
the synthetic flag on Timestamps, replacing an "is_synthetic" bit with a
"from_clock" bit. While inverting the flag optimized the encoded size of
non-clock (currently synthetic) timestamps at the expense of the encoded
size of clock timestamps by 2 bytes, it came with major benefits. By
making clock timestamps opt-in instead of opt-out, we more closely match
the capability model we're trying to establish here with static typing,
where a clock timestamp can do everything a normal timestamp can, but
can also be used to update an HLC clock. The opt-in nature mitigated the
risk of bugs that forget to set this flag correctly. Instead of risking
a capability escalation where a non-clock timestamp is incorrectly
interpreted as a clock timestamp and used to update an HLC clock, we
risked a much less harmful capability de-escalation where a clock
timestamp loses its ability to update an HLC clock. We could then much
more carefully audit the cases where the flag needs to be unset, such as
in the Timestamp.Add and Timestamp.Forward methods.

Unfortunately, this "from_clock" approach (attempted in 4e34e20) came with
serious complications as well, which became clear only after making the
change. Chiefly, the approach made the mixed-version migration of this
field significantly more difficult. With v20.2 nodes unaware of the
flag, v21.1 nodes would need to find a way to either auto-assign it for
all timestamps coming from v21.1 nodes, or to work around its absence.
But this latter idea touches on a second complication – the absence of
the field resulted (intentionally) in an MVCC encoding that v20.2 nodes
would be unable to decode. So v21.1 nodes would need to be very careful
to always set the "from_clock" flag on all timestamps when in a mixed
version cluster. But this all made the migration towards not setting the
flag once in a v21.1-only cluster even more difficult. In the end,
opting into "is_synthetic" is a lot easier to work with than opting into
"from_clock", so that's how the rest of this PR will operate.
  • Loading branch information
nvanbenschoten committed Dec 30, 2020
1 parent 20bb46e commit abeda9c
Show file tree
Hide file tree
Showing 50 changed files with 1,536 additions and 1,336 deletions.
1 change: 1 addition & 0 deletions docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pkg/sql/sem/tree/table_ref.go | `ColumnID`
pkg/storage/enginepb/mvcc3.go | `MVCCStatsDelta`
pkg/storage/enginepb/mvcc3.go | `*MVCCStats`
pkg/util/hlc/timestamp.go | `Timestamp`
pkg/util/hlc/timestamp.go | `ClockTimestamp`
pkg/util/log/redact.go | `reflect.TypeOf(true)`
pkg/util/log/redact.go | `reflect.TypeOf(123)`
pkg/util/log/redact.go | `reflect.TypeOf(int8(0))`
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,8 +936,8 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
for i, test := range directCases {
t.Run(fmt.Sprintf("direct-txn-%d", i), func(t *testing.T) {
db := setup(test.nodeID)
now := db.Clock().Now()
kvTxn := roachpb.MakeTransaction("unnamed", nil /*baseKey*/, roachpb.NormalUserPriority, now, db.Clock().MaxOffset().Nanoseconds())
now := db.Clock().NowAsClockTimestamp()
kvTxn := roachpb.MakeTransaction("unnamed", nil /*baseKey*/, roachpb.NormalUserPriority, now.ToTimestamp(), db.Clock().MaxOffset().Nanoseconds())
txn := kv.NewTxnFromProto(ctx, db, test.nodeID, now, test.typ, &kvTxn)
ots := txn.TestingCloneTxn().ObservedTimestamps
if (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1535,8 +1535,8 @@ func TestPropagateTxnOnError(t *testing.T) {
// response that does not result in an error. Even though the batch as a
// whole results in an error, the transaction should still propagate this
// information.
ot1 := roachpb.ObservedTimestamp{NodeID: 7, Timestamp: hlc.Timestamp{WallTime: 15}}
ot2 := roachpb.ObservedTimestamp{NodeID: 8, Timestamp: hlc.Timestamp{WallTime: 16}}
ot1 := roachpb.ObservedTimestamp{NodeID: 7, Timestamp: hlc.ClockTimestamp{WallTime: 15}}
ot2 := roachpb.ObservedTimestamp{NodeID: 8, Timestamp: hlc.ClockTimestamp{WallTime: 16}}
containsObservedTSs := func(txn *roachpb.Transaction) bool {
contains := func(ot roachpb.ObservedTimestamp) bool {
for _, ts := range txn.ObservedTimestamps {
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func TestImmutableBatchArgs(t *testing.T) {

// An optimization does copy-on-write if we haven't observed anything,
// so make sure we're not in that case.
txn.UpdateObservedTimestamp(1, hlc.MaxTimestamp)
txn.UpdateObservedTimestamp(1, hlc.MaxClockTimestamp)

put := roachpb.NewPut(roachpb.Key("don't"), roachpb.Value{})
if _, pErr := kv.SendWrappedWith(context.Background(), ds, roachpb.Header{
Expand Down Expand Up @@ -2180,20 +2180,20 @@ func TestClockUpdateOnResponse(t *testing.T) {

// Prepare the test function
put := roachpb.NewPut(roachpb.Key("a"), roachpb.MakeValueFromString("value"))
doCheck := func(sender kv.Sender, fakeTime hlc.Timestamp) {
doCheck := func(sender kv.Sender, fakeTime hlc.ClockTimestamp) {
ds.transportFactory = SenderTransportFactory(tracing.NewTracer(), sender)
_, err := kv.SendWrapped(context.Background(), ds, put)
if err != nil && err != expectedErr {
t.Fatal(err)
}
newTime := ds.clock.Now()
newTime := ds.clock.NowAsClockTimestamp()
if newTime.Less(fakeTime) {
t.Fatalf("clock was not advanced: expected >= %s; got %s", fakeTime, newTime)
}
}

// Test timestamp propagation on valid BatchResults.
fakeTime := ds.clock.Now().Add(10000000000 /*10s*/, 0)
fakeTime := ds.clock.Now().Add(10000000000 /*10s*/, 0).UnsafeToClockTimestamp()
replyNormal := kv.SenderFunc(
func(_ context.Context, args roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
rb := args.CreateReply()
Expand All @@ -2203,7 +2203,7 @@ func TestClockUpdateOnResponse(t *testing.T) {
doCheck(replyNormal, fakeTime)

// Test timestamp propagation on errors.
fakeTime = ds.clock.Now().Add(10000000000 /*10s*/, 0)
fakeTime = ds.clock.Now().Add(10000000000 /*10s*/, 0).UnsafeToClockTimestamp()
replyError := kv.SenderFunc(
func(_ context.Context, _ roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
pErr := expectedErr
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
name: "ReadWithinUncertaintyIntervalError",
pErrGen: func(txn *roachpb.Transaction) *roachpb.Error {
const nodeID = 1
txn.UpdateObservedTimestamp(nodeID, plus10)
txn.UpdateObservedTimestamp(nodeID, plus10.UnsafeToClockTimestamp())
pErr := roachpb.NewErrorWithTxn(
roachpb.NewReadWithinUncertaintyIntervalError(
hlc.Timestamp{}, hlc.Timestamp{}, nil),
Expand Down Expand Up @@ -818,12 +818,12 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
)
db := kv.NewDB(ambient, tsf, clock, stopper)
key := roachpb.Key("test-key")
now := clock.Now()
now := clock.NowAsClockTimestamp()
origTxnProto := roachpb.MakeTransaction(
"test txn",
key,
roachpb.UserPriority(0),
now,
now.ToTimestamp(),
clock.MaxOffset().Nanoseconds(),
)
// TODO(andrei): I've monkeyed with the priorities on this initial
Expand Down Expand Up @@ -1270,7 +1270,7 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
const nodeID = 0
// ReadWithinUncertaintyIntervalErrors need a clock to have been
// recorded on the origin.
txn.UpdateObservedTimestamp(nodeID, makeTS(123, 0))
txn.UpdateObservedTimestamp(nodeID, makeTS(123, 0).UnsafeToClockTimestamp())
return roachpb.NewErrorWithTxn(
roachpb.NewReadWithinUncertaintyIntervalError(hlc.Timestamp{}, hlc.Timestamp{}, nil),
&txn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
defer log.Scope(t).Close(t)

txn := makeTxnProto()
txn.UpdateObservedTimestamp(1, txn.WriteTimestamp.Add(20, 0))
txn.UpdateObservedTimestamp(1, txn.WriteTimestamp.Add(20, 0).UnsafeToClockTimestamp())
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")

cases := []struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func RequestLease(
// The bug prevented with this is unlikely to occur in practice
// since earlier commands usually apply before this lease will.
if ts := args.MinProposedTS; isExtension && ts != nil {
effectiveStart.Forward(*ts)
effectiveStart.Forward(ts.ToTimestamp())
}

} else if prevLease.Type() == roachpb.LeaseExpiration {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ func Subsume(

reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
reply.LeaseAppliedIndex = lai
reply.FreezeStart = cArgs.EvalCtx.Clock().Now()
reply.FreezeStart = cArgs.EvalCtx.Clock().NowAsClockTimestamp()

return result.Result{
Local: result.LocalResult{FreezeStart: reply.FreezeStart},
Local: result.LocalResult{FreezeStart: reply.FreezeStart.ToTimestamp()},
}, nil
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ func TestRangeLimitTxnMaxTimestamp(t *testing.T) {
// Start a transaction using node2 as a gateway.
txn := roachpb.MakeTransaction("test", keyA, 1, clock2.Now(), 250 /* maxOffsetNs */)
// Simulate a read to another range on node2 by setting the observed timestamp.
txn.UpdateObservedTimestamp(2, clock2.Now())
txn.UpdateObservedTimestamp(2, clock2.NowAsClockTimestamp())

defer mtc.Stop()
mtc.Start(t, 2)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ func (filter *mergeFilter) SuspendMergeTrigger(
// We block the LHS leaseholder from applying the merge trigger. Note
// that RHS followers will have already caught up to the leaseholder
// well before this point.
blocker.signal(freezeStart)
blocker.signal(freezeStart.ToTimestamp())
// Wait for the merge to be unblocked.
<-blocker.unblockCh
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
ReadTimestamp: ts,
MaxTimestamp: maxTS,
}
txn.UpdateObservedTimestamp(c.nodeDesc.NodeID, ts)
txn.UpdateObservedTimestamp(c.nodeDesc.NodeID, ts.UnsafeToClockTimestamp())
c.registerTxn(txnName, txn)
return ""

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/intentresolver/intent_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
// Txn1 is in the pending state but is expired.
txn1 := newTransaction("txn1", key, 1, clock)
txn1.ReadTimestamp.WallTime -= int64(100 * time.Second)
txn1.LastHeartbeat = txn1.ReadTimestamp
txn1.LastHeartbeat = txn1.ReadTimestamp.UnsafeToClockTimestamp().ToTimestamp()
// Txn2 is in the staging state and is not old enough to have expired so the
// code ought to send nothing.
txn2 := newTransaction("txn2", key, 1, clock)
Expand Down
Loading

0 comments on commit abeda9c

Please sign in to comment.