Skip to content

Commit

Permalink
hlc: replace Timestamp.flags bit field with synthetic boolean
Browse files Browse the repository at this point in the history
This commit replaces Timestamp's (and LegacyTimestamp's) `flags` field
with a single boolean field called `synthetic`. This shaves a byte off
the encoded message when the field is set and makes synthetic timestamps
easier to work with. If we ever decide that we need to go back on this
because we need more flags (unlikely), we can without breaking anything.

The commit also starts the process of updating tests to handle the fact
that `Timestamp.Add` should set the synthetic flag. However, it stops
before completing the change because it turned out to be fairly large
and not a pre-requisite for non-blocking transactions.
  • Loading branch information
nvanbenschoten committed Dec 31, 2020
1 parent a18b534 commit 086903e
Show file tree
Hide file tree
Showing 27 changed files with 435 additions and 440 deletions.
4 changes: 0 additions & 4 deletions pkg/ccl/changefeedccl/kvfeed/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ var memBufferColTypes = []*types.T{
types.Bytes, // span.EndKey
types.Int, // ts.WallTime
types.Int, // ts.Logical
types.Int, // ts.Flags
}

// memBuffer is an in-memory buffer for changed KV and Resolved timestamp
Expand Down Expand Up @@ -267,7 +266,6 @@ func (b *memBuffer) AddKV(
tree.DNull,
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.WallTime)),
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.Logical)),
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.Flags)),
}
b.allocMu.Unlock()
return b.addRow(ctx, row)
Expand All @@ -286,7 +284,6 @@ func (b *memBuffer) AddResolved(
b.allocMu.a.NewDBytes(tree.DBytes(span.EndKey)),
b.allocMu.a.NewDInt(tree.DInt(ts.WallTime)),
b.allocMu.a.NewDInt(tree.DInt(ts.Logical)),
b.allocMu.a.NewDInt(tree.DInt(ts.Flags)),
}
b.allocMu.Unlock()
return b.addRow(ctx, row)
Expand All @@ -303,7 +300,6 @@ func (b *memBuffer) Get(ctx context.Context) (Event, error) {
ts := hlc.Timestamp{
WallTime: int64(*row[5].(*tree.DInt)),
Logical: int32(*row[6].(*tree.DInt)),
Flags: uint32(*row[7].(*tree.DInt)),
}
if row[2] != tree.DNull {
e.prevVal = roachpb.Value{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func (s *bufferSink) EmitRow(
{Datum: tree.DNull}, // resolved span
{Datum: s.alloc.NewDString(tree.DString(topic))}, // topic
{Datum: s.alloc.NewDBytes(tree.DBytes(key))}, // key
{Datum: s.alloc.NewDBytes(tree.DBytes(value))}, //value
{Datum: s.alloc.NewDBytes(tree.DBytes(value))}, // value
})
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ func TestMultiRangeScanReverseScanInconsistent(t *testing.T) {
t.Fatal(err)
}
ts[i] = s.Clock().Now()
log.Infof(ctx, "%d: %s %d", i, key, ts[i])
log.Infof(ctx, "%d: %s %s", i, key, ts[i])
if i == 0 {
testutils.SucceedsSoon(t, func() error {
// Enforce that when we write the second key, it's written
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,8 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()
origTS := makeTS(123, 0)
plus10 := origTS.Add(10, 10)
plus20 := origTS.Add(20, 0)
plus10 := origTS.Add(10, 10).SetSynthetic(false)
plus20 := origTS.Add(20, 0).SetSynthetic(false)
testCases := []struct {
// The test's name.
name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
return pErr
},
expRefresh: true,
expRefreshTS: txn.WriteTimestamp.Add(20, 0), // see UpdateObservedTimestamp
expRefreshTS: txn.WriteTimestamp.Add(20, 0).SetSynthetic(false), // see UpdateObservedTimestamp
},
{
pErr: func() *roachpb.Error {
Expand Down
14 changes: 11 additions & 3 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
populatedConstructor: func(r *rand.Rand) protoutil.Message {
m := enginepb.NewPopulatedMVCCMetadata(r, false)
m.Txn = nil // never populated below Raft
m.Timestamp.Synthetic = nil // never populated below Raft
if m.MergeTimestamp != nil {
m.MergeTimestamp.Synthetic = nil // never populated below Raft
}
m.TxnDidNotUpdateMeta = nil // never populated below Raft
return m
},
emptySum: 7551962144604783939,
populatedSum: 11599955036265189084,
populatedSum: 12366000535951165621,
},
reflect.TypeOf(&enginepb.RangeAppliedState{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
Expand Down Expand Up @@ -124,10 +128,14 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
},
reflect.TypeOf(&enginepb.MVCCMetadataSubsetForMergeSerialization{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
return enginepb.NewPopulatedMVCCMetadataSubsetForMergeSerialization(r, false)
m := enginepb.NewPopulatedMVCCMetadataSubsetForMergeSerialization(r, false)
if m.MergeTimestamp != nil {
m.MergeTimestamp.Synthetic = nil // never populated below Raft
}
return m
},
emptySum: 14695981039346656037,
populatedSum: 834545685817460463,
populatedSum: 6109178572734990978,
},
}

Expand Down
112 changes: 103 additions & 9 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,103 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
)

// TestRangeCommandClockUpdate verifies that followers update their
// clocks when executing a command, even if the lease holder's clock is far
// in the future.
func TestRangeCommandClockUpdate(t *testing.T) {
// TestReplicaClockUpdates verifies that the leaseholder and followers both
// update their clocks when executing a command to the command's timestamp, as
// long as the request timestamp is from a clock (i.e. is not synthetic).
func TestReplicaClockUpdates(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

run := func(t *testing.T, write bool, synthetic bool) {
const numNodes = 3
const maxOffset = 100 * time.Millisecond
var manuals []*hlc.ManualClock
var clocks []*hlc.Clock
for i := 0; i < numNodes; i++ {
manuals = append(manuals, hlc.NewManualClock(1))
clocks = append(clocks, hlc.NewClock(manuals[i].UnixNano, maxOffset))
}
ctx := context.Background()
cfg := kvserver.TestStoreConfig(nil)
cfg.TestingKnobs.DisableReplicateQueue = true
cfg.Clock = nil
mtc := &multiTestContext{
storeConfig: &cfg,
clocks: clocks,
// This test was written before the multiTestContext started creating many
// system ranges at startup, and hasn't been update to take that into
// account.
startWithSingleRange: true,
}
defer mtc.Stop()
mtc.Start(t, numNodes)
mtc.replicateRange(1, 1, 2)

// Pick a timestamp in the future of all nodes by less than the
// MaxOffset. Set the synthetic flag according to the test case.
reqTS := clocks[0].Now().Add(int64(maxOffset/2), 0).SetSynthetic(synthetic)
h := roachpb.Header{Timestamp: reqTS}

// Execute the command.
var req roachpb.Request
reqKey := roachpb.Key("a")
if write {
req = incrementArgs(reqKey, 5)
} else {
req = getArgs(reqKey)
}
if _, err := kv.SendWrappedWith(ctx, mtc.stores[0].TestSender(), h, req); err != nil {
t.Fatal(err)
}

// If writing, wait for that command to execute on all the replicas.
// Consensus is asynchronous outside of the majority quorum, and Raft
// application is asynchronous on all nodes.
if write {
testutils.SucceedsSoon(t, func() error {
var values []int64
for _, eng := range mtc.engines {
val, _, err := storage.MVCCGet(ctx, eng, reqKey, reqTS, storage.MVCCGetOptions{})
if err != nil {
return err
}
values = append(values, mustGetInt(val))
}
if !reflect.DeepEqual(values, []int64{5, 5, 5}) {
return errors.Errorf("expected (5, 5, 5), got %v", values)
}
return nil
})
}

// Verify that clocks were updated as expected. Check all clocks if we
// issued a write, but only the leaseholder's if we issued a read. In
// theory, we should be able to assert that _only_ the leaseholder's
// clock is updated by a read, but in practice an assertion against
// followers' clocks being updated is very difficult to make without
// being flaky because it's difficult to prevent other channels
// (background work, etc.) from carrying the clock update.
expUpdated := !synthetic
clocksToCheck := clocks
if !write {
clocksToCheck = clocks[:1]
}
for _, c := range clocksToCheck {
require.Equal(t, expUpdated, reqTS.Less(c.Now()))
}
}

testutils.RunTrueAndFalse(t, "write", func(t *testing.T, write bool) {
testutils.RunTrueAndFalse(t, "synthetic", func(t *testing.T, synthetic bool) {
run(t, write, synthetic)
})
})
}

// TestFollowersDontRejectClockUpdateWithJump verifies that followers update
// their clocks when executing a command, even if the leaseholder's clock is
// far in the future.
func TestFollowersDontRejectClockUpdateWithJump(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand Down Expand Up @@ -126,9 +219,9 @@ func TestRangeCommandClockUpdate(t *testing.T) {
}
}

// TestRejectFutureCommand verifies that lease holders reject commands that
// would cause a large time jump.
func TestRejectFutureCommand(t *testing.T) {
// TestLeaseholdersRejectClockUpdateWithJump verifies that leaseholders reject
// commands that would cause a large time jump.
func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -149,7 +242,7 @@ func TestRejectFutureCommand(t *testing.T) {
const numCmds = 3
clockOffset := clock.MaxOffset() / numCmds
for i := int64(1); i <= numCmds; i++ {
ts := ts1.Add(i*clockOffset.Nanoseconds(), 0)
ts := ts1.Add(i*clockOffset.Nanoseconds(), 0).SetSynthetic(false)
if _, err := kv.SendWrappedWith(context.Background(), mtc.stores[0].TestSender(), roachpb.Header{Timestamp: ts}, incArgs); err != nil {
t.Fatal(err)
}
Expand All @@ -161,7 +254,8 @@ func TestRejectFutureCommand(t *testing.T) {
}

// Once the accumulated offset reaches MaxOffset, commands will be rejected.
_, pErr := kv.SendWrappedWith(context.Background(), mtc.stores[0].TestSender(), roachpb.Header{Timestamp: ts1.Add(clock.MaxOffset().Nanoseconds()+1, 0)}, incArgs)
tsFuture := ts1.Add(clock.MaxOffset().Nanoseconds()+1, 0).SetSynthetic(false)
_, pErr := kv.SendWrappedWith(context.Background(), mtc.stores[0].TestSender(), roachpb.Header{Timestamp: tsFuture}, incArgs)
if !testutils.IsPError(pErr, "remote wall time is too far ahead") {
t.Fatalf("unexpected error %v", pErr)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (rts *resolvedTimestamp) recompute() bool {
func (rts *resolvedTimestamp) assertNoChange() {
before := rts.resolvedTS
changed := rts.recompute()
if changed || (before != rts.resolvedTS) {
if changed || !before.EqOrdering(rts.resolvedTS) {
panic(fmt.Sprintf("unexpected resolved timestamp change on recomputation, "+
"was %s, recomputed as %s", before, rts.resolvedTS))
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rditer/replica_data_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func verifyRDReplicatedOnlyMVCCIter(
if key := iter.Key(); !key.Equal(expectedKeys[i]) {
k1, ts1 := key.Key, key.Timestamp
k2, ts2 := expectedKeys[i].Key, expectedKeys[i].Timestamp
t.Errorf("%d: expected %q(%d); got %q(%d)", i, k2, ts2, k1, ts1)
t.Errorf("%d: expected %q(%s); got %q(%s)", i, k2, ts2, k1, ts1)
}
if reverse {
i--
Expand Down Expand Up @@ -220,7 +220,7 @@ func verifyRDEngineIter(
if !k.Equal(expectedKeys[i]) {
k1, ts1 := k.Key, k.Timestamp
k2, ts2 := expectedKeys[i].Key, expectedKeys[i].Timestamp
t.Errorf("%d: expected %q(%d); got %q(%d)", i, k2, ts2, k1, ts1)
t.Errorf("%d: expected %q(%s); got %q(%s)", i, k2, ts2, k1, ts1)
}
i++
iter.Next()
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,7 +1340,7 @@ func TestStoreSendUpdateTime(t *testing.T) {
defer stopper.Stop(context.Background())
store, _ := createTestStore(t, testStoreOpts{createSystemRanges: true}, stopper)
args := getArgs([]byte("a"))
reqTS := store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds(), 0)
reqTS := store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds(), 0).SetSynthetic(false)
_, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Timestamp: reqTS}, &args)
if pErr != nil {
t.Fatal(pErr)
Expand Down Expand Up @@ -1386,7 +1386,7 @@ func TestStoreSendWithClockOffset(t *testing.T) {
store, _ := createTestStore(t, testStoreOpts{createSystemRanges: true}, stopper)
args := getArgs([]byte("a"))
// Set args timestamp to exceed max offset.
reqTS := store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds()+1, 0)
reqTS := store.cfg.Clock.Now().Add(store.cfg.Clock.MaxOffset().Nanoseconds()+1, 0).SetSynthetic(false)
_, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Timestamp: reqTS}, &args)
if !testutils.IsPError(pErr, "remote wall time is too far ahead") {
t.Errorf("unexpected error: %v", pErr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/tscache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func TestTimestampCacheImplsIdentical(t *testing.T) {
to = nil
}

ts := start.Add(int64(j), 100)
ts := start.Add(int64(j), 100).SetSynthetic(false)
if useClock {
ts = clock.Now()
}
Expand Down
40 changes: 23 additions & 17 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,19 @@ func makeClockTS(walltime int64, logical int32) hlc.ClockTimestamp {
}
}

func makeClockTSWithFlag(walltime int64, logical int32) hlc.ClockTimestamp {
return hlc.ClockTimestamp{
func makeTS(walltime int64, logical int32) hlc.Timestamp {
return hlc.Timestamp{
WallTime: walltime,
Logical: logical,
Flags: uint32(hlc.TimestampFlag_SYNTHETIC),
}
}

func makeTS(walltime int64, logical int32) hlc.Timestamp {
return makeClockTS(walltime, logical).ToTimestamp()
}

func makeTSWithFlag(walltime int64, logical int32) hlc.Timestamp {
return makeClockTSWithFlag(walltime, logical).ToTimestamp()
func makeSynTS(walltime int64, logical int32) hlc.Timestamp {
return hlc.Timestamp{
WallTime: walltime,
Logical: logical,
Synthetic: true,
}
}

// TestKeyNext tests that the method for creating lexicographic
Expand Down Expand Up @@ -474,17 +473,24 @@ var nonZeroTxn = Transaction{
Key: Key("foo"),
ID: uuid.MakeV4(),
Epoch: 2,
WriteTimestamp: makeTSWithFlag(20, 21),
MinTimestamp: makeTSWithFlag(10, 11),
WriteTimestamp: makeSynTS(20, 21),
MinTimestamp: makeSynTS(10, 11),
Priority: 957356782,
Sequence: 123,
},
Name: "name",
Status: COMMITTED,
LastHeartbeat: makeTSWithFlag(1, 2),
ReadTimestamp: makeTSWithFlag(20, 22),
MaxTimestamp: makeTSWithFlag(40, 41),
ObservedTimestamps: []ObservedTimestamp{{NodeID: 1, Timestamp: makeClockTSWithFlag(1, 2)}},
Name: "name",
Status: COMMITTED,
LastHeartbeat: makeSynTS(1, 2),
ReadTimestamp: makeSynTS(20, 22),
MaxTimestamp: makeSynTS(40, 41),
ObservedTimestamps: []ObservedTimestamp{{
NodeID: 1,
Timestamp: hlc.ClockTimestamp{
WallTime: 1,
Logical: 2,
Synthetic: true, // normally not set, but needed for zerofields.NoZeroField
},
}},
WriteTooOld: true,
LockSpans: []Span{{Key: []byte("a"), EndKey: []byte("b")}},
InFlightWrites: []SequencedWrite{{Key: []byte("c"), Sequence: 1}},
Expand Down
4 changes: 2 additions & 2 deletions pkg/roachpb/string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func TestTransactionString(t *testing.T) {
Status: roachpb.COMMITTED,
LastHeartbeat: hlc.Timestamp{WallTime: 10, Logical: 11},
ReadTimestamp: hlc.Timestamp{WallTime: 30, Logical: 31},
MaxTimestamp: hlc.Timestamp{WallTime: 40, Logical: 41},
MaxTimestamp: hlc.Timestamp{WallTime: 40, Logical: 41, Synthetic: true},
}
expStr := `"name" meta={id=d7aa0f5e key="foo" pri=44.58039917 epo=2 ts=0.000000020,21 min=0.000000010,11 seq=15}` +
` lock=true stat=COMMITTED rts=0.000000030,31 wto=false max=0.000000040,41`
` lock=true stat=COMMITTED rts=0.000000030,31 wto=false max=0.000000040,41?`

if str := txn.String(); str != expStr {
t.Errorf(
Expand Down
Loading

0 comments on commit 086903e

Please sign in to comment.