Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.2: batcheval: add BarrierRequest.WithLeaseAppliedIndex #118407

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,7 @@ func (b *Batch) queryResolvedTimestamp(s, e interface{}) {
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) barrier(s, e interface{}) {
func (b *Batch) barrier(s, e interface{}, withLAI bool) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -1132,6 +1132,7 @@ func (b *Batch) barrier(s, e interface{}) {
Key: begin,
EndKey: end,
},
WithLeaseAppliedIndex: withLAI,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
44 changes: 34 additions & 10 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,23 +889,47 @@ func (db *DB) QueryResolvedTimestamp(
// writes on the specified key range to finish.
func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) {
b := &Batch{}
b.barrier(begin, end)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
b.barrier(begin, end, false /* withLAI */)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return hlc.Timestamp{}, err
}
responses := b.response.Responses
if len(responses) == 0 {
return hlc.Timestamp{}, errors.Errorf("unexpected empty response for Barrier")
if l := len(b.response.Responses); l != 1 {
return hlc.Timestamp{}, errors.Errorf("got %d responses for Barrier", l)
}
resp, ok := responses[0].GetInner().(*kvpb.BarrierResponse)
if !ok {
return hlc.Timestamp{}, errors.Errorf("unexpected response of type %T for Barrier",
responses[0].GetInner())
resp := b.response.Responses[0].GetBarrier()
if resp == nil {
return hlc.Timestamp{}, errors.Errorf("unexpected response %T for Barrier",
b.response.Responses[0].GetInner())
}
return resp.Timestamp, nil
}

// BarrierWithLAI is like Barrier, but also returns the lease applied index and
// range descriptor at which the barrier was applied. In this case, the barrier
// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned.
//
// NB: the protocol support for this was added in a patch release, and is not
// guaranteed to be present with nodes prior to 24.1. In this case, the request
// will return an empty result.
func (db *DB) BarrierWithLAI(
ctx context.Context, begin, end interface{},
) (kvpb.LeaseAppliedIndex, roachpb.RangeDescriptor, error) {
b := &Batch{}
b.barrier(begin, end, true /* withLAI */)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return 0, roachpb.RangeDescriptor{}, err
}
if l := len(b.response.Responses); l != 1 {
return 0, roachpb.RangeDescriptor{}, errors.Errorf("got %d responses for Barrier", l)
}
resp := b.response.Responses[0].GetBarrier()
if resp == nil {
return 0, roachpb.RangeDescriptor{}, errors.Errorf("unexpected response %T for Barrier",
b.response.Responses[0].GetInner())
}
return resp.LeaseAppliedIndex, resp.RangeDesc, nil
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
21 changes: 21 additions & 0 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
case *ChangeZoneOperation:
err := updateZoneConfigInEnv(ctx, env, o.Type)
o.Result = resultInit(ctx, err)
case *BarrierOperation:
var err error
if o.WithLeaseAppliedIndex {
_, _, err = db.BarrierWithLAI(ctx, o.Key, o.EndKey)
} else {
_, err = db.Barrier(ctx, o.Key, o.EndKey)
}
o.Result = resultInit(ctx, err)
case *ClosureTxnOperation:
// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between
// epochs of the same transaction to avoid waiting while holding locks.
Expand Down Expand Up @@ -435,6 +443,17 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
return
}
o.Result.OptionalTimestamp = ts
case *BarrierOperation:
_, _, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.AddRawRequest(&kvpb.BarrierRequest{
RequestHeader: kvpb.RequestHeader{
Key: o.Key,
EndKey: o.EndKey,
},
WithLeaseAppliedIndex: o.WithLeaseAppliedIndex,
})
})
o.Result = resultInit(ctx, err)
case *BatchOperation:
b := &kv.Batch{}
applyBatchOp(ctx, b, db.Run, o)
Expand Down Expand Up @@ -510,6 +529,8 @@ func applyBatchOp(
setLastReqSeq(b, subO.Seq)
case *AddSSTableOperation:
panic(errors.AssertionFailedf(`AddSSTable cannot be used in batches`))
case *BarrierOperation:
panic(errors.AssertionFailedf(`Barrier cannot be used in batches`))
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ type ClientOperationConfig struct {
DeleteRangeUsingTombstone int
// AddSSTable is an operations that ingests an SSTable with random KV pairs.
AddSSTable int
// Barrier is an operation that waits for in-flight writes to complete.
Barrier int
}

// BatchOperationConfig configures the relative probability of generating a
Expand Down Expand Up @@ -373,6 +375,7 @@ func newAllOperationsConfig() GeneratorConfig {
DeleteRange: 1,
DeleteRangeUsingTombstone: 1,
AddSSTable: 1,
Barrier: 1,
}
batchOpConfig := BatchOperationConfig{
Batch: 4,
Expand Down Expand Up @@ -491,6 +494,12 @@ func NewDefaultConfig() GeneratorConfig {
config.Ops.ClosureTxn.CommitBatchOps.AddSSTable = 0
config.Ops.ClosureTxn.TxnClientOps.AddSSTable = 0
config.Ops.ClosureTxn.TxnBatchOps.Ops.AddSSTable = 0
// Barrier cannot be used in batches, and we omit it in txns too because it
// can result in spurious RangeKeyMismatchErrors that fail the txn operation.
config.Ops.Batch.Ops.Barrier = 0
config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0
config.Ops.ClosureTxn.TxnClientOps.Barrier = 0
config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0
return config
}

Expand Down Expand Up @@ -766,6 +775,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig
addOpGen(allowed, randDelRange, c.DeleteRange)
addOpGen(allowed, randDelRangeUsingTombstone, c.DeleteRangeUsingTombstone)
addOpGen(allowed, randAddSSTable, c.AddSSTable)
addOpGen(allowed, randBarrier, c.Barrier)
}

func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) {
Expand Down Expand Up @@ -1056,6 +1066,21 @@ func randAddSSTable(g *generator, rng *rand.Rand) Operation {
return addSSTable(f.Data(), sstSpan, sstTimestamp, seq, asWrites)
}

func randBarrier(g *generator, rng *rand.Rand) Operation {
withLAI := rng.Float64() < 0.5
var key, endKey string
if withLAI {
// Barriers requesting LAIs can't span multiple ranges, so we try to fit
// them within an existing range. This may race with a concurrent split, in
// which case the Barrier will fail, but that's ok -- most should still
// succeed. These errors are ignored by the validator.
key, endKey = randRangeSpan(rng, g.currentSplits)
} else {
key, endKey = randSpan(rng)
}
return barrier(key, endKey, withLAI)
}

func randScan(g *generator, rng *rand.Rand) Operation {
key, endKey := randSpan(rng)
return scan(key, endKey)
Expand Down Expand Up @@ -1735,3 +1760,11 @@ func addSSTable(
AsWrites: asWrites,
}}
}

