Skip to content

Commit

Permalink
Merge #57811
Browse files Browse the repository at this point in the history
57811: kv/tscache: propagate synthetic timestamps through timestamp cache r=nvanbenschoten a=nvanbenschoten

Fixes #57683.

Before this change, a read with a synthetic timestamp would not pass the synthetic flag through the timestamp cache to conflicting writes. This could cause issues where a write that was bumped by one of these future timestamps would not notice that its write timestamp was now synthetic.

This change fixes this by propagating the synthetic flag through the timestamp cache. We have to play a few tricks to make this efficient around `sklPage.ratchetMaxTimestamp`, which attempts to keeping a running maximum of the timestamps in a given skiplist using atomics. The added complexity here is encapsulated in a new `ratchetingTime` type.

The testing around here is actually pretty solid. `TestIntervalSklConcurrency` and `TestIntervalSklConcurrentVsSequential` both perform long series of randomized operations and assert against ratchet inversions. This change adds some randomization around synthetic timestamps to the tests and introduces synthetic timestamps to the ratcheting policy (which wasn't otherwise necessary) to get solid randomized test coverage.

This made encoding and decoding timestamp cache values slightly slower, but on the order of nanoseconds.

```
name                                                             old time/op    new time/op    delta
IntervalSklDecodeValue/logical=false,flags=false,txnID=false-16    11.7ns ± 2%    12.1ns ± 2%  +3.52%  (p=0.000 n=9+10)
IntervalSklDecodeValue/logical=false,flags=false,txnID=true-16     11.7ns ± 1%    12.0ns ± 1%  +2.66%  (p=0.000 n=10+9)
IntervalSklDecodeValue/logical=false,flags=true,txnID=false-16     11.8ns ± 1%    12.1ns ± 2%  +3.27%  (p=0.000 n=10+9)
IntervalSklDecodeValue/logical=false,flags=true,txnID=true-16      11.7ns ± 0%    12.0ns ± 1%  +2.82%  (p=0.000 n=6+10)
IntervalSklDecodeValue/logical=true,flags=false,txnID=false-16     11.7ns ± 2%    12.1ns ± 1%  +3.08%  (p=0.000 n=10+10)
IntervalSklDecodeValue/logical=true,flags=false,txnID=true-16      11.7ns ± 1%    12.1ns ± 2%  +3.59%  (p=0.000 n=10+10)
IntervalSklDecodeValue/logical=true,flags=true,txnID=false-16      11.7ns ± 1%    12.2ns ± 4%  +4.11%  (p=0.000 n=10+10)
IntervalSklDecodeValue/logical=true,flags=true,txnID=true-16       11.7ns ± 2%    12.1ns ± 1%  +3.42%  (p=0.000 n=10+10)
IntervalSklEncodeValue/logical=false,flags=false,txnID=false-16    6.25ns ± 1%    6.52ns ± 1%  +4.38%  (p=0.000 n=9+10)
IntervalSklEncodeValue/logical=false,flags=false,txnID=true-16     6.25ns ± 1%    6.56ns ± 1%  +5.00%  (p=0.000 n=9+9)
IntervalSklEncodeValue/logical=false,flags=true,txnID=false-16     6.23ns ± 2%    6.55ns ± 2%  +5.07%  (p=0.000 n=10+10)
IntervalSklEncodeValue/logical=false,flags=true,txnID=true-16      6.23ns ± 2%    6.56ns ± 4%  +5.28%  (p=0.000 n=10+10)
IntervalSklEncodeValue/logical=true,flags=false,txnID=false-16     6.26ns ± 3%    6.55ns ± 1%  +4.66%  (p=0.000 n=10+10)
IntervalSklEncodeValue/logical=true,flags=false,txnID=true-16      6.24ns ± 1%    6.52ns ± 1%  +4.53%  (p=0.000 n=9+10)
IntervalSklEncodeValue/logical=true,flags=true,txnID=false-16      6.29ns ± 3%    6.53ns ± 2%  +3.71%  (p=0.000 n=9+10)
IntervalSklEncodeValue/logical=true,flags=true,txnID=true-16       6.23ns ± 2%    6.56ns ± 0%  +5.26%  (p=0.000 n=10+8)

name                                                             old alloc/op   new alloc/op   delta
IntervalSklDecodeValue/logical=false,flags=false,txnID=false-16     0.00B          0.00B         ~     (all equal)
IntervalSklDecodeValue/logical=false,flags=false,txnID=true-16      0.00B          0.00B         ~     (all equal)
IntervalSklDecodeValue/logical=false,flags=true,txnID=false-16      0.00B          0.00B         ~     (all equal)
IntervalSklDecodeValue/logical=false,flags=true,txnID=true-16       0.00B          0.00B         ~     (all equal)
IntervalSklDecodeValue/logical=true,flags=false,txnID=false-16      0.00B          0.00B         ~     (all equal)
IntervalSklDecodeValue/logical=true,flags=false,txnID=true-16       0.00B          0.00B         ~     (all equal)
IntervalSklDecodeValue/logical=true,flags=true,txnID=false-16       0.00B          0.00B         ~     (all equal)
IntervalSklDecodeValue/logical=true,flags=true,txnID=true-16        0.00B          0.00B         ~     (all equal)
IntervalSklEncodeValue/logical=false,flags=false,txnID=false-16     0.00B          0.00B         ~     (all equal)
IntervalSklEncodeValue/logical=false,flags=false,txnID=true-16      0.00B          0.00B         ~     (all equal)
IntervalSklEncodeValue/logical=false,flags=true,txnID=false-16      0.00B          0.00B         ~     (all equal)
IntervalSklEncodeValue/logical=false,flags=true,txnID=true-16       0.00B          0.00B         ~     (all equal)
IntervalSklEncodeValue/logical=true,flags=false,txnID=false-16      0.00B          0.00B         ~     (all equal)
IntervalSklEncodeValue/logical=true,flags=false,txnID=true-16       0.00B          0.00B         ~     (all equal)
IntervalSklEncodeValue/logical=true,flags=true,txnID=false-16       0.00B          0.00B         ~     (all equal)
IntervalSklEncodeValue/logical=true,flags=true,txnID=true-16        0.00B          0.00B         ~     (all equal)

name                                                             old allocs/op  new allocs/op  delta
IntervalSklDecodeValue/logical=false,flags=false,txnID=false-16      0.00           0.00         ~     (all equal)
IntervalSklDecodeValue/logical=false,flags=false,txnID=true-16       0.00           0.00         ~     (all equal)
IntervalSklDecodeValue/logical=false,flags=true,txnID=false-16       0.00           0.00         ~     (all equal)
IntervalSklDecodeValue/logical=false,flags=true,txnID=true-16        0.00           0.00         ~     (all equal)
IntervalSklDecodeValue/logical=true,flags=false,txnID=false-16       0.00           0.00         ~     (all equal)
IntervalSklDecodeValue/logical=true,flags=false,txnID=true-16        0.00           0.00         ~     (all equal)
IntervalSklDecodeValue/logical=true,flags=true,txnID=false-16        0.00           0.00         ~     (all equal)
IntervalSklDecodeValue/logical=true,flags=true,txnID=true-16         0.00           0.00         ~     (all equal)
IntervalSklEncodeValue/logical=false,flags=false,txnID=false-16      0.00           0.00         ~     (all equal)
IntervalSklEncodeValue/logical=false,flags=false,txnID=true-16       0.00           0.00         ~     (all equal)
IntervalSklEncodeValue/logical=false,flags=true,txnID=false-16       0.00           0.00         ~     (all equal)
IntervalSklEncodeValue/logical=false,flags=true,txnID=true-16        0.00           0.00         ~     (all equal)
IntervalSklEncodeValue/logical=true,flags=false,txnID=false-16       0.00           0.00         ~     (all equal)
IntervalSklEncodeValue/logical=true,flags=false,txnID=true-16        0.00           0.00         ~     (all equal)
IntervalSklEncodeValue/logical=true,flags=true,txnID=false-16        0.00           0.00         ~     (all equal)
IntervalSklEncodeValue/logical=true,flags=true,txnID=true-16         0.00           0.00         ~     (all equal)
```

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Jan 6, 2021
2 parents f62055c + 1db343c commit 15765c0
Show file tree
Hide file tree
Showing 3 changed files with 346 additions and 170 deletions.
24 changes: 15 additions & 9 deletions pkg/kv/kvserver/tscache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,26 @@ func (v cacheValue) String() string {
//
// This ratcheting policy is shared across all Cache implementations, even if
// they do not use this function directly.
func ratchetValue(old, new cacheValue) (cacheValue, bool) {
func ratchetValue(old, new cacheValue) (res cacheValue, updated bool) {
if old.ts.Less(new.ts) {
// Ratchet to new value.
// New value newer. Ratchet to new value.
return new, true
} else if new.ts.Less(old.ts) {
// Nothing to update.
// Old value newer. Nothing to update.
return old, false
} else if new.txnID != old.txnID {
}

// Equal times.
if new.ts.Synthetic != old.ts.Synthetic {
// old.ts == new.ts but the values have different synthetic flags.
// Remove the synthetic flag from the resulting value.
new.ts.Synthetic = false
}
if new.txnID != old.txnID {
// old.ts == new.ts but the values have different txnIDs. Remove the
// transaction ID from the value so that it is no longer owned by any
// transaction.
// transaction ID from the resulting value so that it is no longer owned
// by any transaction.
new.txnID = noTxnID
return new, old.txnID != noTxnID
}
// old == new.
return old, false
return new, new != old
}
162 changes: 106 additions & 56 deletions pkg/kv/kvserver/tscache/interval_skl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"bytes"
"container/list"
"context"
"encoding/binary"
"fmt"
"sync/atomic"
"time"
Expand All @@ -26,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -90,9 +88,7 @@ const (
)

const (
encodedTsSize = int(unsafe.Sizeof(int64(0)) + unsafe.Sizeof(int32(0)))
encodedTxnIDSize = int(unsafe.Sizeof(uuid.UUID{}))
encodedValSize = encodedTsSize + encodedTxnIDSize
encodedValSize = int(unsafe.Sizeof(cacheValue{}))

// initialSklPageSize is the initial size of each page in the sklImpl's
// intervalSkl. The pages start small to limit the memory footprint of
Expand Down Expand Up @@ -202,7 +198,7 @@ func newIntervalSkl(clock *hlc.Clock, minRet time.Duration, metrics sklMetrics)
minPages: defaultMinSklPages,
metrics: metrics,
}
s.pushNewPage(0 /* maxWallTime */, nil /* arena */)
s.pushNewPage(0 /* maxTime */, nil /* arena */)
s.metrics.Pages.Update(1)
return &s
}
Expand Down Expand Up @@ -301,9 +297,10 @@ func (s *intervalSkl) addRange(from, to []byte, opt rangeOptions, val cacheValue
s.rotMutex.RLock()
defer s.rotMutex.RUnlock()

// If floor ts is >= requested timestamp, then no need to perform a search
// or add any records.
if val.ts.LessEq(s.floorTS) {
// If floor ts is greater than the requested timestamp, then no need to
// perform a search or add any records. We don't return early when the
// timestamps are equal, because their flags may differ.
if val.ts.Less(s.floorTS) {
return nil
}

Expand Down Expand Up @@ -385,7 +382,7 @@ func (s *intervalSkl) frontPage() *sklPage {

// pushNewPage prepends a new empty page to the front of the pages list. It
// accepts an optional arena argument to facilitate re-use.
func (s *intervalSkl) pushNewPage(maxWallTime int64, arena *arenaskl.Arena) {
func (s *intervalSkl) pushNewPage(maxTime ratchetingTime, arena *arenaskl.Arena) {
size := s.nextPageSize()
if arena != nil && arena.Cap() == size {
// Re-use the provided arena, if possible.
Expand All @@ -395,7 +392,7 @@ func (s *intervalSkl) pushNewPage(maxWallTime int64, arena *arenaskl.Arena) {
arena = arenaskl.NewArena(size)
}
p := newSklPage(arena)
p.maxWallTime = maxWallTime
p.maxTime = maxTime
s.pages.PushFront(p)
}

Expand Down Expand Up @@ -455,7 +452,7 @@ func (s *intervalSkl) rotatePages(filledPage *sklPage) {
var oldArena *arenaskl.Arena
for s.pages.Len() >= s.minPages {
bp := back.Value.(*sklPage)
bpMaxTS := hlc.Timestamp{WallTime: bp.maxWallTime}
bpMaxTS := bp.getMaxTimestamp()
if minTSToRetain.LessEq(bpMaxTS) {
// The back page's maximum timestamp is within the time
// window we've promised to retain, so we can't evict it.
Expand All @@ -473,12 +470,12 @@ func (s *intervalSkl) rotatePages(filledPage *sklPage) {
}

// Push a new empty page on the front of the pages list. We give this page
// the maxWallTime of the old front page. This assures that the maxWallTime
// for a page is always equal to or greater than that for all earlier pages.
// In other words, it assures that the maxWallTime for a page is not only
// the maximum timestamp for all values it contains, but also for all values
// any earlier pages contain.
s.pushNewPage(fp.maxWallTime, oldArena)
// the maxTime of the old front page. This assures that the maxTime for a
// page is always equal to or greater than that for all earlier pages. In
// other words, it assures that the maxTime for a page is not only the
// maximum timestamp for all values it contains, but also for all values any
// earlier pages contain.
s.pushNewPage(fp.maxTime, oldArena)

// Update metrics.
s.metrics.Pages.Update(int64(s.pages.Len()))
Expand Down Expand Up @@ -526,7 +523,7 @@ func (s *intervalSkl) LookupTimestampRange(from, to []byte, opt rangeOptions) ca
// different txnID than our current cacheValue result (val), then we
// need to remove the txnID from our result, per the ratcheting policy
// for cacheValues. This is tested in TestIntervalSklMaxPageTS.
maxTS := hlc.Timestamp{WallTime: atomic.LoadInt64(&p.maxWallTime)}
maxTS := p.getMaxTimestamp()
if maxTS.Less(val.ts) {
break
}
Expand Down Expand Up @@ -554,9 +551,9 @@ func (s *intervalSkl) FloorTS() hlc.Timestamp {
// filled up, it returns arenaskl.ErrArenaFull. At that point, a new fixed page
// must be allocated and used instead.
type sklPage struct {
list *arenaskl.Skiplist
maxWallTime int64 // accessed atomically
isFull int32 // accessed atomically
list *arenaskl.Skiplist
maxTime ratchetingTime // accessed atomically
isFull int32 // accessed atomically
}

func newSklPage(arena *arenaskl.Arena) *sklPage {
Expand Down Expand Up @@ -802,6 +799,54 @@ func (p *sklPage) ensureFloorValue(it *arenaskl.Iterator, to []byte, val cacheVa
}

func (p *sklPage) ratchetMaxTimestamp(ts hlc.Timestamp) {
new := makeRatchetingTime(ts)
for {
old := ratchetingTime(atomic.LoadInt64((*int64)(&p.maxTime)))
if new <= old {
break
}

if atomic.CompareAndSwapInt64((*int64)(&p.maxTime), int64(old), int64(new)) {
break
}
}
}

func (p *sklPage) getMaxTimestamp() hlc.Timestamp {
return ratchetingTime(atomic.LoadInt64((*int64)(&p.maxTime))).get()
}

// ratchetingTime is a compressed representation of an hlc.Timestamp, reduced
// down to 64 bits to support atomic access.
//
// ratchetingTime implements compression such that any loss of information when
// passing through the type results in the resulting Timestamp being ratcheted
// to a larger value. This provides the guarantee that the following relation
// holds, regardless of the value of x:
//
// x.LessEq(makeRatchetingTime(x).get())
//
// It also provides the guarantee that if the synthetic flag is set on the
// initial timestamp, then this flag is set on the resulting Timestamp. So the
// following relation is guaranteed to hold, regardless of the value of x:
//
// x.IsFlagSet(SYNTHETIC) == makeRatchetingTime(x).get().IsFlagSet(SYNTHETIC)
//
// Compressed ratchetingTime values compare such that taking the maximum of any
// two ratchetingTime values and converting that back to a Timestamp is always
// equal to or larger than the equivalent call through the Timestamp.Forward
// method. So the following relation is guaranteed to hold, regardless of the
// value of x or y:
//
// z := max(makeRatchetingTime(x), makeRatchetingTime(y)).get()
// x.Forward(y).LessEq(z)
//
// Bit layout (LSB to MSB):
// bits 0: inverted synthetic flag
// bits 1 - 63: upper 63 bits of wall time
type ratchetingTime int64

func makeRatchetingTime(ts hlc.Timestamp) ratchetingTime {
// Cheat and just use the max wall time portion of the timestamp, since it's
// fine for the max timestamp to be a bit too large. This is the case
// because it's always safe to increase the timestamp in a range. It's also
Expand All @@ -815,23 +860,38 @@ func (p *sklPage) ratchetMaxTimestamp(ts hlc.Timestamp) {
// We could use an atomic.Value to store a "MaxValue" cacheValue for a given
// page, but this would be more expensive and it's not clear that it would
// be worth it.
new := ts.WallTime
rt := ratchetingTime(ts.WallTime)
if ts.Logical > 0 {
new++
rt++
}

// TODO(nvanbenschoten): propagate the timestamp synthetic bit through the
// page's max time.
for {
old := atomic.LoadInt64(&p.maxWallTime)
if new <= old {
break
}
// Similarly, cheat and use the last bit in the wall time to indicate
// whether the timestamp is synthetic or not. Do so by first rounding up the
// last bit of the wall time so that it is empty. This is safe for the same
// reason that rounding up the logical portion of the timestamp in the wall
// time is safe (see above).
//
// We use the last bit to indicate that the flag is NOT set. This ensures
// that if two timestamps have the same ordering but different values for
// the synthetic flag, the timestamp without the synthetic flag has a larger
// ratchetingTime value. This follows how Timestamp.Forward treats the flag.
if rt&1 == 1 {
rt++
}
if !ts.Synthetic {
rt |= 1
}

if atomic.CompareAndSwapInt64(&p.maxWallTime, old, new) {
break
}
return rt
}

func (rt ratchetingTime) get() hlc.Timestamp {
var ts hlc.Timestamp
ts.WallTime = int64(rt &^ 1)
if rt&1 == 0 {
ts.Synthetic = true
}
return ts
}

// ratchetPolicy defines the behavior a ratcheting attempt should take when
Expand Down Expand Up @@ -899,12 +959,9 @@ func (p *sklPage) ratchetValueSet(
// must handle with care.

// Ratchet the max timestamp.
keyTs, gapTs := keyVal.ts, gapVal.ts
if gapTs.Less(keyTs) {
p.ratchetMaxTimestamp(keyTs)
} else {
p.ratchetMaxTimestamp(gapTs)
}
maxTs := keyVal.ts
maxTs.Forward(gapVal.ts)
p.ratchetMaxTimestamp(maxTs)

// Remove the hasKey and hasGap flags from the meta. These will be
// replaced below.
Expand Down Expand Up @@ -1151,26 +1208,19 @@ func encodeValueSet(b []byte, keyVal, gapVal cacheValue) (ret []byte, meta uint1
}

func decodeValue(b []byte) (ret []byte, val cacheValue) {
// TODO(nvanbenschoten): decode the timestamp synthetic bit.
val.ts.WallTime = int64(binary.BigEndian.Uint64(b))
val.ts.Logical = int32(binary.BigEndian.Uint32(b[8:]))
var err error
if val.txnID, err = uuid.FromBytes(b[encodedTsSize:encodedValSize]); err != nil {
panic(err)
}
// Copy and interpret the byte slice as a cacheValue.
valPtr := (*[encodedValSize]byte)(unsafe.Pointer(&val))
copy(valPtr[:], b)
ret = b[encodedValSize:]
return
return ret, val
}

func encodeValue(b []byte, val cacheValue) []byte {
// TODO(nvanbenschoten): encode the timestamp synthetic bit.
l := len(b)
b = b[:l+encodedValSize]
binary.BigEndian.PutUint64(b[l:], uint64(val.ts.WallTime))
binary.BigEndian.PutUint32(b[l+8:], uint32(val.ts.Logical))
if _, err := val.txnID.MarshalTo(b[l+encodedTsSize:]); err != nil {
panic(err)
}
// Interpret the cacheValue as a byte slice and copy.
prev := len(b)
b = b[:prev+encodedValSize]
valPtr := (*[encodedValSize]byte)(unsafe.Pointer(&val))
copy(b[prev:], valPtr[:])
return b
}

Expand Down
Loading

0 comments on commit 15765c0

Please sign in to comment.