Skip to content

Commit

Permalink
Merge #58349 #58456
Browse files Browse the repository at this point in the history
58349: hlc: strongly type ClockTimestamp as specialization of Timestamp r=nvanbenschoten a=nvanbenschoten

Closes #57684 - no need to panic, type system now protects us.

This PR splits off a new `hlc.ClockTimestamp` type from the existing `hlc.Timestamp` type through a type alias. While the two types share the same memory and proto representation, they have different purposes and properties. `Timestamp` serves the role of representing an arbitrary timestamp, one that makes no claim about real-time. `ClockTimestamp` serves the role of representing a real timestamp pulled from one of the HLC clocks in the system. Because of this, it has the added capability to update a peer's HLC clock. As such, a clock timestamp is a promise that some node in the system has a clock with a reading equal to or above its value.

The PR also moves to a world where the `ClockTimestamp` specialization is maintained through a combination of static and dynamic typing. While the primary mechanisms that use `ClockTimestamps` will use static typing, Timestamp will also carry a bit indicating whether it can be downcast to a `ClockTimestamp`. This bit will replace the current flag structure. So instead of an interaction like the one introduced in #57077 checking whether the value has an arbitrary "synthetic" flag set, it will instead check whether the value has a "clock timestamp" (same mechanism, but slightly more consistent meaning).

This serves as an alternative to an approach like the one in #58213, where we split the types in the other direction, keeping `Timestamp` to represent a clock timestamp and introduce a new `enginepb.TxnTimestamp` to represent an arbitrary MVCC timestamp. That change resulted in a significantly larger diff, as it misjudged the extremely small percent of all `Timestamp` usages which care about the capability of updating remote HLC clocks. It also forced all users (tests, etc.) of timestamps to address this question front-and-center, which had benefits but was also a burden on all uses of timestamps.

The original intention of this change was to follow it up by inverting the synthetic flag on `Timestamp`, replacing an "is_synthetic" bit with a "from_clock" bit. While inverting the flag optimized the encoded size of non-clock (currently synthetic) timestamps at the expense of the encoded size of clock timestamps by 2 bytes, it came with major benefits. By making clock timestamps opt-in instead of opt-out, we more closely match the capability model we're trying to establish here with static typing, where a clock timestamp can do everything a normal timestamp can, but can also be used to update an HLC clock. The opt-in nature mitigated the risk of bugs that forget to set this flag correctly. Instead of risking a capability escalation where a non-clock timestamp is incorrectly interpreted as a clock timestamp and used to update an HLC clock, we risked a much less harmful capability de-escalation where a clock timestamp loses its ability to update an HLC clock. We could then much more carefully audit the cases where the flag needs to be unset, such as in the `Timestamp.Add` and `Timestamp.Forward` methods.

Unfortunately, this "from_clock" approach (attempted in 4e34e20) came with serious complications as well, which became clear only after making the change. Chiefly, the approach made the mixed-version migration of this field significantly more difficult. With v20.2 nodes unaware of the flag, v21.1 nodes would need to find a way to either auto-assign it for all timestamps coming from v21.1 nodes, or to work around its absence. But this latter idea touches on a second complication – the absence of the field resulted (intentionally) in an MVCC encoding that v20.2 nodes would be unable to decode. So v21.1 nodes would need to be very careful to always set the "from_clock" flag on all timestamps when in a mixed version cluster. But this all made the migration towards not setting the flag once in a v21.1-only cluster even more difficult. In the end, opting into "is_synthetic" is a lot easier to work with than opting into "from_clock", so that's how the rest of this PR operates.

58456: build: fix Pebble nightly benchmarks r=jbowens a=jbowens



Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Jackson Owens <[email protected]>
  • Loading branch information
3 people committed Jan 5, 2021
3 parents 99c97d0 + b96f85c + 72ed74e commit a148c4f
Show file tree
Hide file tree
Showing 138 changed files with 3,882 additions and 3,670 deletions.
5 changes: 4 additions & 1 deletion build/teamcity-nightly-pebble.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ make bin/roachprod bin/roachtest

rm -fr vendor/github.com/cockroachdb/pebble
git clone https://github.com/cockroachdb/pebble vendor/github.com/cockroachdb/pebble
GOOS=linux go build -v -mod=vendor -o pebble.linux github.com/cockroachdb/pebble/cmd/pebble
pushd vendor/github.com/cockroachdb/pebble
GOOS=linux go build -v -mod=vendor -o pebble.linux ./cmd/pebble
popd
mv vendor/github.com/cockroachdb/pebble/pebble.linux .
export PEBBLE_BIN=pebble.linux

