Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: batcheval: add BarrierRequest.WithLeaseAppliedIndex #118474

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ func (b *Batch) scanInterleavedIntents(s, e interface{}) {
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) barrier(s, e interface{}) {
func (b *Batch) barrier(s, e interface{}, withLAI bool) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -921,6 +921,7 @@ func (b *Batch) barrier(s, e interface{}) {
Key: begin,
EndKey: end,
},
WithLeaseAppliedIndex: withLAI,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
44 changes: 34 additions & 10 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,23 +827,47 @@ func (db *DB) ScanInterleavedIntents(
// writes on the specified key range to finish.
func (db *DB) Barrier(ctx context.Context, begin, end interface{}) (hlc.Timestamp, error) {
b := &Batch{}
b.barrier(begin, end)
err := getOneErr(db.Run(ctx, b), b)
if err != nil {
b.barrier(begin, end, false /* withLAI */)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return hlc.Timestamp{}, err
}
responses := b.response.Responses
if len(responses) == 0 {
return hlc.Timestamp{}, errors.Errorf("unexpected empty response for Barrier")
if l := len(b.response.Responses); l != 1 {
return hlc.Timestamp{}, errors.Errorf("got %d responses for Barrier", l)
}
resp, ok := responses[0].GetInner().(*roachpb.BarrierResponse)
if !ok {
return hlc.Timestamp{}, errors.Errorf("unexpected response of type %T for Barrier",
responses[0].GetInner())
resp := b.response.Responses[0].GetBarrier()
if resp == nil {
return hlc.Timestamp{}, errors.Errorf("unexpected response %T for Barrier",
b.response.Responses[0].GetInner())
}
return resp.Timestamp, nil
}

// BarrierWithLAI is like Barrier, but also returns the lease applied index and
// range descriptor at which the barrier was applied. In this case, the barrier
// can't span multiple ranges, otherwise a RangeKeyMismatchError is returned.
//
// NB: the protocol support for this was added in a patch release, and is not
// guaranteed to be present with nodes prior to 24.1. In this case, the request
// will return an empty result.
func (db *DB) BarrierWithLAI(
ctx context.Context, begin, end interface{},
) (uint64, roachpb.RangeDescriptor, error) {
b := &Batch{}
b.barrier(begin, end, true /* withLAI */)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return 0, roachpb.RangeDescriptor{}, err
}
if l := len(b.response.Responses); l != 1 {
return 0, roachpb.RangeDescriptor{}, errors.Errorf("got %d responses for Barrier", l)
}
resp := b.response.Responses[0].GetBarrier()
if resp == nil {
return 0, roachpb.RangeDescriptor{}, errors.Errorf("unexpected response %T for Barrier",
b.response.Responses[0].GetInner())
}
return resp.LeaseAppliedIndex, resp.RangeDesc, nil
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
21 changes: 21 additions & 0 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
case *ChangeZoneOperation:
err := updateZoneConfigInEnv(ctx, env, o.Type)
o.Result = resultError(ctx, err)
case *BarrierOperation:
var err error
if o.WithLeaseAppliedIndex {
_, _, err = db.BarrierWithLAI(ctx, o.Key, o.EndKey)
} else {
_, err = db.Barrier(ctx, o.Key, o.EndKey)
}
o.Result = resultError(ctx, err)
case *ClosureTxnOperation:
// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between
// epochs of the same transaction to avoid waiting while holding locks.
Expand Down Expand Up @@ -232,6 +240,17 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
o.Result.Keys[i] = deletedKey
}
}
case *BarrierOperation:
b := &kv.Batch{}
b.AddRawRequest(&roachpb.BarrierRequest{
RequestHeader: roachpb.RequestHeader{
Key: o.Key,
EndKey: o.EndKey,
},
WithLeaseAppliedIndex: o.WithLeaseAppliedIndex,
})
err := db.Run(ctx, b)
o.Result = resultError(ctx, err)
case *BatchOperation:
b := &kv.Batch{}
applyBatchOp(ctx, b, db.Run, o, inTxn)
Expand Down Expand Up @@ -274,6 +293,8 @@ func applyBatchOp(
panic(errors.AssertionFailedf(`non-transactional batch DelRange operations currently unsupported`))
}
b.DelRange(subO.Key, subO.EndKey, true /* returnKeys */)
case *BarrierOperation:
panic(errors.AssertionFailedf(`Barrier cannot be used in batches`))
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
Expand Down
61 changes: 61 additions & 0 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvnemesis

import (
"math/rand"
"sort"
"strconv"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -104,6 +105,8 @@ type ClientOperationConfig struct {
DeleteExisting int
// DeleteRange is an operation that Deletes a key range that may contain values.
DeleteRange int
// Barrier is an operation that waits for in-flight writes to complete.
Barrier int
}

// BatchOperationConfig configures the relative probability of generating a
Expand Down Expand Up @@ -183,6 +186,7 @@ func newAllOperationsConfig() GeneratorConfig {
DeleteMissing: 1,
DeleteExisting: 1,
DeleteRange: 1,
Barrier: 1,
}
batchOpConfig := BatchOperationConfig{
Batch: 4,
Expand Down Expand Up @@ -247,6 +251,12 @@ func NewDefaultConfig() GeneratorConfig {
// TODO(dan): Remove when #45586 is addressed.
config.Ops.ClosureTxn.CommitBatchOps.GetExisting = 0
config.Ops.ClosureTxn.CommitBatchOps.GetMissing = 0
// Barrier cannot be used in batches, and we omit it in txns too because it
// can result in spurious RangeKeyMismatchErrors that fail the txn operation.
config.Ops.Batch.Ops.Barrier = 0
config.Ops.ClosureTxn.CommitBatchOps.Barrier = 0
config.Ops.ClosureTxn.TxnClientOps.Barrier = 0
config.Ops.ClosureTxn.TxnBatchOps.Ops.Barrier = 0
return config
}

Expand Down Expand Up @@ -433,6 +443,7 @@ func (g *generator) registerClientOps(allowed *[]opGen, c *ClientOperationConfig
addOpGen(allowed, randReverseScan, c.ReverseScan)
addOpGen(allowed, randReverseScanForUpdate, c.ReverseScanForUpdate)
addOpGen(allowed, randDelRange, c.DeleteRange)
addOpGen(allowed, randBarrier, c.Barrier)
}

func (g *generator) registerBatchOps(allowed *[]opGen, c *BatchOperationConfig) {
Expand Down Expand Up @@ -474,6 +485,21 @@ func randPutExisting(g *generator, rng *rand.Rand) Operation {
return put(key, value)
}

func randBarrier(g *generator, rng *rand.Rand) Operation {
withLAI := rng.Float64() < 0.5
var key, endKey string
if withLAI {
// Barriers requesting LAIs can't span multiple ranges, so we try to fit
// them within an existing range. This may race with a concurrent split, in
// which case the Barrier will fail, but that's ok -- most should still
// succeed. These errors are ignored by the validator.
key, endKey = randRangeSpan(rng, g.currentSplits)
} else {
key, endKey = randSpan(rng)
}
return barrier(key, endKey, withLAI)
}

func randScan(g *generator, rng *rand.Rand) Operation {
key, endKey := randSpan(rng)
return scan(key, endKey)
Expand Down Expand Up @@ -656,6 +682,33 @@ func randKey(rng *rand.Rand) string {
return string(key)
}

// Interprets the provided map as the split points of the key space and returns
// the boundaries of a random range.
func randRangeSpan(rng *rand.Rand, curOrHistSplits map[string]struct{}) (string, string) {
genSpan := GeneratorDataSpan()
keys := make([]string, 0, len(curOrHistSplits))
for key := range curOrHistSplits {
keys = append(keys, key)
}
sort.Strings(keys)
if len(keys) == 0 {
// No splits.
return string(genSpan.Key), string(genSpan.EndKey)
}
idx := rng.Intn(len(keys) + 1)
if idx == len(keys) {
// Last range.
return keys[idx-1], string(genSpan.EndKey)
}
if idx == 0 {
// First range. We avoid having splits at 0 so this will be a well-formed
// range. (If it isn't, we'll likely catch an error because we'll send an
// ill-formed request and kvserver will error it out).
return string(genSpan.Key), keys[0]
}
return keys[idx-1], keys[idx]
}

func randMapKey(rng *rand.Rand, m map[string]struct{}) string {
keys := make([]string, 0, len(m))
for key := range m {
Expand Down Expand Up @@ -756,3 +809,11 @@ func transferLease(key string, target roachpb.StoreID) Operation {
func changeZone(changeType ChangeZoneType) Operation {
return Operation{ChangeZone: &ChangeZoneOperation{Type: changeType}}
}

func barrier(key, endKey string, withLAI bool) Operation {
return Operation{Barrier: &BarrierOperation{
Key: []byte(key),
EndKey: []byte(endKey),
WithLeaseAppliedIndex: withLAI,
}}
}
5 changes: 4 additions & 1 deletion pkg/kv/kvnemesis/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ func TestRandStep(t *testing.T) {
}
case *DeleteRangeOperation:
client.DeleteRange++
case *BarrierOperation:
client.Barrier++
case *BatchOperation:
batch.Batch++
countClientOps(&batch.Ops, nil, o.Ops...)
Expand All @@ -152,7 +154,8 @@ func TestRandStep(t *testing.T) {
*ScanOperation,
*BatchOperation,
*DeleteOperation,
*DeleteRangeOperation:
*DeleteRangeOperation,
*BarrierOperation:
countClientOps(&counts.DB, &counts.Batch, step.Op)
case *ClosureTxnOperation:
countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...)
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvnemesis/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func (op Operation) Result() *Result {
return &o.Result
case *DeleteRangeOperation:
return &o.Result
case *BarrierOperation:
return &o.Result
case *SplitOperation:
return &o.Result
case *MergeOperation:
Expand Down Expand Up @@ -112,6 +114,8 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
o.format(w, fctx)
case *DeleteRangeOperation:
o.format(w, fctx)
case *BarrierOperation:
o.format(w, fctx)
case *SplitOperation:
o.format(w, fctx)
case *MergeOperation:
Expand Down Expand Up @@ -259,6 +263,16 @@ func (op DeleteRangeOperation) format(w *strings.Builder, fctx formatCtx) {
}
}

func (op BarrierOperation) format(w *strings.Builder, fctx formatCtx) {
if op.WithLeaseAppliedIndex {
fmt.Fprintf(w, `%s.BarrierWithLAI(ctx, %s, %s)`,
fctx.receiver, roachpb.Key(op.Key), roachpb.Key(op.EndKey))
} else {
fmt.Fprintf(w, `%s.Barrier(ctx, %s, %s)`, fctx.receiver, roachpb.Key(op.Key), roachpb.Key(op.EndKey))
}
op.Result.format(w)
}

func (op SplitOperation) format(w *strings.Builder, fctx formatCtx) {
fmt.Fprintf(w, `%s.AdminSplit(ctx, %s)`, fctx.receiver, roachpb.Key(op.Key))
op.Result.format(w)
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvnemesis/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ message DeleteRangeOperation {
Result result = 3 [(gogoproto.nullable) = false];
}

message BarrierOperation {
bytes key = 1;
bytes end_key = 2;
bool with_lease_applied_index = 3;
Result result = 4 [(gogoproto.nullable) = false];
}

message SplitOperation {
bytes key = 1;
Result result = 2 [(gogoproto.nullable) = false];
Expand Down Expand Up @@ -122,6 +129,7 @@ message Operation {
ChangeReplicasOperation change_replicas = 14;
TransferLeaseOperation transfer_lease = 15;
ChangeZoneOperation change_zone = 16;
BarrierOperation barrier = 22;
}

enum ResultType {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvnemesis/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ func TestOperationsFormat(t *testing.T) {
})
`,
},
{
step: step(barrier(`a`, `b`, false /* withLAI */)),
expected: `db0.Barrier(ctx, "a", "b")`,
},
{
step: step(barrier(`c`, `d`, true /* withLAI */)),
expected: `db0.BarrierWithLAI(ctx, "c", "d")`,
},
}

for _, test := range tests {
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,19 @@ func (v *validator) processOp(txnID *string, op Operation) {
v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], scan)
v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], deleteOps...)
}
case *BarrierOperation:
if op.Barrier.WithLeaseAppliedIndex &&
resultIsError(t.Result, &roachpb.RangeKeyMismatchError{}) {
// Barriers requesting LAIs can't span ranges. The generator will
// optimistically try to fit the barrier inside one of the current ranges,
// but this may race with a split, so we ignore the error in this case and
// try again later.
} else {
// Fail or retry on other errors, depending on type.
v.failIfError(op, t.Result)
}
// We don't yet actually check the barrier guarantees here, i.e. that all
// concurrent writes are applied by the time it completes. Maybe later.
case *ScanOperation:
v.failIfError(op, t.Result)
if txnID == nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ go_test(
size = "medium",
srcs = [
"cmd_add_sstable_test.go",
"cmd_barrier_test.go",
"cmd_clear_range_test.go",
"cmd_delete_range_gchint_test.go",
"cmd_delete_range_test.go",
Expand Down
12 changes: 9 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,19 @@ func declareKeysBarrier(
latchSpans.AddNonMVCC(spanset.SpanReadWrite, req.Header().Span())
}

// Barrier evaluation is a no-op, as all the latch waiting happens in
// the latch manager.
// Barrier evaluation is a no-op, but it still goes through Raft because of
// BatchRequest.RequiresConsensus(). The latch waiting happens in the latch
// manager, and the WithLeaseAppliedIndex info is populated during application.
func Barrier(
_ context.Context, _ storage.ReadWriter, cArgs CommandArgs, response roachpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*roachpb.BarrierRequest)
resp := response.(*roachpb.BarrierResponse)
resp.Timestamp = cArgs.EvalCtx.Clock().Now()

return result.Result{}, nil
return result.Result{
Local: result.LocalResult{
PopulateBarrierResponse: args.WithLeaseAppliedIndex,
},
}, nil
}
Loading