Skip to content

Commit

Permalink
Merge #69121
Browse files Browse the repository at this point in the history
69121: closedts: delete old closedts mechanism  r=andreimatei a=andreimatei

Deletion of old code followed by cleanups. See individual commits.
The last commit provides a significant improvement by permitting follower reads under an invalid lease.

Release justification: Too good to wait another release. Also the last commit is needed for bounded staleness read availability
during partial failures.

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Aug 31, 2021
2 parents 462ac82 + dcf2273 commit c1ef81f
Show file tree
Hide file tree
Showing 95 changed files with 926 additions and 7,891 deletions.
3 changes: 2 additions & 1 deletion docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ File | Type
--|--
pkg/cli/exit/exit.go | `Code`
pkg/jobs/jobspb/wrap.go | `Type`
pkg/kv/kvserver/closedts/ctpb/entry.go | `LAI`
pkg/kv/kvserver/closedts/ctpb/service.go | `LAI`
pkg/kv/kvserver/closedts/ctpb/service.go | `SeqNum`
pkg/kv/kvserver/concurrency/lock/locking.go | `WaitPolicy`
pkg/kv/kvserver/raft.go | `SnapshotRequest_Type`
pkg/roachpb/data.go | `LeaseSequence`
Expand Down
5 changes: 0 additions & 5 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,8 @@ ALL_TESTS = [
"//pkg/kv/kvserver/apply:apply_test",
"//pkg/kv/kvserver/batcheval/result:result_test",
"//pkg/kv/kvserver/batcheval:batcheval_test",
"//pkg/kv/kvserver/closedts/container:container_test",
"//pkg/kv/kvserver/closedts/minprop:minprop_test",
"//pkg/kv/kvserver/closedts/provider:provider_test",
"//pkg/kv/kvserver/closedts/sidetransport:sidetransport_test",
"//pkg/kv/kvserver/closedts/storage:storage_test",
"//pkg/kv/kvserver/closedts/tracker:tracker_test",
"//pkg/kv/kvserver/closedts/transport:transport_test",
"//pkg/kv/kvserver/closedts:closedts_test",
"//pkg/kv/kvserver/concurrency:concurrency_test",
"//pkg/kv/kvserver/gc:gc_test",
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,13 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
UseDatabase: "t",
Knobs: base.TestingKnobs{
KVClient: &kvcoord.ClientTestingKnobs{
// Inhibit the checking of connection health done by the
// GRPCTransport. This test wants to control what replica (which
// follower) a request is sent to and, depending on timing, the
// connection from n4 to the respective follower might not be
// heartbeated by the time the test wants to use it. Without this
// knob, that would cause the transport to reorder replicas.
DontConsiderConnHealth: true,
LatencyFunc: func(addr string) (time.Duration, bool) {
if (addr == n2Addr.Get()) || (addr == n3Addr.Get()) {
return time.Millisecond, true
Expand Down
7 changes: 0 additions & 7 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,6 @@ const (
// are propagated across RPC boundaries independently of their verbosity setting.
// This requires a version gate this violates implicit assumptions in v20.2.
TracingVerbosityIndependentSemantics
// ClosedTimestampsRaftTransport enables the Raft transport for closed
// timestamps and disables the previous per-node transport.
ClosedTimestampsRaftTransport
// PriorReadSummaries introduces support for the use of read summary objects
// to ship information about reads on a range through lease changes and
// range merges.
Expand Down Expand Up @@ -410,10 +407,6 @@ var versionsSingleton = keyedVersions{
Key: TracingVerbosityIndependentSemantics,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 28},
},
{
Key: ClosedTimestampsRaftTransport,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 36},
},
{
Key: PriorReadSummaries,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 44},
Expand Down
73 changes: 36 additions & 37 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@ type DistSender struct {
// the descriptor, instead of trying to reorder them by latency. The knob
// only applies to requests sent with the LEASEHOLDER routing policy.
dontReorderReplicas bool
// dontConsiderConnHealth, if set, makes the GRPCTransport not take into
// consideration the connection health when deciding the ordering for
// replicas. When not set, replicas on nodes with unhealthy connections are
// deprioritized.
dontConsiderConnHealth bool

// Currently executing range feeds.
activeRangeFeeds sync.Map // // map[*rangeFeedRegistry]nil
Expand Down Expand Up @@ -383,6 +388,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
ds.transportFactory = GRPCTransportFactory
}
ds.dontReorderReplicas = cfg.TestingKnobs.DontReorderReplicas
ds.dontConsiderConnHealth = cfg.TestingKnobs.DontConsiderConnHealth
ds.rpcRetryOptions = base.DefaultRetryOptions()
if cfg.RPCRetryOptions != nil {
ds.rpcRetryOptions = *cfg.RPCRetryOptions
Expand Down Expand Up @@ -1855,8 +1861,9 @@ func (ds *DistSender) sendToReplicas(
}

opts := SendOptions{
class: rpc.ConnectionClassForKey(desc.RSpan().Key),
metrics: &ds.metrics,
class: rpc.ConnectionClassForKey(desc.RSpan().Key),
metrics: &ds.metrics,
dontConsiderConnHealth: ds.dontConsiderConnHealth,
}
transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ type ClientTestingKnobs struct {
// testing purposes.
TransportFactory TransportFactory

// DontConsiderConnHealth, if set, makes the GRPCTransport not take into
// consideration the connection health when deciding the ordering for
// replicas. When not set, replicas on nodes with unhealthy connections are
// deprioritized.
DontConsiderConnHealth bool

// The maximum number of times a txn will attempt to refresh its
// spans for a single transactional batch.
// 0 means use a default. -1 means disable refresh.
Expand Down
14 changes: 11 additions & 3 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import (
type SendOptions struct {
class rpc.ConnectionClass
metrics *DistSenderMetrics
// dontConsiderConnHealth, if set, makes the transport not take into
// consideration the connection health when deciding the ordering for
// replicas. When not set, replicas on nodes with unhealthy connections are
// deprioritized.
dontConsiderConnHealth bool
}

// TransportFactory encapsulates all interaction with the RPC
Expand Down Expand Up @@ -109,6 +114,7 @@ func grpcTransportFactoryImpl(
} else {
replicas = replicas[:len(rs)]
}

// We'll map the index of the replica descriptor in its slice to its health.
var health util.FastIntMap
for i, r := range rs {
Expand All @@ -121,9 +127,11 @@ func grpcTransportFactoryImpl(
}
}

// Put known-healthy clients first, while otherwise respecting the existing
// ordering of the replicas.
splitHealthy(replicas, health)
if !opts.dontConsiderConnHealth {
// Put known-healthy clients first, while otherwise respecting the existing
// ordering of the replicas.
splitHealthy(replicas, health)
}

*transport = grpcTransport{
opts: opts,
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,8 @@ go_library(
"//pkg/kv/kvserver/batcheval",
"//pkg/kv/kvserver/batcheval/result",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/closedts/container",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/closedts/sidetransport",
"//pkg/kv/kvserver/closedts/storage",
"//pkg/kv/kvserver/closedts/tracker",
"//pkg/kv/kvserver/concurrency",
"//pkg/kv/kvserver/constraint",
Expand Down Expand Up @@ -207,7 +205,6 @@ go_test(
"batch_spanset_test.go",
"below_raft_protos_test.go",
"client_atomic_membership_change_test.go",
"client_closed_timestamp_test.go",
"client_lease_test.go",
"client_merge_test.go",
"client_metrics_test.go",
Expand Down Expand Up @@ -253,6 +250,7 @@ go_test(
"replica_command_test.go",
"replica_consistency_test.go",
"replica_evaluate_test.go",
"replica_follower_read_test.go",
"replica_gc_queue_test.go",
"replica_init_test.go",
"replica_learner_test.go",
Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ go_library(
"//pkg/keys",
"//pkg/kv/kvserver/abortspan",
"//pkg/kv/kvserver/batcheval/result",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/concurrency",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/gc",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func QueryResolvedTimestamp(
// because QueryResolvedTimestamp requests are often run without acquiring
// latches (see roachpb.INCONSISTENT) and often also on follower replicas,
// so latches won't help them to synchronize with writes.
closedTS := cArgs.EvalCtx.GetClosedTimestampV2(ctx)
closedTS := cArgs.EvalCtx.GetClosedTimestamp(ctx)

// Compute the minimum timestamp of any intent in the request's key span,
// which may span the entire range, but does not need to.
Expand Down
54 changes: 2 additions & 52 deletions pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -125,50 +124,10 @@ func Subsume(
return result.Result{}, errors.Wrap(err, "watching for merge during subsume")
}

// We prevent followers of the RHS from being able to serve follower reads on
// timestamps that fall in the timestamp window representing the range's
// subsumed state (i.e. between the subsumption time (FreezeStart) and the
// timestamp at which the merge transaction commits or aborts), by requiring
// follower replicas to catch up to an MLAI that succeeds the range's current
// LeaseAppliedIndex (note that we're tracking lai + 1 below instead of lai).
// In case the merge successfully commits, this MLAI will never be caught up
// to since the RHS will be destroyed. In case the merge aborts, this ensures
// that the followers can only activate the newer closed timestamps once they
// catch up to the LAI associated with the merge abort. We need to do this
// because the closed timestamps that are broadcast by RHS in this subsumed
// state are not going to be reflected in the timestamp cache of the LHS range
// after the merge, which can cause a serializability violation.
//
// Note that we are essentially lying to the closed timestamp tracker here in
// order to achieve the effect of unactionable closed timestamp updates until
// the merge concludes. Tracking lai + 1 here ensures that the follower
// replicas need to catch up to at least that index before they are able to
// activate _any of the closed timestamps from this point onwards_. In other
// words, we will never publish a closed timestamp update for this range below
// this lai, regardless of whether a different proposal untracks a lower lai
// at any point in the future.
//
// NB: The above statement relies on the invariant that the LAI that follows a
// Subsume request will be applied only after the merge aborts. More
// specifically, this means that no intervening request can bump the LAI of
// range while it is subsumed. This invariant is upheld because the only Raft
// proposals allowed after a range has been subsumed are lease requests, which
// do not bump the LAI. In case there is lease transfer on this range while it
// is subsumed, we ensure that the initial MLAI update broadcast by the new
// leaseholder respects the invariant in question, in much the same way we do
// here. Take a look at `EmitMLAI()` in replica_closedts.go for more details.
//
// TODO(nvanbenschoten): remove this in v21.2 when the rest of the v1 closed
// timestamp system disappears.
_, untrack := cArgs.EvalCtx.GetTracker().Track(ctx)
lease, _ := cArgs.EvalCtx.GetLease()
lai := cArgs.EvalCtx.GetLeaseAppliedIndex()
untrack(ctx, ctpb.Epoch(lease.Epoch), desc.RangeID, ctpb.LAI(lai+1))

// Now that the range is frozen, collect some information to ship to the LHS
// leaseholder through the merge trigger.
reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
reply.LeaseAppliedIndex = lai
reply.LeaseAppliedIndex = cArgs.EvalCtx.GetLeaseAppliedIndex()
reply.FreezeStart = cArgs.EvalCtx.Clock().NowAsClockTimestamp()

// Collect a read summary from the RHS leaseholder to ship to the LHS
Expand All @@ -188,16 +147,7 @@ func Subsume(
// think about.
priorReadSum.Merge(rspb.FromTimestamp(reply.FreezeStart.ToTimestamp()))
reply.ReadSummary = &priorReadSum
// NOTE FOR v21.1: GetClosedTimestampV2 might return an empty timestamp if
// the Raft-based closed timestamp transport hasn't been enabled yet. That's
// OK because, if the new transport is not enabled, then ranges with leading
// closed timestamps can't exist yet, and so the closed timestamp must be
// below the FreezeStart. The FreezeStart is used by Store.MergeRange to
// bump the RHS' ts cache if LHS/RHS leases are not collocated. The case
// when the leases are collocated also works out because then the closed
// timestamp (according to the old mechanism) is the same for both ranges
// being merged.
reply.ClosedTimestamp = cArgs.EvalCtx.GetClosedTimestampV2(ctx)
reply.ClosedTimestamp = cArgs.EvalCtx.GetClosedTimestamp(ctx)

return result.Result{}, nil
}
11 changes: 3 additions & 8 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
Expand Down Expand Up @@ -63,7 +62,6 @@ type EvalContext interface {
GetFirstIndex() (uint64, error)
GetTerm(uint64) (uint64, error)
GetLeaseAppliedIndex() uint64
GetTracker() closedts.TrackerI

Desc() *roachpb.RangeDescriptor
ContainsKey(key roachpb.Key) bool
Expand Down Expand Up @@ -109,11 +107,11 @@ type EvalContext interface {
// requests on the range.
GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary

// GetClosedTimestampV2 returns the current closed timestamp on the range.
// GetClosedTimestamp returns the current closed timestamp on the range.
// It is expected that a caller will have performed some action (either
// calling RevokeLease or WatchForMerge) to freeze further progression of
// the closed timestamp before calling this method.
GetClosedTimestampV2(ctx context.Context) hlc.Timestamp
GetClosedTimestamp(ctx context.Context) hlc.Timestamp

GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error)
GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage,
Expand Down Expand Up @@ -208,9 +206,6 @@ func (m *mockEvalCtxImpl) GetTerm(uint64) (uint64, error) {
func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() uint64 {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) GetTracker() closedts.TrackerI {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) Desc() *roachpb.RangeDescriptor {
return m.MockEvalCtx.Desc
}
Expand Down Expand Up @@ -246,7 +241,7 @@ func (m *mockEvalCtxImpl) GetRangeInfo(ctx context.Context) roachpb.RangeInfo {
func (m *mockEvalCtxImpl) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary {
return m.CurrentReadSummary
}
func (m *mockEvalCtxImpl) GetClosedTimestampV2(ctx context.Context) hlc.Timestamp {
func (m *mockEvalCtxImpl) GetClosedTimestamp(ctx context.Context) hlc.Timestamp {
return m.ClosedTimestamp
}
func (m *mockEvalCtxImpl) GetExternalStorage(
Expand Down
Loading

0 comments on commit c1ef81f

Please sign in to comment.