func barrier(key, endKey string, withLAI bool) Operation {
return Operation{Barrier: &BarrierOperation{
Key: []byte(key),
EndKey: []byte(endKey),
WithLeaseAppliedIndex: withLAI,
}}
}
5 changes: 4 additions & 1 deletion pkg/kv/kvnemesis/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ func TestRandStep(t *testing.T) {
client.DeleteRangeUsingTombstone++
case *AddSSTableOperation:
client.AddSSTable++
case *BarrierOperation:
client.Barrier++
case *BatchOperation:
batch.Batch++
countClientOps(&batch.Ops, nil, o.Ops...)
Expand All @@ -270,7 +272,8 @@ func TestRandStep(t *testing.T) {
*DeleteOperation,
*DeleteRangeOperation,
*DeleteRangeUsingTombstoneOperation,
*AddSSTableOperation:
*AddSSTableOperation,
*BarrierOperation:
countClientOps(&counts.DB, &counts.Batch, step.Op)
case *ClosureTxnOperation:
countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...)
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvnemesis/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (op Operation) Result() *Result {
return &o.Result
case *AddSSTableOperation:
return &o.Result
case *BarrierOperation:
return &o.Result
case *SplitOperation:
return &o.Result
case *MergeOperation:
Expand Down Expand Up @@ -131,6 +133,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
o.format(w, fctx)
case *AddSSTableOperation:
o.format(w, fctx)
case *BarrierOperation:
o.format(w, fctx)
case *SplitOperation:
o.format(w, fctx)
case *MergeOperation:
Expand Down Expand Up @@ -339,6 +343,16 @@ func (op AddSSTableOperation) format(w *strings.Builder, fctx formatCtx) {
}
}

func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) {
if op.WithLeaseAppliedIndex {
fmt.Fprintf(w, `%s.BarrierWithLAI(ctx, %s, %s)`,
fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey))
} else {
fmt.Fprintf(w, `%s.Barrier(ctx, %s, %s)`, fctx.receiver, fmtKey(op.Key), fmtKey(op.EndKey))
}
op.Result.format(w)
}

