Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hlc: strongly type ClockTimestamp as specialization of Timestamp #58349

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pkg/sql/sem/tree/table_ref.go | `ColumnID`
pkg/storage/enginepb/mvcc3.go | `MVCCStatsDelta`
pkg/storage/enginepb/mvcc3.go | `*MVCCStats`
pkg/util/hlc/timestamp.go | `Timestamp`
pkg/util/hlc/timestamp.go | `ClockTimestamp`
pkg/util/log/redact.go | `reflect.TypeOf(true)`
pkg/util/log/redact.go | `reflect.TypeOf(123)`
pkg/util/log/redact.go | `reflect.TypeOf(int8(0))`
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/changefeedccl/kvfeed/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ var memBufferColTypes = []*types.T{
types.Bytes, // span.EndKey
types.Int, // ts.WallTime
types.Int, // ts.Logical
types.Int, // ts.Flags
}

// memBuffer is an in-memory buffer for changed KV and Resolved timestamp
Expand Down Expand Up @@ -267,7 +266,6 @@ func (b *memBuffer) AddKV(
tree.DNull,
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.WallTime)),
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.Logical)),
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.Flags)),
}
b.allocMu.Unlock()
return b.addRow(ctx, row)
Expand All @@ -286,7 +284,6 @@ func (b *memBuffer) AddResolved(
b.allocMu.a.NewDBytes(tree.DBytes(span.EndKey)),
b.allocMu.a.NewDInt(tree.DInt(ts.WallTime)),
b.allocMu.a.NewDInt(tree.DInt(ts.Logical)),
b.allocMu.a.NewDInt(tree.DInt(ts.Flags)),
}
b.allocMu.Unlock()
return b.addRow(ctx, row)
Expand All @@ -303,7 +300,6 @@ func (b *memBuffer) Get(ctx context.Context) (Event, error) {
ts := hlc.Timestamp{
WallTime: int64(*row[5].(*tree.DInt)),
Logical: int32(*row[6].(*tree.DInt)),
Flags: uint32(*row[7].(*tree.DInt)),
}
if row[2] != tree.DNull {
e.prevVal = roachpb.Value{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func (s *bufferSink) EmitRow(
{Datum: tree.DNull}, // resolved span
{Datum: s.alloc.NewDString(tree.DString(topic))}, // topic
{Datum: s.alloc.NewDBytes(tree.DBytes(key))}, // key
{Datum: s.alloc.NewDBytes(tree.DBytes(value))}, //value
{Datum: s.alloc.NewDBytes(tree.DBytes(value))}, // value
})
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,8 +936,8 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
for i, test := range directCases {
t.Run(fmt.Sprintf("direct-txn-%d", i), func(t *testing.T) {
db := setup(test.nodeID)
now := db.Clock().Now()
kvTxn := roachpb.MakeTransaction("unnamed", nil /*baseKey*/, roachpb.NormalUserPriority, now, db.Clock().MaxOffset().Nanoseconds())
now := db.Clock().NowAsClockTimestamp()
kvTxn := roachpb.MakeTransaction("unnamed", nil /*baseKey*/, roachpb.NormalUserPriority, now.ToTimestamp(), db.Clock().MaxOffset().Nanoseconds())
txn := kv.NewTxnFromProto(ctx, db, test.nodeID, now, test.typ, &kvTxn)
ots := txn.TestingCloneTxn().ObservedTimestamps
if (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ func TestMultiRangeScanReverseScanInconsistent(t *testing.T) {
t.Fatal(err)
}
ts[i] = s.Clock().Now()
log.Infof(ctx, "%d: %s %d", i, key, ts[i])
log.Infof(ctx, "%d: %s %s", i, key, ts[i])
if i == 0 {
testutils.SucceedsSoon(t, func() error {
// Enforce that when we write the second key, it's written
Expand Down Expand Up @@ -1535,8 +1535,8 @@ func TestPropagateTxnOnError(t *testing.T) {
// response that does not result in an error. Even though the batch as a
// whole results in an error, the transaction should still propagate this
// information.
ot1 := roachpb.ObservedTimestamp{NodeID: 7, Timestamp: hlc.Timestamp{WallTime: 15}}
ot2 := roachpb.ObservedTimestamp{NodeID: 8, Timestamp: hlc.Timestamp{WallTime: 16}}
ot1 := roachpb.ObservedTimestamp{NodeID: 7, Timestamp: hlc.ClockTimestamp{WallTime: 15}}
ot2 := roachpb.ObservedTimestamp{NodeID: 8, Timestamp: hlc.ClockTimestamp{WallTime: 16}}
containsObservedTSs := func(txn *roachpb.Transaction) bool {
contains := func(ot roachpb.ObservedTimestamp) bool {
for _, ts := range txn.ObservedTimestamps {
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func TestImmutableBatchArgs(t *testing.T) {

// An optimization does copy-on-write if we haven't observed anything,
// so make sure we're not in that case.
txn.UpdateObservedTimestamp(1, hlc.MaxTimestamp)
txn.UpdateObservedTimestamp(1, hlc.MaxClockTimestamp)

put := roachpb.NewPut(roachpb.Key("don't"), roachpb.Value{})
if _, pErr := kv.SendWrappedWith(context.Background(), ds, roachpb.Header{
Expand Down Expand Up @@ -2180,20 +2180,20 @@ func TestClockUpdateOnResponse(t *testing.T) {

// Prepare the test function
put := roachpb.NewPut(roachpb.Key("a"), roachpb.MakeValueFromString("value"))
doCheck := func(sender kv.Sender, fakeTime hlc.Timestamp) {
doCheck := func(sender kv.Sender, fakeTime hlc.ClockTimestamp) {
ds.transportFactory = SenderTransportFactory(tracing.NewTracer(), sender)
_, err := kv.SendWrapped(context.Background(), ds, put)
if err != nil && err != expectedErr {
t.Fatal(err)
}
newTime := ds.clock.Now()
newTime := ds.clock.NowAsClockTimestamp()
if newTime.Less(fakeTime) {
t.Fatalf("clock was not advanced: expected >= %s; got %s", fakeTime, newTime)
}
}

// Test timestamp propagation on valid BatchResults.
fakeTime := ds.clock.Now().Add(10000000000 /*10s*/, 0)
fakeTime := ds.clock.Now().Add(10000000000 /*10s*/, 0).UnsafeToClockTimestamp()
replyNormal := kv.SenderFunc(
func(_ context.Context, args roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
rb := args.CreateReply()
Expand All @@ -2203,7 +2203,7 @@ func TestClockUpdateOnResponse(t *testing.T) {
doCheck(replyNormal, fakeTime)

// Test timestamp propagation on errors.
fakeTime = ds.clock.Now().Add(10000000000 /*10s*/, 0)
fakeTime = ds.clock.Now().Add(10000000000 /*10s*/, 0).UnsafeToClockTimestamp()
replyError := kv.SenderFunc(
func(_ context.Context, _ roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
pErr := expectedErr
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,8 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()
origTS := makeTS(123, 0)
plus10 := origTS.Add(10, 10)
plus20 := origTS.Add(20, 0)
plus10 := origTS.Add(10, 10).WithSynthetic(false)
plus20 := origTS.Add(20, 0).WithSynthetic(false)
testCases := []struct {
// The test's name.
name string
Expand Down Expand Up @@ -726,7 +726,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
name: "ReadWithinUncertaintyIntervalError",
pErrGen: func(txn *roachpb.Transaction) *roachpb.Error {
const nodeID = 1
txn.UpdateObservedTimestamp(nodeID, plus10)
txn.UpdateObservedTimestamp(nodeID, plus10.UnsafeToClockTimestamp())
pErr := roachpb.NewErrorWithTxn(
roachpb.NewReadWithinUncertaintyIntervalError(
hlc.Timestamp{}, hlc.Timestamp{}, nil),
Expand Down Expand Up @@ -818,12 +818,12 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
)
db := kv.NewDB(ambient, tsf, clock, stopper)
key := roachpb.Key("test-key")
now := clock.Now()
now := clock.NowAsClockTimestamp()
origTxnProto := roachpb.MakeTransaction(
"test txn",
key,
roachpb.UserPriority(0),
now,
now.ToTimestamp(),
clock.MaxOffset().Nanoseconds(),
)
// TODO(andrei): I've monkeyed with the priorities on this initial
Expand Down Expand Up @@ -1270,7 +1270,7 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
const nodeID = 0
// ReadWithinUncertaintyIntervalErrors need a clock to have been
// recorded on the origin.
txn.UpdateObservedTimestamp(nodeID, makeTS(123, 0))
txn.UpdateObservedTimestamp(nodeID, makeTS(123, 0).UnsafeToClockTimestamp())
return roachpb.NewErrorWithTxn(
roachpb.NewReadWithinUncertaintyIntervalError(hlc.Timestamp{}, hlc.Timestamp{}, nil),
&txn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
defer log.Scope(t).Close(t)

txn := makeTxnProto()
txn.UpdateObservedTimestamp(1, txn.WriteTimestamp.Add(20, 0))
txn.UpdateObservedTimestamp(1, txn.WriteTimestamp.Add(20, 0).UnsafeToClockTimestamp())
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")

cases := []struct {
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
return pErr
},
expRefresh: true,
expRefreshTS: txn.WriteTimestamp.Add(20, 0), // see UpdateObservedTimestamp
expRefreshTS: txn.WriteTimestamp.Add(20, 0).WithSynthetic(false), // see UpdateObservedTimestamp
},
{
pErr: func() *roachpb.Error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func RequestLease(
// The bug prevented with this is unlikely to occur in practice
// since earlier commands usually apply before this lease will.
if ts := args.MinProposedTS; isExtension && ts != nil {
effectiveStart.Forward(*ts)
effectiveStart.Forward(ts.ToTimestamp())
}

} else if prevLease.Type() == roachpb.LeaseExpiration {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ func Subsume(

reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
reply.LeaseAppliedIndex = lai
reply.FreezeStart = cArgs.EvalCtx.Clock().Now()
reply.FreezeStart = cArgs.EvalCtx.Clock().NowAsClockTimestamp()

return result.Result{
Local: result.LocalResult{FreezeStart: reply.FreezeStart},
Local: result.LocalResult{FreezeStart: reply.FreezeStart.ToTimestamp()},
}, nil
}
14 changes: 11 additions & 3 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
populatedConstructor: func(r *rand.Rand) protoutil.Message {
m := enginepb.NewPopulatedMVCCMetadata(r, false)
m.Txn = nil // never populated below Raft
m.Timestamp.Synthetic = nil // never populated below Raft
if m.MergeTimestamp != nil {
m.MergeTimestamp.Synthetic = nil // never populated below Raft
}
m.TxnDidNotUpdateMeta = nil // never populated below Raft
return m
},
emptySum: 7551962144604783939,
populatedSum: 11599955036265189084,
populatedSum: 12366000535951165621,
},
reflect.TypeOf(&enginepb.RangeAppliedState{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
Expand Down Expand Up @@ -124,10 +128,14 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
},
reflect.TypeOf(&enginepb.MVCCMetadataSubsetForMergeSerialization{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
return enginepb.NewPopulatedMVCCMetadataSubsetForMergeSerialization(r, false)
m := enginepb.NewPopulatedMVCCMetadataSubsetForMergeSerialization(r, false)
if m.MergeTimestamp != nil {
m.MergeTimestamp.Synthetic = nil // never populated below Raft
}
return m
},
emptySum: 14695981039346656037,
populatedSum: 834545685817460463,
populatedSum: 6109178572734990978,
},
}

Expand Down
114 changes: 104 additions & 10 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,103 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
)

// TestRangeCommandClockUpdate verifies that followers update their
// clocks when executing a command, even if the lease holder's clock is far
// in the future.
func TestRangeCommandClockUpdate(t *testing.T) {
// TestReplicaClockUpdates verifies that the leaseholder and followers both
// update their clocks when executing a command to the command's timestamp, as
// long as the request timestamp is from a clock (i.e. is not synthetic).
func TestReplicaClockUpdates(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

run := func(t *testing.T, write bool, synthetic bool) {
const numNodes = 3
const maxOffset = 100 * time.Millisecond
var manuals []*hlc.ManualClock
var clocks []*hlc.Clock
for i := 0; i < numNodes; i++ {
manuals = append(manuals, hlc.NewManualClock(1))
clocks = append(clocks, hlc.NewClock(manuals[i].UnixNano, maxOffset))
}
ctx := context.Background()
cfg := kvserver.TestStoreConfig(nil)
cfg.TestingKnobs.DisableReplicateQueue = true
cfg.Clock = nil
mtc := &multiTestContext{
storeConfig: &cfg,
clocks: clocks,
// This test was written before the multiTestContext started creating many
// system ranges at startup, and hasn't been update to take that into
// account.
startWithSingleRange: true,
}
defer mtc.Stop()
mtc.Start(t, numNodes)
mtc.replicateRange(1, 1, 2)

// Pick a timestamp in the future of all nodes by less than the
// MaxOffset. Set the synthetic flag according to the test case.
reqTS := clocks[0].Now().Add(int64(maxOffset/2), 0).WithSynthetic(synthetic)
h := roachpb.Header{Timestamp: reqTS}

// Execute the command.
var req roachpb.Request
reqKey := roachpb.Key("a")
if write {
req = incrementArgs(reqKey, 5)
} else {
req = getArgs(reqKey)
}
if _, err := kv.SendWrappedWith(ctx, mtc.stores[0].TestSender(), h, req); err != nil {
t.Fatal(err)
}

// If writing, wait for that command to execute on all the replicas.
// Consensus is asynchronous outside of the majority quorum, and Raft
// application is asynchronous on all nodes.
if write {
testutils.SucceedsSoon(t, func() error {
var values []int64
for _, eng := range mtc.engines {
val, _, err := storage.MVCCGet(ctx, eng, reqKey, reqTS, storage.MVCCGetOptions{})
if err != nil {
return err
}
values = append(values, mustGetInt(val))
}
if !reflect.DeepEqual(values, []int64{5, 5, 5}) {
return errors.Errorf("expected (5, 5, 5), got %v", values)
}
return nil
})
}

// Verify that clocks were updated as expected. Check all clocks if we
// issued a write, but only the leaseholder's if we issued a read. In
// theory, we should be able to assert that _only_ the leaseholder's
// clock is updated by a read, but in practice an assertion against
// followers' clocks being updated is very difficult to make without
// being flaky because it's difficult to prevent other channels
// (background work, etc.) from carrying the clock update.
expUpdated := !synthetic
clocksToCheck := clocks
if !write {
clocksToCheck = clocks[:1]
}
for _, c := range clocksToCheck {
require.Equal(t, expUpdated, reqTS.Less(c.Now()))
}
}

testutils.RunTrueAndFalse(t, "write", func(t *testing.T, write bool) {
testutils.RunTrueAndFalse(t, "synthetic", func(t *testing.T, synthetic bool) {
run(t, write, synthetic)
})
})
}

// TestFollowersDontRejectClockUpdateWithJump verifies that followers update
// their clocks when executing a command, even if the leaseholder's clock is
// far in the future.
func TestFollowersDontRejectClockUpdateWithJump(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand Down Expand Up @@ -128,9 +221,9 @@ func TestRangeCommandClockUpdate(t *testing.T) {
}
}

// TestRejectFutureCommand verifies that lease holders reject commands that
// would cause a large time jump.
func TestRejectFutureCommand(t *testing.T) {
// TestLeaseholdersRejectClockUpdateWithJump verifies that leaseholders reject
// commands that would cause a large time jump.
func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -151,7 +244,7 @@ func TestRejectFutureCommand(t *testing.T) {
const numCmds = 3
clockOffset := clock.MaxOffset() / numCmds
for i := int64(1); i <= numCmds; i++ {
ts := ts1.Add(i*clockOffset.Nanoseconds(), 0)
ts := ts1.Add(i*clockOffset.Nanoseconds(), 0).WithSynthetic(false)
if _, err := kv.SendWrappedWith(context.Background(), mtc.stores[0].TestSender(), roachpb.Header{Timestamp: ts}, incArgs); err != nil {
t.Fatal(err)
}
Expand All @@ -163,7 +256,8 @@ func TestRejectFutureCommand(t *testing.T) {
}

// Once the accumulated offset reaches MaxOffset, commands will be rejected.
_, pErr := kv.SendWrappedWith(context.Background(), mtc.stores[0].TestSender(), roachpb.Header{Timestamp: ts1.Add(clock.MaxOffset().Nanoseconds()+1, 0)}, incArgs)
tsFuture := ts1.Add(clock.MaxOffset().Nanoseconds()+1, 0).WithSynthetic(false)
_, pErr := kv.SendWrappedWith(context.Background(), mtc.stores[0].TestSender(), roachpb.Header{Timestamp: tsFuture}, incArgs)
if !testutils.IsPError(pErr, "remote wall time is too far ahead") {
t.Fatalf("unexpected error %v", pErr)
}
Expand Down Expand Up @@ -953,7 +1047,7 @@ func TestRangeLimitTxnMaxTimestamp(t *testing.T) {
// Start a transaction using node2 as a gateway.
txn := roachpb.MakeTransaction("test", keyA, 1, clock2.Now(), 250 /* maxOffsetNs */)
// Simulate a read to another range on node2 by setting the observed timestamp.
txn.UpdateObservedTimestamp(2, clock2.Now())
txn.UpdateObservedTimestamp(2, clock2.NowAsClockTimestamp())

defer mtc.Stop()
mtc.Start(t, 2)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ func (filter *mergeFilter) SuspendMergeTrigger(
// We block the LHS leaseholder from applying the merge trigger. Note
// that RHS followers will have already caught up to the leaseholder
// well before this point.
blocker.signal(freezeStart)
blocker.signal(freezeStart.ToTimestamp())
// Wait for the merge to be unblocked.
<-blocker.unblockCh
}
Expand Down
Loading