Skip to content

Commit

Permalink
Merge #88452
Browse files Browse the repository at this point in the history
88452: kvnemesis: store and assert on presence of execution  timestamp r=erikgrinaker a=tbg

Only review the last commit here, the others are #87877.

----

We currently don't support non-transactional `DeleteRange` because
we want to be able to match up its deletion tombstones with it, but
weren't able to retrieve its execution timestamp.

This is possible though, and in fact it's possible for all MVCC-aware
operations. Rather than plumbing something one-off for DeleteRange,
add a `OptionalTimestamp` field to `Result` and populate it for all
MVCC-aware operations.

This will enable further simplifications in the future, including
addressing the following TODO:

https://github.com/cockroachdb/cockroach/blob/7cde315da539fe3d790f546a1ddde6cc882fca6b/pkg/kv/kvnemesis/validator.go#L43-L46

Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Sep 23, 2022
2 parents 34dc56f + 47317de commit 48af844
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 70 deletions.
174 changes: 116 additions & 58 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,21 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
applyClientOp(ctx, db, op, false /* inTxn */)
case *SplitOperation:
err := db.AdminSplit(ctx, o.Key, hlc.MaxTimestamp)
o.Result = resultError(ctx, err)
o.Result = resultInit(ctx, err)
case *MergeOperation:
err := db.AdminMerge(ctx, o.Key)
o.Result = resultError(ctx, err)
o.Result = resultInit(ctx, err)
case *ChangeReplicasOperation:
desc := getRangeDesc(ctx, o.Key, db)
_, err := db.AdminChangeReplicas(ctx, o.Key, desc, o.Changes)
// TODO(dan): Save returned desc?
o.Result = resultError(ctx, err)
o.Result = resultInit(ctx, err)
case *TransferLeaseOperation:
err := db.AdminTransferLease(ctx, o.Key, o.Target)
o.Result = resultError(ctx, err)
o.Result = resultInit(ctx, err)
case *ChangeZoneOperation:
err := updateZoneConfigInEnv(ctx, env, o.Type)
o.Result = resultError(ctx, err)
o.Result = resultInit(ctx, err)
case *ClosureTxnOperation:
// Use a backoff loop to avoid thrashing on txn aborts. Don't wait between
// epochs of the same transaction to avoid waiting while holding locks.
Expand Down Expand Up @@ -143,16 +143,22 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
panic(errors.AssertionFailedf(`unknown closure txn type: %s`, o.Type))
}
})
o.Result = resultError(ctx, txnErr)
o.Result = resultInit(ctx, txnErr)
if txnErr == nil {
o.Txn = savedTxn.TestingCloneTxn()
o.Result.OptionalTimestamp = o.Txn.WriteTimestamp
}
default:
panic(errors.AssertionFailedf(`unknown operation type: %T %v`, o, o))
}
}

type dbRunI interface {
Run(context.Context, *kv.Batch) error
}

