diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 92080093f422..48d891f54f0a 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -1116,7 +1116,7 @@ func (b *Batch) queryResolvedTimestamp(s, e interface{}) { b.initResult(1, 0, notRaw, nil) } -func (b *Batch) barrier(s, e interface{}) { +func (b *Batch) barrier(s, e interface{}, withLAI bool) { begin, err := marshalKey(s) if err != nil { b.initResult(0, 0, notRaw, err) @@ -1132,6 +1132,7 @@ func (b *Batch) barrier(s, e interface{}) { Key: begin, EndKey: end, }, + WithLeaseAppliedIndex: withLAI, } b.appendReqs(req) b.initResult(1, 0, notRaw, nil) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index d9e2c74f2679..798c5444d2fb 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -889,23 +889,47 @@ func (db *DB) QueryResolvedTimestamp( // writes on the specified key range to finish. func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) { b := &Batch{} - b.barrier(begin, end) - err := getOneErr(db.Run(ctx, b), b) - if err != nil { + b.barrier(begin, end, false /* withLAI */) + if err := getOneErr(db.Run(ctx, b), b); err != nil { return hlc.Timestamp{}, err } - responses := b.response.Responses - if len(responses) == 0 { - return hlc.Timestamp{}, errors.Errorf("unexpected empty response for Barrier") + if l := len(b.response.Responses); l != 1 { + return hlc.Timestamp{}, errors.Errorf("got %d responses for Barrier", l) } - resp, ok := responses[0].GetInner().(*kvpb.BarrierResponse) - if !ok { - return hlc.Timestamp{}, errors.Errorf("unexpected response of type %T for Barrier", - responses[0].GetInner()) + resp := b.response.Responses[0].GetBarrier() + if resp == nil { + return hlc.Timestamp{}, errors.Errorf("unexpected response %T for Barrier", + b.response.Responses[0].GetInner()) } return resp.Timestamp, nil } +// BarrierWithLAI is like Barrier, but also returns the lease applied index and +// range descriptor at which the barrier was applied. In this case, the barrier +// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned. +// +// NB: the protocol support for this was added in a patch release, and is not +// guaranteed to be present with nodes prior to 24.1. In this case, the request +// will return an empty result. +func (db *DB) BarrierWithLAI( + ctx context.Context, begin, end interface{}, +) (kvpb.LeaseAppliedIndex, roachpb.RangeDescriptor, error) { + b := &Batch{} + b.barrier(begin, end, true /* withLAI */) + if err := getOneErr(db.Run(ctx, b), b); err != nil { + return 0, roachpb.RangeDescriptor{}, err + } + if l := len(b.response.Responses); l != 1 { + return 0, roachpb.RangeDescriptor{}, errors.Errorf("got %d responses for Barrier", l) + } + resp := b.response.Responses[0].GetBarrier() + if resp == nil { + return 0, roachpb.RangeDescriptor{}, errors.Errorf("unexpected response %T for Barrier", + b.response.Responses[0].GetInner()) + } + return resp.LeaseAppliedIndex, resp.RangeDesc, nil +} + // sendAndFill is a helper which sends the given batch and fills its results, // returning the appropriate error which is either from the first failing call, // or an "internal" error. diff --git a/pkg/kv/kvnemesis/applier.go b/pkg/kv/kvnemesis/applier.go index 6e6902d67f82..0500e5a9f9f3 100644 --- a/pkg/kv/kvnemesis/applier.go +++ b/pkg/kv/kvnemesis/applier.go @@ -151,6 +151,14 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { case *ChangeZoneOperation: err := updateZoneConfigInEnv(ctx, env, o.Type) o.Result = resultInit(ctx, err) + case *BarrierOperation: + var err error + if o.WithLeaseAppliedIndex { + _, _, err = db.BarrierWithLAI(ctx, o.Key, o.EndKey) + } else { + _, err = db.Barrier(ctx, o.Key, o.EndKey) + } + o.Result = resultInit(ctx, err) case *ClosureTxnOperation: // Use a backoff loop to avoid thrashing on txn aborts. Don't wait between // epochs of the same transaction to avoid waiting while holding locks. @@ -435,6 +443,17 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { return } o.Result.OptionalTimestamp = ts + case *BarrierOperation: + _, _, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { + b.AddRawRequest(&kvpb.BarrierRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: o.Key, + EndKey: o.EndKey, + }, + WithLeaseAppliedIndex: o.WithLeaseAppliedIndex, + }) + }) + o.Result = resultInit(ctx, err) case *BatchOperation: b := &kv.Batch{} applyBatchOp(ctx, b, db.Run, o) @@ -510,6 +529,8 @@ func applyBatchOp( setLastReqSeq(b, subO.Seq) case *AddSSTableOperation: panic(errors.AssertionFailedf(`AddSSTable cannot be used in batches`)) + case *BarrierOperation: + panic(errors.AssertionFailedf(`Barrier cannot be used in batches`)) default: panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO)) } diff --git a/pkg/kv/kvnemesis/generator.go b/pkg/kv/kvnemesis/generator.go index 4d21e139f57d..d6c6c159a0ca 100644 --- a/pkg/kv/kvnemesis/generator.go +++ b/pkg/kv/kvnemesis/generator.go @@ -261,6 +261,8 @@ type ClientOperationConfig struct { DeleteRangeUsingTombstone int // AddSSTable is an operations that ingests an SSTable with random KV pairs. AddSSTable int + // Barrier is an operation that waits for in-flight writes to complete. + Barrier int } // BatchOperationConfig configures the relative probability of generating a @@ -373,6 +375,7 @@ func newAllOperationsConfig() GeneratorConfig { DeleteRange: 1, DeleteRangeUsingTombstone: 1, AddSSTable: 1, + Barrier: 1, } batchOpConfig := BatchOperationConfig{ Batch: 4, @@ -491,6 +494,12 @@ func NewDefaultConfig() GeneratorConfig { config.Ops.ClosureTxn.CommitBatchOps.AddSSTable = 0 config.Ops.ClosureTxn.TxnClientOps.AddSSTable = 0 config.Ops.ClosureTxn.TxnBatchOps.Ops.AddSSTable = 0 + // Barrier cannot be used in batches, and we omit it in txns too because it + // can result in spurious RangeKeyMismatchErrors that fail the txn operation. + config.Ops.Batch.Ops.Barrier = 0 + config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0 + config.Ops.ClosureTxn.TxnClientOps.Barrier = 0 + config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0 return config } @@ -766,6 +775,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig addOpGen(allowed, randDelRange, c.DeleteRange) addOpGen(allowed, randDelRangeUsingTombstone, c.DeleteRangeUsingTombstone) addOpGen(allowed, randAddSSTable, c.AddSSTable) + addOpGen(allowed, randBarrier, c.Barrier) } func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) { @@ -1056,6 +1066,21 @@ func randAddSSTable(g *generator, rng *rand.Rand) Operation { return addSSTable(f.Data(), sstSpan, sstTimestamp, seq, asWrites) } +func randBarrier(g *generator, rng *rand.Rand) Operation { + withLAI := rng.Float64() < 0.5 + var key, endKey string + if withLAI { + // Barriers requesting LAIs can't span multiple ranges, so we try to fit + // them within an existing range. This may race with a concurrent split, in + // which case the Barrier will fail, but that's ok -- most should still + // succeed. These errors are ignored by the validator. + key, endKey = randRangeSpan(rng, g.currentSplits) + } else { + key, endKey = randSpan(rng) + } + return barrier(key, endKey, withLAI) +} + func randScan(g *generator, rng *rand.Rand) Operation { key, endKey := randSpan(rng) return scan(key, endKey) @@ -1735,3 +1760,11 @@ func addSSTable( AsWrites: asWrites, }} } + +func barrier(key, endKey string, withLAI bool) Operation { + return Operation{Barrier: &BarrierOperation{ + Key: []byte(key), + EndKey: []byte(endKey), + WithLeaseAppliedIndex: withLAI, + }} +} diff --git a/pkg/kv/kvnemesis/generator_test.go b/pkg/kv/kvnemesis/generator_test.go index 2dfefae87c08..4d14a133fdd1 100644 --- a/pkg/kv/kvnemesis/generator_test.go +++ b/pkg/kv/kvnemesis/generator_test.go @@ -250,6 +250,8 @@ func TestRandStep(t *testing.T) { client.DeleteRangeUsingTombstone++ case *AddSSTableOperation: client.AddSSTable++ + case *BarrierOperation: + client.Barrier++ case *BatchOperation: batch.Batch++ countClientOps(&batch.Ops, nil, o.Ops...) @@ -270,7 +272,8 @@ func TestRandStep(t *testing.T) { *DeleteOperation, *DeleteRangeOperation, *DeleteRangeUsingTombstoneOperation, - *AddSSTableOperation: + *AddSSTableOperation, + *BarrierOperation: countClientOps(&counts.DB, &counts.Batch, step.Op) case *ClosureTxnOperation: countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...) diff --git a/pkg/kv/kvnemesis/operations.go b/pkg/kv/kvnemesis/operations.go index 036a59cbc86a..024c1df85049 100644 --- a/pkg/kv/kvnemesis/operations.go +++ b/pkg/kv/kvnemesis/operations.go @@ -41,6 +41,8 @@ func (op Operation) Result() *Result { return &o.Result case *AddSSTableOperation: return &o.Result + case *BarrierOperation: + return &o.Result case *SplitOperation: return &o.Result case *MergeOperation: @@ -131,6 +133,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) { o.format(w, fctx) case *AddSSTableOperation: o.format(w, fctx) + case *BarrierOperation: + o.format(w, fctx) case *SplitOperation: o.format(w, fctx) case *MergeOperation: @@ -339,6 +343,16 @@ func (op AddSSTableOperation) format(w *strings.Builder, fctx formatCtx) { } } +func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) { + if op.WithLeaseAppliedIndex { + fmt.Fprintf(w, `%s.BarrierWithLAI(ctx, %s, %s)`, + fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey)) + } else { + fmt.Fprintf(w, `%s.Barrier(ctx, %s, %s)`, fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey)) + } + op.Result.format(w) +} + func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) { fmt.Fprintf(w, `%s.AdminSplit(ctx, %s, hlc.MaxTimestamp)`, fctx.receiver, fmtKey(op.Key)) op.Result.format(w) diff --git a/pkg/kv/kvnemesis/operations.proto b/pkg/kv/kvnemesis/operations.proto index bb33a83627cf..a27f8ce6f799 100644 --- a/pkg/kv/kvnemesis/operations.proto +++ b/pkg/kv/kvnemesis/operations.proto @@ -95,6 +95,13 @@ message AddSSTableOperation { Result result = 6 [(gogoproto.nullable) = false]; } +message BarrierOperation { + bytes key = 1; + bytes end_key = 2; + bool with_lease_applied_index = 3; + Result result = 4 [(gogoproto.nullable) = false]; +} + message SplitOperation { bytes key = 1; Result result = 2 [(gogoproto.nullable) = false]; @@ -150,6 +157,7 @@ message Operation { TransferLeaseOperation transfer_lease = 16; ChangeZoneOperation change_zone = 17; AddSSTableOperation add_sstable = 18 [(gogoproto.customname) = "AddSSTable"]; + BarrierOperation barrier = 22; } enum ResultType { diff --git a/pkg/kv/kvnemesis/operations_test.go b/pkg/kv/kvnemesis/operations_test.go index 5c24e2034477..95059b95b618 100644 --- a/pkg/kv/kvnemesis/operations_test.go +++ b/pkg/kv/kvnemesis/operations_test.go @@ -87,6 +87,8 @@ func TestOperationsFormat(t *testing.T) { { step: step(addSSTable(sstFile.Data(), sstSpan, sstTS, sstValueHeader.KVNemesisSeq.Get(), true)), }, + {step: step(barrier(k1, k2, false /* withLAI */))}, + {step: step(barrier(k3, k4, true /* withLAI */))}, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name())) diff --git a/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 new file mode 100644 index 000000000000..b848afebfc9b --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 @@ -0,0 +1,3 @@ +echo +---- +···db0.Barrier(ctx, tk(1), tk(2)) diff --git a/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 new file mode 100644 index 000000000000..92c0790f55de --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/6 @@ -0,0 +1,3 @@ +echo +---- +···db0.BarrierWithLAI(ctx, tk(3), tk(4)) diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index ab101a207e21..03c766ec4f96 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -693,6 +693,20 @@ func (v *validator) processOp(op Operation) { if v.buffering == bufferingSingle { v.checkAtomic(`addSSTable`, t.Result) } + case *BarrierOperation: + execTimestampStrictlyOptional = true + if op.Barrier.WithLeaseAppliedIndex && + resultHasErrorType(t.Result, &kvpb.RangeKeyMismatchError{}) { + // Barriers requesting LAIs can't span ranges. The generator will + // optimistically try to fit the barrier inside one of the current ranges, + // but this may race with a split, so we ignore the error in this case and + // try again later. + } else { + // Fail or retry on other errors, depending on type. + v.checkNonAmbError(op, t.Result, exceptUnhandledRetry) + } + // We don't yet actually check the barrier guarantees here, i.e. that all + // concurrent writes are applied by the time it completes. Maybe later. case *ScanOperation: if _, isErr := v.checkError(op, t.Result); isErr { break diff --git a/pkg/kv/kvpb/api.go b/pkg/kv/kvpb/api.go index 85f91a2be0f5..972923a59eca 100644 --- a/pkg/kv/kvpb/api.go +++ b/pkg/kv/kvpb/api.go @@ -663,6 +663,12 @@ func (r *BarrierResponse) combine(_ context.Context, c combinable, _ *BatchReque return err } r.Timestamp.Forward(otherR.Timestamp) + if r.LeaseAppliedIndex != 0 || otherR.LeaseAppliedIndex != 0 { + return errors.AssertionFailedf("can't combine BarrierResponses with LeaseAppliedIndex") + } + if r.RangeDesc.NextReplicaID != 0 || otherR.RangeDesc.NextReplicaID != 0 { + return errors.AssertionFailedf("can't combine BarrierResponses with RangeDesc") + } } return nil } @@ -1767,7 +1773,13 @@ func (*RangeStatsRequest) flags() flag { return isRead } func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot } -func (*BarrierRequest) flags() flag { return isWrite | isRange } +func (r *BarrierRequest) flags() flag { + flags := isWrite | isRange | isAlone + if r.WithLeaseAppliedIndex { + flags |= isUnsplittable // the LAI is only valid for a single range + } + return flags +} func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange } // IsParallelCommit returns whether the EndTxn request is attempting to perform diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 89f3ed2667ac..09a0339eb5e5 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2423,11 +2423,28 @@ message QueryResolvedTimestampResponse { (gogoproto.nullable) = false, (gogoproto.customname) = "ResolvedTS"]; } -// BarrierRequest is the request for a Barrier operation. This goes through Raft -// and has the purpose of waiting until all conflicting in-flight operations on -// this range have completed, without blocking any new operations. +// BarrierRequest is the request for a Barrier operation. This guarantees that +// all past and ongoing writes to a key span have completed and applied on the +// leaseholder. It does this by waiting for all conflicting write latches and +// then submitting a noop write through Raft, waiting for it to apply. Later +// writes are not affected -- in particular, it does not actually take out a +// latch, so writers don't have to wait for it to complete and can write below +// the barrier. message BarrierRequest { RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + + // WithLeaseAppliedIndex will return the LeaseAppliedIndex of the barrier + // command in the response, allowing the caller to wait for the barrier to + // apply on an arbitrary replica. It also returns the range descriptor, so the + // caller can detect any unexpected range changes. + // + // When enabled, the barrier request can no longer span multiple ranges, and + // will instead return RangeKeyMismatchError. The caller must be prepared to + // handle this. + // + // NB: This field was added in a patch release. Nodes prior to 24.1 are not + // guaranteed to support it, returning a zero LeaseAppliedIndex instead. + bool with_lease_applied_index = 2; } // BarrierResponse is the response for a Barrier operation. @@ -2437,6 +2454,14 @@ message BarrierResponse { // Timestamp at which this Barrier was evaluated. Can be used to guarantee // future operations happen on the same or newer leaseholders. util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; + + // LeaseAppliedIndex at which this Barrier was applied. Only returned when + // requested via WithLeaseAppliedIndex. + uint64 lease_applied_index = 3 [(gogoproto.casttype) = "LeaseAppliedIndex"]; + + // RangeDesc at the time the barrier was applied. Only returned when requested + // via WithLeaseAppliedIndex. + RangeDescriptor range_desc = 4 [(gogoproto.nullable) = false]; } // A RequestUnion contains exactly one of the requests. diff --git a/pkg/kv/kvpb/batch.go b/pkg/kv/kvpb/batch.go index c54da8265ab2..7d810fc65234 100644 --- a/pkg/kv/kvpb/batch.go +++ b/pkg/kv/kvpb/batch.go @@ -215,6 +215,12 @@ func (ba *BatchRequest) IsSingleRequest() bool { return len(ba.Requests) == 1 } +// IsSingleBarrierRequest returns true iff the batch contains a single request, +// and that request is a Barrier. +func (ba *BatchRequest) IsSingleBarrierRequest() bool { + return ba.isSingleRequestWithMethod(Barrier) +} + // IsSingleSkipsLeaseCheckRequest returns true iff the batch contains a single // request, and that request has the skipsLeaseCheck flag set. func (ba *BatchRequest) IsSingleSkipsLeaseCheckRequest() bool { @@ -349,7 +355,7 @@ func (ba *BatchRequest) IsSingleExportRequest() bool { // a no-op. The Barrier request requires consensus even though its evaluation // is a no-op. func (ba *BatchRequest) RequiresConsensus() bool { - return ba.isSingleRequestWithMethod(Barrier) || ba.isSingleRequestWithMethod(Probe) + return ba.IsSingleBarrierRequest() || ba.IsSingleProbeRequest() } // IsCompleteTransaction determines whether a batch contains every write in a diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 5a38c4ec1998..734a187c35a5 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -101,6 +101,7 @@ go_test( size = "medium", srcs = [ "cmd_add_sstable_test.go", + "cmd_barrier_test.go", "cmd_clear_range_test.go", "cmd_delete_range_gchint_test.go", "cmd_delete_range_test.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_barrier.go b/pkg/kv/kvserver/batcheval/cmd_barrier.go index a03f992292d5..1681a1476e29 100644 --- a/pkg/kv/kvserver/batcheval/cmd_barrier.go +++ b/pkg/kv/kvserver/batcheval/cmd_barrier.go @@ -47,13 +47,19 @@ func declareKeysBarrier( return nil } -// Barrier evaluation is a no-op, as all the latch waiting happens in -// the latch manager. +// Barrier evaluation is a no-op, but it still goes through Raft because of +// BatchRequest.RequiresConsensus(). The latch waiting happens in the latch +// manager, and the WithLeaseAppliedIndex info is populated during application. func Barrier( _ context.Context, _ storage.ReadWriter, cArgs CommandArgs, response kvpb.Response, ) (result.Result, error) { + args := cArgs.Args.(*kvpb.BarrierRequest) resp := response.(*kvpb.BarrierResponse) resp.Timestamp = cArgs.EvalCtx.Clock().Now() - return result.Result{}, nil + return result.Result{ + Local: result.LocalResult{ + PopulateBarrierResponse: args.WithLeaseAppliedIndex, + }, + }, nil } diff --git a/pkg/kv/kvserver/batcheval/cmd_barrier_test.go b/pkg/kv/kvserver/batcheval/cmd_barrier_test.go new file mode 100644 index 000000000000..82a65da1f5bd --- /dev/null +++ b/pkg/kv/kvserver/batcheval/cmd_barrier_test.go @@ -0,0 +1,316 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package batcheval_test + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +// TestBarrierEval tests basic Barrier evaluation. +func TestBarrierEval(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + start := roachpb.Key("a") + end := roachpb.Key("b") + + clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Now())) + ts := clock.Now() + evalCtx := (&batcheval.MockEvalCtx{Clock: clock}).EvalContext() + + testutils.RunTrueAndFalse(t, "WithLeaseAppliedIndex", func(t *testing.T, withLAI bool) { + resp := kvpb.BarrierResponse{} + res, err := batcheval.Barrier(ctx, nil, batcheval.CommandArgs{ + EvalCtx: evalCtx, + Args: &kvpb.BarrierRequest{ + RequestHeader: kvpb.RequestHeader{Key: start, EndKey: end}, + WithLeaseAppliedIndex: withLAI, + }, + }, &resp) + require.NoError(t, err) + + require.Equal(t, result.Result{ + Local: result.LocalResult{ + PopulateBarrierResponse: withLAI, + }, + }, res) + + // Ignore the logical timestamp component, which is incremented per reading. + resp.Timestamp.Logical = 0 + + require.Equal(t, kvpb.BarrierResponse{ + Timestamp: ts, + }, resp) + }) +} + +// TestBarrier is an integration test for Barrier. It tests that it processes +// the request and response properly, within a single range and across multiple +// ranges. +func TestBarrier(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // Set up a test server. + srv, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer srv.Stopper().Stop(ctx) + ssrv := srv.StorageLayer() + tsrv := srv.ApplicationLayer() + srv = nil // prevent direct access, use system or tenant as appropriate + + store, err := ssrv.GetStores().(*kvserver.Stores).GetStore(ssrv.GetFirstStoreID()) + require.NoError(t, err) + sender := kvDB.NonTransactionalSender() + + // We'll use /a to /z as our keyspace, and split off a range at /x. + prefix := tsrv.Codec().TenantPrefix() + _, _, err = ssrv.SplitRange(append(prefix, []byte("/x")...)) + require.NoError(t, err) + + // Send Barrier request with/without LeaseAppliedIndex, and within a single + // range or across multiple ranges. + testutils.RunTrueAndFalse(t, "WithLeaseAppliedIndex", func(t *testing.T, withLAI bool) { + testutils.RunTrueAndFalse(t, "crossRange", func(t *testing.T, crossRange bool) { + start := append(prefix, []byte("/a")...) + end := append(prefix, []byte("/b")...) + if crossRange { + end = append(prefix, []byte("/z")...) + } + repl := store.LookupReplica(roachpb.RKey(start)) + + tsBefore := tsrv.Clock().Now() + laiBefore := repl.GetLeaseAppliedIndex() + req := kvpb.BarrierRequest{ + RequestHeader: kvpb.RequestHeader{Key: start, EndKey: end}, + WithLeaseAppliedIndex: withLAI, + } + respI, pErr := kv.SendWrapped(ctx, sender, &req) + + // WithLeaseAppliedIndex should return RangeKeyMismatchError when across + // multiple ranges. + if withLAI && crossRange { + require.Error(t, pErr.GoError()) + require.IsType(t, &kvpb.RangeKeyMismatchError{}, pErr.GoError()) + return + } + + require.NoError(t, pErr.GoError()) + resp, ok := respI.(*kvpb.BarrierResponse) + require.True(t, ok) + + // The timestamp must be after the request was sent. + require.True(t, tsBefore.LessEq(resp.Timestamp)) + + // If WithLeaseAppliedIndex is set, it also returns the LAI and range + // descriptor. + if withLAI { + require.GreaterOrEqual(t, resp.LeaseAppliedIndex, laiBefore) + require.GreaterOrEqual(t, repl.GetLeaseAppliedIndex(), resp.LeaseAppliedIndex) + require.Equal(t, *repl.Desc(), resp.RangeDesc) + } else { + require.Zero(t, resp.LeaseAppliedIndex) + require.Zero(t, resp.RangeDesc) + } + }) + }) +} + +// TestBarrierLatches tests Barrier latch interactions. Specifically, that it +// waits for in-flight requests to complete, but that it does not block later +// requests. +func TestBarrierLatches(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderDuress(t) // too slow, times out + + // Use a timeout, to prevent blocking indefinitely if something goes wrong. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // We'll do reads and writes to two separate keys, with a range split in + // between. These keys will be under the tenant prefix. + readSuffix := roachpb.Key("/read") + splitSuffix := roachpb.Key("/split") + writeSuffix := roachpb.Key("/write") + + // Set up a request evaluation filter which will block Gets to /read and Puts + // to /write. These will signal that they're blocked via blockedC, and unblock + // when unblockC is closed. + // + // Unfortunately, we can't use a magic context to specify which requests to + // block, since this does not work with external process tenants which may be + // randomly enabled. We therefore have to match the actual keys. + blockedC := make(chan struct{}, 10) + unblockC := make(chan struct{}) + + evalFilter := func(args kvserverbase.FilterArgs) *kvpb.Error { + var shouldBlock bool + if key, err := keys.StripTenantPrefix(args.Req.Header().Key); err == nil { + if args.Req.Method() == kvpb.Get && bytes.Equal(key, readSuffix) { + shouldBlock = true + } + if args.Req.Method() == kvpb.Put && bytes.Equal(key, writeSuffix) { + shouldBlock = true + } + } + if shouldBlock { + // Notify callers that we're blocking. + select { + case blockedC <- struct{}{}: + t.Logf("blocked %s", args.Req) + case <-ctx.Done(): + return kvpb.NewError(ctx.Err()) + } + // Wait to unblock. + select { + case <-unblockC: + t.Logf("unblocked %s", args.Req) + case <-ctx.Done(): + return kvpb.NewError(ctx.Err()) + } + } + return nil + } + + // Set up a test server. + srv := serverutils.StartServerOnly(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: evalFilter, + }, + }, + }, + }) + defer srv.Stopper().Stop(ctx) + ssrv := srv.StorageLayer() + tsrv := srv.ApplicationLayer() + srv = nil // prevent direct access, use system or tenant as appropriate + + db := tsrv.DB() + store, err := ssrv.GetStores().(*kvserver.Stores).GetStore(ssrv.GetFirstStoreID()) + require.NoError(t, err) + _ = store + + // Determine the tenant prefix and keys. + prefix := tsrv.Codec().TenantPrefix() + readKey := append(prefix, readSuffix...) + splitKey := append(prefix, splitSuffix...) + writeKey := append(prefix, writeSuffix...) + + // Set up helpers to run barriers, both sync and async. + barrier := func(ctx context.Context, start, end roachpb.Key, withLAI bool) (err error) { + if withLAI { + _, _, err = db.BarrierWithLAI(ctx, start, end) + } else { + _, err = db.Barrier(ctx, start, end) + } + return + } + + barrierAsync := func(ctx context.Context, start, end roachpb.Key, withLAI bool) <-chan error { + errC := make(chan error, 1) + go func() { + errC <- barrier(ctx, start, end, withLAI) + }() + return errC + } + + // Split off a range at /split, to test cross-range barriers. + _, _, err = ssrv.SplitRange(splitKey) + require.NoError(t, err) + + // Spawn read and write requests, and wait for them to block. + go func() { + _ = db.Put(ctx, writeKey, "value") + }() + go func() { + _, _ = db.Get(ctx, readKey) + }() + + for i := 0; i < 2; i++ { + select { + case <-blockedC: + case <-ctx.Done(): + require.NoError(t, ctx.Err()) + } + } + + // Barriers should not conflict outside of these keys. + require.NoError(t, barrier(ctx, readKey.Next(), splitKey, true /* withLAI */)) + require.NoError(t, barrier(ctx, splitKey, writeKey, true /* withLAI */)) + require.Error(t, barrier(ctx, readKey.Next(), writeKey, true /* withLAI */)) // can't span ranges + require.NoError(t, barrier(ctx, readKey.Next(), writeKey, false /* withLAI */)) + + // Barriers should not conflict with read requests. + require.NoError(t, barrier(ctx, readKey, readKey.Next(), true /* withLAI */)) + + // Barriers should conflict with write requests. We send off two barriers: one + // WithLAI in a single range, and another across ranges. Neither of these + // should return in a second. + withLAIC := barrierAsync(ctx, splitKey, writeKey.Next(), true /* withLAI */) + withoutLAIC := barrierAsync(ctx, readKey, writeKey.Next(), false /* withLAI */) + select { + case err := <-withLAIC: + t.Fatalf("WithLAI=true barrier returned prematurely: %v", err) + case err := <-withoutLAIC: + t.Fatalf("WithLAI=false barrier returned prematurely: %v", err) + case <-time.After(time.Second): + } + + // While the barriers are blocked, later overlapping requests should be able + // to proceed and evaluate below them. + require.NoError(t, db.Put(ctx, splitKey, "value")) + _, err = db.Get(ctx, splitKey) + require.NoError(t, err) + + // Unblock the requests. This should now unblock the barriers as well. + close(unblockC) + + select { + case err := <-withLAIC: + require.NoError(t, err) + case <-time.After(3 * time.Second): + t.Fatal("WithLAI=true barrier did not return") + } + + select { + case err := <-withoutLAIC: + require.NoError(t, err) + case <-time.After(3 * time.Second): + t.Fatal("WithLAI=false barrier did not return") + } +} diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 6a2136b03376..25fb2966d6f6 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -52,6 +52,9 @@ type LocalResult struct { // commit fails, or we may accidentally make uncommitted values // live. EndTxns []EndTxnIntents + // PopulateBarrierResponse will populate a BarrierResponse with the lease + // applied index and range descriptor when applied. + PopulateBarrierResponse bool // When set (in which case we better be the first range), call // GossipFirstRange if the Replica holds the lease. @@ -79,6 +82,7 @@ func (lResult *LocalResult) IsZero() bool { lResult.ResolvedLocks == nil && lResult.UpdatedTxns == nil && lResult.EndTxns == nil && + !lResult.PopulateBarrierResponse && !lResult.GossipFirstRange && !lResult.MaybeGossipSystemConfig && !lResult.MaybeGossipSystemConfigIfHaveFailure && @@ -93,13 +97,13 @@ func (lResult *LocalResult) String() string { return fmt.Sprintf("LocalResult (reply: %v, "+ "#encountered intents: %d, #acquired locks: %d, #resolved locks: %d"+ "#updated txns: %d #end txns: %d, "+ - "GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ + "PopulateBarrierResponse:%t GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ "MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+ "MaybeGossipNodeLiveness:%s ", lResult.Reply, len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks), len(lResult.UpdatedTxns), len(lResult.EndTxns), - lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, + lResult.PopulateBarrierResponse, lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue, lResult.MaybeGossipNodeLiveness) } @@ -147,6 +151,17 @@ func (lResult *LocalResult) DetachEndTxns(alwaysOnly bool) []EndTxnIntents { return r } +// DetachPopulateBarrierResponse returns (and removes) the +// PopulateBarrierResponse value from the local result. +func (lResult *LocalResult) DetachPopulateBarrierResponse() bool { + if lResult == nil { + return false + } + r := lResult.PopulateBarrierResponse + lResult.PopulateBarrierResponse = false + return r +} + // Result is the result of evaluating a KV request. That is, the // proposer (which holds the lease, at least in the case in which the command // will complete successfully) has evaluated the request and is holding on to: @@ -368,6 +383,14 @@ func (p *Result) MergeAndDestroy(q Result) error { } q.Local.EndTxns = nil + if !p.Local.PopulateBarrierResponse { + p.Local.PopulateBarrierResponse = q.Local.PopulateBarrierResponse + } else { + // PopulateBarrierResponse is only valid for a single Barrier response. + return errors.AssertionFailedf("multiple PopulateBarrierResponse results") + } + q.Local.PopulateBarrierResponse = false + if p.Local.MaybeGossipNodeLiveness == nil { p.Local.MaybeGossipNodeLiveness = q.Local.MaybeGossipNodeLiveness } else if q.Local.MaybeGossipNodeLiveness != nil { diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index e3ee7fef4f38..1e7534dca24b 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -262,6 +262,17 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { // in that case, it would seem prudent not to take advantage of that. In other // words, the line below this comment should be conditional on `pErr == nil`. cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) + + // Populate BarrierResponse if requested. + if pErr == nil && cmd.proposal.Local.DetachPopulateBarrierResponse() { + if resp := cmd.response.Reply.Responses[0].GetBarrier(); resp != nil { + resp.LeaseAppliedIndex = cmd.LeaseIndex + resp.RangeDesc = *r.Desc() + } else { + log.Fatalf(ctx, "PopulateBarrierResponse for %T", cmd.response.Reply.Responses[0].GetInner()) + } + } + if pErr == nil { cmd.localResult = cmd.proposal.Local } else if cmd.localResult != nil { diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 11da90129c7c..1530604d0df1 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -893,6 +893,32 @@ func waitForReplicasInit( }) } +// WaitForLeaseAppliedIndex waits for the replica to reach the given lease +// applied index, or until the context is cancelled or the replica is destroyed. +// Note that the lease applied index may regress across restarts, since we don't +// sync state machine application to disk. +// +// TODO(erikgrinaker): it would be nice if we could be notified about LAI +// updates instead, but polling will do for now. +func (r *Replica) WaitForLeaseAppliedIndex( + ctx context.Context, lai kvpb.LeaseAppliedIndex, +) (kvpb.LeaseAppliedIndex, error) { + retryOpts := retry.Options{ + InitialBackoff: 10 * time.Millisecond, + Multiplier: 2, + MaxBackoff: time.Second, + } + for retry := retry.StartWithCtx(ctx, retryOpts); retry.Next(); { + if currentLAI := r.GetLeaseAppliedIndex(); currentLAI >= lai { + return currentLAI, nil + } + if _, err := r.IsDestroyed(); err != nil { + return 0, err + } + } + return 0, ctx.Err() +} + // ChangeReplicas atomically changes the replicas that are members of a range. // The change is performed in a distributed transaction and takes effect when // that transaction is committed. This transaction confirms that the supplied diff --git a/pkg/kv/kvserver/replica_command_test.go b/pkg/kv/kvserver/replica_command_test.go index 722db5b18f1a..8e68a9b386d3 100644 --- a/pkg/kv/kvserver/replica_command_test.go +++ b/pkg/kv/kvserver/replica_command_test.go @@ -13,7 +13,9 @@ package kvserver import ( "context" "encoding/binary" + math "math" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" @@ -23,6 +25,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -608,3 +613,80 @@ func TestSynthesizeTargetsByChangeType(t *testing.T) { }) } } + +func TestWaitForLeaseAppliedIndex(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const maxLAI = math.MaxUint64 + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + tc := testContext{} + tc.Start(ctx, t, stopper) + db := tc.store.DB() + + // Submit a write and read it back to bump the initial LAI. + write := func(key, value string) { + require.NoError(t, db.Put(ctx, key, value)) + _, err := db.Get(ctx, key) + require.NoError(t, err) + } + write("foo", "bar") + + // Should return immediately when already at or past the LAI. + currentLAI := tc.repl.GetLeaseAppliedIndex() + require.NotZero(t, currentLAI) + resultLAI, err := tc.repl.WaitForLeaseAppliedIndex(ctx, currentLAI) + require.NoError(t, err) + require.GreaterOrEqual(t, resultLAI, currentLAI) + + // Should wait for a future LAI to be reached. + const numWrites = 10 + waitLAI := tc.repl.GetLeaseAppliedIndex() + numWrites + laiC := make(chan kvpb.LeaseAppliedIndex, 1) + go func() { + lai, err := tc.repl.WaitForLeaseAppliedIndex(ctx, waitLAI) + assert.NoError(t, err) // can't use require in goroutine + laiC <- lai + }() + + select { + case lai := <-laiC: + t.Fatalf("unexpected early LAI %d", lai) + case <-time.After(time.Second): + } + + for i := 0; i < numWrites; i++ { + write("foo", "bar") + } + + select { + case lai := <-laiC: + require.GreaterOrEqual(t, lai, waitLAI) + require.GreaterOrEqual(t, tc.repl.GetLeaseAppliedIndex(), lai) + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for LAI %d", waitLAI) + } + + // Should error on context cancellation. + cancelCtx, cancel := context.WithCancel(ctx) + cancel() + _, err = tc.repl.WaitForLeaseAppliedIndex(cancelCtx, maxLAI) + require.Error(t, err) + require.Equal(t, cancelCtx.Err(), err) + + // Should error on destroyed replicas. + stopper.Stop(ctx) + + destroyErr := errors.New("destroyed") + tc.repl.mu.Lock() + tc.repl.mu.destroyStatus.Set(destroyErr, destroyReasonRemoved) + tc.repl.mu.Unlock() + + _, err = tc.repl.WaitForLeaseAppliedIndex(ctx, maxLAI) + require.Error(t, err) + require.Equal(t, destroyErr, err) +} diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index b4040707b7bc..ea1af8f10e49 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -74,6 +74,8 @@ func (is Server) CollectChecksum( // // It is the caller's responsibility to cancel or set a timeout on the context. // If the context is never canceled, WaitForApplication will retry forever. +// +// TODO(erikgrinaker): consider using Replica.WaitForLeaseAppliedIndex(). func (is Server) WaitForApplication( ctx context.Context, req *WaitForApplicationRequest, ) (*WaitForApplicationResponse, error) {