# NB: We specify "true" for the --cockroach and --workload binaries to
Expand Down
1 change: 1 addition & 0 deletions docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pkg/sql/sem/tree/table_ref.go | `ColumnID`
pkg/storage/enginepb/mvcc3.go | `MVCCStatsDelta`
pkg/storage/enginepb/mvcc3.go | `*MVCCStats`
pkg/util/hlc/timestamp.go | `Timestamp`
pkg/util/hlc/timestamp.go | `ClockTimestamp`
pkg/util/log/redact.go | `reflect.TypeOf(true)`
pkg/util/log/redact.go | `reflect.TypeOf(123)`
pkg/util/log/redact.go | `reflect.TypeOf(int8(0))`
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/changefeedccl/kvfeed/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ var memBufferColTypes = []*types.T{
types.Bytes, // span.EndKey
types.Int, // ts.WallTime
types.Int, // ts.Logical
types.Int, // ts.Flags
}

// memBuffer is an in-memory buffer for changed KV and Resolved timestamp
Expand Down Expand Up @@ -267,7 +266,6 @@ func (b *memBuffer) AddKV(
tree.DNull,
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.WallTime)),
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.Logical)),
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.Flags)),
}
b.allocMu.Unlock()
return b.addRow(ctx, row)
Expand All @@ -286,7 +284,6 @@ func (b *memBuffer) AddResolved(
b.allocMu.a.NewDBytes(tree.DBytes(span.EndKey)),
b.allocMu.a.NewDInt(tree.DInt(ts.WallTime)),
b.allocMu.a.NewDInt(tree.DInt(ts.Logical)),
b.allocMu.a.NewDInt(tree.DInt(ts.Flags)),
}
b.allocMu.Unlock()
return b.addRow(ctx, row)
Expand All @@ -303,7 +300,6 @@ func (b *memBuffer) Get(ctx context.Context) (Event, error) {
ts := hlc.Timestamp{
WallTime: int64(*row[5].(*tree.DInt)),
Logical: int32(*row[6].(*tree.DInt)),
Flags: uint32(*row[7].(*tree.DInt)),
}
if row[2] != tree.DNull {
e.prevVal = roachpb.Value{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func (s *bufferSink) EmitRow(
{Datum: tree.DNull}, // resolved span
{Datum: s.alloc.NewDString(tree.DString(topic))}, // topic
{Datum: s.alloc.NewDBytes(tree.DBytes(key))}, // key
{Datum: s.alloc.NewDBytes(tree.DBytes(value))}, //value
{Datum: s.alloc.NewDBytes(tree.DBytes(value))}, // value
})
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,8 +936,8 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
for i, test := range directCases {
t.Run(fmt.Sprintf("direct-txn-%d", i), func(t *testing.T) {
db := setup(test.nodeID)
now := db.Clock().Now()
kvTxn := roachpb.MakeTransaction("unnamed", nil /*baseKey*/, roachpb.NormalUserPriority, now, db.Clock().MaxOffset().Nanoseconds())
now := db.Clock().NowAsClockTimestamp()
kvTxn := roachpb.MakeTransaction("unnamed", nil /*baseKey*/, roachpb.NormalUserPriority, now.ToTimestamp(), db.Clock().MaxOffset().Nanoseconds())
txn := kv.NewTxnFromProto(ctx, db, test.nodeID, now, test.typ, &kvTxn)
ots := txn.TestingCloneTxn().ObservedTimestamps
if (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ func TestMultiRangeScanReverseScanInconsistent(t *testing.T) {
t.Fatal(err)
}
ts[i] = s.Clock().Now()
log.Infof(ctx, "%d: %s %d", i, key, ts[i])
log.Infof(ctx, "%d: %s %s", i, key, ts[i])
if i == 0 {
testutils.SucceedsSoon(t, func() error {
// Enforce that when we write the second key, it's written
Expand Down Expand Up @@ -1535,8 +1535,8 @@ func TestPropagateTxnOnError(t *testing.T) {
// response that does not result in an error. Even though the batch as a
// whole results in an error, the transaction should still propagate this
// information.
ot1 := roachpb.ObservedTimestamp{NodeID: 7, Timestamp: hlc.Timestamp{WallTime: 15}}
ot2 := roachpb.ObservedTimestamp{NodeID: 8, Timestamp: hlc.Timestamp{WallTime: 16}}
ot1 := roachpb.ObservedTimestamp{NodeID: 7, Timestamp: hlc.ClockTimestamp{WallTime: 15}}
ot2 := roachpb.ObservedTimestamp{NodeID: 8, Timestamp: hlc.ClockTimestamp{WallTime: 16}}
containsObservedTSs := func(txn *roachpb.Transaction) bool {
contains := func(ot roachpb.ObservedTimestamp) bool {
for _, ts := range txn.ObservedTimestamps {
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func TestImmutableBatchArgs(t *testing.T) {

// An optimization does copy-on-write if we haven't observed anything,
// so make sure we're not in that case.
txn.UpdateObservedTimestamp(1, hlc.MaxTimestamp)
txn.UpdateObservedTimestamp(1, hlc.MaxClockTimestamp)

put := roachpb.NewPut(roachpb.Key("don't"), roachpb.Value{})
if _, pErr := kv.SendWrappedWith(context.Background(), ds, roachpb.Header{
Expand Down Expand Up @@ -2180,20 +2180,20 @@ func TestClockUpdateOnResponse(t *testing.T) {

// Prepare the test function
put := roachpb.NewPut(roachpb.Key("a"), roachpb.MakeValueFromString("value"))
doCheck := func(sender kv.Sender, fakeTime hlc.Timestamp) {
doCheck := func(sender kv.Sender, fakeTime hlc.ClockTimestamp) {
ds.transportFactory = SenderTransportFactory(tracing.NewTracer(), sender)
_, err := kv.SendWrapped(context.Background(), ds, put)
if err != nil && err != expectedErr {
t.Fatal(err)
}
newTime := ds.clock.Now()
newTime := ds.clock.NowAsClockTimestamp()
if newTime.Less(fakeTime) {
t.Fatalf("clock was not advanced: expected >= %s; got %s", fakeTime, newTime)
}
}

// Test timestamp propagation on valid BatchResults.
fakeTime := ds.clock.Now().Add(10000000000 /*10s*/, 0)
fakeTime := ds.clock.Now().Add(10000000000 /*10s*/, 0).UnsafeToClockTimestamp()
replyNormal := kv.SenderFunc(
func(_ context.Context, args roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
rb := args.CreateReply()
Expand All @@ -2203,7 +2203,7 @@ func TestClockUpdateOnResponse(t *testing.T) {
doCheck(replyNormal, fakeTime)

// Test timestamp propagation on errors.
fakeTime = ds.clock.Now().Add(10000000000 /*10s*/, 0)
fakeTime = ds.clock.Now().Add(10000000000 /*10s*/, 0).UnsafeToClockTimestamp()
replyError := kv.SenderFunc(
func(_ context.Context, _ roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
pErr := expectedErr
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,8 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()
origTS := makeTS(123, 0)
plus10 := origTS.Add(10, 10)
plus20 := origTS.Add(20, 0)
plus10 := origTS.Add(10, 10).WithSynthetic(false)
plus20 := origTS.Add(20, 0).WithSynthetic(false)
testCases := []struct {
// The test's name.
name string
Expand Down Expand Up @@ -726,7 +726,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
name: "ReadWithinUncertaintyIntervalError",
pErrGen: func(txn *roachpb.Transaction) *roachpb.Error {
const nodeID = 1
txn.UpdateObservedTimestamp(nodeID, plus10)
txn.UpdateObservedTimestamp(nodeID, plus10.UnsafeToClockTimestamp())
pErr := roachpb.NewErrorWithTxn(
roachpb.NewReadWithinUncertaintyIntervalError(
hlc.Timestamp{}, hlc.Timestamp{}, nil),
Expand Down Expand Up @@ -818,12 +818,12 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
)
db := kv.NewDB(ambient, tsf, clock, stopper)
key := roachpb.Key("test-key")
now := clock.Now()
now := clock.NowAsClockTimestamp()
origTxnProto := roachpb.MakeTransaction(
"test txn",
key,
roachpb.UserPriority(0),
now,
now.ToTimestamp(),
clock.MaxOffset().Nanoseconds(),
)
// TODO(andrei): I've monkeyed with the priorities on this initial
Expand Down Expand Up @@ -1270,7 +1270,7 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
const nodeID = 0
// ReadWithinUncertaintyIntervalErrors need a clock to have been
// recorded on the origin.
txn.UpdateObservedTimestamp(nodeID, makeTS(123, 0))
txn.UpdateObservedTimestamp(nodeID, makeTS(123, 0).UnsafeToClockTimestamp())
return roachpb.NewErrorWithTxn(
roachpb.NewReadWithinUncertaintyIntervalError(hlc.Timestamp{}, hlc.Timestamp{}, nil),
&txn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
defer log.Scope(t).Close(t)

txn := makeTxnProto()
txn.UpdateObservedTimestamp(1, txn.WriteTimestamp.Add(20, 0))
txn.UpdateObservedTimestamp(1, txn.WriteTimestamp.Add(20, 0).UnsafeToClockTimestamp())
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")

cases := []struct {
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
return pErr
},
expRefresh: true,
expRefreshTS: txn.WriteTimestamp.Add(20, 0), // see UpdateObservedTimestamp
expRefreshTS: txn.WriteTimestamp.Add(20, 0).WithSynthetic(false), // see UpdateObservedTimestamp
},
{
pErr: func() *roachpb.Error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func RequestLease(
// The bug prevented with this is unlikely to occur in practice
// since earlier commands usually apply before this lease will.
if ts := args.MinProposedTS; isExtension && ts != nil {
effectiveStart.Forward(*ts)
effectiveStart.Forward(ts.ToTimestamp())
}

} else if prevLease.Type() == roachpb.LeaseExpiration {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ func Subsume(

reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
reply.LeaseAppliedIndex = lai
reply.FreezeStart = cArgs.EvalCtx.Clock().Now()
reply.FreezeStart = cArgs.EvalCtx.Clock().NowAsClockTimestamp()

return result.Result{
Local: result.LocalResult{FreezeStart: reply.FreezeStart},
Local: result.LocalResult{FreezeStart: reply.FreezeStart.ToTimestamp()},
}, nil
}
14 changes: 11 additions & 3 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
populatedConstructor: func(r *rand.Rand) protoutil.Message {
m := enginepb.NewPopulatedMVCCMetadata(r, false)
m.Txn = nil // never populated below Raft
m.Timestamp.Synthetic = nil // never populated below Raft
if m.MergeTimestamp != nil {
m.MergeTimestamp.Synthetic = nil // never populated below Raft
}
m.TxnDidNotUpdateMeta = nil // never populated below Raft
return m
},
emptySum: 7551962144604783939,
populatedSum: 11599955036265189084,
populatedSum: 12366000535951165621,
},
reflect.TypeOf(&enginepb.RangeAppliedState{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
Expand Down Expand Up @@ -124,10 +128,14 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
},
reflect.TypeOf(&enginepb.MVCCMetadataSubsetForMergeSerialization{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
return enginepb.NewPopulatedMVCCMetadataSubsetForMergeSerialization(r, false)
m := enginepb.NewPopulatedMVCCMetadataSubsetForMergeSerialization(r, false)
if m.MergeTimestamp != nil {
m.MergeTimestamp.Synthetic = nil // never populated below Raft
}
return m
},
emptySum: 14695981039346656037,
populatedSum: 834545685817460463,
populatedSum: 6109178572734990978,
},
}

Expand Down
114 changes: 104 additions & 10 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,103 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
)

// TestRangeCommandClockUpdate verifies that followers update their
// clocks when executing a command, even if the lease holder's clock is far
// in the future.
func TestRangeCommandClockUpdate(t *testing.T) {
// TestReplicaClockUpdates verifies that the leaseholder and followers both
// update their clocks when executing a command to the command's timestamp, as
// long as the request timestamp is from a clock (i.e. is not synthetic).
func TestReplicaClockUpdates(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

run := func(t *testing.T, write bool, synthetic bool) {
const numNodes = 3
const maxOffset = 100 * time.Millisecond
var manuals []*hlc.ManualClock
var clocks []*hlc.Clock
for i := 0; i < numNodes; i++ {
manuals = append(manuals, hlc.NewManualClock(1))
clocks = append(clocks, hlc.NewClock(manuals[i].UnixNano, maxOffset))
}
ctx := context.Background()
cfg := kvserver.TestStoreConfig(nil)
cfg.TestingKnobs.DisableReplicateQueue = true
cfg.Clock = nil
mtc := &multiTestContext{
storeConfig: &cfg,
clocks: clocks,
// This test was written before the multiTestContext started creating many
// system ranges at startup, and hasn't been update to take that into
// account.
startWithSingleRange: true,
}
defer mtc.Stop()
mtc.Start(t, numNodes)
mtc.replicateRange(1, 1, 2)

// Pick a timestamp in the future of all nodes by less than the
// MaxOffset. Set the synthetic flag according to the test case.
reqTS := clocks[0].Now().Add(int64(maxOffset/2), 0).WithSynthetic(synthetic)
h := roachpb.Header{Timestamp: reqTS}

// Execute the command.
var req roachpb.Request
reqKey := roachpb.Key("a")
if write {
req = incrementArgs(reqKey, 5)
} else {
req = getArgs(reqKey)
}
if _, err := kv.SendWrappedWith(ctx, mtc.stores[0].TestSender(), h, req); err != nil {
t.Fatal(err)
}

// If writing, wait for that command to execute on all the replicas.
// Consensus is asynchronous outside of the majority quorum, and Raft
// application is asynchronous on all nodes.
if write {
testutils.SucceedsSoon(t, func() error {
var values []int64
for _, eng := range mtc.engines {
val, _, err := storage.MVCCGet(ctx, eng, reqKey, reqTS, storage.MVCCGetOptions{})
if err != nil {
return err
}
values = append(values, mustGetInt(val))
}
if !reflect.DeepEqual(values, []int64{5, 5, 5}) {
return errors.Errorf("expected (5, 5, 5), got %v", values)
}
return nil
})
}

// Verify that clocks were updated as expected. Check all clocks if we
// issued a write, but only the leaseholder's if we issued a read. In
// theory, we should be able to assert that _only_ the leaseholder's
// clock is updated by a read, but in practice an assertion against
// followers' clocks being updated is very difficult to make without
// being flaky because it's difficult to prevent other channels
// (background work, etc.) from carrying the clock update.
expUpdated := !synthetic
clocksToCheck := clocks
if !write {
clocksToCheck = clocks[:1]
}
for _, c := range clocksToCheck {
require.Equal(t, expUpdated, reqTS.Less(c.Now()))
}
}

testutils.RunTrueAndFalse(t, "write", func(t *testing.T, write bool) {
testutils.RunTrueAndFalse(t, "synthetic", func(t *testing.T, synthetic bool) {
run(t, write, synthetic)
})
})
}

// TestFollowersDontRejectClockUpdateWithJump verifies that followers update
// their clocks when executing a command, even if the leaseholder's clock is
// far in the future.
func TestFollowersDontRejectClockUpdateWithJump(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand Down Expand Up @@ -128,9 +221,9 @@ func TestRangeCommandClockUpdate(t *testing.T) {
}
}

// TestRejectFutureCommand verifies that lease holders reject commands that
// would cause a large time jump.
func TestRejectFutureCommand(t *testing.T) {
// TestLeaseholdersRejectClockUpdateWithJump verifies that leaseholders reject
// commands that would cause a large time jump.
func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -151,7 +244,7 @@ func TestRejectFutureCommand(t *testing.T) {
const numCmds = 3
clockOffset := clock.MaxOffset() / numCmds
for i := int64(1); i <= numCmds; i++ {
ts := ts1.Add(i*clockOffset.Nanoseconds(), 0)
ts := ts1.Add(i*clockOffset.Nanoseconds(), 0).WithSynthetic(false)
if _, err := kv.SendWrappedWith(context.Background(), mtc.stores[0].TestSender(), roachpb.Header{Timestamp: ts}, incArgs); err != nil {
t.Fatal(err)
}
Expand All @@ -163,7 +256,8 @@ func TestRejectFutureCommand(t *testing.T) {
}

// Once the accumulated offset reaches MaxOffset, commands will be rejected.
_, pErr := kv.SendWrappedWith(context.Background(), mtc.stores[0].TestSender(), roachpb.Header{Timestamp: ts1.Add(clock.MaxOffset().Nanoseconds()+1, 0)}, incArgs)
tsFuture := ts1.Add(clock.MaxOffset().Nanoseconds()+1, 0).WithSynthetic(false)
_, pErr := kv.SendWrappedWith(context.Background(), mtc.stores[0].TestSender(), roachpb.Header{Timestamp: tsFuture}, incArgs)
if !testutils.IsPError(pErr, "remote wall time is too far ahead") {
t.Fatalf("unexpected error %v", pErr)
}
Expand Down Expand Up @@ -953,7 +1047,7 @@ func TestRangeLimitTxnMaxTimestamp(t *testing.T) {
// Start a transaction using node2 as a gateway.
txn := roachpb.MakeTransaction("test", keyA, 1, clock2.Now(), 250 /* maxOffsetNs */)
// Simulate a read to another range on node2 by setting the observed timestamp.
txn.UpdateObservedTimestamp(2, clock2.Now())
txn.UpdateObservedTimestamp(2, clock2.NowAsClockTimestamp())

defer mtc.Stop()
mtc.Start(t, 2)
Expand Down
Loading

0 comments on commit a148c4f

Please sign in to comment.