Skip to content

Commit

Permalink
Merge #59225 #59502
Browse files Browse the repository at this point in the history
59225: kvserver/tstracker: implement a better closedts tracker r=andreimatei a=andreimatei

This patch implements a replacement for the existing minprop.Tracker.
The new tracker is not yet hooked up.

Compared to the old tracker, this new one is better in multiple ways:
- it's supposed to be used at the level of a single range, not at the
  level of a node
- it supports lock-free concurrent inserts
- it's scope is smaller, making it cleaner and more optimal at the job
  of tracking requests. The old tracker combined tracking evaluating
  requests with various closed timestamps considerations: it was in
  charge of maintaining the closed timestamp, closing new timestamps,
  bumping requests to the closed timestamp, and the tracking that it did
  was relative to the closed timestamp. This new guy only deals with
  tracking (it doesn't know anything about closed timestamps), and so it
  will represent the tracked requests more accurately.

The implementation is intended to work with the proposal buffer's
locking model; locking is external to the tracker, but the caller is
expected to hold a "read" lock while inserting requests for tracking,
and a write lock while removing tracked requests. This matches how the
proposal buffer handles its own insertions and flushing.

The tracking is done using a two-rolling-buckets scheme inspired from
the existing one, but rationalized more.

Release note: None

59502: kvserver: fix a reproposal check r=andreimatei a=andreimatei

In some situations, we repropose a command with a new LAI. When doing
so, we need to make sure that the closed timestamp has not advanced past
the command's write timestamp since the original proposal. Except we
were checking the command's read timestamp, not write timestamp.
Also changes a Less to a LessEq when comparing with the closed
timestamp, which I think is how that comparison should be.

Release note (bug fix): Fixed a very rare chance of inconsistent
follower reads.

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Feb 9, 2021
3 parents ef1ea06 + 4205ece + 645882d commit a8532bc
Show file tree
Hide file tree
Showing 8 changed files with 1,111 additions and 3 deletions.
34 changes: 34 additions & 0 deletions pkg/kv/kvserver/closedts/tracker/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "tracker",
srcs = [
"heap_tracker.go",
"lockfree_tracker.go",
"tracker.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
],
)

go_test(
name = "tracker_test",
srcs = ["tracker_test.go"],
embed = [":tracker"],
deps = [
"//pkg/testutils/skip",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//require",
],
)
126 changes: 126 additions & 0 deletions pkg/kv/kvserver/closedts/tracker/heap_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tracker

import (
"container/heap"
"context"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// heapTracker is a reference implementation of Tracker. Its LowerBound()
// response is precise: the lowest timestamps of the tracked set. The production
// implementation is the more performant lockfreeTracker.
//
// heapTracker maintains the currently tracked set of timestamps in a heap. Each
// element maintains its heap index, so random deletes are supported. All
// methods do internal locking, so all methods can be called concurrently.
type heapTracker struct {
mu struct {
syncutil.Mutex
rs tsHeap
}
}

var _ Tracker = &heapTracker{}

func newHeapTracker() Tracker {
return &heapTracker{}
}

type item struct {
ts hlc.Timestamp
// This item's index in the heap.
index int
}

type tsHeap []*item

var _ heap.Interface = &tsHeap{}

// Less is part of heap.Interface.
func (h tsHeap) Less(i, j int) bool {
return h[i].ts.Less(h[j].ts)
}

// Swap is part of heap.Interface.
func (h tsHeap) Swap(i, j int) {
tmp := h[i]
h[i] = h[j]
h[j] = tmp
h[i].index = i
h[j].index = j
}

// Push is part of heap.Interface.
func (h *tsHeap) Push(x interface{}) {
n := len(*h)
item := x.(*item)
item.index = n
*h = append(*h, item)
}

// Pop is part of heap.Interface.
func (h *tsHeap) Pop() interface{} {
it := (*h)[len(*h)-1]
// Poison the removed element, for safety.
it.index = -1
*h = (*h)[0 : len(*h)-1]
return it
}

// Len is part of heap.Interface.
func (h *tsHeap) Len() int {
return len(*h)
}

// heapToken implements RemovalToken.
type heapToken struct {
*item
}

// RemovalTokenMarker implements RemovalToken.
func (heapToken) RemovalTokenMarker() {}

var _ RemovalToken = heapToken{}

// Track is part of the Tracker interface.
func (h *heapTracker) Track(ctx context.Context, ts hlc.Timestamp) RemovalToken {
h.mu.Lock()
defer h.mu.Unlock()
i := &item{ts: ts}
heap.Push(&h.mu.rs, i)
return heapToken{i}
}

// Untrack is part of the Tracker interface.
func (h *heapTracker) Untrack(ctx context.Context, tok RemovalToken) {
idx := tok.(heapToken).index
if idx == -1 {
log.Fatalf(ctx, "attempting to untrack already-untracked item")
}
h.mu.Lock()
defer h.mu.Unlock()
heap.Remove(&h.mu.rs, idx)
}

// LowerBound is part of the Tracker interface.
func (h *heapTracker) LowerBound(ctx context.Context) hlc.Timestamp {
h.mu.Lock()
defer h.mu.Unlock()
if h.mu.rs.Len() == 0 {
return hlc.Timestamp{}
}
return h.mu.rs[0].ts
}
Loading

0 comments on commit a8532bc

Please sign in to comment.