Skip to content

Commit

Permalink
hlc: introduce synthetic flag on timestamps
Browse files Browse the repository at this point in the history
Informs #52745.
Informs #36431.

This commit introduces an 8-bit flags field on the hlc timestamp struct.
The flags are used to provide details about the timestamp and its
meaning. They do not affect the sort order of Timestamps.

The commit then introduces the first flag: SYNTHETIC. As discussed
in #52745, a synthetic timestamp is defined as a timestamp that makes no
claim about the value of clocks in the system. While standard timestamps
are pulled from HLC clocks and indicate that some node in the system has
a clock with a reading equal to or above its value, a synthetic
timestamp makes no such indication. By avoiding a connection to "real
time", synthetic timestamps can be used to write values at a future time
and to indicate that observed timestamps do not apply to such writes for
the purposes of tracking causality between the write and its observers.
Observed timestamps will be a critical part of implementing non-blocking
transactions (#52745) and fixing the interaction between observed
timestamps and transaction refreshing (#36431).

The original plan was to reserve the high-order bit in the logical
portion of a timestamp as a "synthetic bit". This is how I began
implementing things, but was turned off for a few reasons. First, it was
fairly subtle and seemed too easy to get wrong. Using a separate field
is more explicit and avoids a class of bugs. Second, I began to have
serious concerns about how the synthetic bit would impact timestamp
ordering. Every timestamp comparison would need to mask out the bit or
risk being incorrect. This was even true of the LSM custom comparator.
This seemed difficult to get right, and seemed particularly concerning
since we're planning on marking only some of a transaction's committed
values as synthetic to fix #36431, so if we weren't careful, we could
get atomicity violations. There were also minor backwards compatibility
concerns.

But a separate field is more expensive in theory, so we need to be
careful. However, it turns out that a separate field is mostly free in
each case that we care about. In memory, the separate field is
effectively free because the Timestamp struct was previously 12 bytes
but was always padded out to 16 bytes when included as a field in any
other struct. This means that the flags field is replacing existing
padding. Over the wire, the field will not be included when zero and
will use a varint encoding when not zero, so again, it is mostly free.
In the engine key encoding, the field is also not included when zero,
and takes up only 1 byte when non-zero, so it is mostly free.
  • Loading branch information
nvanbenschoten committed Nov 12, 2020
1 parent df5e066 commit 11abdda
Showing 36 changed files with 552 additions and 146 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/buffer.go
Original file line number Diff line number Diff line change
@@ -211,6 +211,7 @@ var memBufferColTypes = []*types.T{
types.Bytes, // span.EndKey
types.Int, // ts.WallTime
types.Int, // ts.Logical
types.Int, // ts.Flags
}

// memBuffer is an in-memory buffer for changed KV and Resolved timestamp
@@ -266,6 +267,7 @@ func (b *memBuffer) AddKV(
tree.DNull,
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.WallTime)),
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.Logical)),
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.Flags)),
}
b.allocMu.Unlock()
return b.addRow(ctx, row)
@@ -284,6 +286,7 @@ func (b *memBuffer) AddResolved(
b.allocMu.a.NewDBytes(tree.DBytes(span.EndKey)),
b.allocMu.a.NewDInt(tree.DInt(ts.WallTime)),
b.allocMu.a.NewDInt(tree.DInt(ts.Logical)),
b.allocMu.a.NewDInt(tree.DInt(ts.Flags)),
}
b.allocMu.Unlock()
return b.addRow(ctx, row)
@@ -300,6 +303,7 @@ func (b *memBuffer) Get(ctx context.Context) (Event, error) {
ts := hlc.Timestamp{
WallTime: int64(*row[5].(*tree.DInt)),
Logical: int32(*row[6].(*tree.DInt)),
Flags: uint32(*row[7].(*tree.DInt)),
}
if row[2] != tree.DNull {
e.prevVal = roachpb.Value{
6 changes: 1 addition & 5 deletions pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
@@ -581,11 +581,7 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
}
batch.Close()

sort.Slice(timestamps, func(i, j int) bool {
return (timestamps[i].WallTime < timestamps[j].WallTime) ||
(timestamps[i].WallTime == timestamps[j].WallTime &&
timestamps[i].Logical < timestamps[j].Logical)
})
sort.Slice(timestamps, func(i, j int) bool { return timestamps[i].Less(timestamps[j]) })
return keys, timestamps
}

4 changes: 2 additions & 2 deletions pkg/jobs/helpers.go
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ func NewFakeNodeLiveness(nodeCount int) *FakeNodeLiveness {
nodeID := roachpb.NodeID(i + 1)
nl.mu.livenessMap[nodeID] = &livenesspb.Liveness{
Epoch: 1,
Expiration: hlc.LegacyTimestamp(hlc.MaxTimestamp),
Expiration: hlc.MaxTimestamp.ToLegacyTimestamp(),
NodeID: nodeID,
}
}
@@ -113,7 +113,7 @@ func (nl *FakeNodeLiveness) FakeIncrementEpoch(id roachpb.NodeID) {
func (nl *FakeNodeLiveness) FakeSetExpiration(id roachpb.NodeID, ts hlc.Timestamp) {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.livenessMap[id].Expiration = hlc.LegacyTimestamp(ts)
nl.mu.livenessMap[id].Expiration = ts.ToLegacyTimestamp()
}

// ResetConstructors resets the registered Resumer constructors.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
return m
},
emptySum: 7551962144604783939,
populatedSum: 12720006657210437557,
populatedSum: 5737658018003400959,
},
reflect.TypeOf(&enginepb.RangeAppliedState{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
@@ -124,7 +124,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
return enginepb.NewPopulatedMVCCMetadataSubsetForMergeSerialization(r, false)
},
emptySum: 14695981039346656037,
populatedSum: 7432412240713840291,
populatedSum: 834545685817460463,
},
}

