From 645882d6d531c3dc6030d3962eb5a8a919c5f168 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Wed, 27 Jan 2021 19:04:36 -0500 Subject: [PATCH 1/2] kvserver: fix a reproposal check In some situations, we repropose a command with a new LAI. When doing so, we need to make sure that the closed timestamp has not advanced past the command's write timestamp since the original proposal. Except we were checking the command's read timestamp, not write timestamp. Also changes a Less to a LessEq when comparing with the closed timestamp, which I think is how that comparison should be. Release note (bug fix): Fixed a very rare chance of inconsistent follower reads. --- pkg/kv/kvserver/replica_application_result.go | 5 +++-- pkg/roachpb/api.go | 2 +- pkg/roachpb/batch.go | 21 +++++++++++++++++++ 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 4d9907c39e4b..34faae32038b 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -205,8 +205,9 @@ func (r *Replica) tryReproposeWithNewLeaseIndex( minTS, untrack := r.store.cfg.ClosedTimestamp.Tracker.Track(ctx) defer untrack(ctx, 0, 0, 0) // covers all error paths below - // NB: p.Request.Timestamp reflects the action of ba.SetActiveTimestamp. - if p.Request.Timestamp.Less(minTS) { + // The ConsultsTimestampCache condition matches the similar logic for caring + // about the closed timestamp cache in applyTimestampCache(). + if p.Request.ConsultsTimestampCache() && p.Request.WriteTimestamp().LessEq(minTS) { // The tracker wants us to forward the request timestamp, but we can't // do that without re-evaluating, so give up. The error returned here // will go to back to DistSender, so send something it can digest. diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index e00c03a885bb..71a1e9aca0ca 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -166,7 +166,7 @@ func IsRange(args Request) bool { return (args.flags() & isRange) != 0 } -// ConsultsTimestampCache returns whether the command must consult +// ConsultsTimestampCache returns whether the request must consult // the timestamp cache to determine whether a mutation is safe at // a proposed timestamp or needs to move to a higher timestamp to // avoid re-writing history. diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 08b78cdb61da..4b563b265bd7 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -74,6 +74,19 @@ func (ba BatchRequest) EarliestActiveTimestamp() hlc.Timestamp { return ts } +// WriteTimestamp returns the timestamps at which this request is writing. For +// non-transactional requests, this is the same as the read timestamp. For +// transactional requests, the write timestamp can be higher until commit time. +// +// This should only be called after SetActiveTimestamp(). +func (ba *BatchRequest) WriteTimestamp() hlc.Timestamp { + ts := ba.Timestamp + if ba.Txn != nil { + ts.Forward(ba.Txn.WriteTimestamp) + } + return ts +} + // UpdateTxn updates the batch transaction from the supplied one in // a copy-on-write fashion, i.e. without mutating an existing // Transaction struct. @@ -156,6 +169,14 @@ func (ba *BatchRequest) IsUnsplittable() bool { return ba.hasFlag(isUnsplittable) } +// ConsultsTimestampCache returns whether the request must consult +// the timestamp cache to determine whether a mutation is safe at +// a proposed timestamp or needs to move to a higher timestamp to +// avoid re-writing history. +func (ba *BatchRequest) ConsultsTimestampCache() bool { + return ba.hasFlag(consultsTSCache) +} + // IsSingleRequest returns true iff the BatchRequest contains a single request. func (ba *BatchRequest) IsSingleRequest() bool { return len(ba.Requests) == 1 From 4205ecec324908108476c817ccba12cc90247d95 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Wed, 20 Jan 2021 18:11:39 -0500 Subject: [PATCH 2/2] kvserver/tstracker: implement a better closedts tracker This patch implements a replacement for the existing minprop.Tracker. The new tracker is not yet hooked up. Compared to the old tracker, this new one is better in multiple ways: - it's supposed to be used at the level of a single range, not at the level of a node - it supports lock-free concurrent inserts - it's scope is smaller, making it cleaner and more optimal at the job of tracking requests. The old tracker combined tracking evaluating requests with various closed timestamps considerations: it was in charge of maintaining the closed timestamp, closing new timestamps, bumping requests to the closed timestamp, and the tracking that it did was relative to the closed timestamp. This new guy only deals with tracking (it doesn't know anything about closed timestamps), and so it will represent the tracked requests more accurately. The implementation is intended to work with the proposal buffer's locking model; locking is external to the tracker, but the caller is expected to hold a "read" lock while inserting requests for tracking, and a write lock while removing tracked requests. This matches how the proposal buffer handles its own insertions and flushing. The tracking is done using a two-rolling-buckets scheme inspired from the existing one, but rationalized more. Release note: None --- pkg/kv/kvserver/closedts/tracker/BUILD.bazel | 34 ++ .../kvserver/closedts/tracker/heap_tracker.go | 126 ++++ .../closedts/tracker/lockfree_tracker.go | 275 +++++++++ pkg/kv/kvserver/closedts/tracker/tracker.go | 95 +++ .../kvserver/closedts/tracker/tracker_test.go | 556 ++++++++++++++++++ 5 files changed, 1086 insertions(+) create mode 100644 pkg/kv/kvserver/closedts/tracker/BUILD.bazel create mode 100644 pkg/kv/kvserver/closedts/tracker/heap_tracker.go create mode 100644 pkg/kv/kvserver/closedts/tracker/lockfree_tracker.go create mode 100644 pkg/kv/kvserver/closedts/tracker/tracker.go create mode 100644 pkg/kv/kvserver/closedts/tracker/tracker_test.go diff --git a/pkg/kv/kvserver/closedts/tracker/BUILD.bazel b/pkg/kv/kvserver/closedts/tracker/BUILD.bazel new file mode 100644 index 000000000000..33ec2b6e1d3f --- /dev/null +++ b/pkg/kv/kvserver/closedts/tracker/BUILD.bazel @@ -0,0 +1,34 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "tracker", + srcs = [ + "heap_tracker.go", + "lockfree_tracker.go", + "tracker.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + ], +) + +go_test( + name = "tracker_test", + srcs = ["tracker_test.go"], + embed = [":tracker"], + deps = [ + "//pkg/testutils/skip", + "//pkg/util/ctxgroup", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/kv/kvserver/closedts/tracker/heap_tracker.go b/pkg/kv/kvserver/closedts/tracker/heap_tracker.go new file mode 100644 index 000000000000..620bd05462a6 --- /dev/null +++ b/pkg/kv/kvserver/closedts/tracker/heap_tracker.go @@ -0,0 +1,126 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tracker + +import ( + "container/heap" + "context" + + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// heapTracker is a reference implementation of Tracker. Its LowerBound() +// response is precise: the lowest timestamps of the tracked set. The production +// implementation is the more performant lockfreeTracker. +// +// heapTracker maintains the currently tracked set of timestamps in a heap. Each +// element maintains its heap index, so random deletes are supported. All +// methods do internal locking, so all methods can be called concurrently. +type heapTracker struct { + mu struct { + syncutil.Mutex + rs tsHeap + } +} + +var _ Tracker = &heapTracker{} + +func newHeapTracker() Tracker { + return &heapTracker{} +} + +type item struct { + ts hlc.Timestamp + // This item's index in the heap. + index int +} + +type tsHeap []*item + +var _ heap.Interface = &tsHeap{} + +// Less is part of heap.Interface. +func (h tsHeap) Less(i, j int) bool { + return h[i].ts.Less(h[j].ts) +} + +// Swap is part of heap.Interface. +func (h tsHeap) Swap(i, j int) { + tmp := h[i] + h[i] = h[j] + h[j] = tmp + h[i].index = i + h[j].index = j +} + +// Push is part of heap.Interface. +func (h *tsHeap) Push(x interface{}) { + n := len(*h) + item := x.(*item) + item.index = n + *h = append(*h, item) +} + +// Pop is part of heap.Interface. +func (h *tsHeap) Pop() interface{} { + it := (*h)[len(*h)-1] + // Poison the removed element, for safety. + it.index = -1 + *h = (*h)[0 : len(*h)-1] + return it +} + +// Len is part of heap.Interface. +func (h *tsHeap) Len() int { + return len(*h) +} + +// heapToken implements RemovalToken. +type heapToken struct { + *item +} + +// RemovalTokenMarker implements RemovalToken. +func (heapToken) RemovalTokenMarker() {} + +var _ RemovalToken = heapToken{} + +// Track is part of the Tracker interface. +func (h *heapTracker) Track(ctx context.Context, ts hlc.Timestamp) RemovalToken { + h.mu.Lock() + defer h.mu.Unlock() + i := &item{ts: ts} + heap.Push(&h.mu.rs, i) + return heapToken{i} +} + +// Untrack is part of the Tracker interface. +func (h *heapTracker) Untrack(ctx context.Context, tok RemovalToken) { + idx := tok.(heapToken).index + if idx == -1 { + log.Fatalf(ctx, "attempting to untrack already-untracked item") + } + h.mu.Lock() + defer h.mu.Unlock() + heap.Remove(&h.mu.rs, idx) +} + +// LowerBound is part of the Tracker interface. +func (h *heapTracker) LowerBound(ctx context.Context) hlc.Timestamp { + h.mu.Lock() + defer h.mu.Unlock() + if h.mu.rs.Len() == 0 { + return hlc.Timestamp{} + } + return h.mu.rs[0].ts +} diff --git a/pkg/kv/kvserver/closedts/tracker/lockfree_tracker.go b/pkg/kv/kvserver/closedts/tracker/lockfree_tracker.go new file mode 100644 index 000000000000..b9cd8bced181 --- /dev/null +++ b/pkg/kv/kvserver/closedts/tracker/lockfree_tracker.go @@ -0,0 +1,275 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tracker + +import ( + "context" + "fmt" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// lockfreeTracker is a performant implementation of Tracker, at the expense of +// precision. A precise implementation would hold all tracked timestamps in a +// min-heap, but that costs memory and can't be implemented in a lock-free +// manner (at least not by this author). This Tracker generally doesn't know +// exactly what the lowest tracked timestamp is; it just knows a lower-bound on +// it. Concretely, the tracker doesn't maintain information on all tracked +// timestamps; it only maintains a summary in the form of two buckets, each with +// one timestamp and a reference count. Every timestamp in a bucket counts as if +// it was equal to the bucket's timestamp - even though, in reality, they can be +// higher than the bucket's timestamp. +// +// Some combinations of operations are thread-safe, others need the caller to +// ensure mutual exclusion. In particular, insertions (Track()) are "lock free", +// but deletions (Untrack()) are not. Deletions need exclusive access, so the +// caller needs to use a lock; the intention is for that lock to be held in +// "read" mode for insertions and in write mode for deletions. This data +// structure is meant to be used in conjunction with a propBuf, which uses this +// locking model. +// +// Note that this implementation is only reasonable under some assumptions about +// the use: namely that the lifetimes of all the timestamps in the set are +// fairly similar, and that the timestamps tend to increase over time. This +// matches the expectations of requests (identified by their write timestamp), +// with the lifetime being their evaluation duration. +type lockfreeTracker struct { + // tokens returned by Track() contain the pointer-identity of a bucket, so we + // can't swap the buckets in this array in order to maintain b1 at the front. + // Instead, we swap the b1 and b2 pointers to reorder them. + buckets [2]bucket + b1, b2 *bucket +} + +// NewLockfreeTracker creates a tracker. +func NewLockfreeTracker() Tracker { + t := lockfreeTracker{} + t.b1 = &t.buckets[0] + t.b2 = &t.buckets[1] + return &t +} + +// String cannot be called concurrently with Untrack. +func (t *lockfreeTracker) String() string { + return fmt.Sprintf("b1: %s; b2: %s", t.b1, t.b2) +} + +// Track is part of the Tracker interface. +func (t *lockfreeTracker) Track(ctx context.Context, ts hlc.Timestamp) RemovalToken { + // The tracking scheme is based on maintaining (at most) two buckets of + // timestamps, and continuously draining them and creating new buckets. Timestamps + // come in (through this Track()) and enter a bucket. Later, they leave the + // bucket through Untrack(). Each bucket has a bucket timestamp, which is the + // lowest timestamp that ever entered it. A + // bucket's timestamp can be lowered throughout its life, but never increased. A bucket doesn't + // keep track of which timestamps are in it (it only maintains a count), so the + // bucket is unaware of when the the lowest timestamp (i.e. the timestamp that + // set the bucket's timestamp) leaves. + // + // When a bucket is emptied, it gets reset. Future Track() calls can + // re-initialize it with a new timestamp (generally expected to be higher than + // the timestamp it had before the reset). + // + // At any point, LowerBound() returns the first bucket's timestamp. That's a + // lower bound on all the timestamps currently tracked, since b1's timestamp + // is always lower than b2's. + // + // The diagram below tries to give intuition about how the two buckets work. + // It shows two buckets with timestamps 10 and 20, and three timestamps + // entering the set. It explains which bucket each timestamp joins. + // + // ^ time grows upwards | | + // | | | + // | | | + // | ts 25 joins b2 -> | | | | + // | | | | | + // | | | +----+ + // | | | b2 ts: 20 + // | ts 15 joins b2, -> | | + // | extending it downwards +----+ + // | b1 ts: 10 + // | ts 5 joins b1, -> + // | extending it downwards + // + // Our goal is to maximize the Tracker's lower bound (i.e. its conservative + // approximation about the lowest tracked timestamp), which is b1's timestamp + // (see below). + // + // - 25 is above both buckets (meaning above the buckets' timestamp), so it + // joins b2. It would be technically correct for it to join b1 too, but it'd + // be a bad idea: if b1 would be slow enough to be on the critical path for b1 + // draining (which it likely is, if all the timestamp stay in the set for a + // similar amount of time) then it'd be preventing bumping the lower bound + // from 10 to 20 (which, in practice, would translate in the respective range not + // closing the [10, 20) range of timestamps). + // - 15 is below b2, but above b1. It's not quite as clear cut which + // bucket is the best one to join; if its lifetime is short and + // so it is *not* on the critical path for b1 draining, then it'd be better for + // it to join b1. Once b1 drains, we'll be able to bump the tracker's lower + // bound to 20. On the other hand, if it joins b2, then b2's timestamp comes + // down to 15 and, once b1 drains and 15 is removed from the tracked set, the + // tracker's lower bound would only become 15 (which is worse than 20). But, + // on the third hand, if 15 stays tracked for a while and is on b1's critical + // path, then putting it in b2 would at least allow us to bump the lower bound + // to 15, which is better than nothing. We take this argument, and put it in + // b2. + // - 5 is below both buckets. The only sensible thing to do is putting it + // in b1; otherwise we'd have to extend b2 downwards, inverting b1 and b2. + // + // + // IMPLEMENTATION INVARIANTS: + // + // 1) After a bucket is initialized, its timestamp only gets lower until the + // bucket is reset (i.e. it never increases). This serves to keep the relative + // relation of buckets fixed. + // 2) (a corollary) If both buckets are initialized, b1.timestamp < b2.timestamp. + // 3) If only one bucket is initialized, it is b1. Note that both buckets + // might be uninitialized. + // 4) Initialized buckets are not empty. + + b1, b2 := t.b1, t.b2 + + // The Tracker internally works with int64's, for atomic CAS purposes. So we + // round down the hlc.Timestamp to just its WallTime. + wts := ts.WallTime + + // Make sure that there's at least one bucket. + t1, initialized := b1.timestamp() + + // Join b1 if wts is below it. + // + // It's possible that multiple requests coming at the same time pass the `wts + // <= t1` check and enter b1, even through b2 is uninitialized. This is not + // ideal; it'd be better if only the lowest request would end up in b1 and the + // others would end up in b2 (or, more generally, if some "low" requests join + // b1 and the rest (the "high" ones) go on to create and join b2). But that's + // harder to implement. + if !initialized || wts <= t1 { + return b1.extendAndJoin(ctx, wts, ts.Synthetic) + } + + // We know that b1 < wts. We can technically join either bucket, but we always + // prefer b2 in order to let b1 drain as soon as possible (at which point + // we'll be able to create a new bucket). + return b2.extendAndJoin(ctx, wts, ts.Synthetic) +} + +// Untrack is part of the Tracker interface. +func (t *lockfreeTracker) Untrack(ctx context.Context, tok RemovalToken) { + b := tok.(lockfreeToken).b + // Note that atomic ops are not required here, as we hold the exclusive lock. + b.refcnt-- + if b.refcnt == 0 { + // Reset the bucket, so that future Track() calls can create a new one. + b.ts = 0 + b.synthetic = 0 + // If we reset b1, swap the pointers, so that, if b2 is currently + // initialized, it becomes b1. If a single bucket is initialized, we want it + // to be b1. + if b == t.b1 { + t.b1 = t.b2 + t.b2 = b + } + } +} + +// LowerBound is part of the Tracker interface. +func (t *lockfreeTracker) LowerBound(ctx context.Context) hlc.Timestamp { + // Note that, if b1 is uninitialized, so is b2. If both are initialized, + // b1 < b2. So, we only need to look at b1. + ts, initialized := t.b1.timestamp() + if !initialized { + return hlc.Timestamp{} + } + return hlc.Timestamp{ + WallTime: ts, + Logical: 0, + Synthetic: t.b1.isSynthetic(), + } +} + +// bucket represent a Tracker bucket: a data structure that coalesces a number +// of timestamps, keeping track only of their count and minimum. +// +// A bucket can be initialized or uninitialized. It's initialized when the ts is +// set. +type bucket struct { + ts int64 // atomic, nanos + refcnt int32 // atomic + synthetic int32 // atomic +} + +func (b *bucket) String() string { + ts := atomic.LoadInt64(&b.ts) + if ts == 0 { + return "uninit" + } + refcnt := atomic.LoadInt32(&b.refcnt) + return fmt.Sprintf("%d requests, lower bound: %s", refcnt, timeutil.Unix(0, ts)) +} + +// timestamp returns the bucket's timestamp. The bool retval is true if the +// bucket is initialized. If false, the timestamp is 0. +func (b *bucket) timestamp() (int64, bool) { + ts := atomic.LoadInt64(&b.ts) + return ts, ts != 0 +} + +// isSynthetic returns true if the bucket's timestamp (i.e. the bucket's lower +// bound) should be considered a synthetic timestamp. +func (b *bucket) isSynthetic() bool { + return atomic.LoadInt32(&b.synthetic) != 0 +} + +// extendAndJoin extends the bucket downwards (if necessary) so that its +// timestamp is <= ts, and then adds a timestamp to the bucket. It returns a +// token to be used for removing the timestamp from the bucket. +// +// If the bucket it not initialized, it will be initialized to ts. +func (b *bucket) extendAndJoin(ctx context.Context, ts int64, synthetic bool) lockfreeToken { + // Loop until either we set the bucket's timestamp, or someone else sets it to + // an even lower value. + var t int64 + for { + t = atomic.LoadInt64(&b.ts) + if t != 0 && t <= ts { + break + } + if atomic.CompareAndSwapInt64(&b.ts, t, ts) { + break + } + } + // If we created the bucket, then we dictate if its lower bound will be + // considered a synthetic timestamp or not. It's possible that we're now + // inserting a synthetic timestamp into the bucket but, over time, a higher + // non-synthetic timestamp joins. Or, that a lower non-synthetic timestamp + // joins. In either case, the bucket will remain "synthetic" although it'd be + // correct to make it non-synthetic. We don't make an effort to keep the + // synthetic bit up to date within a bucket. + if t == 0 && synthetic { + atomic.StoreInt32(&b.synthetic, 1) + } + atomic.AddInt32(&b.refcnt, 1) + return lockfreeToken{b: b} +} + +// lockfreeToken implements RemovalToken. +type lockfreeToken struct { + // The bucket that this timestamp is part of. + b *bucket +} + +var _ RemovalToken = lockfreeToken{} + +// RemovalTokenMarker implements RemovalToken. +func (l lockfreeToken) RemovalTokenMarker() {} diff --git a/pkg/kv/kvserver/closedts/tracker/tracker.go b/pkg/kv/kvserver/closedts/tracker/tracker.go new file mode 100644 index 000000000000..4fc41d429bff --- /dev/null +++ b/pkg/kv/kvserver/closedts/tracker/tracker.go @@ -0,0 +1,95 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tracker + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// Tracker tracks the lower bound of a set of timestamps (called the tracked +// set). Timestamps can be added and removed from the tracked set. A +// conservative estimation of the set's lower bound can be queried; the result +// will be lower or equal to all the tracked timestamps; it might not be equal +// to the lowest timestamp currently in the set (i.e. it might not be precise). +// +// For context, the Tracker is used to track the write timestamps of requests +// currently evaluating on a range. The lower bound is used to figure out what +// timestamps can be closed: we can only close timestamps below that of any +// request that's currently evaluating. The Tracker itself does not know +// anything about closed timestamps. +// +// Track can be called concurrently. Other methods cannot be called concurrently +// (with themselves or with any other method, including Track). +// +// The usage pattern is: +// +// Start of request evaluation: +// +// externalLock.RLock() +// tok := Tracker.Track(request.writeTimestamp) +// externalLock.RUnlock() +// +// Proposal buffer flush: +// +// externalLock.Lock() +// for each command being proposed: +// Tracker.Untrack(tok) +// newClosedTimestamp := min(now() - kv.closed_timestamp.target_duration, Tracker.LowerBound() - 1) +// externalLock.Unlock() +// +// The production implementation of the interface is the lockfreeTracker, which +// trades accuracy for performance. There's also a more pedestrian HeapTracker +// reference implementation. +type Tracker interface { + + // Track inserts a timestamps into the tracked set. The returned token must + // later be passed to Untrack() to remove the timestamps from the set. + // + // While `ts` is tracked, LowerBound() will return values less or equal to + // `ts`. + // + // Track can be called concurrently with other Track calls. + // + // Implementations should pay attention so that the returned interface doesn't + // cause an allocation (when the implementation's concrete token type is + // wrapped in the RemovalToken interface). If their token implementation is + // smaller or equal to a pointer size, the allocation will be avoided. + Track(_ context.Context, ts hlc.Timestamp) RemovalToken + + // Untrack removes a timestamp from the tracked set - the timestamp that created + // `tok`. This might advance the result of future LowerBound() calls. + // + // Untrack cannot be called concurrently with other operations. + Untrack(_ context.Context, tok RemovalToken) + + // LowerBound returns a conservative estimate of the lower bound of the + // tracked set of timestamps. If the tracked set is currently empty, an empty + // timestamp is returned. + // + // The returned timestamp might be smaller than the lowest timestamp ever + // inserted into the set. Implementations are allowed to round timestamps + // down. + // + // Synthetic timestamps: The Tracker doesn't necessarily track synthetic / + // physical timestamps precisely; the only guarantee implementations need to + // make is that, if no synthethic timestamp is inserted into the tracked set + // for a while, eventually the LowerBound value will not be synthetic. + LowerBound(context.Context) hlc.Timestamp +} + +// RemovalToken represents the result of Track: a token to be later used with +// Untrack() for removing the respective timestamp from the tracked set. +type RemovalToken interface { + // RemovalTokenMarker is a dummy marker method. + RemovalTokenMarker() +} diff --git a/pkg/kv/kvserver/closedts/tracker/tracker_test.go b/pkg/kv/kvserver/closedts/tracker/tracker_test.go new file mode 100644 index 000000000000..26795b069e4c --- /dev/null +++ b/pkg/kv/kvserver/closedts/tracker/tracker_test.go @@ -0,0 +1,556 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tracker + +import ( + "container/heap" + "context" + "fmt" + "math" + "math/rand" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +func TestLockfreeTracker(t *testing.T) { + defer leaktest.AfterTest(t) + ctx := context.Background() + tr := NewLockfreeTracker() + testTracker(ctx, t, tr) +} + +func TestHeapTracker(t *testing.T) { + defer leaktest.AfterTest(t) + ctx := context.Background() + tr := newHeapTracker() + testTracker(ctx, t, tr) +} + +func testTracker(ctx context.Context, t *testing.T, tr Tracker) { + ts := func(nanos int64) hlc.Timestamp { + return hlc.Timestamp{ + WallTime: nanos, + } + } + + // No requests are evaluating, so LowerBound() returns zero val. + require.True(t, tr.LowerBound(ctx).IsEmpty()) + tok10 := tr.Track(ctx, ts(10)) + require.Equal(t, int64(10), tr.LowerBound(ctx).WallTime) + + tok20 := tr.Track(ctx, ts(20)) + require.Equal(t, int64(10), tr.LowerBound(ctx).WallTime) + tr.Untrack(ctx, tok10) + require.Equal(t, int64(20), tr.LowerBound(ctx).WallTime) + + tok30 := tr.Track(ctx, ts(30)) + tok25 := tr.Track(ctx, ts(25)) + require.Equal(t, int64(20), tr.LowerBound(ctx).WallTime) + tr.Untrack(ctx, tok20) + require.Equal(t, int64(25), tr.LowerBound(ctx).WallTime) + tr.Untrack(ctx, tok25) + // Here we hackily have different logic for the different trackers. The + // lockfree one is not accurate, so it returns a lower LowerBound than the + // heap one. + if _, ok := tr.(*lockfreeTracker); ok { + require.Equal(t, int64(25), tr.LowerBound(ctx).WallTime) + } else { + require.Equal(t, int64(30), tr.LowerBound(ctx).WallTime) + } + tr.Untrack(ctx, tok30) + require.True(t, tr.LowerBound(ctx).IsEmpty()) + + // Check that synthetic timestamps are tracked as such. + synthTS := hlc.Timestamp{ + WallTime: 10, + Synthetic: true, + } + tok := tr.Track(ctx, synthTS) + require.Equal(t, synthTS, tr.LowerBound(ctx)) + // Check that after the Tracker is emptied, lowerbounds are not synthetic any + // more. + tr.Untrack(ctx, tok) + tr.Track(ctx, ts(10)) + require.Equal(t, ts(10), tr.LowerBound(ctx)) +} + +// Test the tracker by throwing random requests at it. We verify that, at all +// times, Tracker.LowerBound()'s error is small (i.e. the lower bound is not +// much lower than the lowest timestamp at which a request is currently +// evaluating). +func TestLockfreeTrackerRandomStress(t *testing.T) { + defer leaktest.AfterTest(t) + ctx := context.Background() + tr := NewLockfreeTracker().(*lockfreeTracker) + + // The test takes a few seconds (configured below). It's also hard on the CPU. + skip.UnderShort(t) + + // How long to stress for. + const testDuration = 2 * time.Second + // How many producers? + const numProducers = 10 + // How many outstanding requests can each producer have? + const maxConcurrentRequestsPerProducer = 50 + // Maximum evaluation duration for a request. Each request will evaluate for a + // random duration with this upper bound. + const maxReqDurationMillis = 5 + // Maximum time in the past that a request can evaluate at. Each request will + // evaluate at a timestamp in the past, with this lower bound. + const maxReqTrailingMillis = 10 + + // We'll generate requests at random timestamps on multiple producer + // goroutines. The test will keep track of what requests are currently + // evaluating, so it can check the tracker's responses. + // At the same time, there a consumer goroutine (taking an exclusive lock), + // and a checker goroutine. + stopT := time.After(testDuration) + stop := make(chan struct{}) + // Adapt the timer channel to a channel of struct{}. + go func() { + <-stopT + close(stop) + }() + + // This mutex is protecting the Tracker. Producers will lock it in read mode + // and consumers in write mode. This matches how the ProposalBuffer + // synchronizes access to the Tracker. + var mu syncutil.RWMutex + var rs requestsCollection + + g := ctxgroup.WithContext(ctx) + + for i := 0; i < numProducers; i++ { + p := makeRequestProducer( + stop, mu.RLocker(), + maxReqDurationMillis, maxReqTrailingMillis, maxConcurrentRequestsPerProducer, + tr, &rs) + g.GoCtx(func(ctx context.Context) error { + p.run(ctx) + return nil + }) + } + + c := makeRequestConsumer(stop, &mu, tr, &rs) + g.GoCtx(func(ctx context.Context) error { + c.run(ctx) + return nil + }) + + checker := makeTrackerChecker(stop, &mu, tr, &rs) + g.GoCtx(checker.run) + + <-stop + require.NoError(t, g.Wait()) + + for _, req := range rs.mu.rs { + tr.Untrack(ctx, req.tok) + } + require.Zero(t, tr.LowerBound(ctx)) + + maxOvershotMillis := checker.maxOvershotNanos / 1000000 + // Maximum tolerated error between what the tracker said the lower bound of + // currently evaluating request was, and what the reality was. This is on the + // order of the maximum time it takes a request to evaluate. Note that the + // requestConsumer measure how long "evaluation" actually took; we don't just + // rely on maxReqDurationMillis. This is in order to make the test resilient + // to the consumer goroutine being starved for a while. The error is highest + // when the Tracker's second bucket has gotten to be really deep (covering + // requests over a long time window). If a consumption step happens then, it + // will clear out the first bucket and most, but possibly not all, of the + // second bucket. If something is left in the second bucket, the lower bound + // error will be large - on the order of a request evaluation time. + // TODO(andrei): I think the 3x below is too conservative; the maximum error + // should be on the order of 1x maxEvaluationTime. And yet 2x fails after + // hours of stressrace on GCE worker (and it failed once on CI stress too). + // Figure out why. + maxToleratedErrorMillis := 3 * c.maxEvaluationTime.Milliseconds() + log.Infof(ctx, "maximum lower bound error: %dms. maximum request evaluation time: %s", + maxOvershotMillis, c.maxEvaluationTime) + require.Lessf(t, maxOvershotMillis, maxToleratedErrorMillis, + "maximum tracker lowerbound error was %dms, above maximum tolerated %dms", + maxOvershotMillis, maxToleratedErrorMillis) +} + +// requests is a collection of requests, ordered by finish time (not by +// evaluation time). +// +// Most, but not all, methods are thread-safe. +type requestsCollection struct { + mu struct { + syncutil.Mutex + rs requestsHeap + } +} + +type requestsHeap []request + +var _ heap.Interface = &requestsHeap{} + +func (rs *requestsCollection) Insert(r request) { + rs.mu.Lock() + defer rs.mu.Unlock() + heap.Push(&rs.mu.rs, r) +} + +func (rs *requestsCollection) Len() int { + rs.mu.Lock() + defer rs.mu.Unlock() + return rs.mu.rs.Len() +} + +// all returns the inner requests. This cannot be called concurrently with any +// other methods; the caller must coordinate exclusive access to the requests. +func (rs *requestsCollection) all() []request { + return rs.mu.rs +} + +// PopMin removes the first request (i.e. the request with the lowest finish +// time) from the collection. +func (rs *requestsCollection) PopMin() request { + rs.mu.Lock() + defer rs.mu.Unlock() + return heap.Pop(&rs.mu.rs).(request) +} + +// PeekFirstFinish returns the timestamp when the first request scheduled to +// finish will finish. It is illegal to call this without ensuring that there's +// at least one request in the collection. +func (rs *requestsCollection) PeekFirstFinish() time.Time { + rs.mu.Lock() + defer rs.mu.Unlock() + return rs.mu.rs[0].finish +} + +// Len is part of heap.Interface. +func (rs requestsHeap) Len() int { + return len(rs) +} + +// Less is part of heap.Interface. +func (rs requestsHeap) Less(i, j int) bool { + return rs[i].finish.Before(rs[j].finish) +} + +// Swap is part of heap.Interface. +func (rs requestsHeap) Swap(i, j int) { + r := rs[i] + rs[i] = rs[j] + rs[j] = r +} + +// Push is part of heap.Interface. +func (rs *requestsHeap) Push(x interface{}) { + r := x.(request) + *rs = append(*rs, r) +} + +// Pop is part of heap.Interface. +func (rs *requestsHeap) Pop() interface{} { + r := (*rs)[len(*rs)-1] + *rs = (*rs)[0 : len(*rs)-1] + return r +} + +// requestProducer is an actor that constantly starts tracking requests +// until signaled to stop. It doesn't untrack any of the requests. +// +// Requests are tracked in both the Tracker and in a requestsCollection. +type requestProducer struct { + reqMaxDurationMillis int + reqMaxTrailingMillis int + + stop <-chan struct{} + // semaphore enforcing a maximum number of concurrent requests. + sem chan struct{} + mu struct { + sync.Locker + t *lockfreeTracker + } + requests *requestsCollection +} + +type request struct { + // The time when the request was created. + start time.Time + // The time at which the request is scheduled to finish evaluation. + finish time.Time + // The time at which the request is writing. + wtsNanos int64 + + // The semaphore to release when the request is completed. + sem chan struct{} + // The tok used to untrack the request. + tok RemovalToken +} + +// release signals the producer that produced this request. +func (r request) release() { + <-r.sem +} + +func makeRequestProducer( + stop <-chan struct{}, + mu sync.Locker, + maxReqDurationMillis int, + maxReqTrailingMillis int, + maxConcurrentRequest int, + t *lockfreeTracker, + rs *requestsCollection, +) requestProducer { + p := requestProducer{ + reqMaxDurationMillis: maxReqDurationMillis, + reqMaxTrailingMillis: maxReqTrailingMillis, + stop: stop, + sem: make(chan struct{}, maxConcurrentRequest), + requests: rs, + } + p.mu.Locker = mu + p.mu.t = t + return p +} + +func (p *requestProducer) wait() bool { + select { + case <-p.stop: + return false + case p.sem <- struct{}{}: + return true + } +} + +func (p *requestProducer) run(ctx context.Context) { + for { + if !p.wait() { + return + } + p.mu.Lock() + + reqDurationMillis := 1 + rand.Intn(p.reqMaxDurationMillis) + reqEndTime := timeutil.Now().Add(time.Duration(reqDurationMillis) * time.Millisecond) + wtsTrailMillis := rand.Intn(p.reqMaxTrailingMillis) + wts := hlc.Timestamp{ + WallTime: timeutil.Now().UnixNano() - (int64(wtsTrailMillis) * 1000000), + } + + tok := p.mu.t.Track(ctx, wts) + req := request{ + finish: reqEndTime, + start: timeutil.Now(), + wtsNanos: wts.WallTime, + sem: p.sem, + tok: tok, + } + p.requests.Insert(req) + + p.mu.Unlock() + } +} + +type requestConsumer struct { + stop <-chan struct{} + mu struct { + sync.Locker + t *lockfreeTracker + requests *requestsCollection + } + // The maximum time a request took from when it was created to when it was + // consumed. + maxEvaluationTime time.Duration +} + +func makeRequestConsumer( + stop <-chan struct{}, mu sync.Locker, t *lockfreeTracker, rs *requestsCollection, +) requestConsumer { + c := requestConsumer{ + stop: stop, + } + c.mu.Locker = mu + c.mu.t = t + c.mu.requests = rs + return c +} + +func (c *requestConsumer) run(ctx context.Context) { + for { + select { + case <-c.stop: + return + case <-time.After(100 * time.Microsecond): + } + c.mu.Lock() + var consumed int + for c.mu.requests.Len() > 0 && c.mu.requests.PeekFirstFinish().Before(timeutil.Now()) { + req := c.mu.requests.PopMin() + c.mu.t.Untrack(ctx, req.tok) + req.release() + consumed++ + evalTime := timeutil.Now().Sub(req.start) + if c.maxEvaluationTime < evalTime { + c.maxEvaluationTime = evalTime + } + } + c.mu.Unlock() + } +} + +type trackerChecker struct { + stop <-chan struct{} + mu struct { + sync.Locker + t *lockfreeTracker + requests *requestsCollection + } + maxOvershotNanos int64 +} + +func makeTrackerChecker( + stop <-chan struct{}, mu sync.Locker, t *lockfreeTracker, rs *requestsCollection, +) trackerChecker { + checker := trackerChecker{ + stop: stop, + } + checker.mu.Locker = mu + checker.mu.t = t + checker.mu.requests = rs + return checker +} + +func (c *trackerChecker) run(ctx context.Context) error { + for { + select { + case <-c.stop: + return nil + case <-time.After(10 * time.Millisecond): + } + c.mu.Lock() + lbNanos := c.mu.t.LowerBound(ctx).WallTime + minEvalTS := int64(math.MaxInt64) + for _, req := range c.mu.requests.all() { + if req.wtsNanos < lbNanos { + c.mu.Unlock() + return fmt.Errorf("bad lower bound %d > req: %d", lbNanos, req.wtsNanos) + } + if req.wtsNanos < minEvalTS { + minEvalTS = req.wtsNanos + } + } + if c.mu.requests.Len() == 0 { + minEvalTS = 0 + } + c.mu.Unlock() + + overshotNanos := minEvalTS - lbNanos + if c.maxOvershotNanos < overshotNanos { + c.maxOvershotNanos = overshotNanos + } + log.VInfof(ctx, 1, "lower bound error: %dms", overshotNanos/1000000) + } +} + +// Results on go 1.15.5 on a Macbook Pro 2.3 GHz 8-Core Intel Core i9 (16 threads): +// +// BenchmarkTracker 38833928 30.9 ns/op +// BenchmarkTracker-2 14426193 71.2 ns/op +// BenchmarkTracker-4 17354930 61.5 ns/op +// BenchmarkTracker-8 24115866 49.7 ns/op +// BenchmarkTracker-16 24667039 45.5 ns/op +// +// The drop in throughput from 1 CPU to 2 CPUs mimics what +// happens for a simple RWMutex.RLock/RUnlock pair. +// TODO(andrei): investigate distributed RWMutexes like +// https://github.com/jonhoo/drwmutex. +func BenchmarkLockfreeTracker(b *testing.B) { + ctx := context.Background() + benchmarkTracker(ctx, b, NewLockfreeTracker()) +} + +// Results on go 1.15.5 on a Macbook Pro 2.3 GHz 8-Core Intel Core i9 (16 threads): +// +// BenchmarkHeapTracker 4646006 355 ns/op +// BenchmarkHeapTracker-2 4602415 235 ns/op +// BenchmarkHeapTracker-4 5170777 229 ns/op +// BenchmarkHeapTracker-8 4938172 244 ns/op +// BenchmarkHeapTracker-16 4223288 268 ns/op +func BenchmarkHeapTracker(b *testing.B) { + ctx := context.Background() + benchmarkTracker(ctx, b, newHeapTracker()) +} + +// benchmarkTracker benchmarks a Tracker. +func benchmarkTracker(ctx context.Context, b *testing.B, t Tracker) { + // This matches what RunParallel does. + numGoRoutines := runtime.GOMAXPROCS(0) + toks := make([][]RemovalToken, numGoRoutines) + for i := range toks { + toks[i] = make([]RemovalToken, 0, 1000000) + } + // Regardless of whether the Tracker being benchmarked + // is fully thread-safe or not, we need locking to synchronize + // access to toks above. Insertions can be done under a read lock, + // consumption is done under a write lock. + var mu syncutil.RWMutex + + // Run a consumer goroutine that periodically consumes everything. + stop := make(chan struct{}) + go func() { + for { + select { + case <-stop: + return + case <-time.After(100 * time.Microsecond): + } + + // Consume all the requests. + mu.Lock() + var n int + for i := range toks { + n += len(toks[i]) + for _, tok := range toks[i] { + t.Untrack(ctx, tok) + // Throw in a call to LowerBound per request. This matches the propBuf + // use. + t.LowerBound(ctx) + } + toks[i] = toks[i][:0] + } + mu.Unlock() + log.VInfof(ctx, 1, "cleared %d reqs", n) + } + }() + + var goroutineID int32 // atomic + b.RunParallel(func(b *testing.PB) { + myid := atomic.AddInt32(&goroutineID, 1) + myid-- // go to 0-based index + i := hlc.Timestamp{} + for b.Next() { + i.WallTime++ + mu.RLock() + tok := t.Track(ctx, i) + toks[myid] = append(toks[myid], tok) + mu.RUnlock() + } + }) + close(stop) +}