From 47317de9b271f2c8e38b03564139c950dd673e6c Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 22 Sep 2022 14:45:20 +0200 Subject: [PATCH] kvnemesis: store and assert on presence of execution timestamp We currently don't support non-transactional `DeleteRange` because we want to be able to match up its deletion tombstones with it, but weren't able to retrieve its execution timestamp. This is possible though, and in fact it's possible for all MVCC-aware operations. Rather than plumbing something one-off for DeleteRange, add a `OptionalTimestamp` field to `Result` and populate it for all MVCC-aware operations. This will enable further simplifications in the future, including addressing the following TODO: https://github.com/cockroachdb/cockroach/blob/7cde315da539fe3d790f546a1ddde6cc882fca6b/pkg/kv/kvnemesis/validator.go#L43-L46 Release note: None --- pkg/kv/kvnemesis/applier.go | 174 +++++++++++++++++++---------- pkg/kv/kvnemesis/operations.proto | 1 + pkg/kv/kvnemesis/validator.go | 14 +++ pkg/kv/kvnemesis/validator_test.go | 29 +++-- 4 files changed, 148 insertions(+), 70 deletions(-) diff --git a/pkg/kv/kvnemesis/applier.go b/pkg/kv/kvnemesis/applier.go index 5c0b3b6e8442..7f000c1ff841 100644 --- a/pkg/kv/kvnemesis/applier.go +++ b/pkg/kv/kvnemesis/applier.go @@ -87,21 +87,21 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { applyClientOp(ctx, db, op, false /* inTxn */) case *SplitOperation: err := db.AdminSplit(ctx, o.Key, hlc.MaxTimestamp) - o.Result = resultError(ctx, err) + o.Result = resultInit(ctx, err) case *MergeOperation: err := db.AdminMerge(ctx, o.Key) - o.Result = resultError(ctx, err) + o.Result = resultInit(ctx, err) case *ChangeReplicasOperation: desc := getRangeDesc(ctx, o.Key, db) _, err := db.AdminChangeReplicas(ctx, o.Key, desc, o.Changes) // TODO(dan): Save returned desc? - o.Result = resultError(ctx, err) + o.Result = resultInit(ctx, err) case *TransferLeaseOperation: err := db.AdminTransferLease(ctx, o.Key, o.Target) - o.Result = resultError(ctx, err) + o.Result = resultInit(ctx, err) case *ChangeZoneOperation: err := updateZoneConfigInEnv(ctx, env, o.Type) - o.Result = resultError(ctx, err) + o.Result = resultInit(ctx, err) case *ClosureTxnOperation: // Use a backoff loop to avoid thrashing on txn aborts. Don't wait between // epochs of the same transaction to avoid waiting while holding locks. @@ -143,16 +143,22 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { panic(errors.AssertionFailedf(`unknown closure txn type: %s`, o.Type)) } }) - o.Result = resultError(ctx, txnErr) + o.Result = resultInit(ctx, txnErr) if txnErr == nil { o.Txn = savedTxn.TestingCloneTxn() + o.Result.OptionalTimestamp = o.Txn.WriteTimestamp } default: panic(errors.AssertionFailedf(`unknown operation type: %T %v`, o, o)) } } +type dbRunI interface { + Run(context.Context, *kv.Batch) error +} + type clientI interface { + dbRunI Get(context.Context, interface{}) (kv.KeyValue, error) GetForUpdate(context.Context, interface{}) (kv.KeyValue, error) Put(context.Context, interface{}, interface{}) error @@ -162,76 +168,124 @@ type clientI interface { ReverseScanForUpdate(context.Context, interface{}, interface{}, int64) ([]kv.KeyValue, error) Del(context.Context, ...interface{}) ([]roachpb.Key, error) DelRange(context.Context, interface{}, interface{}, bool) ([]roachpb.Key, error) - Run(context.Context, *kv.Batch) error +} + +func dbRunWithResultAndTimestamp( + ctx context.Context, db dbRunI, ops ...func(b *kv.Batch), +) ([]kv.Result, hlc.Timestamp, error) { + b := &kv.Batch{} + for _, op := range ops { + op(b) + } + ts, err := batchRun(ctx, db.Run, b) + if err != nil { + return nil, hlc.Timestamp{}, err + } + return b.Results, ts, nil +} + +func batchRun( + ctx context.Context, run func(context.Context, *kv.Batch) error, b *kv.Batch, +) (hlc.Timestamp, error) { + if err := run(ctx, b); err != nil { + return hlc.Timestamp{}, err + } + var ts hlc.Timestamp + if rr := b.RawResponse(); rr != nil { + ts = rr.Timestamp + } + return ts, nil } func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { switch o := op.GetValue().(type) { case *GetOperation: - fn := db.Get + fn := (*kv.Batch).Get if o.ForUpdate { - fn = db.GetForUpdate + fn = (*kv.Batch).GetForUpdate } - kv, err := fn(ctx, o.Key) + res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { + fn(b, o.Key) + }) + o.Result = resultInit(ctx, err) if err != nil { - o.Result = resultError(ctx, err) + return + } + o.Result.Type = ResultType_Value + o.Result.OptionalTimestamp = ts + kv := res[0].Rows[0] + if kv.Value != nil { + o.Result.Value = kv.Value.RawBytes } else { - o.Result.Type = ResultType_Value - if kv.Value != nil { - o.Result.Value = kv.Value.RawBytes - } else { - o.Result.Value = nil - } + o.Result.Value = nil } case *PutOperation: - err := db.Put(ctx, o.Key, o.Value) - o.Result = resultError(ctx, err) + _, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { + b.Put(o.Key, o.Value) + }) + o.Result = resultInit(ctx, err) + if err != nil { + return + } + o.Result.OptionalTimestamp = ts case *ScanOperation: - fn := db.Scan + fn := (*kv.Batch).Scan if o.Reverse && o.ForUpdate { - fn = db.ReverseScanForUpdate + fn = (*kv.Batch).ReverseScanForUpdate } else if o.Reverse { - fn = db.ReverseScan + fn = (*kv.Batch).ReverseScan } else if o.ForUpdate { - fn = db.ScanForUpdate + fn = (*kv.Batch).ScanForUpdate } - kvs, err := fn(ctx, o.Key, o.EndKey, 0 /* maxRows */) + res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { + fn(b, o.Key, o.EndKey) + }) + o.Result = resultInit(ctx, err) if err != nil { - o.Result = resultError(ctx, err) - } else { - o.Result.Type = ResultType_Values - o.Result.Values = make([]KeyValue, len(kvs)) - for i, kv := range kvs { - o.Result.Values[i] = KeyValue{ - Key: []byte(kv.Key), - Value: kv.Value.RawBytes, - } + return + } + kvs := res[0].Rows + o.Result.OptionalTimestamp = ts + o.Result.Type = ResultType_Values + o.Result.Values = make([]KeyValue, len(kvs)) + for i, kv := range kvs { + o.Result.Values[i] = KeyValue{ + Key: []byte(kv.Key), + Value: kv.Value.RawBytes, } } case *DeleteOperation: - deletedKeys, err := db.Del(ctx, o.Key) + res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { + b.Del(o.Key) + }) + o.Result = resultInit(ctx, err) if err != nil { - o.Result = resultError(ctx, err) - } else { - o.Result.Type = ResultType_Keys - o.Result.Keys = make([][]byte, len(deletedKeys)) - for i, deletedKey := range deletedKeys { - o.Result.Keys[i] = deletedKey - } + return + } + deletedKeys := res[0].Keys + o.Result.OptionalTimestamp = ts + o.Result.Type = ResultType_Keys + o.Result.Keys = make([][]byte, len(deletedKeys)) + for i, deletedKey := range deletedKeys { + o.Result.Keys[i] = deletedKey } case *DeleteRangeOperation: if !inTxn { panic(errors.AssertionFailedf(`non-transactional DelRange operations currently unsupported`)) } - deletedKeys, err := db.DelRange(ctx, o.Key, o.EndKey, true /* returnKeys */) + res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { + b.DelRange(o.Key, o.EndKey, true /* returnKeys */) + }) + o.Result = resultInit(ctx, err) if err != nil { - o.Result = resultError(ctx, err) - } else { - o.Result.Type = ResultType_Keys - o.Result.Keys = make([][]byte, len(deletedKeys)) - for i, deletedKey := range deletedKeys { - o.Result.Keys[i] = deletedKey - } + return + } + deletedKeys := res[0].Keys + o.Result.OptionalTimestamp = ts + o.Result.Type = ResultType_Keys + o.Result.Keys = make([][]byte, len(deletedKeys)) + for i, deletedKey := range deletedKeys { + o.Result.Keys[i] = deletedKey } case *BatchOperation: b := &kv.Batch{} @@ -244,7 +298,7 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { func applyBatchOp( ctx context.Context, b *kv.Batch, - runFn func(context.Context, *kv.Batch) error, + run func(context.Context, *kv.Batch) error, o *BatchOperation, inTxn bool, ) { @@ -279,13 +333,17 @@ func applyBatchOp( panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO)) } } - runErr := runFn(ctx, b) - o.Result = resultError(ctx, runErr) + ts, err := batchRun(ctx, run, b) + o.Result = resultInit(ctx, err) + // NB: we intentionally fall through; the batch propagates the error + // to each result. + err = nil + o.Result.OptionalTimestamp = ts for i := range o.Ops { switch subO := o.Ops[i].GetValue().(type) { case *GetOperation: if b.Results[i].Err != nil { - subO.Result = resultError(ctx, b.Results[i].Err) + subO.Result = resultInit(ctx, b.Results[i].Err) } else { subO.Result.Type = ResultType_Value result := b.Results[i].Rows[0] @@ -297,11 +355,11 @@ func applyBatchOp( } case *PutOperation: err := b.Results[i].Err - subO.Result = resultError(ctx, err) + subO.Result = resultInit(ctx, err) case *ScanOperation: kvs, err := b.Results[i].Rows, b.Results[i].Err if err != nil { - subO.Result = resultError(ctx, err) + subO.Result = resultInit(ctx, err) } else { subO.Result.Type = ResultType_Values subO.Result.Values = make([]KeyValue, len(kvs)) @@ -314,11 +372,11 @@ func applyBatchOp( } case *DeleteOperation: err := b.Results[i].Err - subO.Result = resultError(ctx, err) + subO.Result = resultInit(ctx, err) case *DeleteRangeOperation: keys, err := b.Results[i].Keys, b.Results[i].Err if err != nil { - subO.Result = resultError(ctx, err) + subO.Result = resultInit(ctx, err) } else { subO.Result.Type = ResultType_Keys subO.Result.Keys = make([][]byte, len(keys)) @@ -332,7 +390,7 @@ func applyBatchOp( } } -func resultError(ctx context.Context, err error) Result { +func resultInit(ctx context.Context, err error) Result { if err == nil { return Result{Type: ResultType_NoError} } diff --git a/pkg/kv/kvnemesis/operations.proto b/pkg/kv/kvnemesis/operations.proto index 3f5ac2a33e6d..e5ac38c0f913 100644 --- a/pkg/kv/kvnemesis/operations.proto +++ b/pkg/kv/kvnemesis/operations.proto @@ -149,6 +149,7 @@ message Result { bytes value = 4; // Only set if Type is ResultType_Values. The RawBytes of a roachpb.Value. repeated KeyValue values = 5 [(gogoproto.nullable) = false]; + util.hlc.Timestamp optional_timestamp = 6 [(gogoproto.nullable) = false]; } message Step { diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index aca8d33a3e2f..5f44be2053d4 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -347,6 +347,11 @@ const ( // Whenever it is `false`, processOp invokes the validator's checkAtomic method // for the operation. func (v *validator) processOp(buffering bool, op Operation) { + // We don't need an execution timestamp when buffering (the caller will need + // an execution timestamp for the combined operation, though). Additionally, + // some operations supported by kvnemesis aren't MVCC-aware (splits, etc) and + // thus also don't need an execution timestamp. + execTimestampStrictlyOptional := buffering switch t := op.GetValue().(type) { case *GetOperation: v.failIfError(op, t.Result) @@ -457,8 +462,10 @@ func (v *validator) processOp(buffering bool, op Operation) { v.curOps = append(v.curOps, scan) } case *SplitOperation: + execTimestampStrictlyOptional = true v.failIfError(op, t.Result) case *MergeOperation: + execTimestampStrictlyOptional = true if resultIsErrorStr(t.Result, `cannot merge final range`) { // Because of some non-determinism, it is not worth it (or maybe not // possible) to prevent these usage errors. Additionally, I (dan) think @@ -500,6 +507,7 @@ func (v *validator) processOp(buffering bool, op Operation) { v.failIfError(op, t.Result) } case *ChangeReplicasOperation: + execTimestampStrictlyOptional = true var ignore bool if err := errorFromResult(t.Result); err != nil { ignore = kvserver.IsRetriableReplicationChangeError(err) || @@ -510,6 +518,7 @@ func (v *validator) processOp(buffering bool, op Operation) { v.failIfError(op, t.Result) } case *TransferLeaseOperation: + execTimestampStrictlyOptional = true var ignore bool if err := errorFromResult(t.Result); err != nil { ignore = kvserver.IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(err) || @@ -539,6 +548,7 @@ func (v *validator) processOp(buffering bool, op Operation) { v.failIfError(op, t.Result) } case *ChangeZoneOperation: + execTimestampStrictlyOptional = true v.failIfError(op, t.Result) case *BatchOperation: if !resultIsRetryable(t.Result) { @@ -568,6 +578,10 @@ func (v *validator) processOp(buffering bool, op Operation) { default: panic(errors.AssertionFailedf(`unknown operation type: %T %v`, t, t)) } + + if !execTimestampStrictlyOptional && !buffering && op.Result().Type != ResultType_Error && op.Result().OptionalTimestamp.IsEmpty() { + v.failures = append(v.failures, errors.Errorf("execution timestamp missing for %s", op)) + } } // checkAtomic verifies a set of operations that should be atomic by trying to find diff --git a/pkg/kv/kvnemesis/validator_test.go b/pkg/kv/kvnemesis/validator_test.go index e483cb52dab7..ad1cc51e2470 100644 --- a/pkg/kv/kvnemesis/validator_test.go +++ b/pkg/kv/kvnemesis/validator_test.go @@ -48,15 +48,22 @@ func withTimestamp(op Operation, ts int) Operation { } func withResult(op Operation, err error) Operation { - (*op.Result()) = resultError(context.Background(), err) + *op.Result() = resultInit(context.Background(), err) + // A number of operations are expected to report an execution + // timestamp. Sometime down the road we want to use that timestamp + // for validation throughout, but currently we only use it for + // transactions, and only in a limited fashion. + // + // Until we rely on it, we set a bogus timestamp here that will be obvious + // when and if it should start causing errors. + op.Result().OptionalTimestamp = hlc.Timestamp{Logical: 987} return op } func withReadResult(op Operation, value string) Operation { + op = withResult(op, nil) get := op.GetValue().(*GetOperation) - get.Result = Result{ - Type: ResultType_Value, - } + get.Result.Type = ResultType_Value if value != `` { get.Result.Value = roachpb.MakeValueFromString(value).RawBytes } @@ -64,20 +71,18 @@ func withReadResult(op Operation, value string) Operation { } func withScanResult(op Operation, kvs ...KeyValue) Operation { + op = withResult(op, nil) scan := op.GetValue().(*ScanOperation) - scan.Result = Result{ - Type: ResultType_Values, - Values: kvs, - } + scan.Result.Type = ResultType_Values + scan.Result.Values = kvs return op } func withDeleteRangeResult(op Operation, keys ...[]byte) Operation { + op = withResult(op, nil) delRange := op.GetValue().(*DeleteRangeOperation) - delRange.Result = Result{ - Type: ResultType_Keys, - Keys: keys, - } + delRange.Result.Type = ResultType_Keys + delRange.Result.Keys = keys return op }