func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) {
fmt.Fprintf(w, `%s.AdminSplit(ctx, %s, hlc.MaxTimestamp)`, fctx.receiver, fmtKey(op.Key))
op.Result.format(w)
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvnemesis/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ message AddSSTableOperation {
Result result = 6 [(gogoproto.nullable) = false];
}

message BarrierOperation {
bytes key = 1;
bytes end_key = 2;
bool with_lease_applied_index = 3;
Result result = 4 [(gogoproto.nullable) = false];
}

message SplitOperation {
bytes key = 1;
Result result = 2 [(gogoproto.nullable) = false];
Expand Down Expand Up @@ -150,6 +157,7 @@ message Operation {
TransferLeaseOperation transfer_lease = 16;
ChangeZoneOperation change_zone = 17;
AddSSTableOperation add_sstable = 18 [(gogoproto.customname) = "AddSSTable"];
BarrierOperation barrier = 22;
}

enum ResultType {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvnemesis/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func TestOperationsFormat(t *testing.T) {
{
step: step(addSSTable(sstFile.Data(), sstSpan, sstTS, sstValueHeader.KVNemesisSeq.Get(), true)),
},
{step: step(barrier(k1, k2, false /* withLAI */))},
{step: step(barrier(k3, k4, true /* withLAI */))},
}

w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name()))
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestOperationsFormat/5
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
echo
----
···db0.Barrier(ctx, tk(1), tk(2))
3 changes: 3 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestOperationsFormat/6
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
echo
----
···db0.BarrierWithLAI(ctx, tk(3), tk(4))
14 changes: 14 additions & 0 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,20 @@ func (v *validator) processOp(op Operation) {
if v.buffering == bufferingSingle {
v.checkAtomic(`addSSTable`, t.Result)
}
case *BarrierOperation:
execTimestampStrictlyOptional = true
if op.Barrier.WithLeaseAppliedIndex &&
resultHasErrorType(t.Result, &kvpb.RangeKeyMismatchError{}) {
// Barriers requesting LAIs can't span ranges. The generator will
// optimistically try to fit the barrier inside one of the current ranges,
// but this may race with a split, so we ignore the error in this case and
// try again later.
} else {
// Fail or retry on other errors, depending on type.
v.checkNonAmbError(op, t.Result, exceptUnhandledRetry)
}
// We don't yet actually check the barrier guarantees here, i.e. that all
// concurrent writes are applied by the time it completes. Maybe later.
case *ScanOperation:
if _, isErr := v.checkError(op, t.Result); isErr {
break
Expand Down
14 changes: 13 additions & 1 deletion pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,12 @@ func (r *BarrierResponse) combine(_ context.Context, c combinable, _ *BatchReque
return err
}
r.Timestamp.Forward(otherR.Timestamp)
if r.LeaseAppliedIndex != 0 || otherR.LeaseAppliedIndex != 0 {
return errors.AssertionFailedf("can't combine BarrierResponses with LeaseAppliedIndex")
}
if r.RangeDesc.NextReplicaID != 0 || otherR.RangeDesc.NextReplicaID != 0 {
return errors.AssertionFailedf("can't combine BarrierResponses with RangeDesc")
}
}
return nil
}
Expand Down Expand Up @@ -1767,7 +1773,13 @@ func (*RangeStatsRequest) flags() flag { return isRead }
func (*QueryResolvedTimestampRequest) flags() flag {
return isRead | isRange | requiresClosedTSOlderThanStorageSnapshot
}
func (*BarrierRequest) flags() flag { return isWrite | isRange }
func (r *BarrierRequest) flags() flag {
flags := isWrite | isRange | isAlone
if r.WithLeaseAppliedIndex {
flags |= isUnsplittable // the LAI is only valid for a single range
}
return flags
}
func (*IsSpanEmptyRequest) flags() flag { return isRead | isRange }

// IsParallelCommit returns whether the EndTxn request is attempting to perform
Expand Down
31 changes: 28 additions & 3 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2423,11 +2423,28 @@ message QueryResolvedTimestampResponse {
(gogoproto.nullable) = false, (gogoproto.customname) = "ResolvedTS"];
}