type clientI interface {
dbRunI
Get(context.Context, interface{}) (kv.KeyValue, error)
GetForUpdate(context.Context, interface{}) (kv.KeyValue, error)
Put(context.Context, interface{}, interface{}) error
Expand All @@ -162,76 +168,124 @@ type clientI interface {
ReverseScanForUpdate(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error)
Del(context.Context, ...interface{}) ([]roachpb.Key, error)
DelRange(context.Context, interface{}, interface{}, bool) ([]roachpb.Key, error)
Run(context.Context, *kv.Batch) error
}

func dbRunWithResultAndTimestamp(
ctx context.Context, db dbRunI, ops ...func(b *kv.Batch),
) ([]kv.Result, hlc.Timestamp, error) {
b := &kv.Batch{}
for _, op := range ops {
op(b)
}
ts, err := batchRun(ctx, db.Run, b)
if err != nil {
return nil, hlc.Timestamp{}, err
}
return b.Results, ts, nil
}

func batchRun(
ctx context.Context, run func(context.Context, *kv.Batch) error, b *kv.Batch,
) (hlc.Timestamp, error) {
if err := run(ctx, b); err != nil {
return hlc.Timestamp{}, err
}
var ts hlc.Timestamp
if rr := b.RawResponse(); rr != nil {
ts = rr.Timestamp
}
return ts, nil
}

func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
switch o := op.GetValue().(type) {
case *GetOperation:
fn := db.Get
fn := (*kv.Batch).Get
if o.ForUpdate {
fn = db.GetForUpdate
fn = (*kv.Batch).GetForUpdate
}
kv, err := fn(ctx, o.Key)
res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
fn(b, o.Key)
})
o.Result = resultInit(ctx, err)
if err != nil {
o.Result = resultError(ctx, err)
return
}
o.Result.Type = ResultType_Value
o.Result.OptionalTimestamp = ts
kv := res[0].Rows[0]
if kv.Value != nil {
o.Result.Value = kv.Value.RawBytes
} else {
o.Result.Type = ResultType_Value
if kv.Value != nil {
o.Result.Value = kv.Value.RawBytes
} else {
o.Result.Value = nil
}
o.Result.Value = nil
}
case *PutOperation:
err := db.Put(ctx, o.Key, o.Value)
o.Result = resultError(ctx, err)
_, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.Put(o.Key, o.Value)
})
o.Result = resultInit(ctx, err)
if err != nil {
return
}
o.Result.OptionalTimestamp = ts
case *ScanOperation:
fn := db.Scan
fn := (*kv.Batch).Scan
if o.Reverse && o.ForUpdate {
fn = db.ReverseScanForUpdate
fn = (*kv.Batch).ReverseScanForUpdate
} else if o.Reverse {
fn = db.ReverseScan
fn = (*kv.Batch).ReverseScan
} else if o.ForUpdate {
fn = db.ScanForUpdate
fn = (*kv.Batch).ScanForUpdate
}
kvs, err := fn(ctx, o.Key, o.EndKey, 0 /* maxRows */)
res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
fn(b, o.Key, o.EndKey)
})
o.Result = resultInit(ctx, err)
if err != nil {
o.Result = resultError(ctx, err)
} else {
o.Result.Type = ResultType_Values
o.Result.Values = make([]KeyValue, len(kvs))
for i, kv := range kvs {
o.Result.Values[i] = KeyValue{
Key: []byte(kv.Key),
Value: kv.Value.RawBytes,
}
return
}
kvs := res[0].Rows
o.Result.OptionalTimestamp = ts
o.Result.Type = ResultType_Values
o.Result.Values = make([]KeyValue, len(kvs))
for i, kv := range kvs {
o.Result.Values[i] = KeyValue{
Key: []byte(kv.Key),
Value: kv.Value.RawBytes,
}
}
case *DeleteOperation:
deletedKeys, err := db.Del(ctx, o.Key)
res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.Del(o.Key)
})
o.Result = resultInit(ctx, err)
if err != nil {
o.Result = resultError(ctx, err)
} else {
o.Result.Type = ResultType_Keys
o.Result.Keys = make([][]byte, len(deletedKeys))
for i, deletedKey := range deletedKeys {
o.Result.Keys[i] = deletedKey
}
return
}
deletedKeys := res[0].Keys
o.Result.OptionalTimestamp = ts
o.Result.Type = ResultType_Keys
o.Result.Keys = make([][]byte, len(deletedKeys))
for i, deletedKey := range deletedKeys {
o.Result.Keys[i] = deletedKey
}
case *DeleteRangeOperation:
if !inTxn {
panic(errors.AssertionFailedf(`non-transactional DelRange operations currently unsupported`))
}
deletedKeys, err := db.DelRange(ctx, o.Key, o.EndKey, true /* returnKeys */)
res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
b.DelRange(o.Key, o.EndKey, true /* returnKeys */)
})
o.Result = resultInit(ctx, err)
if err != nil {
o.Result = resultError(ctx, err)
} else {
o.Result.Type = ResultType_Keys
o.Result.Keys = make([][]byte, len(deletedKeys))
for i, deletedKey := range deletedKeys {
o.Result.Keys[i] = deletedKey
}
return
}
deletedKeys := res[0].Keys
o.Result.OptionalTimestamp = ts
o.Result.Type = ResultType_Keys
o.Result.Keys = make([][]byte, len(deletedKeys))
for i, deletedKey := range deletedKeys {
o.Result.Keys[i] = deletedKey
}
case *BatchOperation:
b := &kv.Batch{}
Expand All @@ -244,7 +298,7 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
func applyBatchOp(
ctx context.Context,
b *kv.Batch,
runFn func(context.Context, *kv.Batch) error,
run func(context.Context, *kv.Batch) error,
o *BatchOperation,
inTxn bool,
) {
Expand Down Expand Up @@ -279,13 +333,17 @@ func applyBatchOp(
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
}
runErr := runFn(ctx, b)
o.Result = resultError(ctx, runErr)
ts, err := batchRun(ctx, run, b)
o.Result = resultInit(ctx, err)
// NB: we intentionally fall through; the batch propagates the error
// to each result.
err = nil
o.Result.OptionalTimestamp = ts
for i := range o.Ops {
switch subO := o.Ops[i].GetValue().(type) {
case *GetOperation:
if b.Results[i].Err != nil {
subO.Result = resultError(ctx, b.Results[i].Err)
subO.Result = resultInit(ctx, b.Results[i].Err)
} else {
subO.Result.Type = ResultType_Value
result := b.Results[i].Rows[0]
Expand All @@ -297,11 +355,11 @@ func applyBatchOp(
}
case *PutOperation:
err := b.Results[i].Err
subO.Result = resultError(ctx, err)
subO.Result = resultInit(ctx, err)
case *ScanOperation:
kvs, err := b.Results[i].Rows, b.Results[i].Err
if err != nil {
subO.Result = resultError(ctx, err)
subO.Result = resultInit(ctx, err)
} else {
subO.Result.Type = ResultType_Values
subO.Result.Values = make([]KeyValue, len(kvs))
Expand All @@ -314,11 +372,11 @@ func applyBatchOp(
}
case *DeleteOperation:
err := b.Results[i].Err
subO.Result = resultError(ctx, err)
subO.Result = resultInit(ctx, err)
case *DeleteRangeOperation:
keys, err := b.Results[i].Keys, b.Results[i].Err
if err != nil {
subO.Result = resultError(ctx, err)
subO.Result = resultInit(ctx, err)
} else {
subO.Result.Type = ResultType_Keys
subO.Result.Keys = make([][]byte, len(keys))
Expand All @@ -332,7 +390,7 @@ func applyBatchOp(
}
}

func resultError(ctx context.Context, err error) Result {
func resultInit(ctx context.Context, err error) Result {
if err == nil {
return Result{Type: ResultType_NoError}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvnemesis/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ message Result {
bytes value = 4;
// Only set if Type is ResultType_Values. The RawBytes of a roachpb.Value.
repeated KeyValue values = 5 [(gogoproto.nullable) = false];
util.hlc.Timestamp optional_timestamp = 6 [(gogoproto.nullable) = false];
}

message Step {
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@ const (
// Whenever it is `false`, processOp invokes the validator's checkAtomic method
// for the operation.
func (v *validator) processOp(buffering bool, op Operation) {
// We don't need an execution timestamp when buffering (the caller will need
// an execution timestamp for the combined operation, though). Additionally,
// some operations supported by kvnemesis aren't MVCC-aware (splits, etc) and
// thus also don't need an execution timestamp.
execTimestampStrictlyOptional := buffering
switch t := op.GetValue().(type) {
case *GetOperation:
v.failIfError(op, t.Result)
Expand Down Expand Up @@ -457,8 +462,10 @@ func (v *validator) processOp(buffering bool, op Operation) {
v.curOps = append(v.curOps, scan)
}
case *SplitOperation:
execTimestampStrictlyOptional = true
v.failIfError(op, t.Result)
case *MergeOperation:
execTimestampStrictlyOptional = true
if resultIsErrorStr(t.Result, `cannot merge final range`) {
// Because of some non-determinism, it is not worth it (or maybe not
// possible) to prevent these usage errors. Additionally, I (dan) think
Expand Down Expand Up @@ -500,6 +507,7 @@ func (v *validator) processOp(buffering bool, op Operation) {
v.failIfError(op, t.Result)
}
case *ChangeReplicasOperation:
execTimestampStrictlyOptional = true
var ignore bool
if err := errorFromResult(t.Result); err != nil {
ignore = kvserver.IsRetriableReplicationChangeError(err) ||
Expand All @@ -510,6 +518,7 @@ func (v *validator) processOp(buffering bool, op Operation) {
v.failIfError(op, t.Result)
}
case *TransferLeaseOperation:
execTimestampStrictlyOptional = true
var ignore bool
if err := errorFromResult(t.Result); err != nil {
ignore = kvserver.IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(err) ||
Expand Down Expand Up @@ -539,6 +548,7 @@ func (v *validator) processOp(buffering bool, op Operation) {
v.failIfError(op, t.Result)
}
case *ChangeZoneOperation:
execTimestampStrictlyOptional = true
v.failIfError(op, t.Result)
case *BatchOperation:
if !resultIsRetryable(t.Result) {
Expand Down Expand Up @@ -568,6 +578,10 @@ func (v *validator) processOp(buffering bool, op Operation) {
default:
panic(errors.AssertionFailedf(`unknown operation type: %T %v`, t, t))
}

if !execTimestampStrictlyOptional && !buffering && op.Result().Type != ResultType_Error && op.Result().OptionalTimestamp.IsEmpty() {
v.failures = append(v.failures, errors.Errorf("execution timestamp missing for %s", op))
}
}

// checkAtomic verifies a set of operations that should be atomic by trying to find
Expand Down
Loading

0 comments on commit 48af844

Please sign in to comment.