From c87aa083319ca5ed2fc15a990b49efc6c79b7c2c Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 15 Jan 2024 16:26:13 +0000 Subject: [PATCH 1/4] kvpb: mark `BarrierRequest` as `isAlone` Otherwise, `BatchRequest.RequiresConsensus()` may return `false` and not submit the barrier through Raft. Similarly, `shouldWaitOnLatchesWithoutAcquiring` will return `false` so it will contend with later writes. Barriers are not used in recent releases, so this does not have any mixed-version concerns. Epic: none Release note: None --- pkg/roachpb/api.go | 2 +- pkg/roachpb/batch.go | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 6b03306d8d5d..c029deb881d1 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1534,7 +1534,7 @@ func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot } func (*ScanInterleavedIntentsRequest) flags() flag { return isRead | isRange } -func (*BarrierRequest) flags() flag { return isWrite | isRange } +func (*BarrierRequest) flags() flag { return isWrite | isRange | isAlone } func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange } // IsParallelCommit returns whether the EndTxn request is attempting to perform diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 1733e877cf01..d96ea6ca62b3 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -216,6 +216,12 @@ func (ba *BatchRequest) isSingleRequestWithMethod(m Method) bool { return ba.IsSingleRequest() && ba.Requests[0].GetInner().Method() == m } +// IsSingleBarrierRequest returns true iff the batch contains a single request, +// and that request is a Barrier. +func (ba *BatchRequest) IsSingleBarrierRequest() bool { + return ba.isSingleRequestWithMethod(Barrier) +} + // IsSingleRequestLeaseRequest returns true iff the batch contains a single // request, and that request is a RequestLease. Note that TransferLease requests // return false. @@ -334,7 +340,7 @@ func (ba *BatchRequest) IsSingleExportRequest() bool { // a no-op. The Barrier request requires consensus even though its evaluation // is a no-op. func (ba *BatchRequest) RequiresConsensus() bool { - return ba.isSingleRequestWithMethod(Barrier) || ba.isSingleRequestWithMethod(Probe) + return ba.IsSingleBarrierRequest() || ba.IsSingleProbeRequest() } // IsCompleteTransaction determines whether a batch contains every write in a From 284f6567b5e66b807910b7268118f49d8845c79d Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 15 Jan 2024 16:29:43 +0000 Subject: [PATCH 2/4] batcheval: add `BarrierRequest.WithLeaseAppliedIndex` This can be used to detect whether a replica has applied the barrier command yet. Epic: none Release note: None --- pkg/kv/batch.go | 3 +- pkg/kv/db.go | 44 ++- pkg/kv/kvserver/batcheval/BUILD.bazel | 1 + pkg/kv/kvserver/batcheval/cmd_barrier.go | 12 +- pkg/kv/kvserver/batcheval/cmd_barrier_test.go | 304 ++++++++++++++++++ pkg/kv/kvserver/batcheval/result/result.go | 27 +- pkg/kv/kvserver/replica_application_result.go | 11 + pkg/roachpb/api.go | 16 +- pkg/roachpb/api.proto | 31 +- 9 files changed, 428 insertions(+), 21 deletions(-) create mode 100644 pkg/kv/kvserver/batcheval/cmd_barrier_test.go diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 449a4a342ce1..065322abb10c 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -905,7 +905,7 @@ func (b *Batch) scanInterleavedIntents(s, e interface{}) { b.initResult(1, 0, notRaw, nil) } -func (b *Batch) barrier(s, e interface{}) { +func (b *Batch) barrier(s, e interface{}, withLAI bool) { begin, err := marshalKey(s) if err != nil { b.initResult(0, 0, notRaw, err) @@ -921,6 +921,7 @@ func (b *Batch) barrier(s, e interface{}) { Key: begin, EndKey: end, }, + WithLeaseAppliedIndex: withLAI, } b.appendReqs(req) b.initResult(1, 0, notRaw, nil) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 38f423770a4a..a643ab118512 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -827,23 +827,47 @@ func (db *DB) ScanInterleavedIntents( // writes on the specified key range to finish. func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) { b := &Batch{} - b.barrier(begin, end) - err := getOneErr(db.Run(ctx, b), b) - if err != nil { + b.barrier(begin, end, false /* withLAI */) + if err := getOneErr(db.Run(ctx, b), b); err != nil { return hlc.Timestamp{}, err } - responses := b.response.Responses - if len(responses) == 0 { - return hlc.Timestamp{}, errors.Errorf("unexpected empty response for Barrier") + if l := len(b.response.Responses); l != 1 { + return hlc.Timestamp{}, errors.Errorf("got %d responses for Barrier", l) } - resp, ok := responses[0].GetInner().(*roachpb.BarrierResponse) - if !ok { - return hlc.Timestamp{}, errors.Errorf("unexpected response of type %T for Barrier", - responses[0].GetInner()) + resp := b.response.Responses[0].GetBarrier() + if resp == nil { + return hlc.Timestamp{}, errors.Errorf("unexpected response %T for Barrier", + b.response.Responses[0].GetInner()) } return resp.Timestamp, nil } +// BarrierWithLAI is like Barrier, but also returns the lease applied index and +// range descriptor at which the barrier was applied. In this case, the barrier +// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned. +// +// NB: the protocol support for this was added in a patch release, and is not +// guaranteed to be present with nodes prior to 24.1. In this case, the request +// will return an empty result. +func (db *DB) BarrierWithLAI( + ctx context.Context, begin, end interface{}, +) (uint64, roachpb.RangeDescriptor, error) { + b := &Batch{} + b.barrier(begin, end, true /* withLAI */) + if err := getOneErr(db.Run(ctx, b), b); err != nil { + return 0, roachpb.RangeDescriptor{}, err + } + if l := len(b.response.Responses); l != 1 { + return 0, roachpb.RangeDescriptor{}, errors.Errorf("got %d responses for Barrier", l) + } + resp := b.response.Responses[0].GetBarrier() + if resp == nil { + return 0, roachpb.RangeDescriptor{}, errors.Errorf("unexpected response %T for Barrier", + b.response.Responses[0].GetInner()) + } + return resp.LeaseAppliedIndex, resp.RangeDesc, nil +} + // sendAndFill is a helper which sends the given batch and fills its results, // returning the appropriate error which is either from the first failing call, // or an "internal" error. diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index e9daf4480e6a..ac930485fd06 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -100,6 +100,7 @@ go_test( size = "medium", srcs = [ "cmd_add_sstable_test.go", + "cmd_barrier_test.go", "cmd_clear_range_test.go", "cmd_delete_range_gchint_test.go", "cmd_delete_range_test.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_barrier.go b/pkg/kv/kvserver/batcheval/cmd_barrier.go index b5396de511b8..9020f75eabeb 100644 --- a/pkg/kv/kvserver/batcheval/cmd_barrier.go +++ b/pkg/kv/kvserver/batcheval/cmd_barrier.go @@ -44,13 +44,19 @@ func declareKeysBarrier( latchSpans.AddNonMVCC(spanset.SpanReadWrite, req.Header().Span()) } -// Barrier evaluation is a no-op, as all the latch waiting happens in -// the latch manager. +// Barrier evaluation is a no-op, but it still goes through Raft because of +// BatchRequest.RequiresConsensus(). The latch waiting happens in the latch +// manager, and the WithLeaseAppliedIndex info is populated during application. func Barrier( _ context.Context, _ storage.ReadWriter, cArgs CommandArgs, response roachpb.Response, ) (result.Result, error) { + args := cArgs.Args.(*roachpb.BarrierRequest) resp := response.(*roachpb.BarrierResponse) resp.Timestamp = cArgs.EvalCtx.Clock().Now() - return result.Result{}, nil + return result.Result{ + Local: result.LocalResult{ + PopulateBarrierResponse: args.WithLeaseAppliedIndex, + }, + }, nil } diff --git a/pkg/kv/kvserver/batcheval/cmd_barrier_test.go b/pkg/kv/kvserver/batcheval/cmd_barrier_test.go new file mode 100644 index 000000000000..17c346cea513 --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_barrier_test.go @@ -0,0 +1,304 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval_test + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +// TestBarrierEval tests basic Barrier evaluation. +func TestBarrierEval(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + start := roachpb.Key("a") + end := roachpb.Key("b") + + clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Now()), 0 /* maxOffset */) + ts := clock.Now() + evalCtx := (&batcheval.MockEvalCtx{Clock: clock}).EvalContext() + + testutils.RunTrueAndFalse(t, "WithLeaseAppliedIndex", func(t *testing.T, withLAI bool) { + resp := roachpb.BarrierResponse{} + res, err := batcheval.Barrier(ctx, nil, batcheval.CommandArgs{ + EvalCtx: evalCtx, + Args: &roachpb.BarrierRequest{ + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + WithLeaseAppliedIndex: withLAI, + }, + }, &resp) + require.NoError(t, err) + + require.Equal(t, result.Result{ + Local: result.LocalResult{ + PopulateBarrierResponse: withLAI, + }, + }, res) + + // Ignore the logical timestamp component, which is incremented per reading. + resp.Timestamp.Logical = 0 + + require.Equal(t, roachpb.BarrierResponse{ + Timestamp: ts, + }, resp) + }) +} + +// TestBarrier is an integration test for Barrier. It tests that it processes +// the request and response properly, within a single range and across multiple +// ranges. +func TestBarrier(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // Set up a test server. + srv, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer srv.Stopper().Stop(ctx) + + store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID()) + require.NoError(t, err) + sender := kvDB.NonTransactionalSender() + + // We'll use /a to /z as our keyspace, and split off a range at /x. + prefix := keys.ScratchRangeMin.Clone() + _, _, err = srv.SplitRange(append(prefix, []byte("/x")...)) + require.NoError(t, err) + + // Send Barrier request with/without LeaseAppliedIndex, and within a single + // range or across multiple ranges. + testutils.RunTrueAndFalse(t, "WithLeaseAppliedIndex", func(t *testing.T, withLAI bool) { + testutils.RunTrueAndFalse(t, "crossRange", func(t *testing.T, crossRange bool) { + start := append(prefix, []byte("/a")...) + end := append(prefix, []byte("/b")...) + if crossRange { + end = append(prefix, []byte("/z")...) + } + repl := store.LookupReplica(roachpb.RKey(start)) + + tsBefore := srv.Clock().Now() + laiBefore := repl.GetLeaseAppliedIndex() + req := roachpb.BarrierRequest{ + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + WithLeaseAppliedIndex: withLAI, + } + respI, pErr := kv.SendWrapped(ctx, sender, &req) + + // WithLeaseAppliedIndex should return RangeKeyMismatchError when across + // multiple ranges. + if withLAI && crossRange { + require.Error(t, pErr.GoError()) + require.IsType(t, &roachpb.RangeKeyMismatchError{}, pErr.GoError()) + return + } + + require.NoError(t, pErr.GoError()) + resp, ok := respI.(*roachpb.BarrierResponse) + require.True(t, ok) + + // The timestamp must be after the request was sent. + require.True(t, tsBefore.LessEq(resp.Timestamp)) + + // If WithLeaseAppliedIndex is set, it also returns the LAI and range + // descriptor. + if withLAI { + require.GreaterOrEqual(t, resp.LeaseAppliedIndex, laiBefore) + require.GreaterOrEqual(t, repl.GetLeaseAppliedIndex(), resp.LeaseAppliedIndex) + require.Equal(t, *repl.Desc(), resp.RangeDesc) + } else { + require.Zero(t, resp.LeaseAppliedIndex) + require.Zero(t, resp.RangeDesc) + } + }) + }) +} + +// TestBarrierLatches tests Barrier latch interactions. Specifically, that it +// waits for in-flight requests to complete, but that it does not block later +// requests. +func TestBarrierLatches(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Too slow, times out. + skip.UnderRace(t) + skip.UnderDeadlock(t) + + // Use a timeout, to prevent blocking indefinitely if something goes wrong. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // We'll do reads and writes to two separate keys, with a range split in + // between. + readKey := roachpb.Key("/read") + splitKey := roachpb.Key("/split") + writeKey := roachpb.Key("/write") + + // Set up a request evaluation filter which will block Gets to /read and Puts + // to /write. These will signal that they're blocked via blockedC, and unblock + // when unblockC is closed. + // + // Unfortunately, we can't use a magic context to specify which requests to + // block, since this does not work with external process tenants which may be + // randomly enabled. We therefore have to match the actual keys. + blockedC := make(chan struct{}, 10) + unblockC := make(chan struct{}) + + evalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error { + var shouldBlock bool + key := args.Req.Header().Key + if args.Req.Method() == roachpb.Get && key.Equal(readKey) { + shouldBlock = true + } + if args.Req.Method() == roachpb.Put && key.Equal(writeKey) { + shouldBlock = true + } + if shouldBlock { + // Notify callers that we're blocking. + select { + case blockedC <- struct{}{}: + t.Logf("blocked %s", args.Req) + case <-ctx.Done(): + return roachpb.NewError(ctx.Err()) + } + // Wait to unblock. + select { + case <-unblockC: + t.Logf("unblocked %s", args.Req) + case <-ctx.Done(): + return roachpb.NewError(ctx.Err()) + } + } + return nil + } + + // Set up a test server. + srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: evalFilter, + }, + }, + }, + }) + defer srv.Stopper().Stop(ctx) + + db := srv.DB() + + // Set up helpers to run barriers, both sync and async. + barrier := func(ctx context.Context, start, end roachpb.Key, withLAI bool) (err error) { + if withLAI { + _, _, err = db.BarrierWithLAI(ctx, start, end) + } else { + _, err = db.Barrier(ctx, start, end) + } + return + } + + barrierAsync := func(ctx context.Context, start, end roachpb.Key, withLAI bool) <-chan error { + errC := make(chan error, 1) + go func() { + errC <- barrier(ctx, start, end, withLAI) + }() + return errC + } + + // Split off a range at /split, to test cross-range barriers. + _, _, err := srv.SplitRange(splitKey) + require.NoError(t, err) + + // Spawn read and write requests, and wait for them to block. + go func() { + _ = db.Put(ctx, writeKey, "value") + }() + go func() { + _, _ = db.Get(ctx, readKey) + }() + + for i := 0; i < 2; i++ { + select { + case <-blockedC: + case <-ctx.Done(): + require.NoError(t, ctx.Err()) + } + } + + // Barriers should not conflict outside of these keys. + require.NoError(t, barrier(ctx, readKey.Next(), splitKey, true /* withLAI */)) + require.NoError(t, barrier(ctx, splitKey, writeKey, true /* withLAI */)) + require.Error(t, barrier(ctx, readKey.Next(), writeKey, true /* withLAI */)) // can't span ranges + require.NoError(t, barrier(ctx, readKey.Next(), writeKey, false /* withLAI */)) + + // Barriers should not conflict with read requests. + // + // NB: they do in fact conflict in 22.2, but not in 23.1 and later, likely + // because read requests drop latches before evaluation in later versions. + // That's likely fine. + //require.NoError(t, barrier(ctx, readKey, readKey.Next(), true /* withLAI */)) + + // Barriers should conflict with write requests. We send off two barriers: one + // WithLAI in a single range, and another across ranges. Neither of these + // should return in a second. + withLAIC := barrierAsync(ctx, splitKey, writeKey.Next(), true /* withLAI */) + withoutLAIC := barrierAsync(ctx, readKey, writeKey.Next(), false /* withLAI */) + select { + case err := <-withLAIC: + t.Fatalf("WithLAI=true barrier returned prematurely: %v", err) + case err := <-withoutLAIC: + t.Fatalf("WithLAI=false barrier returned prematurely: %v", err) + case <-time.After(time.Second): + } + + // While the barriers are blocked, later overlapping requests should be able + // to proceed and evaluate below them. + require.NoError(t, db.Put(ctx, splitKey, "value")) + _, err = db.Get(ctx, splitKey) + require.NoError(t, err) + + // Unblock the requests. This should now unblock the barriers as well. + close(unblockC) + + select { + case err := <-withLAIC: + require.NoError(t, err) + case <-time.After(3 * time.Second): + t.Fatal("WithLAI=true barrier did not return") + } + + select { + case err := <-withoutLAIC: + require.NoError(t, err) + case <-time.After(3 * time.Second): + t.Fatal("WithLAI=false barrier did not return") + } +} diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 77f9ae4c79cc..b77ce276debd 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -52,6 +52,9 @@ type LocalResult struct { // commit fails, or we may accidentally make uncommitted values // live. EndTxns []EndTxnIntents + // PopulateBarrierResponse will populate a BarrierResponse with the lease + // applied index and range descriptor when applied. + PopulateBarrierResponse bool // When set (in which case we better be the first range), call // GossipFirstRange if the Replica holds the lease. @@ -79,6 +82,7 @@ func (lResult *LocalResult) IsZero() bool { lResult.ResolvedLocks == nil && lResult.UpdatedTxns == nil && lResult.EndTxns == nil && + !lResult.PopulateBarrierResponse && !lResult.GossipFirstRange && !lResult.MaybeGossipSystemConfig && !lResult.MaybeGossipSystemConfigIfHaveFailure && @@ -93,13 +97,13 @@ func (lResult *LocalResult) String() string { return fmt.Sprintf("LocalResult (reply: %v, "+ "#encountered intents: %d, #acquired locks: %d, #resolved locks: %d"+ "#updated txns: %d #end txns: %d, "+ - "GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ + "PopulateBarrierResponse:%t GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ "MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+ "MaybeGossipNodeLiveness:%s ", lResult.Reply, len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks), len(lResult.UpdatedTxns), len(lResult.EndTxns), - lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, + lResult.PopulateBarrierResponse, lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue, lResult.MaybeGossipNodeLiveness) } @@ -147,6 +151,17 @@ func (lResult *LocalResult) DetachEndTxns(alwaysOnly bool) []EndTxnIntents { return r } +// DetachPopulateBarrierResponse returns (and removes) the +// PopulateBarrierResponse value from the local result. +func (lResult *LocalResult) DetachPopulateBarrierResponse() bool { + if lResult == nil { + return false + } + r := lResult.PopulateBarrierResponse + lResult.PopulateBarrierResponse = false + return r +} + // Result is the result of evaluating a KV request. That is, the // proposer (which holds the lease, at least in the case in which the command // will complete successfully) has evaluated the request and is holding on to: @@ -381,6 +396,14 @@ func (p *Result) MergeAndDestroy(q Result) error { } q.Local.EndTxns = nil + if !p.Local.PopulateBarrierResponse { + p.Local.PopulateBarrierResponse = q.Local.PopulateBarrierResponse + } else { + // PopulateBarrierResponse is only valid for a single Barrier response. + return errors.AssertionFailedf("multiple PopulateBarrierResponse results") + } + q.Local.PopulateBarrierResponse = false + if p.Local.MaybeGossipNodeLiveness == nil { p.Local.MaybeGossipNodeLiveness = q.Local.MaybeGossipNodeLiveness } else if q.Local.MaybeGossipNodeLiveness != nil { diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index e80191f468cc..0dc86eb7f43e 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -171,6 +171,17 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { } cmd.response.EncounteredIntents = cmd.proposal.Local.DetachEncounteredIntents() cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) + + // Populate BarrierResponse if requested. + if pErr == nil && cmd.proposal.Local.DetachPopulateBarrierResponse() { + if resp := cmd.response.Reply.Responses[0].GetBarrier(); resp != nil { + resp.LeaseAppliedIndex = cmd.leaseIndex + resp.RangeDesc = *r.Desc() + } else { + log.Fatalf(ctx, "PopulateBarrierResponse for %T", cmd.response.Reply.Responses[0].GetInner()) + } + } + if pErr == nil { cmd.localResult = cmd.proposal.Local } else if cmd.localResult != nil { diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index c029deb881d1..52a7ebff93be 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -519,6 +519,12 @@ func (r *BarrierResponse) combine(c combinable) error { return err } r.Timestamp.Forward(otherR.Timestamp) + if r.LeaseAppliedIndex != 0 || otherR.LeaseAppliedIndex != 0 { + return errors.AssertionFailedf("can't combine BarrierResponses with LeaseAppliedIndex") + } + if r.RangeDesc.NextReplicaID != 0 || otherR.RangeDesc.NextReplicaID != 0 { + return errors.AssertionFailedf("can't combine BarrierResponses with RangeDesc") + } } return nil } @@ -1534,8 +1540,14 @@ func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot } func (*ScanInterleavedIntentsRequest) flags() flag { return isRead | isRange } -func (*BarrierRequest) flags() flag { return isWrite | isRange | isAlone } -func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange } +func (r *BarrierRequest) flags() flag { + flags := isWrite | isRange | isAlone + if r.WithLeaseAppliedIndex { + flags |= isUnsplittable // the LAI is only valid for a single range + } + return flags +} +func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange } // IsParallelCommit returns whether the EndTxn request is attempting to perform // a parallel commit. See txn_interceptor_committer.go for a discussion about diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index b8262a10c681..7a78eb3a36df 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2119,11 +2119,28 @@ message ScanInterleavedIntentsResponse { repeated Intent intents = 3 [(gogoproto.nullable) = false]; } -// BarrierRequest is the request for a Barrier operation. This goes through Raft -// and has the purpose of waiting until all conflicting in-flight operations on -// this range have completed, without blocking any new operations. +// BarrierRequest is the request for a Barrier operation. This guarantees that +// all past and ongoing operations to a key span have completed and applied on +// the leaseholder. It does this by waiting for all conflicting latches and then +// submitting a noop write through Raft, waiting for it to apply. Later +// operations are not affected -- in particular, it does not actually take out a +// latch, so writers don't have to wait for it to complete and can write below +// the barrier. message BarrierRequest { RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + + // WithLeaseAppliedIndex will return the LeaseAppliedIndex of the barrier + // command in the response, allowing the caller to wait for the barrier to + // apply on an arbitrary replica. It also returns the range descriptor, so the + // caller can detect any unexpected range changes. + // + // When enabled, the barrier request can no longer span multiple ranges, and + // will instead return RangeKeyMismatchError. The caller must be prepared to + // handle this. + // + // NB: This field was added in a patch release. Nodes prior to 24.1 are not + // guaranteed to support it, returning a zero LeaseAppliedIndex instead. + bool with_lease_applied_index = 2; } // BarrierResponse is the response for a Barrier operation. @@ -2133,6 +2150,14 @@ message BarrierResponse { // Timestamp at which this Barrier was evaluated. Can be used to guarantee // future operations happen on the same or newer leaseholders. util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; + + // LeaseAppliedIndex at which this Barrier was applied. Only returned when + // requested via WithLeaseAppliedIndex. + uint64 lease_applied_index = 3; + + // RangeDesc at the time the barrier was applied. Only returned when requested + // via WithLeaseAppliedIndex. + RangeDescriptor range_desc = 4 [(gogoproto.nullable) = false]; } // A RequestUnion contains exactly one of the requests. From 538c3239b77926d499fd3a9cc58bb378759ceb69 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 19 Jan 2024 10:28:30 +0000 Subject: [PATCH 3/4] kvnemsis: add support for `Barrier` operations This only executes random `Barrier` requests, but does not verify that the barrier guarantees are actually satisfied (i.e. that all past and concurrent writes are applied before it returns). At least we get some execution coverage, and verify that it does not have negative interactions with other operations. Epic: none Release note: None --- pkg/kv/kvnemesis/applier.go | 21 ++++++++++ pkg/kv/kvnemesis/generator.go | 61 +++++++++++++++++++++++++++++ pkg/kv/kvnemesis/generator_test.go | 5 ++- pkg/kv/kvnemesis/operations.go | 14 +++++++ pkg/kv/kvnemesis/operations.proto | 8 ++++ pkg/kv/kvnemesis/operations_test.go | 8 ++++ pkg/kv/kvnemesis/validator.go | 13 ++++++ 7 files changed, 129 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvnemesis/applier.go b/pkg/kv/kvnemesis/applier.go index 952f2998c057..4d5cee7e44f5 100644 --- a/pkg/kv/kvnemesis/applier.go +++ b/pkg/kv/kvnemesis/applier.go @@ -102,6 +102,14 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { case *ChangeZoneOperation: err := updateZoneConfigInEnv(ctx, env, o.Type) o.Result = resultError(ctx, err) + case *BarrierOperation: + var err error + if o.WithLeaseAppliedIndex { + _, _, err = db.BarrierWithLAI(ctx, o.Key, o.EndKey) + } else { + _, err = db.Barrier(ctx, o.Key, o.EndKey) + } + o.Result = resultError(ctx, err) case *ClosureTxnOperation: // Use a backoff loop to avoid thrashing on txn aborts. Don't wait between // epochs of the same transaction to avoid waiting while holding locks. @@ -232,6 +240,17 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { o.Result.Keys[i] = deletedKey } } + case *BarrierOperation: + b := &kv.Batch{} + b.AddRawRequest(&roachpb.BarrierRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: o.Key, + EndKey: o.EndKey, + }, + WithLeaseAppliedIndex: o.WithLeaseAppliedIndex, + }) + err := db.Run(ctx, b) + o.Result = resultError(ctx, err) case *BatchOperation: b := &kv.Batch{} applyBatchOp(ctx, b, db.Run, o, inTxn) @@ -274,6 +293,8 @@ func applyBatchOp( panic(errors.AssertionFailedf(`non-transactional batch DelRange operations currently unsupported`)) } b.DelRange(subO.Key, subO.EndKey, true /* returnKeys */) + case *BarrierOperation: + panic(errors.AssertionFailedf(`Barrier cannot be used in batches`)) default: panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO)) } diff --git a/pkg/kv/kvnemesis/generator.go b/pkg/kv/kvnemesis/generator.go index 2289a37e1e72..409ee22eeab0 100644 --- a/pkg/kv/kvnemesis/generator.go +++ b/pkg/kv/kvnemesis/generator.go @@ -12,6 +12,7 @@ package kvnemesis import ( "math/rand" + "sort" "strconv" "github.com/cockroachdb/cockroach/pkg/keys" @@ -104,6 +105,8 @@ type ClientOperationConfig struct { DeleteExisting int // DeleteRange is an operation that Deletes a key range that may contain values. DeleteRange int + // Barrier is an operation that waits for in-flight writes to complete. + Barrier int } // BatchOperationConfig configures the relative probability of generating a @@ -183,6 +186,7 @@ func newAllOperationsConfig() GeneratorConfig { DeleteMissing: 1, DeleteExisting: 1, DeleteRange: 1, + Barrier: 1, } batchOpConfig := BatchOperationConfig{ Batch: 4, @@ -247,6 +251,12 @@ func NewDefaultConfig() GeneratorConfig { // TODO(dan): Remove when #45586 is addressed. config.Ops.ClosureTxn.CommitBatchOps.GetExisting = 0 config.Ops.ClosureTxn.CommitBatchOps.GetMissing = 0 + // Barrier cannot be used in batches, and we omit it in txns too because it + // can result in spurious RangeKeyMismatchErrors that fail the txn operation. + config.Ops.Batch.Ops.Barrier = 0 + config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0 + config.Ops.ClosureTxn.TxnClientOps.Barrier = 0 + config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0 return config } @@ -433,6 +443,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig addOpGen(allowed, randReverseScan, c.ReverseScan) addOpGen(allowed, randReverseScanForUpdate, c.ReverseScanForUpdate) addOpGen(allowed, randDelRange, c.DeleteRange) + addOpGen(allowed, randBarrier, c.Barrier) } func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) { @@ -474,6 +485,21 @@ func randPutExisting(g *generator, rng *rand.Rand) Operation { return put(key, value) } +func randBarrier(g *generator, rng *rand.Rand) Operation { + withLAI := rng.Float64() < 0.5 + var key, endKey string + if withLAI { + // Barriers requesting LAIs can't span multiple ranges, so we try to fit + // them within an existing range. This may race with a concurrent split, in + // which case the Barrier will fail, but that's ok -- most should still + // succeed. These errors are ignored by the validator. + key, endKey = randRangeSpan(rng, g.currentSplits) + } else { + key, endKey = randSpan(rng) + } + return barrier(key, endKey, withLAI) +} + func randScan(g *generator, rng *rand.Rand) Operation { key, endKey := randSpan(rng) return scan(key, endKey) @@ -656,6 +682,33 @@ func randKey(rng *rand.Rand) string { return string(key) } +// Interprets the provided map as the split points of the key space and returns +// the boundaries of a random range. +func randRangeSpan(rng *rand.Rand, curOrHistSplits map[string]struct{}) (string, string) { + genSpan := GeneratorDataSpan() + keys := make([]string, 0, len(curOrHistSplits)) + for key := range curOrHistSplits { + keys = append(keys, key) + } + sort.Strings(keys) + if len(keys) == 0 { + // No splits. + return string(genSpan.Key), string(genSpan.EndKey) + } + idx := rng.Intn(len(keys) + 1) + if idx == len(keys) { + // Last range. + return keys[idx-1], string(genSpan.EndKey) + } + if idx == 0 { + // First range. We avoid having splits at 0 so this will be a well-formed + // range. (If it isn't, we'll likely catch an error because we'll send an + // ill-formed request and kvserver will error it out). + return string(genSpan.Key), keys[0] + } + return keys[idx-1], keys[idx] +} + func randMapKey(rng *rand.Rand, m map[string]struct{}) string { keys := make([]string, 0, len(m)) for key := range m { @@ -756,3 +809,11 @@ func transferLease(key string, target roachpb.StoreID) Operation { func changeZone(changeType ChangeZoneType) Operation { return Operation{ChangeZone: &ChangeZoneOperation{Type: changeType}} } + +func barrier(key, endKey string, withLAI bool) Operation { + return Operation{Barrier: &BarrierOperation{ + Key: []byte(key), + EndKey: []byte(endKey), + WithLeaseAppliedIndex: withLAI, + }} +} diff --git a/pkg/kv/kvnemesis/generator_test.go b/pkg/kv/kvnemesis/generator_test.go index de66d2899f50..a4b2b88bcf25 100644 --- a/pkg/kv/kvnemesis/generator_test.go +++ b/pkg/kv/kvnemesis/generator_test.go @@ -136,6 +136,8 @@ func TestRandStep(t *testing.T) { } case *DeleteRangeOperation: client.DeleteRange++ + case *BarrierOperation: + client.Barrier++ case *BatchOperation: batch.Batch++ countClientOps(&batch.Ops, nil, o.Ops...) @@ -152,7 +154,8 @@ func TestRandStep(t *testing.T) { *ScanOperation, *BatchOperation, *DeleteOperation, - *DeleteRangeOperation: + *DeleteRangeOperation, + *BarrierOperation: countClientOps(&counts.DB, &counts.Batch, step.Op) case *ClosureTxnOperation: countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...) diff --git a/pkg/kv/kvnemesis/operations.go b/pkg/kv/kvnemesis/operations.go index 6d8ad552614f..6705825c8ae7 100644 --- a/pkg/kv/kvnemesis/operations.go +++ b/pkg/kv/kvnemesis/operations.go @@ -33,6 +33,8 @@ func (op Operation) Result() *Result { return &o.Result case *DeleteRangeOperation: return &o.Result + case *BarrierOperation: + return &o.Result case *SplitOperation: return &o.Result case *MergeOperation: @@ -112,6 +114,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) { o.format(w, fctx) case *DeleteRangeOperation: o.format(w, fctx) + case *BarrierOperation: + o.format(w, fctx) case *SplitOperation: o.format(w, fctx) case *MergeOperation: @@ -259,6 +263,16 @@ func (op DeleteRangeOperation) format(w *strings.Builder, fctx formatCtx) { } } +func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) { + if op.WithLeaseAppliedIndex { + fmt.Fprintf(w, `%s.BarrierWithLAI(ctx, %s, %s)`, + fctx.receiver, roachpb.Key(op.Key), roachpb.Key(op.EndKey)) + } else { + fmt.Fprintf(w, `%s.Barrier(ctx, %s, %s)`, fctx.receiver, roachpb.Key(op.Key), roachpb.Key(op.EndKey)) + } + op.Result.format(w) +} + func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) { fmt.Fprintf(w, `%s.AdminSplit(ctx, %s)`, fctx.receiver, roachpb.Key(op.Key)) op.Result.format(w) diff --git a/pkg/kv/kvnemesis/operations.proto b/pkg/kv/kvnemesis/operations.proto index 3f5ac2a33e6d..dff52cc31ae3 100644 --- a/pkg/kv/kvnemesis/operations.proto +++ b/pkg/kv/kvnemesis/operations.proto @@ -69,6 +69,13 @@ message DeleteRangeOperation { Result result = 3 [(gogoproto.nullable) = false]; } +message BarrierOperation { + bytes key = 1; + bytes end_key = 2; + bool with_lease_applied_index = 3; + Result result = 4 [(gogoproto.nullable) = false]; +} + message SplitOperation { bytes key = 1; Result result = 2 [(gogoproto.nullable) = false]; @@ -122,6 +129,7 @@ message Operation { ChangeReplicasOperation change_replicas = 14; TransferLeaseOperation transfer_lease = 15; ChangeZoneOperation change_zone = 16; + BarrierOperation barrier = 22; } enum ResultType { diff --git a/pkg/kv/kvnemesis/operations_test.go b/pkg/kv/kvnemesis/operations_test.go index 92fd862e90ea..c803671f2fea 100644 --- a/pkg/kv/kvnemesis/operations_test.go +++ b/pkg/kv/kvnemesis/operations_test.go @@ -60,6 +60,14 @@ func TestOperationsFormat(t *testing.T) { }) `, }, + { + step: step(barrier(`a`, `b`, false /* withLAI */)), + expected: `db0.Barrier(ctx, "a", "b")`, + }, + { + step: step(barrier(`c`, `d`, true /* withLAI */)), + expected: `db0.BarrierWithLAI(ctx, "c", "d")`, + }, } for _, test := range tests { diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index 72cf8bab205c..8febe7351e61 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -432,6 +432,19 @@ func (v *validator) processOp(txnID *string, op Operation) { v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], scan) v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], deleteOps...) } + case *BarrierOperation: + if op.Barrier.WithLeaseAppliedIndex && + resultIsError(t.Result, &roachpb.RangeKeyMismatchError{}) { + // Barriers requesting LAIs can't span ranges. The generator will + // optimistically try to fit the barrier inside one of the current ranges, + // but this may race with a split, so we ignore the error in this case and + // try again later. + } else { + // Fail or retry on other errors, depending on type. + v.failIfError(op, t.Result) + } + // We don't yet actually check the barrier guarantees here, i.e. that all + // concurrent writes are applied by the time it completes. Maybe later. case *ScanOperation: v.failIfError(op, t.Result) if txnID == nil { From b8c9af129d8127bfb4e980bc0d176b8cc1d3a277 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 17 Jan 2024 13:31:38 +0000 Subject: [PATCH 4/4] kvserver: add `Replica.WaitForLeaseAppliedIndex()` This allows a caller to wait for a replica to reach a certain lease applied index. Similar functionality elsewhere is not migrated yet, out of caution. Epic: none Release note: None --- pkg/kv/kvserver/replica_command.go | 24 ++++++++ pkg/kv/kvserver/replica_command_test.go | 82 +++++++++++++++++++++++++ pkg/kv/kvserver/stores_server.go | 2 + 3 files changed, 108 insertions(+) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index acc70a38bc18..f7aa8e5260e6 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -902,6 +902,30 @@ func waitForReplicasInit( }) } +// WaitForLeaseAppliedIndex waits for the replica to reach the given lease +// applied index, or until the context is cancelled or the replica is destroyed. +// Note that the lease applied index may regress across restarts, since we don't +// sync state machine application to disk. +// +// TODO(erikgrinaker): it would be nice if we could be notified about LAI +// updates instead, but polling will do for now. +func (r *Replica) WaitForLeaseAppliedIndex(ctx context.Context, lai uint64) (uint64, error) { + retryOpts := retry.Options{ + InitialBackoff: 10 * time.Millisecond, + Multiplier: 2, + MaxBackoff: time.Second, + } + for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); { + if currentLAI := r.GetLeaseAppliedIndex(); currentLAI >= lai { + return currentLAI, nil + } + if _, err := r.IsDestroyed(); err != nil { + return 0, err + } + } + return 0, ctx.Err() +} + // ChangeReplicas atomically changes the replicas that are members of a range. // The change is performed in a distributed transaction and takes effect when // that transaction is committed. This transaction confirms that the supplied diff --git a/pkg/kv/kvserver/replica_command_test.go b/pkg/kv/kvserver/replica_command_test.go index 7b6e9e772059..5166eca2de3e 100644 --- a/pkg/kv/kvserver/replica_command_test.go +++ b/pkg/kv/kvserver/replica_command_test.go @@ -13,7 +13,9 @@ package kvserver import ( "context" "encoding/binary" + math "math" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" @@ -22,6 +24,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -599,3 +604,80 @@ func TestSynthesizeTargetsByChangeType(t *testing.T) { }) } } + +func TestWaitForLeaseAppliedIndex(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const maxLAI = math.MaxUint64 + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + tc := testContext{} + tc.Start(ctx, t, stopper) + db := tc.store.DB() + + // Submit a write and read it back to bump the initial LAI. + write := func(key, value string) { + require.NoError(t, db.Put(ctx, key, value)) + _, err := db.Get(ctx, key) + require.NoError(t, err) + } + write("foo", "bar") + + // Should return immediately when already at or past the LAI. + currentLAI := tc.repl.GetLeaseAppliedIndex() + require.NotZero(t, currentLAI) + resultLAI, err := tc.repl.WaitForLeaseAppliedIndex(ctx, currentLAI) + require.NoError(t, err) + require.GreaterOrEqual(t, resultLAI, currentLAI) + + // Should wait for a future LAI to be reached. + const numWrites = 10 + waitLAI := tc.repl.GetLeaseAppliedIndex() + numWrites + laiC := make(chan uint64, 1) + go func() { + lai, err := tc.repl.WaitForLeaseAppliedIndex(ctx, waitLAI) + assert.NoError(t, err) // can't use require in goroutine + laiC <- lai + }() + + select { + case lai := <-laiC: + t.Fatalf("unexpected early LAI %d", lai) + case <-time.After(time.Second): + } + + for i := 0; i < numWrites; i++ { + write("foo", "bar") + } + + select { + case lai := <-laiC: + require.GreaterOrEqual(t, lai, waitLAI) + require.GreaterOrEqual(t, tc.repl.GetLeaseAppliedIndex(), lai) + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for LAI %d", waitLAI) + } + + // Should error on context cancellation. + cancelCtx, cancel := context.WithCancel(ctx) + cancel() + _, err = tc.repl.WaitForLeaseAppliedIndex(cancelCtx, maxLAI) + require.Error(t, err) + require.Equal(t, cancelCtx.Err(), err) + + // Should error on destroyed replicas. + stopper.Stop(ctx) + + destroyErr := errors.New("destroyed") + tc.repl.mu.Lock() + tc.repl.mu.destroyStatus.Set(destroyErr, destroyReasonRemoved) + tc.repl.mu.Unlock() + + _, err = tc.repl.WaitForLeaseAppliedIndex(ctx, maxLAI) + require.Error(t, err) + require.Equal(t, destroyErr, err) +} diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index f3154aca6842..a34089bed9fc 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -77,6 +77,8 @@ func (is Server) CollectChecksum( // // It is the caller's responsibility to cancel or set a timeout on the context. // If the context is never canceled, WaitForApplication will retry forever. +// +// TODO(erikgrinaker): consider using Replica.WaitForLeaseAppliedIndex(). func (is Server) WaitForApplication( ctx context.Context, req *WaitForApplicationRequest, ) (*WaitForApplicationResponse, error) {