2 changes: 1 addition & 1 deletion pkg/kv/kvserver/gc/gc.go
Original file line number Diff line number Diff line change
@@ -250,7 +250,7 @@ func processReplicatedKeyRange(
if meta.Txn != nil {
// Keep track of intent to resolve if older than the intent
// expiration threshold.
if hlc.Timestamp(meta.Timestamp).Less(intentExp) {
if meta.Timestamp.ToTimestamp().Less(intentExp) {
txnID := meta.Txn.ID
if _, ok := txnMap[txnID]; !ok {
txnMap[txnID] = &roachpb.Transaction{
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/gc/gc_old_test.go
Original file line number Diff line number Diff line change
@@ -101,7 +101,7 @@ func runGCOld(
if meta.Txn != nil {
// Keep track of intent to resolve if older than the intent
// expiration threshold.
if hlc.Timestamp(meta.Timestamp).Less(intentExp) {
if meta.Timestamp.ToTimestamp().Less(intentExp) {
txnID := meta.Txn.ID
if _, ok := txnMap[txnID]; !ok {
txnMap[txnID] = &roachpb.Transaction{
6 changes: 2 additions & 4 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
@@ -825,8 +825,7 @@ func (nl *NodeLiveness) heartbeatInternal(
// [*]: see TODO below about how errNodeAlreadyLive handling does not
// enforce this guarantee.
beforeQueueTS := nl.clock.Now()
minExpiration := hlc.LegacyTimestamp(
beforeQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0))
minExpiration := beforeQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0).ToLegacyTimestamp()

// Before queueing, record the heartbeat as in-flight.
nl.metrics.HeartbeatsInFlight.Inc(1)
@@ -873,8 +872,7 @@ func (nl *NodeLiveness) heartbeatInternal(
// Grab a new clock reading to compute the new expiration time,
// since we may have queued on the semaphore for a while.
afterQueueTS := nl.clock.Now()
newLiveness.Expiration = hlc.LegacyTimestamp(
afterQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0))
newLiveness.Expiration = afterQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0).ToLegacyTimestamp()
// This guards against the system clock moving backwards. As long
// as the cockroach process is running, checks inside hlc.Clock
// will ensure that the clock never moves backwards, but these
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/liveness/liveness_test.go
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@ func TestShouldReplaceLiveness(t *testing.T) {
l := func(epo int64, expiration hlc.Timestamp, draining bool, membership string) Record {
liveness := livenesspb.Liveness{
Epoch: epo,
Expiration: hlc.LegacyTimestamp(expiration),
Expiration: expiration.ToLegacyTimestamp(),
Draining: draining,
Membership: toMembershipStatus(membership),
}
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/node_liveness_test.go
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -196,7 +195,7 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) {
livenessAfter, found := nl.Self()
assert.True(t, found)
exp := livenessAfter.Expiration
minExp := hlc.LegacyTimestamp(before.Add(nlActive.Nanoseconds(), 0))
minExp := before.Add(nlActive.Nanoseconds(), 0).ToLegacyTimestamp()
if exp.Less(minExp) {
return errors.Errorf("expected min expiration %v, found %v", minExp, exp)
}
@@ -958,7 +957,7 @@ func TestNodeLivenessDecommissionAbsent(t *testing.T) {
if err := mtc.dbs[0].CPut(ctx, keys.NodeLivenessKey(goneNodeID), &livenesspb.Liveness{
NodeID: goneNodeID,
Epoch: 1,
Expiration: hlc.LegacyTimestamp(mtc.clock().Now()),
Expiration: mtc.clock().Now().ToLegacyTimestamp(),
Membership: livenesspb.MembershipStatus_ACTIVE,
}, nil); err != nil {
t.Fatal(err)
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
@@ -348,7 +348,7 @@ func (r *registration) maybeRunCatchupScan() error {
// immediately after) the provisional key.
catchupIter.SeekGE(storage.MVCCKey{
Key: unsafeKey.Key,
Timestamp: hlc.Timestamp(meta.Timestamp).Prev(),
Timestamp: meta.Timestamp.ToTimestamp().Prev(),
})
continue
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
@@ -105,7 +105,7 @@ func newTestIterator(kvs []storage.MVCCKeyValue) *testIterator {
}
expNextKey := storage.MVCCKey{
Key: kv.Key.Key,
Timestamp: hlc.Timestamp(meta.Timestamp),
Timestamp: meta.Timestamp.ToTimestamp(),
}
if !kvs[i].Key.Equal(expNextKey) {
panic(missingErr)
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
@@ -612,7 +612,7 @@ func (r *Replica) sha512(
if _, err := hasher.Write(unsafeKey.Key); err != nil {
return err
}
legacyTimestamp = hlc.LegacyTimestamp(unsafeKey.Timestamp)
legacyTimestamp = unsafeKey.Timestamp.ToLegacyTimestamp()
if size := legacyTimestamp.Size(); size > cap(timestampBuf) {
timestampBuf = make([]byte, size)
} else {
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
@@ -563,7 +563,7 @@ func (r *Replica) leaseStatus(
status.State = kvserverpb.LeaseState_EXPIRED
return status
}
expiration = hlc.Timestamp(status.Liveness.Expiration)
expiration = status.Liveness.Expiration.ToTimestamp()
}
maxOffset := r.store.Clock().MaxOffset()
stasis := expiration.Add(-int64(maxOffset), 0)
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
@@ -2931,7 +2931,7 @@ func TestReplicaTSCacheForwardsIntentTS(t *testing.T) {
} else if err := iter.ValueProto(&keyMeta); err != nil {
t.Fatalf("failed to unmarshal metadata for %q", mvccKey)
}
if tsNext := tsNew.Next(); hlc.Timestamp(keyMeta.Timestamp) != tsNext {
if tsNext := tsNew.Next(); keyMeta.Timestamp.ToTimestamp() != tsNext {
t.Errorf("timestamp not forwarded for %q intent: expected %s but got %s",
key, tsNext, keyMeta.Timestamp)
}
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/tscache/interval_skl.go
Original file line number Diff line number Diff line change
@@ -821,6 +821,8 @@ func (p *sklPage) ratchetMaxTimestamp(ts hlc.Timestamp) {
new++
}

// TODO(nvanbenschoten): propagate the timestamp synthetic bit through the
// page's max time.
for {
old := atomic.LoadInt64(&p.maxWallTime)
if new <= old {
@@ -1135,12 +1137,12 @@ func decodeValueSet(b []byte, meta uint16) (keyVal, gapVal cacheValue) {
}

func encodeValueSet(b []byte, keyVal, gapVal cacheValue) (ret []byte, meta uint16) {
if keyVal.ts.WallTime != 0 || keyVal.ts.Logical != 0 {
if !keyVal.ts.IsEmpty() {
b = encodeValue(b, keyVal)
meta |= hasKey
}

if gapVal.ts.WallTime != 0 || gapVal.ts.Logical != 0 {
if !gapVal.ts.IsEmpty() {
b = encodeValue(b, gapVal)
meta |= hasGap
}
@@ -1150,6 +1152,7 @@ func encodeValueSet(b []byte, keyVal, gapVal cacheValue) (ret []byte, meta uint1
}

func decodeValue(b []byte) (ret []byte, val cacheValue) {
// TODO(nvanbenschoten): decode the timestamp synthetic bit.
val.ts.WallTime = int64(binary.BigEndian.Uint64(b))
val.ts.Logical = int32(binary.BigEndian.Uint32(b[8:]))
var err error
@@ -1161,6 +1164,7 @@ func decodeValue(b []byte) (ret []byte, val cacheValue) {
}

func encodeValue(b []byte, val cacheValue) []byte {
// TODO(nvanbenschoten): encode the timestamp synthetic bit.
l := len(b)
b = b[:l+encodedValSize]
binary.BigEndian.PutUint64(b[l:], uint64(val.ts.WallTime))
2 changes: 1 addition & 1 deletion pkg/roachpb/data.go
Original file line number Diff line number Diff line change
@@ -1098,7 +1098,7 @@ func (t *Transaction) Update(o *Transaction) {
// Nothing to do.
}

if t.ReadTimestamp.Equal(o.ReadTimestamp) {
if t.ReadTimestamp == o.ReadTimestamp {
// If neither of the transactions has a bumped ReadTimestamp, then the
// WriteTooOld flag is cumulative.
t.WriteTooOld = t.WriteTooOld || o.WriteTooOld
16 changes: 10 additions & 6 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
@@ -46,6 +46,10 @@ func makeTS(walltime int64, logical int32) hlc.Timestamp {
}
}

func makeTSWithFlag(walltime int64, logical int32) hlc.Timestamp {
return makeTS(walltime, logical).SetFlag(hlc.TimestampFlag_SYNTHETIC)
}

// TestKeyNext tests that the method for creating lexicographic
// successors to byte slices works as expected.
func TestKeyNext(t *testing.T) {
@@ -457,17 +461,17 @@ var nonZeroTxn = Transaction{
Key: Key("foo"),
ID: uuid.MakeV4(),
Epoch: 2,
WriteTimestamp: makeTS(20, 21),
MinTimestamp: makeTS(10, 11),
WriteTimestamp: makeTSWithFlag(20, 21),
MinTimestamp: makeTSWithFlag(10, 11),
Priority: 957356782,
Sequence: 123,
},
Name: "name",
Status: COMMITTED,
LastHeartbeat: makeTS(1, 2),
ReadTimestamp: makeTS(20, 22),
MaxTimestamp: makeTS(40, 41),
ObservedTimestamps: []ObservedTimestamp{{NodeID: 1, Timestamp: makeTS(1, 2)}},
LastHeartbeat: makeTSWithFlag(1, 2),
ReadTimestamp: makeTSWithFlag(20, 22),
MaxTimestamp: makeTSWithFlag(40, 41),
ObservedTimestamps: []ObservedTimestamp{{NodeID: 1, Timestamp: makeTSWithFlag(1, 2)}},
WriteTooOld: true,
LockSpans: []Span{{Key: []byte("a"), EndKey: []byte("b")}},
InFlightWrites: []SequencedWrite{{Key: []byte("c"), Sequence: 1}},
2 changes: 1 addition & 1 deletion pkg/server/admin_test.go
Original file line number Diff line number Diff line change
@@ -1370,7 +1370,7 @@ func TestHealthAPI(t *testing.T) {
defer ts.nodeLiveness.PauseAllHeartbeatsForTest()()
self, ok := ts.nodeLiveness.Self()
assert.True(t, ok)
s.Clock().Update(hlc.Timestamp(self.Expiration).Add(1, 0))
s.Clock().Update(self.Expiration.ToTimestamp().Add(1, 0))

var resp serverpb.HealthResponse
testutils.SucceedsSoon(t, func() error {
22 changes: 14 additions & 8 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
@@ -77,15 +77,16 @@ const (
// The keys encoded into the batch are MVCC keys: a string key with a timestamp
// suffix. MVCC keys are encoded as:
//
// <key>[<wall_time>[<logical>]]<#timestamp-bytes>
// <key>[<wall_time>[<logical>[<flags>]]]<#timestamp-bytes>
//
// The <wall_time> and <logical> portions of the key are encoded as 64 and
// 32-bit big-endian integers. A custom RocksDB comparator is used to maintain
// the desired ordering as these keys do not sort lexicographically correctly.
// The <wall_time>, <logical>, and <flags> portions of the key are encoded as
// 64-bit, 32-bit, and 8-bit big-endian integers, respectively. A custom RocksDB
// comparator is used to maintain the desired ordering as these keys do not sort
// lexicographically correctly.
//
// TODO(bilal): This struct exists mostly as a historic artifact. Transition
// the remaining few test uses of this struct over to pebble.Batch, and remove
// it entirely.
// TODO(bilal): This struct exists mostly as a historic artifact. Transition the
// remaining few test uses of this struct over to pebble.Batch, and remove it
// entirely.
type RocksDBBatchBuilder struct {
batch pebble.Batch
}
@@ -144,6 +145,7 @@ func encodeKeyToBuf(buf []byte, key MVCCKey, keyLen int) {
timestampSentinelLen = 1
walltimeEncodedLen = 8
logicalEncodedLen = 4
flagsEncodedLen = 1
)

copy(buf, key.Key)
@@ -155,10 +157,14 @@ func encodeKeyToBuf(buf []byte, key MVCCKey, keyLen int) {
pos += timestampSentinelLen
binary.BigEndian.PutUint64(buf[pos:], uint64(key.Timestamp.WallTime))
pos += walltimeEncodedLen
if key.Timestamp.Logical != 0 {
if key.Timestamp.Logical != 0 || key.Timestamp.Flags != 0 {
binary.BigEndian.PutUint32(buf[pos:], uint32(key.Timestamp.Logical))
pos += logicalEncodedLen
}
if key.Timestamp.Flags != 0 {
buf[pos] = uint8(key.Timestamp.Flags)
pos += flagsEncodedLen
}
}
buf[len(buf)-1] = byte(timestampLength)
}
1 change: 1 addition & 0 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
@@ -1206,6 +1206,7 @@ func TestDecodeKey(t *testing.T) {
{Key: []byte("foo")},
{Key: []byte("foo"), Timestamp: hlc.Timestamp{WallTime: 1}},
{Key: []byte("foo"), Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1}},
{Key: []byte("foo"), Timestamp: hlc.Timestamp{WallTime: 1, Logical: 1, Flags: 3}},
}
for _, test := range tests {
t.Run(test.String(), func(t *testing.T) {
19 changes: 13 additions & 6 deletions pkg/storage/engine_key.go
Original file line number Diff line number Diff line change
@@ -40,10 +40,11 @@ type EngineKey struct {
}

const (
engineKeyNoVersion = 0
engineKeyVersionWallTimeLen = 8
engineKeyVersionWallAndLogicalTimeLen = 12
engineKeyVersionLockTableLen = 17
engineKeyNoVersion = 0
engineKeyVersionWallTimeLen = 8
engineKeyVersionWallAndLogicalTimeLen = 12
engineKeyVersionWallLogicalAndFlagsTimeLen = 13
engineKeyVersionLockTableLen = 17
)

// Format implements the fmt.Formatter interface
@@ -141,8 +142,10 @@ func (k EngineKey) encodeToSizedBuf(buf []byte) {
// This includes the case of an empty timestamp.
func (k EngineKey) IsMVCCKey() bool {
l := len(k.Version)
return l == engineKeyNoVersion || l == engineKeyVersionWallTimeLen ||
l == engineKeyVersionWallAndLogicalTimeLen
return l == engineKeyNoVersion ||
l == engineKeyVersionWallTimeLen ||
l == engineKeyVersionWallAndLogicalTimeLen ||
l == engineKeyVersionWallLogicalAndFlagsTimeLen
}

// IsLockTableKey returns true if the key can be decoded as a LockTableKey.
@@ -161,6 +164,10 @@ func (k EngineKey) ToMVCCKey() (MVCCKey, error) {
case engineKeyVersionWallAndLogicalTimeLen:
key.Timestamp.WallTime = int64(binary.BigEndian.Uint64(k.Version[0:8]))
key.Timestamp.Logical = int32(binary.BigEndian.Uint32(k.Version[8:12]))
case engineKeyVersionWallLogicalAndFlagsTimeLen:
key.Timestamp.WallTime = int64(binary.BigEndian.Uint64(k.Version[0:8]))
key.Timestamp.Logical = int32(binary.BigEndian.Uint32(k.Version[8:12]))
key.Timestamp.Flags = uint32(k.Version[12])
default:
return MVCCKey{}, errors.Errorf("version is not an encoded timestamp %x", k.Version)
}
Loading

0 comments on commit 11abdda

Please sign in to comment.