// BarrierRequest is the request for a Barrier operation. This goes through Raft
// and has the purpose of waiting until all conflicting in-flight operations on
// this range have completed, without blocking any new operations.
// BarrierRequest is the request for a Barrier operation. This guarantees that
// all past and ongoing writes to a key span have completed and applied on the
// leaseholder. It does this by waiting for all conflicting write latches and
// then submitting a noop write through Raft, waiting for it to apply. Later
// writes are not affected -- in particular, it does not actually take out a
// latch, so writers don't have to wait for it to complete and can write below
// the barrier.
message BarrierRequest {
RequestHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];

// WithLeaseAppliedIndex will return the LeaseAppliedIndex of the barrier
// command in the response, allowing the caller to wait for the barrier to
// apply on an arbitrary replica. It also returns the range descriptor, so the
// caller can detect any unexpected range changes.
//
// When enabled, the barrier request can no longer span multiple ranges, and
// will instead return RangeKeyMismatchError. The caller must be prepared to
// handle this.
//
// NB: This field was added in a patch release. Nodes prior to 24.1 are not
// guaranteed to support it, returning a zero LeaseAppliedIndex instead.
bool with_lease_applied_index = 2;
}

// BarrierResponse is the response for a Barrier operation.
Expand All @@ -2437,6 +2454,14 @@ message BarrierResponse {
// Timestamp at which this Barrier was evaluated. Can be used to guarantee
// future operations happen on the same or newer leaseholders.
util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false];

// LeaseAppliedIndex at which this Barrier was applied. Only returned when
// requested via WithLeaseAppliedIndex.
uint64 lease_applied_index = 3 [(gogoproto.casttype) = "LeaseAppliedIndex"];

// RangeDesc at the time the barrier was applied. Only returned when requested
// via WithLeaseAppliedIndex.
RangeDescriptor range_desc = 4 [(gogoproto.nullable) = false];
}

// A RequestUnion contains exactly one of the requests.
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ func (ba *BatchRequest) IsSingleRequest() bool {
return len(ba.Requests) == 1
}

// IsSingleBarrierRequest returns true iff the batch contains a single request,
// and that request is a Barrier.
func (ba *BatchRequest) IsSingleBarrierRequest() bool {
return ba.isSingleRequestWithMethod(Barrier)
}

// IsSingleSkipsLeaseCheckRequest returns true iff the batch contains a single
// request, and that request has the skipsLeaseCheck flag set.
func (ba *BatchRequest) IsSingleSkipsLeaseCheckRequest() bool {
Expand Down Expand Up @@ -349,7 +355,7 @@ func (ba *BatchRequest) IsSingleExportRequest() bool {
// a no-op. The Barrier request requires consensus even though its evaluation
// is a no-op.
func (ba *BatchRequest) RequiresConsensus() bool {
return ba.isSingleRequestWithMethod(Barrier) || ba.isSingleRequestWithMethod(Probe)
return ba.IsSingleBarrierRequest() || ba.IsSingleProbeRequest()
}

// IsCompleteTransaction determines whether a batch contains every write in a
Expand Down
Loading