From cc61e09f579ad479bda695f60b0610d99a391fb6 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 10 Dec 2020 22:09:14 -0500 Subject: [PATCH] kv/tscache: propagate synthetic timestamps through timestamp cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #57683. Before this change, a read with a synthetic timestamp would not pass the synthetic flag through the timestamp cache to conflicting writes. This could cause issues where a write that was bumped by one of these future timestamps would not notice that its write timestamp was now synthetic. This change fixes this by propagating the synthetic flag through the timestamp cache. We have to play a few tricks to make this efficient around `sklPage.ratchetMaxTimestamp`, which attempts to keeping a running maximum of the timestamps in a given skiplist using atomics. The added complexity here is encapsulated in a new `ratchetingTime` type. The testing around here is actually pretty solid. `TestIntervalSklConcurrency` and `TestIntervalSklConcurrentVsSequential` both perform long series of randomized operations and assert against ratchet inversions. This change adds some randomization around synthetic timestamps to the tests and introduces synthetic timestamps to the ratcheting policy (which wasn't otherwise necessary) to get solid randomized test coverage. This made encoding and decoding timestamp cache values slightly slower, but on the order of nanoseconds. ``` name old time/op new time/op delta IntervalSklDecodeValue/logical=false,synthetic=false,txnID=false-16 11.7ns ± 2% 12.1ns ± 2% +3.52% (p=0.000 n=9+10) IntervalSklDecodeValue/logical=false,synthetic=false,txnID=true-16 11.7ns ± 1% 12.0ns ± 1% +2.66% (p=0.000 n=10+9) IntervalSklDecodeValue/logical=false,synthetic=true,txnID=false-16 11.8ns ± 1% 12.1ns ± 2% +3.27% (p=0.000 n=10+9) IntervalSklDecodeValue/logical=false,synthetic=true,txnID=true-16 11.7ns ± 0% 12.0ns ± 1% +2.82% (p=0.000 n=6+10) IntervalSklDecodeValue/logical=true,synthetic=false,txnID=false-16 11.7ns ± 2% 12.1ns ± 1% +3.08% (p=0.000 n=10+10) IntervalSklDecodeValue/logical=true,synthetic=false,txnID=true-16 11.7ns ± 1% 12.1ns ± 2% +3.59% (p=0.000 n=10+10) IntervalSklDecodeValue/logical=true,synthetic=true,txnID=false-16 11.7ns ± 1% 12.2ns ± 4% +4.11% (p=0.000 n=10+10) IntervalSklDecodeValue/logical=true,synthetic=true,txnID=true-16 11.7ns ± 2% 12.1ns ± 1% +3.42% (p=0.000 n=10+10) IntervalSklEncodeValue/logical=false,synthetic=false,txnID=false-16 6.25ns ± 1% 6.52ns ± 1% +4.38% (p=0.000 n=9+10) IntervalSklEncodeValue/logical=false,synthetic=false,txnID=true-16 6.25ns ± 1% 6.56ns ± 1% +5.00% (p=0.000 n=9+9) IntervalSklEncodeValue/logical=false,synthetic=true,txnID=false-16 6.23ns ± 2% 6.55ns ± 2% +5.07% (p=0.000 n=10+10) IntervalSklEncodeValue/logical=false,synthetic=true,txnID=true-16 6.23ns ± 2% 6.56ns ± 4% +5.28% (p=0.000 n=10+10) IntervalSklEncodeValue/logical=true,synthetic=false,txnID=false-16 6.26ns ± 3% 6.55ns ± 1% +4.66% (p=0.000 n=10+10) IntervalSklEncodeValue/logical=true,synthetic=false,txnID=true-16 6.24ns ± 1% 6.52ns ± 1% +4.53% (p=0.000 n=9+10) IntervalSklEncodeValue/logical=true,synthetic=true,txnID=false-16 6.29ns ± 3% 6.53ns ± 2% +3.71% (p=0.000 n=9+10) IntervalSklEncodeValue/logical=true,synthetic=true,txnID=true-16 6.23ns ± 2% 6.56ns ± 0% +5.26% (p=0.000 n=10+8) name old alloc/op new alloc/op delta IntervalSklDecodeValue/logical=false,synthetic=false,txnID=false-16 0.00B 0.00B ~ (all equal) IntervalSklDecodeValue/logical=false,synthetic=false,txnID=true-16 0.00B 0.00B ~ (all equal) IntervalSklDecodeValue/logical=false,synthetic=true,txnID=false-16 0.00B 0.00B ~ (all equal) IntervalSklDecodeValue/logical=false,synthetic=true,txnID=true-16 0.00B 0.00B ~ (all equal) IntervalSklDecodeValue/logical=true,synthetic=false,txnID=false-16 0.00B 0.00B ~ (all equal) IntervalSklDecodeValue/logical=true,synthetic=false,txnID=true-16 0.00B 0.00B ~ (all equal) IntervalSklDecodeValue/logical=true,synthetic=true,txnID=false-16 0.00B 0.00B ~ (all equal) IntervalSklDecodeValue/logical=true,synthetic=true,txnID=true-16 0.00B 0.00B ~ (all equal) IntervalSklEncodeValue/logical=false,synthetic=false,txnID=false-16 0.00B 0.00B ~ (all equal) IntervalSklEncodeValue/logical=false,synthetic=false,txnID=true-16 0.00B 0.00B ~ (all equal) IntervalSklEncodeValue/logical=false,synthetic=true,txnID=false-16 0.00B 0.00B ~ (all equal) IntervalSklEncodeValue/logical=false,synthetic=true,txnID=true-16 0.00B 0.00B ~ (all equal) IntervalSklEncodeValue/logical=true,synthetic=false,txnID=false-16 0.00B 0.00B ~ (all equal) IntervalSklEncodeValue/logical=true,synthetic=false,txnID=true-16 0.00B 0.00B ~ (all equal) IntervalSklEncodeValue/logical=true,synthetic=true,txnID=false-16 0.00B 0.00B ~ (all equal) IntervalSklEncodeValue/logical=true,synthetic=true,txnID=true-16 0.00B 0.00B ~ (all equal) name old allocs/op new allocs/op delta IntervalSklDecodeValue/logical=false,synthetic=false,txnID=false-16 0.00 0.00 ~ (all equal) IntervalSklDecodeValue/logical=false,synthetic=false,txnID=true-16 0.00 0.00 ~ (all equal) IntervalSklDecodeValue/logical=false,synthetic=true,txnID=false-16 0.00 0.00 ~ (all equal) IntervalSklDecodeValue/logical=false,synthetic=true,txnID=true-16 0.00 0.00 ~ (all equal) IntervalSklDecodeValue/logical=true,synthetic=false,txnID=false-16 0.00 0.00 ~ (all equal) IntervalSklDecodeValue/logical=true,synthetic=false,txnID=true-16 0.00 0.00 ~ (all equal) IntervalSklDecodeValue/logical=true,synthetic=true,txnID=false-16 0.00 0.00 ~ (all equal) IntervalSklDecodeValue/logical=true,synthetic=true,txnID=true-16 0.00 0.00 ~ (all equal) IntervalSklEncodeValue/logical=false,synthetic=false,txnID=false-16 0.00 0.00 ~ (all equal) IntervalSklEncodeValue/logical=false,synthetic=false,txnID=true-16 0.00 0.00 ~ (all equal) IntervalSklEncodeValue/logical=false,synthetic=true,txnID=false-16 0.00 0.00 ~ (all equal) IntervalSklEncodeValue/logical=false,synthetic=true,txnID=true-16 0.00 0.00 ~ (all equal) IntervalSklEncodeValue/logical=true,synthetic=false,txnID=false-16 0.00 0.00 ~ (all equal) IntervalSklEncodeValue/logical=true,synthetic=false,txnID=true-16 0.00 0.00 ~ (all equal) IntervalSklEncodeValue/logical=true,synthetic=true,txnID=false-16 0.00 0.00 ~ (all equal) IntervalSklEncodeValue/logical=true,synthetic=true,txnID=true-16 0.00 0.00 ~ (all equal) ``` --- pkg/kv/kvserver/tscache/cache.go | 24 ++- pkg/kv/kvserver/tscache/interval_skl.go | 144 +++++++++---- pkg/kv/kvserver/tscache/interval_skl_test.go | 204 ++++++++++++------- 3 files changed, 247 insertions(+), 125 deletions(-) diff --git a/pkg/kv/kvserver/tscache/cache.go b/pkg/kv/kvserver/tscache/cache.go index f2872f614a6c..b9c38878e834 100644 --- a/pkg/kv/kvserver/tscache/cache.go +++ b/pkg/kv/kvserver/tscache/cache.go @@ -114,20 +114,26 @@ func (v cacheValue) String() string { // // This ratcheting policy is shared across all Cache implementations, even if // they do not use this function directly. -func ratchetValue(old, new cacheValue) (cacheValue, bool) { +func ratchetValue(old, new cacheValue) (res cacheValue, updated bool) { if old.ts.Less(new.ts) { - // Ratchet to new value. + // New value newer. Ratchet to new value. return new, true } else if new.ts.Less(old.ts) { - // Nothing to update. + // Old value newer. Nothing to update. return old, false - } else if new.txnID != old.txnID { + } + + // Equal times. + if new.ts.Synthetic != old.ts.Synthetic { + // old.ts == new.ts but the values have different synthetic flags. + // Remove the synthetic flag from the resulting value. + new.ts.Synthetic = false + } + if new.txnID != old.txnID { // old.ts == new.ts but the values have different txnIDs. Remove the - // transaction ID from the value so that it is no longer owned by any - // transaction. + // transaction ID from the resulting value so that it is no longer owned + // by any transaction. new.txnID = noTxnID - return new, old.txnID != noTxnID } - // old == new. - return old, false + return new, new != old } diff --git a/pkg/kv/kvserver/tscache/interval_skl.go b/pkg/kv/kvserver/tscache/interval_skl.go index 88d5217c4bbb..f51f0b174bd3 100644 --- a/pkg/kv/kvserver/tscache/interval_skl.go +++ b/pkg/kv/kvserver/tscache/interval_skl.go @@ -19,7 +19,6 @@ import ( "fmt" "sync/atomic" "time" - "unsafe" "github.com/andy-kimball/arenaskl" "github.com/cockroachdb/cockroach/pkg/util" @@ -90,8 +89,8 @@ const ( ) const ( - encodedTsSize = int(unsafe.Sizeof(int64(0)) + unsafe.Sizeof(int32(0))) - encodedTxnIDSize = int(unsafe.Sizeof(uuid.UUID{})) + encodedTsSize = 8 + 4 + 1 // walltime + logical + flags + encodedTxnIDSize = uuid.Size encodedValSize = encodedTsSize + encodedTxnIDSize // initialSklPageSize is the initial size of each page in the sklImpl's @@ -202,7 +201,7 @@ func newIntervalSkl(clock *hlc.Clock, minRet time.Duration, metrics sklMetrics) minPages: defaultMinSklPages, metrics: metrics, } - s.pushNewPage(0 /* maxWallTime */, nil /* arena */) + s.pushNewPage(0 /* maxTime */, nil /* arena */) s.metrics.Pages.Update(1) return &s } @@ -301,9 +300,10 @@ func (s *intervalSkl) addRange(from, to []byte, opt rangeOptions, val cacheValue s.rotMutex.RLock() defer s.rotMutex.RUnlock() - // If floor ts is >= requested timestamp, then no need to perform a search - // or add any records. - if val.ts.LessEq(s.floorTS) { + // If floor ts is greater than the requested timestamp, then no need to + // perform a search or add any records. We don't return early when the + // timestamps are equal, because their flags may differ. + if val.ts.Less(s.floorTS) { return nil } @@ -385,7 +385,7 @@ func (s *intervalSkl) frontPage() *sklPage { // pushNewPage prepends a new empty page to the front of the pages list. It // accepts an optional arena argument to facilitate re-use. -func (s *intervalSkl) pushNewPage(maxWallTime int64, arena *arenaskl.Arena) { +func (s *intervalSkl) pushNewPage(maxTime ratchetingTime, arena *arenaskl.Arena) { size := s.nextPageSize() if arena != nil && arena.Cap() == size { // Re-use the provided arena, if possible. @@ -395,7 +395,7 @@ func (s *intervalSkl) pushNewPage(maxWallTime int64, arena *arenaskl.Arena) { arena = arenaskl.NewArena(size) } p := newSklPage(arena) - p.maxWallTime = maxWallTime + p.maxTime = maxTime s.pages.PushFront(p) } @@ -455,7 +455,7 @@ func (s *intervalSkl) rotatePages(filledPage *sklPage) { var oldArena *arenaskl.Arena for s.pages.Len() >= s.minPages { bp := back.Value.(*sklPage) - bpMaxTS := hlc.Timestamp{WallTime: bp.maxWallTime} + bpMaxTS := bp.getMaxTimestamp() if minTSToRetain.LessEq(bpMaxTS) { // The back page's maximum timestamp is within the time // window we've promised to retain, so we can't evict it. @@ -473,12 +473,12 @@ func (s *intervalSkl) rotatePages(filledPage *sklPage) { } // Push a new empty page on the front of the pages list. We give this page - // the maxWallTime of the old front page. This assures that the maxWallTime - // for a page is always equal to or greater than that for all earlier pages. - // In other words, it assures that the maxWallTime for a page is not only - // the maximum timestamp for all values it contains, but also for all values - // any earlier pages contain. - s.pushNewPage(fp.maxWallTime, oldArena) + // the maxTime of the old front page. This assures that the maxTime for a + // page is always equal to or greater than that for all earlier pages. In + // other words, it assures that the maxTime for a page is not only the + // maximum timestamp for all values it contains, but also for all values any + // earlier pages contain. + s.pushNewPage(fp.maxTime, oldArena) // Update metrics. s.metrics.Pages.Update(int64(s.pages.Len())) @@ -526,7 +526,7 @@ func (s *intervalSkl) LookupTimestampRange(from, to []byte, opt rangeOptions) ca // different txnID than our current cacheValue result (val), then we // need to remove the txnID from our result, per the ratcheting policy // for cacheValues. This is tested in TestIntervalSklMaxPageTS. - maxTS := hlc.Timestamp{WallTime: atomic.LoadInt64(&p.maxWallTime)} + maxTS := p.getMaxTimestamp() if maxTS.Less(val.ts) { break } @@ -554,9 +554,9 @@ func (s *intervalSkl) FloorTS() hlc.Timestamp { // filled up, it returns arenaskl.ErrArenaFull. At that point, a new fixed page // must be allocated and used instead. type sklPage struct { - list *arenaskl.Skiplist - maxWallTime int64 // accessed atomically - isFull int32 // accessed atomically + list *arenaskl.Skiplist + maxTime ratchetingTime // accessed atomically + isFull int32 // accessed atomically } func newSklPage(arena *arenaskl.Arena) *sklPage { @@ -802,6 +802,54 @@ func (p *sklPage) ensureFloorValue(it *arenaskl.Iterator, to []byte, val cacheVa } func (p *sklPage) ratchetMaxTimestamp(ts hlc.Timestamp) { + new := makeRatchetingTime(ts) + for { + old := ratchetingTime(atomic.LoadInt64((*int64)(&p.maxTime))) + if new <= old { + break + } + + if atomic.CompareAndSwapInt64((*int64)(&p.maxTime), int64(old), int64(new)) { + break + } + } +} + +func (p *sklPage) getMaxTimestamp() hlc.Timestamp { + return ratchetingTime(atomic.LoadInt64((*int64)(&p.maxTime))).get() +} + +// ratchetingTime is a compressed representation of an hlc.Timestamp, reduced +// down to 64 bits to support atomic access. +// +// ratchetingTime implements compression such that any loss of information when +// passing through the type results in the resulting Timestamp being ratcheted +// to a larger value. This provides the guarantee that the following relation +// holds, regardless of the value of x: +// +// x.LessEq(makeRatchetingTime(x).get()) +// +// It also provides the guarantee that if the synthetic flag is set on the +// initial timestamp, then this flag is set on the resulting Timestamp. So the +// following relation is guaranteed to hold, regardless of the value of x: +// +// x.IsFlagSet(SYNTHETIC) == makeRatchetingTime(x).get().IsFlagSet(SYNTHETIC) +// +// Compressed ratchetingTime values compare such that taking the maximum of any +// two ratchetingTime values and converting that back to a Timestamp is always +// equal to or larger than the equivalent call through the Timestamp.Forward +// method. So the following relation is guaranteed to hold, regardless of the +// value of x or y: +// +// z := max(makeRatchetingTime(x), makeRatchetingTime(y)).get() +// x.Forward(y).LessEq(z) +// +// Bit layout (LSB to MSB): +// bits 0: inverted synthetic flag +// bits 1 - 63: upper 63 bits of wall time +type ratchetingTime int64 + +func makeRatchetingTime(ts hlc.Timestamp) ratchetingTime { // Cheat and just use the max wall time portion of the timestamp, since it's // fine for the max timestamp to be a bit too large. This is the case // because it's always safe to increase the timestamp in a range. It's also @@ -815,23 +863,38 @@ func (p *sklPage) ratchetMaxTimestamp(ts hlc.Timestamp) { // We could use an atomic.Value to store a "MaxValue" cacheValue for a given // page, but this would be more expensive and it's not clear that it would // be worth it. - new := ts.WallTime + rt := ratchetingTime(ts.WallTime) if ts.Logical > 0 { - new++ + rt++ } - // TODO(nvanbenschoten): propagate the timestamp synthetic bit through the - // page's max time. - for { - old := atomic.LoadInt64(&p.maxWallTime) - if new <= old { - break - } + // Similarly, cheat and use the last bit in the wall time to indicate + // whether the timestamp is synthetic or not. Do so by first rounding up the + // last bit of the wall time so that it is empty. This is safe for the same + // reason that rounding up the logical portion of the timestamp in the wall + // time is safe (see above). + // + // We use the last bit to indicate that the flag is NOT set. This ensures + // that if two timestamps have the same ordering but different values for + // the synthetic flag, the timestamp without the synthetic flag has a larger + // ratchetingTime value. This follows how Timestamp.Forward treats the flag. + if rt&1 == 1 { + rt++ + } + if !ts.Synthetic { + rt |= 1 + } - if atomic.CompareAndSwapInt64(&p.maxWallTime, old, new) { - break - } + return rt +} + +func (rt ratchetingTime) get() hlc.Timestamp { + var ts hlc.Timestamp + ts.WallTime = int64(rt &^ 1) + if rt&1 == 0 { + ts.Synthetic = true } + return ts } // ratchetPolicy defines the behavior a ratcheting attempt should take when @@ -899,12 +962,9 @@ func (p *sklPage) ratchetValueSet( // must handle with care. // Ratchet the max timestamp. - keyTs, gapTs := keyVal.ts, gapVal.ts - if gapTs.Less(keyTs) { - p.ratchetMaxTimestamp(keyTs) - } else { - p.ratchetMaxTimestamp(gapTs) - } + maxTs := keyVal.ts + maxTs.Forward(gapVal.ts) + p.ratchetMaxTimestamp(maxTs) // Remove the hasKey and hasGap flags from the meta. These will be // replaced below. @@ -1151,9 +1211,9 @@ func encodeValueSet(b []byte, keyVal, gapVal cacheValue) (ret []byte, meta uint1 } func decodeValue(b []byte) (ret []byte, val cacheValue) { - // TODO(nvanbenschoten): decode the timestamp synthetic bit. val.ts.WallTime = int64(binary.BigEndian.Uint64(b)) val.ts.Logical = int32(binary.BigEndian.Uint32(b[8:])) + val.ts.Synthetic = b[12] != 0 var err error if val.txnID, err = uuid.FromBytes(b[encodedTsSize:encodedValSize]); err != nil { panic(err) @@ -1163,11 +1223,15 @@ func decodeValue(b []byte) (ret []byte, val cacheValue) { } func encodeValue(b []byte, val cacheValue) []byte { - // TODO(nvanbenschoten): encode the timestamp synthetic bit. l := len(b) b = b[:l+encodedValSize] binary.BigEndian.PutUint64(b[l:], uint64(val.ts.WallTime)) binary.BigEndian.PutUint32(b[l+8:], uint32(val.ts.Logical)) + syn := byte(0) + if val.ts.Synthetic { + syn = 1 + } + b[l+12] = syn if _, err := val.txnID.MarshalTo(b[l+encodedTsSize:]); err != nil { panic(err) } diff --git a/pkg/kv/kvserver/tscache/interval_skl_test.go b/pkg/kv/kvserver/tscache/interval_skl_test.go index 603a4a01706b..1134d1a74ad4 100644 --- a/pkg/kv/kvserver/tscache/interval_skl_test.go +++ b/pkg/kv/kvserver/tscache/interval_skl_test.go @@ -69,7 +69,7 @@ func (s *intervalSkl) setFixedPageSize(pageSize uint32) { s.pageSize = pageSize s.pageSizeFixed = true s.pages.Init() // clear - s.pushNewPage(0 /* maxWallTime */, nil /* arena */) + s.pushNewPage(0 /* maxTime */, nil /* arena */) } // setMinPages sets the minimum number of pages intervalSkl will evict down to. @@ -80,27 +80,29 @@ func (s *intervalSkl) setMinPages(minPages int) { } func TestIntervalSklAdd(t *testing.T) { - ts1 := makeTS(200, 0) - ts2 := makeTS(200, 201) - ts3Ceil := makeTS(201, 0) + testutils.RunTrueAndFalse(t, "synthetic", func(t *testing.T, synthetic bool) { + ts1 := makeTS(200, 0).WithSynthetic(synthetic) + ts2 := makeTS(200, 201).WithSynthetic(synthetic) + ts2Ceil := makeTS(202, 0).WithSynthetic(synthetic) - val1 := makeVal(ts1, "1") - val2 := makeVal(ts2, "2") + val1 := makeVal(ts1, "1") + val2 := makeVal(ts2, "2") - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) - - s.Add([]byte("apricot"), val1) - require.Equal(t, ts1.WallTime, s.frontPage().maxWallTime) - require.Equal(t, emptyVal, s.LookupTimestamp([]byte("apple"))) - require.Equal(t, val1, s.LookupTimestamp([]byte("apricot"))) - require.Equal(t, emptyVal, s.LookupTimestamp([]byte("banana"))) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) - s.Add([]byte("banana"), val2) - require.Equal(t, ts3Ceil.WallTime, s.frontPage().maxWallTime) - require.Equal(t, emptyVal, s.LookupTimestamp([]byte("apple"))) - require.Equal(t, val1, s.LookupTimestamp([]byte("apricot"))) - require.Equal(t, val2, s.LookupTimestamp([]byte("banana"))) - require.Equal(t, emptyVal, s.LookupTimestamp([]byte("cherry"))) + s.Add([]byte("apricot"), val1) + require.Equal(t, ts1, s.frontPage().maxTime.get()) + require.Equal(t, emptyVal, s.LookupTimestamp([]byte("apple"))) + require.Equal(t, val1, s.LookupTimestamp([]byte("apricot"))) + require.Equal(t, emptyVal, s.LookupTimestamp([]byte("banana"))) + + s.Add([]byte("banana"), val2) + require.Equal(t, ts2Ceil, s.frontPage().maxTime.get()) + require.Equal(t, emptyVal, s.LookupTimestamp([]byte("apple"))) + require.Equal(t, val1, s.LookupTimestamp([]byte("apricot"))) + require.Equal(t, val2, s.LookupTimestamp([]byte("banana"))) + require.Equal(t, emptyVal, s.LookupTimestamp([]byte("cherry"))) + }) } func TestIntervalSklSingleRange(t *testing.T) { @@ -450,7 +452,7 @@ func TestIntervalSklSingleKeyRanges(t *testing.T) { // Don't allow inverted ranges. require.Panics(t, func() { s.AddRange([]byte("kiwi"), []byte("apple"), 0, val1) }) - require.Equal(t, int64(0), s.frontPage().maxWallTime) + require.Equal(t, ratchetingTime(0), s.frontPage().maxTime) require.Equal(t, emptyVal, s.LookupTimestamp([]byte("apple"))) require.Equal(t, emptyVal, s.LookupTimestamp([]byte("banana"))) require.Equal(t, emptyVal, s.LookupTimestamp([]byte("kiwi"))) @@ -734,71 +736,76 @@ func TestIntervalSklLookupRangeSingleKeyRanges(t *testing.T) { }) } -// TestIntervalSklLookupEqualsEarlierMaxWallTime tests that we properly handle +// TestIntervalSklLookupEqualsEarlierMaxTime tests that we properly handle // the lookup when the timestamp for a range found in the later page is equal to -// the maxWallTime of the earlier page. -func TestIntervalSklLookupEqualsEarlierMaxWallTime(t *testing.T) { +// the maxTime of the earlier page. +func TestIntervalSklLookupEqualsEarlierMaxTime(t *testing.T) { ts1 := makeTS(200, 0) // without Logical part ts2 := makeTS(200, 1) // with Logical part - ts2Ceil := makeTS(201, 0) + ts2Ceil := makeTS(202, 0) txnID1 := "1" txnID2 := "2" - testutils.RunTrueAndFalse(t, "tsWithLogicalPart", func(t *testing.T, logicalPart bool) { - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) - s.floorTS = floorTS + testutils.RunTrueAndFalse(t, "logical", func(t *testing.T, logical bool) { + testutils.RunTrueAndFalse(t, "synthetic", func(t *testing.T, synthetic bool) { - // Insert an initial value into intervalSkl. - initTS := ts1 - if logicalPart { - initTS = ts2 - } - origVal := makeVal(initTS, txnID1) - s.AddRange([]byte("banana"), []byte("orange"), 0, origVal) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) + s.floorTS = floorTS - // Verify the later page's maxWallTime is what we expect. - expMaxTS := ts1 - if logicalPart { - expMaxTS = ts2Ceil - } - require.Equal(t, expMaxTS.WallTime, s.frontPage().maxWallTime) - - // Rotate the page so that new writes will go to a different page. - s.rotatePages(s.frontPage()) - - // Write to overlapping and non-overlapping parts of the new page with - // the values that have the same timestamp as the maxWallTime of the - // earlier page. One value has the same txnID as the previous write in - // the earlier page and one has a different txnID. - valSameID := makeVal(expMaxTS, txnID1) - valDiffID := makeVal(expMaxTS, txnID2) - valNoID := makeValWithoutID(expMaxTS) - s.Add([]byte("apricot"), valSameID) - s.Add([]byte("banana"), valSameID) - s.Add([]byte("orange"), valDiffID) - s.Add([]byte("raspberry"), valDiffID) - - require.Equal(t, valSameID, s.LookupTimestamp([]byte("apricot"))) - require.Equal(t, valSameID, s.LookupTimestamp([]byte("banana"))) - if logicalPart { - // If the initial timestamp had a logical part then - // s.earlier.maxWallTime is inexact (see ratchetMaxTimestamp). When - // we search in the earlier page, we'll find the exact timestamp of - // the overlapping range and realize that its not the same as the - // timestamp of the range in the later page. Because of this, - // ratchetValue WON'T remove the txnID. - require.Equal(t, valDiffID, s.LookupTimestamp([]byte("orange"))) - } else { - // If the initial timestamp did not have a logical part then - // s.earlier.maxWallTime is exact. When we search in the earlier - // page, we'll find the overlapping range and realize that it is the - // same as the timestamp of the range in the later page. Because of - // this, ratchetValue WILL remove the txnID. - require.Equal(t, valNoID, s.LookupTimestamp([]byte("orange"))) - } - require.Equal(t, valDiffID, s.LookupTimestamp([]byte("raspberry"))) - require.Equal(t, floorVal, s.LookupTimestamp([]byte("tomato"))) + // Insert an initial value into intervalSkl. + initTS := ts1 + if logical { + initTS = ts2 + } + initTS = initTS.WithSynthetic(synthetic) + origVal := makeVal(initTS, txnID1) + s.AddRange([]byte("banana"), []byte("orange"), 0, origVal) + + // Verify the later page's maxTime is what we expect. + expMaxTS := ts1 + if logical { + expMaxTS = ts2Ceil + } + expMaxTS = expMaxTS.WithSynthetic(synthetic) + require.Equal(t, expMaxTS, s.frontPage().maxTime.get()) + + // Rotate the page so that new writes will go to a different page. + s.rotatePages(s.frontPage()) + + // Write to overlapping and non-overlapping parts of the new page + // with the values that have the same timestamp as the maxTime of + // the earlier page. One value has the same txnID as the previous + // write in the earlier page and one has a different txnID. + valSameID := makeVal(expMaxTS, txnID1) + valDiffID := makeVal(expMaxTS, txnID2) + valNoID := makeValWithoutID(expMaxTS) + s.Add([]byte("apricot"), valSameID) + s.Add([]byte("banana"), valSameID) + s.Add([]byte("orange"), valDiffID) + s.Add([]byte("raspberry"), valDiffID) + + require.Equal(t, valSameID, s.LookupTimestamp([]byte("apricot"))) + require.Equal(t, valSameID, s.LookupTimestamp([]byte("banana"))) + if logical { + // If the initial timestamp had a logical part then + // s.earlier.maxTime is inexact (see ratchetMaxTimestamp). When + // we search in the earlier page, we'll find the exact timestamp + // of the overlapping range and realize that its not the same as + // the timestamp of the range in the later page. Because of + // this, ratchetValue WON'T remove the txnID. + require.Equal(t, valDiffID, s.LookupTimestamp([]byte("orange"))) + } else { + // If the initial timestamp did not have a logical part then + // s.earlier.maxTime is exact. When we search in the earlier + // page, we'll find the overlapping range and realize that it is + // the same as the timestamp of the range in the later page. + // Because of this, ratchetValue WILL remove the txnID. + require.Equal(t, valNoID, s.LookupTimestamp([]byte("orange"))) + } + require.Equal(t, valDiffID, s.LookupTimestamp([]byte("raspberry"))) + require.Equal(t, floorVal, s.LookupTimestamp([]byte("tomato"))) + }) }) } @@ -901,6 +908,45 @@ func TestIntervalSklMinRetentionWindow(t *testing.T) { require.Equal(t, s.pages.Len(), s.minPages) } +// TestIntervalSklRotateWithSyntheticTimestamps tests that if a page is evicted +// and subsumed by the floor timestamp, then the floor timestamp will continue +// to carry the synthtic flag, if necessary. +func TestIntervalSklRotateWithSyntheticTimestamps(t *testing.T) { + manual := hlc.NewManualClock(200) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + + const minRet = 500 + s := newIntervalSkl(clock, minRet, makeSklMetrics()) + s.setFixedPageSize(1500) + s.floorTS = floorTS + + // Add an initial value with a synthetic timestamp. + // Rotate the page so it's alone. + origKey := []byte("banana") + origTS := clock.Now().WithSynthetic(true) + origVal := makeVal(origTS, "1") + s.Add(origKey, origVal) + s.rotatePages(s.frontPage()) + + // We should still be able to look up the initial value. + require.Equal(t, origVal, s.LookupTimestamp(origKey)) + + // Increment the clock so that the original value is not in the minimum + // retention window. Rotate the pages and the back page should be evicted. + manual.Increment(600) + s.rotatePages(s.frontPage()) + + // The initial value's page was evicted, so it should no longer exist. + // However, since it had the highest timestamp of all values added, its + // timestamp should still exist. Critically, this timestamp should still + // be marked as synthetic. + newVal := s.LookupTimestamp(origKey) + require.NotEqual(t, origVal, newVal, "the original value should be evicted") + require.Equal(t, uuid.Nil, newVal.txnID, "the original value's txn ID should be lost") + require.Equal(t, origVal.ts, newVal.ts, "the original value's timestamp should persist") + require.True(t, newVal.ts.Synthetic, "the synthetic flag should persist") +} + func TestIntervalSklConcurrency(t *testing.T) { defer leaktest.AfterTest(t)() defer util.EnableRacePreemptionPoints()() @@ -974,6 +1020,9 @@ func TestIntervalSklConcurrency(t *testing.T) { if useClock { ts = clock.Now() } + if rng.Intn(2) == 0 { + ts = ts.WithSynthetic(true) + } nowVal := cacheValue{ts: ts, txnID: txnID} s.AddRange(from, to, opt, nowVal) @@ -1075,6 +1124,9 @@ func TestIntervalSklConcurrentVsSequential(t *testing.T) { if useClock { ts = clock.Now() } + if rng.Intn(2) == 0 { + ts = ts.WithSynthetic(true) + } a.val = cacheValue{ts: ts, txnID: txnIDs[i]} // This is a lot of log output so only un-comment to debug.