diff --git a/pkg/kv/kvserver/tscache/cache.go b/pkg/kv/kvserver/tscache/cache.go index f2872f614a6c..b9c38878e834 100644 --- a/pkg/kv/kvserver/tscache/cache.go +++ b/pkg/kv/kvserver/tscache/cache.go @@ -114,20 +114,26 @@ func (v cacheValue) String() string { // // This ratcheting policy is shared across all Cache implementations, even if // they do not use this function directly. -func ratchetValue(old, new cacheValue) (cacheValue, bool) { +func ratchetValue(old, new cacheValue) (res cacheValue, updated bool) { if old.ts.Less(new.ts) { - // Ratchet to new value. + // New value newer. Ratchet to new value. return new, true } else if new.ts.Less(old.ts) { - // Nothing to update. + // Old value newer. Nothing to update. return old, false - } else if new.txnID != old.txnID { + } + + // Equal times. + if new.ts.Synthetic != old.ts.Synthetic { + // old.ts == new.ts but the values have different synthetic flags. + // Remove the synthetic flag from the resulting value. + new.ts.Synthetic = false + } + if new.txnID != old.txnID { // old.ts == new.ts but the values have different txnIDs. Remove the - // transaction ID from the value so that it is no longer owned by any - // transaction. + // transaction ID from the resulting value so that it is no longer owned + // by any transaction. new.txnID = noTxnID - return new, old.txnID != noTxnID } - // old == new. - return old, false + return new, new != old } diff --git a/pkg/kv/kvserver/tscache/interval_skl.go b/pkg/kv/kvserver/tscache/interval_skl.go index 88d5217c4bbb..f51f0b174bd3 100644 --- a/pkg/kv/kvserver/tscache/interval_skl.go +++ b/pkg/kv/kvserver/tscache/interval_skl.go @@ -19,7 +19,6 @@ import ( "fmt" "sync/atomic" "time" - "unsafe" "github.com/andy-kimball/arenaskl" "github.com/cockroachdb/cockroach/pkg/util" @@ -90,8 +89,8 @@ const ( ) const ( - encodedTsSize = int(unsafe.Sizeof(int64(0)) + unsafe.Sizeof(int32(0))) - encodedTxnIDSize = int(unsafe.Sizeof(uuid.UUID{})) + encodedTsSize = 8 + 4 + 1 // walltime + logical + flags + encodedTxnIDSize = uuid.Size encodedValSize = encodedTsSize + encodedTxnIDSize // initialSklPageSize is the initial size of each page in the sklImpl's @@ -202,7 +201,7 @@ func newIntervalSkl(clock *hlc.Clock, minRet time.Duration, metrics sklMetrics) minPages: defaultMinSklPages, metrics: metrics, } - s.pushNewPage(0 /* maxWallTime */, nil /* arena */) + s.pushNewPage(0 /* maxTime */, nil /* arena */) s.metrics.Pages.Update(1) return &s } @@ -301,9 +300,10 @@ func (s *intervalSkl) addRange(from, to []byte, opt rangeOptions, val cacheValue s.rotMutex.RLock() defer s.rotMutex.RUnlock() - // If floor ts is >= requested timestamp, then no need to perform a search - // or add any records. - if val.ts.LessEq(s.floorTS) { + // If floor ts is greater than the requested timestamp, then no need to + // perform a search or add any records. We don't return early when the + // timestamps are equal, because their flags may differ. + if val.ts.Less(s.floorTS) { return nil } @@ -385,7 +385,7 @@ func (s *intervalSkl) frontPage() *sklPage { // pushNewPage prepends a new empty page to the front of the pages list. It // accepts an optional arena argument to facilitate re-use. -func (s *intervalSkl) pushNewPage(maxWallTime int64, arena *arenaskl.Arena) { +func (s *intervalSkl) pushNewPage(maxTime ratchetingTime, arena *arenaskl.Arena) { size := s.nextPageSize() if arena != nil && arena.Cap() == size { // Re-use the provided arena, if possible. @@ -395,7 +395,7 @@ func (s *intervalSkl) pushNewPage(maxWallTime int64, arena *arenaskl.Arena) { arena = arenaskl.NewArena(size) } p := newSklPage(arena) - p.maxWallTime = maxWallTime + p.maxTime = maxTime s.pages.PushFront(p) } @@ -455,7 +455,7 @@ func (s *intervalSkl) rotatePages(filledPage *sklPage) { var oldArena *arenaskl.Arena for s.pages.Len() >= s.minPages { bp := back.Value.(*sklPage) - bpMaxTS := hlc.Timestamp{WallTime: bp.maxWallTime} + bpMaxTS := bp.getMaxTimestamp() if minTSToRetain.LessEq(bpMaxTS) { // The back page's maximum timestamp is within the time // window we've promised to retain, so we can't evict it. @@ -473,12 +473,12 @@ func (s *intervalSkl) rotatePages(filledPage *sklPage) { } // Push a new empty page on the front of the pages list. We give this page - // the maxWallTime of the old front page. This assures that the maxWallTime - // for a page is always equal to or greater than that for all earlier pages. - // In other words, it assures that the maxWallTime for a page is not only - // the maximum timestamp for all values it contains, but also for all values - // any earlier pages contain. - s.pushNewPage(fp.maxWallTime, oldArena) + // the maxTime of the old front page. This assures that the maxTime for a + // page is always equal to or greater than that for all earlier pages. In + // other words, it assures that the maxTime for a page is not only the + // maximum timestamp for all values it contains, but also for all values any + // earlier pages contain. + s.pushNewPage(fp.maxTime, oldArena) // Update metrics. s.metrics.Pages.Update(int64(s.pages.Len())) @@ -526,7 +526,7 @@ func (s *intervalSkl) LookupTimestampRange(from, to []byte, opt rangeOptions) ca // different txnID than our current cacheValue result (val), then we // need to remove the txnID from our result, per the ratcheting policy // for cacheValues. This is tested in TestIntervalSklMaxPageTS. - maxTS := hlc.Timestamp{WallTime: atomic.LoadInt64(&p.maxWallTime)} + maxTS := p.getMaxTimestamp() if maxTS.Less(val.ts) { break } @@ -554,9 +554,9 @@ func (s *intervalSkl) FloorTS() hlc.Timestamp { // filled up, it returns arenaskl.ErrArenaFull. At that point, a new fixed page // must be allocated and used instead. type sklPage struct { - list *arenaskl.Skiplist - maxWallTime int64 // accessed atomically - isFull int32 // accessed atomically + list *arenaskl.Skiplist + maxTime ratchetingTime // accessed atomically + isFull int32 // accessed atomically } func newSklPage(arena *arenaskl.Arena) *sklPage { @@ -802,6 +802,54 @@ func (p *sklPage) ensureFloorValue(it *arenaskl.Iterator, to []byte, val cacheVa } func (p *sklPage) ratchetMaxTimestamp(ts hlc.Timestamp) { + new := makeRatchetingTime(ts) + for { + old := ratchetingTime(atomic.LoadInt64((*int64)(&p.maxTime))) + if new <= old { + break + } + + if atomic.CompareAndSwapInt64((*int64)(&p.maxTime), int64(old), int64(new)) { + break + } + } +} + +func (p *sklPage) getMaxTimestamp() hlc.Timestamp { + return ratchetingTime(atomic.LoadInt64((*int64)(&p.maxTime))).get() +} + +// ratchetingTime is a compressed representation of an hlc.Timestamp, reduced +// down to 64 bits to support atomic access. +// +// ratchetingTime implements compression such that any loss of information when +// passing through the type results in the resulting Timestamp being ratcheted +// to a larger value. This provides the guarantee that the following relation +// holds, regardless of the value of x: +// +// x.LessEq(makeRatchetingTime(x).get()) +// +// It also provides the guarantee that if the synthetic flag is set on the +// initial timestamp, then this flag is set on the resulting Timestamp. So the +// following relation is guaranteed to hold, regardless of the value of x: +// +// x.IsFlagSet(SYNTHETIC) == makeRatchetingTime(x).get().IsFlagSet(SYNTHETIC) +// +// Compressed ratchetingTime values compare such that taking the maximum of any +// two ratchetingTime values and converting that back to a Timestamp is always +// equal to or larger than the equivalent call through the Timestamp.Forward +// method. So the following relation is guaranteed to hold, regardless of the +// value of x or y: +// +// z := max(makeRatchetingTime(x), makeRatchetingTime(y)).get() +// x.Forward(y).LessEq(z) +// +// Bit layout (LSB to MSB): +// bits 0: inverted synthetic flag +// bits 1 - 63: upper 63 bits of wall time +type ratchetingTime int64 + +func makeRatchetingTime(ts hlc.Timestamp) ratchetingTime { // Cheat and just use the max wall time portion of the timestamp, since it's // fine for the max timestamp to be a bit too large. This is the case // because it's always safe to increase the timestamp in a range. It's also @@ -815,23 +863,38 @@ func (p *sklPage) ratchetMaxTimestamp(ts hlc.Timestamp) { // We could use an atomic.Value to store a "MaxValue" cacheValue for a given // page, but this would be more expensive and it's not clear that it would // be worth it. - new := ts.WallTime + rt := ratchetingTime(ts.WallTime) if ts.Logical > 0 { - new++ + rt++ } - // TODO(nvanbenschoten): propagate the timestamp synthetic bit through the - // page's max time. - for { - old := atomic.LoadInt64(&p.maxWallTime) - if new <= old { - break - } + // Similarly, cheat and use the last bit in the wall time to indicate + // whether the timestamp is synthetic or not. Do so by first rounding up the + // last bit of the wall time so that it is empty. This is safe for the same + // reason that rounding up the logical portion of the timestamp in the wall + // time is safe (see above). + // + // We use the last bit to indicate that the flag is NOT set. This ensures + // that if two timestamps have the same ordering but different values for + // the synthetic flag, the timestamp without the synthetic flag has a larger + // ratchetingTime value. This follows how Timestamp.Forward treats the flag. + if rt&1 == 1 { + rt++ + } + if !ts.Synthetic { + rt |= 1 + } - if atomic.CompareAndSwapInt64(&p.maxWallTime, old, new) { - break - } + return rt +} + +func (rt ratchetingTime) get() hlc.Timestamp { + var ts hlc.Timestamp + ts.WallTime = int64(rt &^ 1) + if rt&1 == 0 { + ts.Synthetic = true } + return ts } // ratchetPolicy defines the behavior a ratcheting attempt should take when @@ -899,12 +962,9 @@ func (p *sklPage) ratchetValueSet( // must handle with care. // Ratchet the max timestamp. - keyTs, gapTs := keyVal.ts, gapVal.ts - if gapTs.Less(keyTs) { - p.ratchetMaxTimestamp(keyTs) - } else { - p.ratchetMaxTimestamp(gapTs) - } + maxTs := keyVal.ts + maxTs.Forward(gapVal.ts) + p.ratchetMaxTimestamp(maxTs) // Remove the hasKey and hasGap flags from the meta. These will be // replaced below. @@ -1151,9 +1211,9 @@ func encodeValueSet(b []byte, keyVal, gapVal cacheValue) (ret []byte, meta uint1 } func decodeValue(b []byte) (ret []byte, val cacheValue) { - // TODO(nvanbenschoten): decode the timestamp synthetic bit. val.ts.WallTime = int64(binary.BigEndian.Uint64(b)) val.ts.Logical = int32(binary.BigEndian.Uint32(b[8:])) + val.ts.Synthetic = b[12] != 0 var err error if val.txnID, err = uuid.FromBytes(b[encodedTsSize:encodedValSize]); err != nil { panic(err) @@ -1163,11 +1223,15 @@ func decodeValue(b []byte) (ret []byte, val cacheValue) { } func encodeValue(b []byte, val cacheValue) []byte { - // TODO(nvanbenschoten): encode the timestamp synthetic bit. l := len(b) b = b[:l+encodedValSize] binary.BigEndian.PutUint64(b[l:], uint64(val.ts.WallTime)) binary.BigEndian.PutUint32(b[l+8:], uint32(val.ts.Logical)) + syn := byte(0) + if val.ts.Synthetic { + syn = 1 + } + b[l+12] = syn if _, err := val.txnID.MarshalTo(b[l+encodedTsSize:]); err != nil { panic(err) } diff --git a/pkg/kv/kvserver/tscache/interval_skl_test.go b/pkg/kv/kvserver/tscache/interval_skl_test.go index 603a4a01706b..1134d1a74ad4 100644 --- a/pkg/kv/kvserver/tscache/interval_skl_test.go +++ b/pkg/kv/kvserver/tscache/interval_skl_test.go @@ -69,7 +69,7 @@ func (s *intervalSkl) setFixedPageSize(pageSize uint32) { s.pageSize = pageSize s.pageSizeFixed = true s.pages.Init() // clear - s.pushNewPage(0 /* maxWallTime */, nil /* arena */) + s.pushNewPage(0 /* maxTime */, nil /* arena */) } // setMinPages sets the minimum number of pages intervalSkl will evict down to. @@ -80,27 +80,29 @@ func (s *intervalSkl) setMinPages(minPages int) { } func TestIntervalSklAdd(t *testing.T) { - ts1 := makeTS(200, 0) - ts2 := makeTS(200, 201) - ts3Ceil := makeTS(201, 0) + testutils.RunTrueAndFalse(t, "synthetic", func(t *testing.T, synthetic bool) { + ts1 := makeTS(200, 0).WithSynthetic(synthetic) + ts2 := makeTS(200, 201).WithSynthetic(synthetic) + ts2Ceil := makeTS(202, 0).WithSynthetic(synthetic) - val1 := makeVal(ts1, "1") - val2 := makeVal(ts2, "2") + val1 := makeVal(ts1, "1") + val2 := makeVal(ts2, "2") - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) - - s.Add([]byte("apricot"), val1) - require.Equal(t, ts1.WallTime, s.frontPage().maxWallTime) - require.Equal(t, emptyVal, s.LookupTimestamp([]byte("apple"))) - require.Equal(t, val1, s.LookupTimestamp([]byte("apricot"))) - require.Equal(t, emptyVal, s.LookupTimestamp([]byte("banana"))) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) - s.Add([]byte("banana"), val2) - require.Equal(t, ts3Ceil.WallTime, s.frontPage().maxWallTime) - require.Equal(t, emptyVal, s.LookupTimestamp([]byte("apple"))) - require.Equal(t, val1, s.LookupTimestamp([]byte("apricot"))) - require.Equal(t, val2, s.LookupTimestamp([]byte("banana"))) - require.Equal(t, emptyVal, s.LookupTimestamp([]byte("cherry"))) + s.Add([]byte("apricot"), val1) + require.Equal(t, ts1, s.frontPage().maxTime.get()) + require.Equal(t, emptyVal, s.LookupTimestamp([]byte("apple"))) + require.Equal(t, val1, s.LookupTimestamp([]byte("apricot"))) + require.Equal(t, emptyVal, s.LookupTimestamp([]byte("banana"))) + + s.Add([]byte("banana"), val2) + require.Equal(t, ts2Ceil, s.frontPage().maxTime.get()) + require.Equal(t, emptyVal, s.LookupTimestamp([]byte("apple"))) + require.Equal(t, val1, s.LookupTimestamp([]byte("apricot"))) + require.Equal(t, val2, s.LookupTimestamp([]byte("banana"))) + require.Equal(t, emptyVal, s.LookupTimestamp([]byte("cherry"))) + }) } func TestIntervalSklSingleRange(t *testing.T) { @@ -450,7 +452,7 @@ func TestIntervalSklSingleKeyRanges(t *testing.T) { // Don't allow inverted ranges. require.Panics(t, func() { s.AddRange([]byte("kiwi"), []byte("apple"), 0, val1) }) - require.Equal(t, int64(0), s.frontPage().maxWallTime) + require.Equal(t, ratchetingTime(0), s.frontPage().maxTime) require.Equal(t, emptyVal, s.LookupTimestamp([]byte("apple"))) require.Equal(t, emptyVal, s.LookupTimestamp([]byte("banana"))) require.Equal(t, emptyVal, s.LookupTimestamp([]byte("kiwi"))) @@ -734,71 +736,76 @@ func TestIntervalSklLookupRangeSingleKeyRanges(t *testing.T) { }) } -// TestIntervalSklLookupEqualsEarlierMaxWallTime tests that we properly handle +// TestIntervalSklLookupEqualsEarlierMaxTime tests that we properly handle // the lookup when the timestamp for a range found in the later page is equal to -// the maxWallTime of the earlier page. -func TestIntervalSklLookupEqualsEarlierMaxWallTime(t *testing.T) { +// the maxTime of the earlier page. +func TestIntervalSklLookupEqualsEarlierMaxTime(t *testing.T) { ts1 := makeTS(200, 0) // without Logical part ts2 := makeTS(200, 1) // with Logical part - ts2Ceil := makeTS(201, 0) + ts2Ceil := makeTS(202, 0) txnID1 := "1" txnID2 := "2" - testutils.RunTrueAndFalse(t, "tsWithLogicalPart", func(t *testing.T, logicalPart bool) { - s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) - s.floorTS = floorTS + testutils.RunTrueAndFalse(t, "logical", func(t *testing.T, logical bool) { + testutils.RunTrueAndFalse(t, "synthetic", func(t *testing.T, synthetic bool) { - // Insert an initial value into intervalSkl. - initTS := ts1 - if logicalPart { - initTS = ts2 - } - origVal := makeVal(initTS, txnID1) - s.AddRange([]byte("banana"), []byte("orange"), 0, origVal) + s := newIntervalSkl(nil /* clock */, 0 /* minRet */, makeSklMetrics()) + s.floorTS = floorTS - // Verify the later page's maxWallTime is what we expect. - expMaxTS := ts1 - if logicalPart { - expMaxTS = ts2Ceil - } - require.Equal(t, expMaxTS.WallTime, s.frontPage().maxWallTime) - - // Rotate the page so that new writes will go to a different page. - s.rotatePages(s.frontPage()) - - // Write to overlapping and non-overlapping parts of the new page with - // the values that have the same timestamp as the maxWallTime of the - // earlier page. One value has the same txnID as the previous write in - // the earlier page and one has a different txnID. - valSameID := makeVal(expMaxTS, txnID1) - valDiffID := makeVal(expMaxTS, txnID2) - valNoID := makeValWithoutID(expMaxTS) - s.Add([]byte("apricot"), valSameID) - s.Add([]byte("banana"), valSameID) - s.Add([]byte("orange"), valDiffID) - s.Add([]byte("raspberry"), valDiffID) - - require.Equal(t, valSameID, s.LookupTimestamp([]byte("apricot"))) - require.Equal(t, valSameID, s.LookupTimestamp([]byte("banana"))) - if logicalPart { - // If the initial timestamp had a logical part then - // s.earlier.maxWallTime is inexact (see ratchetMaxTimestamp). When - // we search in the earlier page, we'll find the exact timestamp of - // the overlapping range and realize that its not the same as the - // timestamp of the range in the later page. Because of this, - // ratchetValue WON'T remove the txnID. - require.Equal(t, valDiffID, s.LookupTimestamp([]byte("orange"))) - } else { - // If the initial timestamp did not have a logical part then - // s.earlier.maxWallTime is exact. When we search in the earlier - // page, we'll find the overlapping range and realize that it is the - // same as the timestamp of the range in the later page. Because of - // this, ratchetValue WILL remove the txnID. - require.Equal(t, valNoID, s.LookupTimestamp([]byte("orange"))) - } - require.Equal(t, valDiffID, s.LookupTimestamp([]byte("raspberry"))) - require.Equal(t, floorVal, s.LookupTimestamp([]byte("tomato"))) + // Insert an initial value into intervalSkl. + initTS := ts1 + if logical { + initTS = ts2 + } + initTS = initTS.WithSynthetic(synthetic) + origVal := makeVal(initTS, txnID1) + s.AddRange([]byte("banana"), []byte("orange"), 0, origVal) + + // Verify the later page's maxTime is what we expect. + expMaxTS := ts1 + if logical { + expMaxTS = ts2Ceil + } + expMaxTS = expMaxTS.WithSynthetic(synthetic) + require.Equal(t, expMaxTS, s.frontPage().maxTime.get()) + + // Rotate the page so that new writes will go to a different page. + s.rotatePages(s.frontPage()) + + // Write to overlapping and non-overlapping parts of the new page + // with the values that have the same timestamp as the maxTime of + // the earlier page. One value has the same txnID as the previous + // write in the earlier page and one has a different txnID. + valSameID := makeVal(expMaxTS, txnID1) + valDiffID := makeVal(expMaxTS, txnID2) + valNoID := makeValWithoutID(expMaxTS) + s.Add([]byte("apricot"), valSameID) + s.Add([]byte("banana"), valSameID) + s.Add([]byte("orange"), valDiffID) + s.Add([]byte("raspberry"), valDiffID) + + require.Equal(t, valSameID, s.LookupTimestamp([]byte("apricot"))) + require.Equal(t, valSameID, s.LookupTimestamp([]byte("banana"))) + if logical { + // If the initial timestamp had a logical part then + // s.earlier.maxTime is inexact (see ratchetMaxTimestamp). When + // we search in the earlier page, we'll find the exact timestamp + // of the overlapping range and realize that its not the same as + // the timestamp of the range in the later page. Because of + // this, ratchetValue WON'T remove the txnID. + require.Equal(t, valDiffID, s.LookupTimestamp([]byte("orange"))) + } else { + // If the initial timestamp did not have a logical part then + // s.earlier.maxTime is exact. When we search in the earlier + // page, we'll find the overlapping range and realize that it is + // the same as the timestamp of the range in the later page. + // Because of this, ratchetValue WILL remove the txnID. + require.Equal(t, valNoID, s.LookupTimestamp([]byte("orange"))) + } + require.Equal(t, valDiffID, s.LookupTimestamp([]byte("raspberry"))) + require.Equal(t, floorVal, s.LookupTimestamp([]byte("tomato"))) + }) }) } @@ -901,6 +908,45 @@ func TestIntervalSklMinRetentionWindow(t *testing.T) { require.Equal(t, s.pages.Len(), s.minPages) } +// TestIntervalSklRotateWithSyntheticTimestamps tests that if a page is evicted +// and subsumed by the floor timestamp, then the floor timestamp will continue +// to carry the synthtic flag, if necessary. +func TestIntervalSklRotateWithSyntheticTimestamps(t *testing.T) { + manual := hlc.NewManualClock(200) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + + const minRet = 500 + s := newIntervalSkl(clock, minRet, makeSklMetrics()) + s.setFixedPageSize(1500) + s.floorTS = floorTS + + // Add an initial value with a synthetic timestamp. + // Rotate the page so it's alone. + origKey := []byte("banana") + origTS := clock.Now().WithSynthetic(true) + origVal := makeVal(origTS, "1") + s.Add(origKey, origVal) + s.rotatePages(s.frontPage()) + + // We should still be able to look up the initial value. + require.Equal(t, origVal, s.LookupTimestamp(origKey)) + + // Increment the clock so that the original value is not in the minimum + // retention window. Rotate the pages and the back page should be evicted. + manual.Increment(600) + s.rotatePages(s.frontPage()) + + // The initial value's page was evicted, so it should no longer exist. + // However, since it had the highest timestamp of all values added, its + // timestamp should still exist. Critically, this timestamp should still + // be marked as synthetic. + newVal := s.LookupTimestamp(origKey) + require.NotEqual(t, origVal, newVal, "the original value should be evicted") + require.Equal(t, uuid.Nil, newVal.txnID, "the original value's txn ID should be lost") + require.Equal(t, origVal.ts, newVal.ts, "the original value's timestamp should persist") + require.True(t, newVal.ts.Synthetic, "the synthetic flag should persist") +} + func TestIntervalSklConcurrency(t *testing.T) { defer leaktest.AfterTest(t)() defer util.EnableRacePreemptionPoints()() @@ -974,6 +1020,9 @@ func TestIntervalSklConcurrency(t *testing.T) { if useClock { ts = clock.Now() } + if rng.Intn(2) == 0 { + ts = ts.WithSynthetic(true) + } nowVal := cacheValue{ts: ts, txnID: txnID} s.AddRange(from, to, opt, nowVal) @@ -1075,6 +1124,9 @@ func TestIntervalSklConcurrentVsSequential(t *testing.T) { if useClock { ts = clock.Now() } + if rng.Intn(2) == 0 { + ts = ts.WithSynthetic(true) + } a.val = cacheValue{ts: ts, txnID: txnIDs[i]} // This is a lot of log output so only un-comment to debug.