From 02390683683fd9bc9a77e0fb14aff63a0cc67dfd Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 11 Nov 2021 11:35:48 +0000 Subject: [PATCH] roachpb: assert request flag combinations Request flags have implicit dependencies and incompatibilities (e.g. `isLocking` implies `isTxn`). However, these were never checked and developers were expected to satisfy them manually, which is error-prone. This patch adds `TestFlagCombinations` that checks these dependencies and incompatibilities, based on `flagDependencies` and `flagExclusions` maps which encodes them. It also adds a new `flag` type for flags, renames `skipLeaseCheck` to `skipsLeaseCheck`, and adds `isAlone` for `CheckConsistencyRequest`. Release note: None --- pkg/kv/kvserver/replica.go | 2 +- pkg/roachpb/BUILD.bazel | 2 + pkg/roachpb/api.go | 151 ++++++++++++++++++-------------- pkg/roachpb/api_test.go | 43 +++++++++ pkg/roachpb/batch.go | 16 ++-- pkg/testutils/lint/lint_test.go | 2 + 6 files changed, 139 insertions(+), 77 deletions(-) diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 3bc7c2c086c2..bc1f1b618cf2 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1329,7 +1329,7 @@ func (r *Replica) checkExecutionCanProceed( st = kvserverpb.LeaseStatus{ Now: now, } - } else if ba.IsSingleSkipLeaseCheckRequest() { + } else if ba.IsSingleSkipsLeaseCheckRequest() { // For lease commands, use the provided previous lease for verification. st = kvserverpb.LeaseStatus{ Lease: ba.GetPrevLeaseForLeaseRequest(), diff --git a/pkg/roachpb/BUILD.bazel b/pkg/roachpb/BUILD.bazel index 0ef0fee92502..766d888fd01d 100644 --- a/pkg/roachpb/BUILD.bazel +++ b/pkg/roachpb/BUILD.bazel @@ -171,6 +171,8 @@ go_test( "@com_github_cockroachdb_apd_v2//:apd", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", + "@com_github_gogo_protobuf//proto", + "@com_github_golang_protobuf//proto:go_default_library", "@com_github_kr_pretty//:pretty", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 12765c3f5806..712a5a8f8a0f 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -71,25 +71,40 @@ func (rc ReadConsistencyType) SupportsBatch(ba BatchRequest) error { panic("unreachable") } +type flag int + const ( - isAdmin = 1 << iota // admin cmds don't go through raft, but run on lease holder - isRead // read-only cmds don't go through raft, but may run on lease holder - isWrite // write cmds go through raft and must be proposed on lease holder - isTxn // txn commands may be part of a transaction - isLocking // locking cmds acquire locks for their transaction (implies isTxn) - isIntentWrite // intent write cmds leave intents when they succeed (implies isWrite and isLocking) - isRange // range commands may span multiple keys - isReverse // reverse commands traverse ranges in descending direction - isAlone // requests which must be alone in a batch - isPrefix // requests which should be grouped with the next request in a batch - isUnsplittable // range command that must not be split during sending - skipLeaseCheck // commands which skip the check that the evaluating replica has a valid lease - updatesTSCache // commands which update the timestamp cache - updatesTSCacheOnErr // commands which make read data available on errors - needsRefresh // commands which require refreshes to avoid serializable retries - canBackpressure // commands which deserve backpressure when a Range grows too large + isAdmin flag = 1 << iota // admin cmds don't go through raft, but run on lease holder + isRead // read-only cmds don't go through raft, but may run on lease holder + isWrite // write cmds go through raft and must be proposed on lease holder + isTxn // txn commands may be part of a transaction + isLocking // locking cmds acquire locks for their transaction + isIntentWrite // intent write cmds leave intents when they succeed + isRange // range commands may span multiple keys + isReverse // reverse commands traverse ranges in descending direction + isAlone // requests which must be alone in a batch + isPrefix // requests which, in a batch, must not be split from the following request + isUnsplittable // range command that must not be split during sending + skipsLeaseCheck // commands which skip the check that the evaluating replica has a valid lease + updatesTSCache // commands which update the timestamp cache + updatesTSCacheOnErr // commands which make read data available on errors + needsRefresh // commands which require refreshes to avoid serializable retries + canBackpressure // commands which deserve backpressure when a Range grows too large ) +// flagDependencies specifies flag dependencies, asserted by TestFlagCombinations. +var flagDependencies = map[flag][]flag{ + isAdmin: {isAlone}, + isLocking: {isTxn}, + isIntentWrite: {isWrite, isLocking}, + skipsLeaseCheck: {isAlone}, +} + +// flagExclusions specifies flag incompatibilities, asserted by TestFlagCombinations. +var flagExclusions = map[flag][]flag{ + skipsLeaseCheck: {isIntentWrite}, +} + // IsReadOnly returns true iff the request is read-only. A request is // read-only if it does not go through raft, meaning that it cannot // change any replicated state. However, read-only requests may still @@ -187,7 +202,7 @@ type Request interface { Method() Method // ShallowCopy returns a shallow copy of the receiver. ShallowCopy() Request - flags() int + flags() flag } // SizedWriteRequest is an interface used to expose the number of bytes a @@ -1160,19 +1175,19 @@ func scanLockStrength(forUpdate bool) lock.Strength { return lock.None } -func flagForLockStrength(l lock.Strength) int { +func flagForLockStrength(l lock.Strength) flag { if l != lock.None { return isLocking } return 0 } -func (gr *GetRequest) flags() int { +func (gr *GetRequest) flags() flag { maybeLocking := flagForLockStrength(gr.KeyLocking) return isRead | isTxn | maybeLocking | updatesTSCache | needsRefresh } -func (*PutRequest) flags() int { +func (*PutRequest) flags() flag { return isWrite | isTxn | isLocking | isIntentWrite | canBackpressure } @@ -1181,7 +1196,7 @@ func (*PutRequest) flags() int { // ConditionalPuts do not require a refresh because on write-too-old errors, // they return an error immediately instead of continuing a serializable // transaction to be retried at end transaction. -func (*ConditionalPutRequest) flags() int { +func (*ConditionalPutRequest) flags() flag { return isRead | isWrite | isTxn | isLocking | isIntentWrite | updatesTSCache | updatesTSCacheOnErr | canBackpressure } @@ -1190,7 +1205,7 @@ func (*ConditionalPutRequest) flags() int { // InitPuts do not require a refresh because on write-too-old errors, they // return an error immediately instead of continuing a serializable transaction // to be retried at end transaction. -func (*InitPutRequest) flags() int { +func (*InitPutRequest) flags() flag { return isRead | isWrite | isTxn | isLocking | isIntentWrite | updatesTSCache | updatesTSCacheOnErr | canBackpressure } @@ -1199,15 +1214,15 @@ func (*InitPutRequest) flags() int { // require a refresh because on write-too-old errors, they return an // error immediately instead of continuing a serializable transaction // to be retried at end transaction. -func (*IncrementRequest) flags() int { +func (*IncrementRequest) flags() flag { return isRead | isWrite | isTxn | isLocking | isIntentWrite | canBackpressure } -func (*DeleteRequest) flags() int { +func (*DeleteRequest) flags() flag { return isWrite | isTxn | isLocking | isIntentWrite | canBackpressure } -func (drr *DeleteRangeRequest) flags() int { +func (drr *DeleteRangeRequest) flags() flag { // DeleteRangeRequest has different properties if the "inline" flag is set. // This flag indicates that the request is deleting inline MVCC values, // which cannot be deleted transactionally - inline DeleteRange will thus @@ -1236,18 +1251,18 @@ func (drr *DeleteRangeRequest) flags() int { // Note that ClearRange commands cannot be part of a transaction as // they clear all MVCC versions. -func (*ClearRangeRequest) flags() int { return isWrite | isRange | isAlone } +func (*ClearRangeRequest) flags() flag { return isWrite | isRange | isAlone } // Note that RevertRange commands cannot be part of a transaction as // they clear all MVCC versions above their target time. -func (*RevertRangeRequest) flags() int { return isWrite | isRange } +func (*RevertRangeRequest) flags() flag { return isWrite | isRange } -func (sr *ScanRequest) flags() int { +func (sr *ScanRequest) flags() flag { maybeLocking := flagForLockStrength(sr.KeyLocking) return isRead | isRange | isTxn | maybeLocking | updatesTSCache | needsRefresh } -func (rsr *ReverseScanRequest) flags() int { +func (rsr *ReverseScanRequest) flags() flag { maybeLocking := flagForLockStrength(rsr.KeyLocking) return isRead | isRange | isReverse | isTxn | maybeLocking | updatesTSCache | needsRefresh } @@ -1255,43 +1270,43 @@ func (rsr *ReverseScanRequest) flags() int { // EndTxn updates the timestamp cache to prevent replays. // Replays for the same transaction key and timestamp will have // Txn.WriteTooOld=true and must retry on EndTxn. -func (*EndTxnRequest) flags() int { return isWrite | isTxn | isAlone | updatesTSCache } -func (*AdminSplitRequest) flags() int { return isAdmin | isAlone } -func (*AdminUnsplitRequest) flags() int { return isAdmin | isAlone } -func (*AdminMergeRequest) flags() int { return isAdmin | isAlone } -func (*AdminTransferLeaseRequest) flags() int { return isAdmin | isAlone } -func (*AdminChangeReplicasRequest) flags() int { return isAdmin | isAlone } -func (*AdminRelocateRangeRequest) flags() int { return isAdmin | isAlone } -func (*GCRequest) flags() int { return isWrite | isRange } +func (*EndTxnRequest) flags() flag { return isWrite | isTxn | isAlone | updatesTSCache } +func (*AdminSplitRequest) flags() flag { return isAdmin | isAlone } +func (*AdminUnsplitRequest) flags() flag { return isAdmin | isAlone } +func (*AdminMergeRequest) flags() flag { return isAdmin | isAlone } +func (*AdminTransferLeaseRequest) flags() flag { return isAdmin | isAlone } +func (*AdminChangeReplicasRequest) flags() flag { return isAdmin | isAlone } +func (*AdminRelocateRangeRequest) flags() flag { return isAdmin | isAlone } +func (*GCRequest) flags() flag { return isWrite | isRange } // HeartbeatTxn updates the timestamp cache with transaction records, // to avoid checking for them on disk when considering 1PC evaluation. -func (*HeartbeatTxnRequest) flags() int { return isWrite | isTxn | updatesTSCache } +func (*HeartbeatTxnRequest) flags() flag { return isWrite | isTxn | updatesTSCache } // PushTxnRequest updates different marker keys in the timestamp cache when // pushing a transaction's timestamp and when aborting a transaction. -func (*PushTxnRequest) flags() int { +func (*PushTxnRequest) flags() flag { return isWrite | isAlone | updatesTSCache | updatesTSCache } -func (*RecoverTxnRequest) flags() int { return isWrite | isAlone | updatesTSCache } -func (*QueryTxnRequest) flags() int { return isRead | isAlone } +func (*RecoverTxnRequest) flags() flag { return isWrite | isAlone | updatesTSCache } +func (*QueryTxnRequest) flags() flag { return isRead | isAlone } // QueryIntent only updates the timestamp cache when attempting to prevent an // intent that is found missing from ever being written in the future. See // QueryIntentRequest_PREVENT. -func (*QueryIntentRequest) flags() int { +func (*QueryIntentRequest) flags() flag { return isRead | isPrefix | updatesTSCache | updatesTSCacheOnErr } -func (*ResolveIntentRequest) flags() int { return isWrite } -func (*ResolveIntentRangeRequest) flags() int { return isWrite | isRange } -func (*TruncateLogRequest) flags() int { return isWrite } -func (*MergeRequest) flags() int { return isWrite | canBackpressure } -func (*RequestLeaseRequest) flags() int { return isWrite | isAlone | skipLeaseCheck } +func (*ResolveIntentRequest) flags() flag { return isWrite } +func (*ResolveIntentRangeRequest) flags() flag { return isWrite | isRange } +func (*TruncateLogRequest) flags() flag { return isWrite } +func (*MergeRequest) flags() flag { return isWrite | canBackpressure } +func (*RequestLeaseRequest) flags() flag { return isWrite | isAlone | skipsLeaseCheck } // LeaseInfoRequest is usually executed in an INCONSISTENT batch, which has the -// effect of the `skipLeaseCheck` flag that lease write operations have. -func (*LeaseInfoRequest) flags() int { return isRead | isAlone } -func (*TransferLeaseRequest) flags() int { +// effect of the `skipsLeaseCheck` flag that lease write operations have. +func (*LeaseInfoRequest) flags() flag { return isRead | isAlone } +func (*TransferLeaseRequest) flags() flag { // TransferLeaseRequest requires the lease, which is checked in // `AdminTransferLease()` before the TransferLeaseRequest is created and sent // for evaluation and in the usual way at application time (i.e. @@ -1299,38 +1314,38 @@ func (*TransferLeaseRequest) flags() int { // command resulting from the evaluation of TransferLeaseRequest was // proposed). // - // But we're marking it with skipLeaseCheck because `redirectOnOrAcquireLease` + // But we're marking it with skipsLeaseCheck because `redirectOnOrAcquireLease` // can't be used before evaluation as, by the time that call would be made, // the store has registered that a transfer is in progress and // `redirectOnOrAcquireLease` would already tentatively redirect to the future // lease holder. - return isWrite | isAlone | skipLeaseCheck -} -func (*RecomputeStatsRequest) flags() int { return isWrite | isAlone } -func (*ComputeChecksumRequest) flags() int { return isWrite } -func (*CheckConsistencyRequest) flags() int { return isAdmin | isRange } -func (*ExportRequest) flags() int { return isRead | isRange | updatesTSCache } -func (*AdminScatterRequest) flags() int { return isAdmin | isRange | isAlone } -func (*AdminVerifyProtectedTimestampRequest) flags() int { return isAdmin | isRange | isAlone } -func (*AddSSTableRequest) flags() int { + return isWrite | isAlone | skipsLeaseCheck +} +func (*RecomputeStatsRequest) flags() flag { return isWrite | isAlone } +func (*ComputeChecksumRequest) flags() flag { return isWrite } +func (*CheckConsistencyRequest) flags() flag { return isAdmin | isRange | isAlone } +func (*ExportRequest) flags() flag { return isRead | isRange | updatesTSCache } +func (*AdminScatterRequest) flags() flag { return isAdmin | isRange | isAlone } +func (*AdminVerifyProtectedTimestampRequest) flags() flag { return isAdmin | isRange | isAlone } +func (*AddSSTableRequest) flags() flag { return isWrite | isRange | isAlone | isUnsplittable | canBackpressure } -func (*MigrateRequest) flags() int { return isWrite | isRange | isAlone } +func (*MigrateRequest) flags() flag { return isWrite | isRange | isAlone } // RefreshRequest and RefreshRangeRequest both determine which timestamp cache // they update based on their Write parameter. -func (r *RefreshRequest) flags() int { +func (r *RefreshRequest) flags() flag { return isRead | isTxn | updatesTSCache } -func (r *RefreshRangeRequest) flags() int { +func (r *RefreshRangeRequest) flags() flag { return isRead | isTxn | isRange | updatesTSCache } -func (*SubsumeRequest) flags() int { return isRead | isAlone | updatesTSCache } -func (*RangeStatsRequest) flags() int { return isRead } -func (*QueryResolvedTimestampRequest) flags() int { return isRead | isRange } -func (*ScanInterleavedIntentsRequest) flags() int { return isRead | isRange } -func (*BarrierRequest) flags() int { return isWrite | isRange } +func (*SubsumeRequest) flags() flag { return isRead | isAlone | updatesTSCache } +func (*RangeStatsRequest) flags() flag { return isRead } +func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange } +func (*ScanInterleavedIntentsRequest) flags() flag { return isRead | isRange } +func (*BarrierRequest) flags() flag { return isWrite | isRange } // IsParallelCommit returns whether the EndTxn request is attempting to perform // a parallel commit. See txn_interceptor_committer.go for a discussion about diff --git a/pkg/roachpb/api_test.go b/pkg/roachpb/api_test.go index 9f4279925035..d4a6a59880bc 100644 --- a/pkg/roachpb/api_test.go +++ b/pkg/roachpb/api_test.go @@ -14,9 +14,12 @@ import ( "reflect" "testing" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/redact" + gogoproto "github.com/gogo/protobuf/proto" + "github.com/golang/protobuf/proto" // nolint deprecated, but required for Protobuf v1 reflection "github.com/stretchr/testify/require" ) @@ -306,3 +309,43 @@ func TestTenantConsumptionAddSub(t *testing.T) { t.Errorf("expected\n%#v\ngot\n%#v", exp, c) } } + +// TestFlagCombinations tests that flag dependencies and exclusions as specified +// in flagDependencies and flagExclusions are satisfied by all requests. +func TestFlagCombinations(t *testing.T) { + // Any non-zero-valued request variants that conditionally affect flags. + reqVariants := []Request{ + &DeleteRangeRequest{Inline: true}, + &GetRequest{KeyLocking: lock.Exclusive}, + &ReverseScanRequest{KeyLocking: lock.Exclusive}, + &ScanRequest{KeyLocking: lock.Exclusive}, + } + + reqTypes := []Request{} + oneofFields := proto.MessageReflect(&RequestUnion{}).Descriptor().Oneofs().Get(0).Fields() + for i := 0; i < oneofFields.Len(); i++ { + msgName := string(oneofFields.Get(i).Message().FullName()) + msgType := gogoproto.MessageType(msgName).Elem() + require.NotNil(t, msgType, "unknown message type %s", msgName) + reqTypes = append(reqTypes, reflect.New(msgType).Interface().(Request)) + } + + for _, req := range append(reqTypes, reqVariants...) { + name := reflect.TypeOf(req).Elem().Name() + flags := req.flags() + for flag, deps := range flagDependencies { + if flags&flag != 0 { + for _, dep := range deps { + require.NotZero(t, flags&dep, "%s has flag %d but not dependant flag %d", name, flag, dep) + } + } + } + for flag, excls := range flagExclusions { + if flags&flag != 0 { + for _, excl := range excls { + require.Zero(t, flags&excl, "%s flag %d cannot be combined with flag %d", name, flag, excl) + } + } + } + } +} diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 5bcf5d3cef1d..b31ac9203e6e 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -185,10 +185,10 @@ func (ba *BatchRequest) IsSingleRequest() bool { return len(ba.Requests) == 1 } -// IsSingleSkipLeaseCheckRequest returns true iff the batch contains a single -// request, and that request has the skipLeaseCheck flag set. -func (ba *BatchRequest) IsSingleSkipLeaseCheckRequest() bool { - return ba.IsSingleRequest() && ba.hasFlag(skipLeaseCheck) +// IsSingleSkipsLeaseCheckRequest returns true iff the batch contains a single +// request, and that request has the skipsLeaseCheck flag set. +func (ba *BatchRequest) IsSingleSkipsLeaseCheckRequest() bool { + return ba.IsSingleRequest() && ba.hasFlag(skipsLeaseCheck) } func (ba *BatchRequest) isSingleRequestWithMethod(m Method) bool { @@ -339,7 +339,7 @@ func (ba *BatchRequest) GetPrevLeaseForLeaseRequest() Lease { // hasFlag returns true iff one of the requests within the batch contains the // specified flag. -func (ba *BatchRequest) hasFlag(flag int) bool { +func (ba *BatchRequest) hasFlag(flag flag) bool { for _, union := range ba.Requests { if (union.GetInner().flags() & flag) != 0 { return true @@ -350,7 +350,7 @@ func (ba *BatchRequest) hasFlag(flag int) bool { // hasFlagForAll returns true iff all of the requests within the batch contains // the specified flag. -func (ba *BatchRequest) hasFlagForAll(flag int) bool { +func (ba *BatchRequest) hasFlagForAll(flag flag) bool { if len(ba.Requests) == 0 { return false } @@ -534,7 +534,7 @@ func (ba *BatchRequest) Methods() []Method { // read that acquired a latch @ ts10 can't simply be bumped to ts 20 because // there might have been overlapping writes in the 10..20 window). func (ba BatchRequest) Split(canSplitET bool) [][]RequestUnion { - compatible := func(exFlags, newFlags int) bool { + compatible := func(exFlags, newFlags flag) bool { // isAlone requests are never compatible. if (exFlags&isAlone) != 0 || (newFlags&isAlone) != 0 { return false @@ -557,7 +557,7 @@ func (ba BatchRequest) Split(canSplitET bool) [][]RequestUnion { var parts [][]RequestUnion for len(ba.Requests) > 0 { part := ba.Requests - var gFlags, hFlags = -1, -1 + var gFlags, hFlags flag = -1, -1 for i, union := range ba.Requests { args := union.GetInner() flags := args.flags() diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 736d99eafae1..64613ff727c3 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -1732,6 +1732,8 @@ func TestLint(t *testing.T) { stream.GrepNot(`pkg/.*.go:.* func .*\.Cause is unused`), // Using deprecated WireLength call. stream.GrepNot(`pkg/rpc/stats_handler.go:.*v.WireLength is deprecated: This field is never set.*`), + // roachpb/api.go needs v1 Protobuf reflection + stream.GrepNot(`pkg/roachpb/api_test.go:.*package github.com/golang/protobuf/proto is deprecated: Use the "google.golang.org/protobuf/proto" package instead.`), // rpc/codec.go imports the same proto package that grpc-go imports (as of crdb@dd87d1145 and grpc-go@7b167fd6). stream.GrepNot(`pkg/rpc/codec.go:.*package github.com/golang/protobuf/proto is deprecated: Use the "google.golang.org/protobuf/proto" package instead.`), // goschedstats contains partial copies of go runtime structures, with