diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 449a4a342ce1..065322abb10c 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -905,7 +905,7 @@ func (b *Batch) scanInterleavedIntents(s, e interface{}) { b.initResult(1, 0, notRaw, nil) } -func (b *Batch) barrier(s, e interface{}) { +func (b *Batch) barrier(s, e interface{}, withLAI bool) { begin, err := marshalKey(s) if err != nil { b.initResult(0, 0, notRaw, err) @@ -921,6 +921,7 @@ func (b *Batch) barrier(s, e interface{}) { Key: begin, EndKey: end, }, + WithLeaseAppliedIndex: withLAI, } b.appendReqs(req) b.initResult(1, 0, notRaw, nil) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 38f423770a4a..a643ab118512 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -827,23 +827,47 @@ func (db *DB) ScanInterleavedIntents( // writes on the specified key range to finish. func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) { b := &Batch{} - b.barrier(begin, end) - err := getOneErr(db.Run(ctx, b), b) - if err != nil { + b.barrier(begin, end, false /* withLAI */) + if err := getOneErr(db.Run(ctx, b), b); err != nil { return hlc.Timestamp{}, err } - responses := b.response.Responses - if len(responses) == 0 { - return hlc.Timestamp{}, errors.Errorf("unexpected empty response for Barrier") + if l := len(b.response.Responses); l != 1 { + return hlc.Timestamp{}, errors.Errorf("got %d responses for Barrier", l) } - resp, ok := responses[0].GetInner().(*roachpb.BarrierResponse) - if !ok { - return hlc.Timestamp{}, errors.Errorf("unexpected response of type %T for Barrier", - responses[0].GetInner()) + resp := b.response.Responses[0].GetBarrier() + if resp == nil { + return hlc.Timestamp{}, errors.Errorf("unexpected response %T for Barrier", + b.response.Responses[0].GetInner()) } return resp.Timestamp, nil } +// BarrierWithLAI is like Barrier, but also returns the lease applied index and +// range descriptor at which the barrier was applied. In this case, the barrier +// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned. +// +// NB: the protocol support for this was added in a patch release, and is not +// guaranteed to be present with nodes prior to 24.1. In this case, the request +// will return an empty result. +func (db *DB) BarrierWithLAI( + ctx context.Context, begin, end interface{}, +) (uint64, roachpb.RangeDescriptor, error) { + b := &Batch{} + b.barrier(begin, end, true /* withLAI */) + if err := getOneErr(db.Run(ctx, b), b); err != nil { + return 0, roachpb.RangeDescriptor{}, err + } + if l := len(b.response.Responses); l != 1 { + return 0, roachpb.RangeDescriptor{}, errors.Errorf("got %d responses for Barrier", l) + } + resp := b.response.Responses[0].GetBarrier() + if resp == nil { + return 0, roachpb.RangeDescriptor{}, errors.Errorf("unexpected response %T for Barrier", + b.response.Responses[0].GetInner()) + } + return resp.LeaseAppliedIndex, resp.RangeDesc, nil +} + // sendAndFill is a helper which sends the given batch and fills its results, // returning the appropriate error which is either from the first failing call, // or an "internal" error. diff --git a/pkg/kv/kvnemesis/applier.go b/pkg/kv/kvnemesis/applier.go index 952f2998c057..4d5cee7e44f5 100644 --- a/pkg/kv/kvnemesis/applier.go +++ b/pkg/kv/kvnemesis/applier.go @@ -102,6 +102,14 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { case *ChangeZoneOperation: err := updateZoneConfigInEnv(ctx, env, o.Type) o.Result = resultError(ctx, err) + case *BarrierOperation: + var err error + if o.WithLeaseAppliedIndex { + _, _, err = db.BarrierWithLAI(ctx, o.Key, o.EndKey) + } else { + _, err = db.Barrier(ctx, o.Key, o.EndKey) + } + o.Result = resultError(ctx, err) case *ClosureTxnOperation: // Use a backoff loop to avoid thrashing on txn aborts. Don't wait between // epochs of the same transaction to avoid waiting while holding locks. @@ -232,6 +240,17 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { o.Result.Keys[i] = deletedKey } } + case *BarrierOperation: + b := &kv.Batch{} + b.AddRawRequest(&roachpb.BarrierRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: o.Key, + EndKey: o.EndKey, + }, + WithLeaseAppliedIndex: o.WithLeaseAppliedIndex, + }) + err := db.Run(ctx, b) + o.Result = resultError(ctx, err) case *BatchOperation: b := &kv.Batch{} applyBatchOp(ctx, b, db.Run, o, inTxn) @@ -274,6 +293,8 @@ func applyBatchOp( panic(errors.AssertionFailedf(`non-transactional batch DelRange operations currently unsupported`)) } b.DelRange(subO.Key, subO.EndKey, true /* returnKeys */) + case *BarrierOperation: + panic(errors.AssertionFailedf(`Barrier cannot be used in batches`)) default: panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO)) } diff --git a/pkg/kv/kvnemesis/generator.go b/pkg/kv/kvnemesis/generator.go index 2289a37e1e72..409ee22eeab0 100644 --- a/pkg/kv/kvnemesis/generator.go +++ b/pkg/kv/kvnemesis/generator.go @@ -12,6 +12,7 @@ package kvnemesis import ( "math/rand" + "sort" "strconv" "github.com/cockroachdb/cockroach/pkg/keys" @@ -104,6 +105,8 @@ type ClientOperationConfig struct { DeleteExisting int // DeleteRange is an operation that Deletes a key range that may contain values. DeleteRange int + // Barrier is an operation that waits for in-flight writes to complete. + Barrier int } // BatchOperationConfig configures the relative probability of generating a @@ -183,6 +186,7 @@ func newAllOperationsConfig() GeneratorConfig { DeleteMissing: 1, DeleteExisting: 1, DeleteRange: 1, + Barrier: 1, } batchOpConfig := BatchOperationConfig{ Batch: 4, @@ -247,6 +251,12 @@ func NewDefaultConfig() GeneratorConfig { // TODO(dan): Remove when #45586 is addressed. config.Ops.ClosureTxn.CommitBatchOps.GetExisting = 0 config.Ops.ClosureTxn.CommitBatchOps.GetMissing = 0 + // Barrier cannot be used in batches, and we omit it in txns too because it + // can result in spurious RangeKeyMismatchErrors that fail the txn operation. + config.Ops.Batch.Ops.Barrier = 0 + config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0 + config.Ops.ClosureTxn.TxnClientOps.Barrier = 0 + config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0 return config } @@ -433,6 +443,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig addOpGen(allowed, randReverseScan, c.ReverseScan) addOpGen(allowed, randReverseScanForUpdate, c.ReverseScanForUpdate) addOpGen(allowed, randDelRange, c.DeleteRange) + addOpGen(allowed, randBarrier, c.Barrier) } func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) { @@ -474,6 +485,21 @@ func randPutExisting(g *generator, rng *rand.Rand) Operation { return put(key, value) } +func randBarrier(g *generator, rng *rand.Rand) Operation { + withLAI := rng.Float64() < 0.5 + var key, endKey string + if withLAI { + // Barriers requesting LAIs can't span multiple ranges, so we try to fit + // them within an existing range. This may race with a concurrent split, in + // which case the Barrier will fail, but that's ok -- most should still + // succeed. These errors are ignored by the validator. + key, endKey = randRangeSpan(rng, g.currentSplits) + } else { + key, endKey = randSpan(rng) + } + return barrier(key, endKey, withLAI) +} + func randScan(g *generator, rng *rand.Rand) Operation { key, endKey := randSpan(rng) return scan(key, endKey) @@ -656,6 +682,33 @@ func randKey(rng *rand.Rand) string { return string(key) } +// Interprets the provided map as the split points of the key space and returns +// the boundaries of a random range. +func randRangeSpan(rng *rand.Rand, curOrHistSplits map[string]struct{}) (string, string) { + genSpan := GeneratorDataSpan() + keys := make([]string, 0, len(curOrHistSplits)) + for key := range curOrHistSplits { + keys = append(keys, key) + } + sort.Strings(keys) + if len(keys) == 0 { + // No splits. + return string(genSpan.Key), string(genSpan.EndKey) + } + idx := rng.Intn(len(keys) + 1) + if idx == len(keys) { + // Last range. + return keys[idx-1], string(genSpan.EndKey) + } + if idx == 0 { + // First range. We avoid having splits at 0 so this will be a well-formed + // range. (If it isn't, we'll likely catch an error because we'll send an + // ill-formed request and kvserver will error it out). + return string(genSpan.Key), keys[0] + } + return keys[idx-1], keys[idx] +} + func randMapKey(rng *rand.Rand, m map[string]struct{}) string { keys := make([]string, 0, len(m)) for key := range m { @@ -756,3 +809,11 @@ func transferLease(key string, target roachpb.StoreID) Operation { func changeZone(changeType ChangeZoneType) Operation { return Operation{ChangeZone: &ChangeZoneOperation{Type: changeType}} } + +func barrier(key, endKey string, withLAI bool) Operation { + return Operation{Barrier: &BarrierOperation{ + Key: []byte(key), + EndKey: []byte(endKey), + WithLeaseAppliedIndex: withLAI, + }} +} diff --git a/pkg/kv/kvnemesis/generator_test.go b/pkg/kv/kvnemesis/generator_test.go index de66d2899f50..a4b2b88bcf25 100644 --- a/pkg/kv/kvnemesis/generator_test.go +++ b/pkg/kv/kvnemesis/generator_test.go @@ -136,6 +136,8 @@ func TestRandStep(t *testing.T) { } case *DeleteRangeOperation: client.DeleteRange++ + case *BarrierOperation: + client.Barrier++ case *BatchOperation: batch.Batch++ countClientOps(&batch.Ops, nil, o.Ops...) @@ -152,7 +154,8 @@ func TestRandStep(t *testing.T) { *ScanOperation, *BatchOperation, *DeleteOperation, - *DeleteRangeOperation: + *DeleteRangeOperation, + *BarrierOperation: countClientOps(&counts.DB, &counts.Batch, step.Op) case *ClosureTxnOperation: countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...) diff --git a/pkg/kv/kvnemesis/operations.go b/pkg/kv/kvnemesis/operations.go index 6d8ad552614f..6705825c8ae7 100644 --- a/pkg/kv/kvnemesis/operations.go +++ b/pkg/kv/kvnemesis/operations.go @@ -33,6 +33,8 @@ func (op Operation) Result() *Result { return &o.Result case *DeleteRangeOperation: return &o.Result + case *BarrierOperation: + return &o.Result case *SplitOperation: return &o.Result case *MergeOperation: @@ -112,6 +114,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) { o.format(w, fctx) case *DeleteRangeOperation: o.format(w, fctx) + case *BarrierOperation: + o.format(w, fctx) case *SplitOperation: o.format(w, fctx) case *MergeOperation: @@ -259,6 +263,16 @@ func (op DeleteRangeOperation) format(w *strings.Builder, fctx formatCtx) { } } +func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) { + if op.WithLeaseAppliedIndex { + fmt.Fprintf(w, `%s.BarrierWithLAI(ctx, %s, %s)`, + fctx.receiver, roachpb.Key(op.Key), roachpb.Key(op.EndKey)) + } else { + fmt.Fprintf(w, `%s.Barrier(ctx, %s, %s)`, fctx.receiver, roachpb.Key(op.Key), roachpb.Key(op.EndKey)) + } + op.Result.format(w) +} + func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) { fmt.Fprintf(w, `%s.AdminSplit(ctx, %s)`, fctx.receiver, roachpb.Key(op.Key)) op.Result.format(w) diff --git a/pkg/kv/kvnemesis/operations.proto b/pkg/kv/kvnemesis/operations.proto index 3f5ac2a33e6d..dff52cc31ae3 100644 --- a/pkg/kv/kvnemesis/operations.proto +++ b/pkg/kv/kvnemesis/operations.proto @@ -69,6 +69,13 @@ message DeleteRangeOperation { Result result = 3 [(gogoproto.nullable) = false]; } +message BarrierOperation { + bytes key = 1; + bytes end_key = 2; + bool with_lease_applied_index = 3; + Result result = 4 [(gogoproto.nullable) = false]; +} + message SplitOperation { bytes key = 1; Result result = 2 [(gogoproto.nullable) = false]; @@ -122,6 +129,7 @@ message Operation { ChangeReplicasOperation change_replicas = 14; TransferLeaseOperation transfer_lease = 15; ChangeZoneOperation change_zone = 16; + BarrierOperation barrier = 22; } enum ResultType { diff --git a/pkg/kv/kvnemesis/operations_test.go b/pkg/kv/kvnemesis/operations_test.go index 92fd862e90ea..c803671f2fea 100644 --- a/pkg/kv/kvnemesis/operations_test.go +++ b/pkg/kv/kvnemesis/operations_test.go @@ -60,6 +60,14 @@ func TestOperationsFormat(t *testing.T) { }) `, }, + { + step: step(barrier(`a`, `b`, false /* withLAI */)), + expected: `db0.Barrier(ctx, "a", "b")`, + }, + { + step: step(barrier(`c`, `d`, true /* withLAI */)), + expected: `db0.BarrierWithLAI(ctx, "c", "d")`, + }, } for _, test := range tests { diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index 72cf8bab205c..8febe7351e61 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -432,6 +432,19 @@ func (v *validator) processOp(txnID *string, op Operation) { v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], scan) v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], deleteOps...) } + case *BarrierOperation: + if op.Barrier.WithLeaseAppliedIndex && + resultIsError(t.Result, &roachpb.RangeKeyMismatchError{}) { + // Barriers requesting LAIs can't span ranges. The generator will + // optimistically try to fit the barrier inside one of the current ranges, + // but this may race with a split, so we ignore the error in this case and + // try again later. + } else { + // Fail or retry on other errors, depending on type. + v.failIfError(op, t.Result) + } + // We don't yet actually check the barrier guarantees here, i.e. that all + // concurrent writes are applied by the time it completes. Maybe later. case *ScanOperation: v.failIfError(op, t.Result) if txnID == nil { diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index e9daf4480e6a..ac930485fd06 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -100,6 +100,7 @@ go_test( size = "medium", srcs = [ "cmd_add_sstable_test.go", + "cmd_barrier_test.go", "cmd_clear_range_test.go", "cmd_delete_range_gchint_test.go", "cmd_delete_range_test.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_barrier.go b/pkg/kv/kvserver/batcheval/cmd_barrier.go index b5396de511b8..9020f75eabeb 100644 --- a/pkg/kv/kvserver/batcheval/cmd_barrier.go +++ b/pkg/kv/kvserver/batcheval/cmd_barrier.go @@ -44,13 +44,19 @@ func declareKeysBarrier( latchSpans.AddNonMVCC(spanset.SpanReadWrite, req.Header().Span()) } -// Barrier evaluation is a no-op, as all the latch waiting happens in -// the latch manager. +// Barrier evaluation is a no-op, but it still goes through Raft because of +// BatchRequest.RequiresConsensus(). The latch waiting happens in the latch +// manager, and the WithLeaseAppliedIndex info is populated during application. func Barrier( _ context.Context, _ storage.ReadWriter, cArgs CommandArgs, response roachpb.Response, ) (result.Result, error) { + args := cArgs.Args.(*roachpb.BarrierRequest) resp := response.(*roachpb.BarrierResponse) resp.Timestamp = cArgs.EvalCtx.Clock().Now() - return result.Result{}, nil + return result.Result{ + Local: result.LocalResult{ + PopulateBarrierResponse: args.WithLeaseAppliedIndex, + }, + }, nil } diff --git a/pkg/kv/kvserver/batcheval/cmd_barrier_test.go b/pkg/kv/kvserver/batcheval/cmd_barrier_test.go new file mode 100644 index 000000000000..17c346cea513 --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_barrier_test.go @@ -0,0 +1,304 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval_test + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +// TestBarrierEval tests basic Barrier evaluation. +func TestBarrierEval(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + start := roachpb.Key("a") + end := roachpb.Key("b") + + clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Now()), 0 /* maxOffset */) + ts := clock.Now() + evalCtx := (&batcheval.MockEvalCtx{Clock: clock}).EvalContext() + + testutils.RunTrueAndFalse(t, "WithLeaseAppliedIndex", func(t *testing.T, withLAI bool) { + resp := roachpb.BarrierResponse{} + res, err := batcheval.Barrier(ctx, nil, batcheval.CommandArgs{ + EvalCtx: evalCtx, + Args: &roachpb.BarrierRequest{ + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + WithLeaseAppliedIndex: withLAI, + }, + }, &resp) + require.NoError(t, err) + + require.Equal(t, result.Result{ + Local: result.LocalResult{ + PopulateBarrierResponse: withLAI, + }, + }, res) + + // Ignore the logical timestamp component, which is incremented per reading. + resp.Timestamp.Logical = 0 + + require.Equal(t, roachpb.BarrierResponse{ + Timestamp: ts, + }, resp) + }) +} + +// TestBarrier is an integration test for Barrier. It tests that it processes +// the request and response properly, within a single range and across multiple +// ranges. +func TestBarrier(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // Set up a test server. + srv, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer srv.Stopper().Stop(ctx) + + store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID()) + require.NoError(t, err) + sender := kvDB.NonTransactionalSender() + + // We'll use /a to /z as our keyspace, and split off a range at /x. + prefix := keys.ScratchRangeMin.Clone() + _, _, err = srv.SplitRange(append(prefix, []byte("/x")...)) + require.NoError(t, err) + + // Send Barrier request with/without LeaseAppliedIndex, and within a single + // range or across multiple ranges. + testutils.RunTrueAndFalse(t, "WithLeaseAppliedIndex", func(t *testing.T, withLAI bool) { + testutils.RunTrueAndFalse(t, "crossRange", func(t *testing.T, crossRange bool) { + start := append(prefix, []byte("/a")...) + end := append(prefix, []byte("/b")...) + if crossRange { + end = append(prefix, []byte("/z")...) + } + repl := store.LookupReplica(roachpb.RKey(start)) + + tsBefore := srv.Clock().Now() + laiBefore := repl.GetLeaseAppliedIndex() + req := roachpb.BarrierRequest{ + RequestHeader: roachpb.RequestHeader{Key: start, EndKey: end}, + WithLeaseAppliedIndex: withLAI, + } + respI, pErr := kv.SendWrapped(ctx, sender, &req) + + // WithLeaseAppliedIndex should return RangeKeyMismatchError when across + // multiple ranges. + if withLAI && crossRange { + require.Error(t, pErr.GoError()) + require.IsType(t, &roachpb.RangeKeyMismatchError{}, pErr.GoError()) + return + } + + require.NoError(t, pErr.GoError()) + resp, ok := respI.(*roachpb.BarrierResponse) + require.True(t, ok) + + // The timestamp must be after the request was sent. + require.True(t, tsBefore.LessEq(resp.Timestamp)) + + // If WithLeaseAppliedIndex is set, it also returns the LAI and range + // descriptor. + if withLAI { + require.GreaterOrEqual(t, resp.LeaseAppliedIndex, laiBefore) + require.GreaterOrEqual(t, repl.GetLeaseAppliedIndex(), resp.LeaseAppliedIndex) + require.Equal(t, *repl.Desc(), resp.RangeDesc) + } else { + require.Zero(t, resp.LeaseAppliedIndex) + require.Zero(t, resp.RangeDesc) + } + }) + }) +} + +// TestBarrierLatches tests Barrier latch interactions. Specifically, that it +// waits for in-flight requests to complete, but that it does not block later +// requests. +func TestBarrierLatches(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Too slow, times out. + skip.UnderRace(t) + skip.UnderDeadlock(t) + + // Use a timeout, to prevent blocking indefinitely if something goes wrong. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // We'll do reads and writes to two separate keys, with a range split in + // between. + readKey := roachpb.Key("/read") + splitKey := roachpb.Key("/split") + writeKey := roachpb.Key("/write") + + // Set up a request evaluation filter which will block Gets to /read and Puts + // to /write. These will signal that they're blocked via blockedC, and unblock + // when unblockC is closed. + // + // Unfortunately, we can't use a magic context to specify which requests to + // block, since this does not work with external process tenants which may be + // randomly enabled. We therefore have to match the actual keys. + blockedC := make(chan struct{}, 10) + unblockC := make(chan struct{}) + + evalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error { + var shouldBlock bool + key := args.Req.Header().Key + if args.Req.Method() == roachpb.Get && key.Equal(readKey) { + shouldBlock = true + } + if args.Req.Method() == roachpb.Put && key.Equal(writeKey) { + shouldBlock = true + } + if shouldBlock { + // Notify callers that we're blocking. + select { + case blockedC <- struct{}{}: + t.Logf("blocked %s", args.Req) + case <-ctx.Done(): + return roachpb.NewError(ctx.Err()) + } + // Wait to unblock. + select { + case <-unblockC: + t.Logf("unblocked %s", args.Req) + case <-ctx.Done(): + return roachpb.NewError(ctx.Err()) + } + } + return nil + } + + // Set up a test server. + srv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: evalFilter, + }, + }, + }, + }) + defer srv.Stopper().Stop(ctx) + + db := srv.DB() + + // Set up helpers to run barriers, both sync and async. + barrier := func(ctx context.Context, start, end roachpb.Key, withLAI bool) (err error) { + if withLAI { + _, _, err = db.BarrierWithLAI(ctx, start, end) + } else { + _, err = db.Barrier(ctx, start, end) + } + return + } + + barrierAsync := func(ctx context.Context, start, end roachpb.Key, withLAI bool) <-chan error { + errC := make(chan error, 1) + go func() { + errC <- barrier(ctx, start, end, withLAI) + }() + return errC + } + + // Split off a range at /split, to test cross-range barriers. + _, _, err := srv.SplitRange(splitKey) + require.NoError(t, err) + + // Spawn read and write requests, and wait for them to block. + go func() { + _ = db.Put(ctx, writeKey, "value") + }() + go func() { + _, _ = db.Get(ctx, readKey) + }() + + for i := 0; i < 2; i++ { + select { + case <-blockedC: + case <-ctx.Done(): + require.NoError(t, ctx.Err()) + } + } + + // Barriers should not conflict outside of these keys. + require.NoError(t, barrier(ctx, readKey.Next(), splitKey, true /* withLAI */)) + require.NoError(t, barrier(ctx, splitKey, writeKey, true /* withLAI */)) + require.Error(t, barrier(ctx, readKey.Next(), writeKey, true /* withLAI */)) // can't span ranges + require.NoError(t, barrier(ctx, readKey.Next(), writeKey, false /* withLAI */)) + + // Barriers should not conflict with read requests. + // + // NB: they do in fact conflict in 22.2, but not in 23.1 and later, likely + // because read requests drop latches before evaluation in later versions. + // That's likely fine. + //require.NoError(t, barrier(ctx, readKey, readKey.Next(), true /* withLAI */)) + + // Barriers should conflict with write requests. We send off two barriers: one + // WithLAI in a single range, and another across ranges. Neither of these + // should return in a second. + withLAIC := barrierAsync(ctx, splitKey, writeKey.Next(), true /* withLAI */) + withoutLAIC := barrierAsync(ctx, readKey, writeKey.Next(), false /* withLAI */) + select { + case err := <-withLAIC: + t.Fatalf("WithLAI=true barrier returned prematurely: %v", err) + case err := <-withoutLAIC: + t.Fatalf("WithLAI=false barrier returned prematurely: %v", err) + case <-time.After(time.Second): + } + + // While the barriers are blocked, later overlapping requests should be able + // to proceed and evaluate below them. + require.NoError(t, db.Put(ctx, splitKey, "value")) + _, err = db.Get(ctx, splitKey) + require.NoError(t, err) + + // Unblock the requests. This should now unblock the barriers as well. + close(unblockC) + + select { + case err := <-withLAIC: + require.NoError(t, err) + case <-time.After(3 * time.Second): + t.Fatal("WithLAI=true barrier did not return") + } + + select { + case err := <-withoutLAIC: + require.NoError(t, err) + case <-time.After(3 * time.Second): + t.Fatal("WithLAI=false barrier did not return") + } +} diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 77f9ae4c79cc..b77ce276debd 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -52,6 +52,9 @@ type LocalResult struct { // commit fails, or we may accidentally make uncommitted values // live. EndTxns []EndTxnIntents + // PopulateBarrierResponse will populate a BarrierResponse with the lease + // applied index and range descriptor when applied. + PopulateBarrierResponse bool // When set (in which case we better be the first range), call // GossipFirstRange if the Replica holds the lease. @@ -79,6 +82,7 @@ func (lResult *LocalResult) IsZero() bool { lResult.ResolvedLocks == nil && lResult.UpdatedTxns == nil && lResult.EndTxns == nil && + !lResult.PopulateBarrierResponse && !lResult.GossipFirstRange && !lResult.MaybeGossipSystemConfig && !lResult.MaybeGossipSystemConfigIfHaveFailure && @@ -93,13 +97,13 @@ func (lResult *LocalResult) String() string { return fmt.Sprintf("LocalResult (reply: %v, "+ "#encountered intents: %d, #acquired locks: %d, #resolved locks: %d"+ "#updated txns: %d #end txns: %d, "+ - "GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ + "PopulateBarrierResponse:%t GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ "MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+ "MaybeGossipNodeLiveness:%s ", lResult.Reply, len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks), len(lResult.UpdatedTxns), len(lResult.EndTxns), - lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, + lResult.PopulateBarrierResponse, lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue, lResult.MaybeGossipNodeLiveness) } @@ -147,6 +151,17 @@ func (lResult *LocalResult) DetachEndTxns(alwaysOnly bool) []EndTxnIntents { return r } +// DetachPopulateBarrierResponse returns (and removes) the +// PopulateBarrierResponse value from the local result. +func (lResult *LocalResult) DetachPopulateBarrierResponse() bool { + if lResult == nil { + return false + } + r := lResult.PopulateBarrierResponse + lResult.PopulateBarrierResponse = false + return r +} + // Result is the result of evaluating a KV request. That is, the // proposer (which holds the lease, at least in the case in which the command // will complete successfully) has evaluated the request and is holding on to: @@ -381,6 +396,14 @@ func (p *Result) MergeAndDestroy(q Result) error { } q.Local.EndTxns = nil + if !p.Local.PopulateBarrierResponse { + p.Local.PopulateBarrierResponse = q.Local.PopulateBarrierResponse + } else { + // PopulateBarrierResponse is only valid for a single Barrier response. + return errors.AssertionFailedf("multiple PopulateBarrierResponse results") + } + q.Local.PopulateBarrierResponse = false + if p.Local.MaybeGossipNodeLiveness == nil { p.Local.MaybeGossipNodeLiveness = q.Local.MaybeGossipNodeLiveness } else if q.Local.MaybeGossipNodeLiveness != nil { diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index e80191f468cc..0dc86eb7f43e 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -171,6 +171,17 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { } cmd.response.EncounteredIntents = cmd.proposal.Local.DetachEncounteredIntents() cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) + + // Populate BarrierResponse if requested. + if pErr == nil && cmd.proposal.Local.DetachPopulateBarrierResponse() { + if resp := cmd.response.Reply.Responses[0].GetBarrier(); resp != nil { + resp.LeaseAppliedIndex = cmd.leaseIndex + resp.RangeDesc = *r.Desc() + } else { + log.Fatalf(ctx, "PopulateBarrierResponse for %T", cmd.response.Reply.Responses[0].GetInner()) + } + } + if pErr == nil { cmd.localResult = cmd.proposal.Local } else if cmd.localResult != nil { diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index acc70a38bc18..f7aa8e5260e6 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -902,6 +902,30 @@ func waitForReplicasInit( }) } +// WaitForLeaseAppliedIndex waits for the replica to reach the given lease +// applied index, or until the context is cancelled or the replica is destroyed. +// Note that the lease applied index may regress across restarts, since we don't +// sync state machine application to disk. +// +// TODO(erikgrinaker): it would be nice if we could be notified about LAI +// updates instead, but polling will do for now. +func (r *Replica) WaitForLeaseAppliedIndex(ctx context.Context, lai uint64) (uint64, error) { + retryOpts := retry.Options{ + InitialBackoff: 10 * time.Millisecond, + Multiplier: 2, + MaxBackoff: time.Second, + } + for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); { + if currentLAI := r.GetLeaseAppliedIndex(); currentLAI >= lai { + return currentLAI, nil + } + if _, err := r.IsDestroyed(); err != nil { + return 0, err + } + } + return 0, ctx.Err() +} + // ChangeReplicas atomically changes the replicas that are members of a range. // The change is performed in a distributed transaction and takes effect when // that transaction is committed. This transaction confirms that the supplied diff --git a/pkg/kv/kvserver/replica_command_test.go b/pkg/kv/kvserver/replica_command_test.go index 7b6e9e772059..5166eca2de3e 100644 --- a/pkg/kv/kvserver/replica_command_test.go +++ b/pkg/kv/kvserver/replica_command_test.go @@ -13,7 +13,9 @@ package kvserver import ( "context" "encoding/binary" + math "math" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" @@ -22,6 +24,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -599,3 +604,80 @@ func TestSynthesizeTargetsByChangeType(t *testing.T) { }) } } + +func TestWaitForLeaseAppliedIndex(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const maxLAI = math.MaxUint64 + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + tc := testContext{} + tc.Start(ctx, t, stopper) + db := tc.store.DB() + + // Submit a write and read it back to bump the initial LAI. + write := func(key, value string) { + require.NoError(t, db.Put(ctx, key, value)) + _, err := db.Get(ctx, key) + require.NoError(t, err) + } + write("foo", "bar") + + // Should return immediately when already at or past the LAI. + currentLAI := tc.repl.GetLeaseAppliedIndex() + require.NotZero(t, currentLAI) + resultLAI, err := tc.repl.WaitForLeaseAppliedIndex(ctx, currentLAI) + require.NoError(t, err) + require.GreaterOrEqual(t, resultLAI, currentLAI) + + // Should wait for a future LAI to be reached. + const numWrites = 10 + waitLAI := tc.repl.GetLeaseAppliedIndex() + numWrites + laiC := make(chan uint64, 1) + go func() { + lai, err := tc.repl.WaitForLeaseAppliedIndex(ctx, waitLAI) + assert.NoError(t, err) // can't use require in goroutine + laiC <- lai + }() + + select { + case lai := <-laiC: + t.Fatalf("unexpected early LAI %d", lai) + case <-time.After(time.Second): + } + + for i := 0; i < numWrites; i++ { + write("foo", "bar") + } + + select { + case lai := <-laiC: + require.GreaterOrEqual(t, lai, waitLAI) + require.GreaterOrEqual(t, tc.repl.GetLeaseAppliedIndex(), lai) + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for LAI %d", waitLAI) + } + + // Should error on context cancellation. + cancelCtx, cancel := context.WithCancel(ctx) + cancel() + _, err = tc.repl.WaitForLeaseAppliedIndex(cancelCtx, maxLAI) + require.Error(t, err) + require.Equal(t, cancelCtx.Err(), err) + + // Should error on destroyed replicas. + stopper.Stop(ctx) + + destroyErr := errors.New("destroyed") + tc.repl.mu.Lock() + tc.repl.mu.destroyStatus.Set(destroyErr, destroyReasonRemoved) + tc.repl.mu.Unlock() + + _, err = tc.repl.WaitForLeaseAppliedIndex(ctx, maxLAI) + require.Error(t, err) + require.Equal(t, destroyErr, err) +} diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index f3154aca6842..a34089bed9fc 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -77,6 +77,8 @@ func (is Server) CollectChecksum( // // It is the caller's responsibility to cancel or set a timeout on the context. // If the context is never canceled, WaitForApplication will retry forever. +// +// TODO(erikgrinaker): consider using Replica.WaitForLeaseAppliedIndex(). func (is Server) WaitForApplication( ctx context.Context, req *WaitForApplicationRequest, ) (*WaitForApplicationResponse, error) { diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 6b03306d8d5d..52a7ebff93be 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -519,6 +519,12 @@ func (r *BarrierResponse) combine(c combinable) error { return err } r.Timestamp.Forward(otherR.Timestamp) + if r.LeaseAppliedIndex != 0 || otherR.LeaseAppliedIndex != 0 { + return errors.AssertionFailedf("can't combine BarrierResponses with LeaseAppliedIndex") + } + if r.RangeDesc.NextReplicaID != 0 || otherR.RangeDesc.NextReplicaID != 0 { + return errors.AssertionFailedf("can't combine BarrierResponses with RangeDesc") + } } return nil } @@ -1534,8 +1540,14 @@ func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot } func (*ScanInterleavedIntentsRequest) flags() flag { return isRead | isRange } -func (*BarrierRequest) flags() flag { return isWrite | isRange } -func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange } +func (r *BarrierRequest) flags() flag { + flags := isWrite | isRange | isAlone + if r.WithLeaseAppliedIndex { + flags |= isUnsplittable // the LAI is only valid for a single range + } + return flags +} +func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange } // IsParallelCommit returns whether the EndTxn request is attempting to perform // a parallel commit. See txn_interceptor_committer.go for a discussion about diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index b8262a10c681..7a78eb3a36df 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2119,11 +2119,28 @@ message ScanInterleavedIntentsResponse { repeated Intent intents = 3 [(gogoproto.nullable) = false]; } -// BarrierRequest is the request for a Barrier operation. This goes through Raft -// and has the purpose of waiting until all conflicting in-flight operations on -// this range have completed, without blocking any new operations. +// BarrierRequest is the request for a Barrier operation. This guarantees that +// all past and ongoing operations to a key span have completed and applied on +// the leaseholder. It does this by waiting for all conflicting latches and then +// submitting a noop write through Raft, waiting for it to apply. Later +// operations are not affected -- in particular, it does not actually take out a +// latch, so writers don't have to wait for it to complete and can write below +// the barrier. message BarrierRequest { RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + + // WithLeaseAppliedIndex will return the LeaseAppliedIndex of the barrier + // command in the response, allowing the caller to wait for the barrier to + // apply on an arbitrary replica. It also returns the range descriptor, so the + // caller can detect any unexpected range changes. + // + // When enabled, the barrier request can no longer span multiple ranges, and + // will instead return RangeKeyMismatchError. The caller must be prepared to + // handle this. + // + // NB: This field was added in a patch release. Nodes prior to 24.1 are not + // guaranteed to support it, returning a zero LeaseAppliedIndex instead. + bool with_lease_applied_index = 2; } // BarrierResponse is the response for a Barrier operation. @@ -2133,6 +2150,14 @@ message BarrierResponse { // Timestamp at which this Barrier was evaluated. Can be used to guarantee // future operations happen on the same or newer leaseholders. util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; + + // LeaseAppliedIndex at which this Barrier was applied. Only returned when + // requested via WithLeaseAppliedIndex. + uint64 lease_applied_index = 3; + + // RangeDesc at the time the barrier was applied. Only returned when requested + // via WithLeaseAppliedIndex. + RangeDescriptor range_desc = 4 [(gogoproto.nullable) = false]; } // A RequestUnion contains exactly one of the requests. diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 1733e877cf01..d96ea6ca62b3 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -216,6 +216,12 @@ func (ba *BatchRequest) isSingleRequestWithMethod(m Method) bool { return ba.IsSingleRequest() && ba.Requests[0].GetInner().Method() == m } +// IsSingleBarrierRequest returns true iff the batch contains a single request, +// and that request is a Barrier. +func (ba *BatchRequest) IsSingleBarrierRequest() bool { + return ba.isSingleRequestWithMethod(Barrier) +} + // IsSingleRequestLeaseRequest returns true iff the batch contains a single // request, and that request is a RequestLease. Note that TransferLease requests // return false. @@ -334,7 +340,7 @@ func (ba *BatchRequest) IsSingleExportRequest() bool { // a no-op. The Barrier request requires consensus even though its evaluation // is a no-op. func (ba *BatchRequest) RequiresConsensus() bool { - return ba.isSingleRequestWithMethod(Barrier) || ba.isSingleRequestWithMethod(Probe) + return ba.IsSingleBarrierRequest() || ba.IsSingleProbeRequest() } // IsCompleteTransaction determines whether a batch contains every write in a