Skip to content

Commit

Permalink
roachpb: assert request flag combinations
Browse files Browse the repository at this point in the history
Request flags have implicit dependencies and incompatibilities (e.g.
`isLocking` implies `isTxn`). However, these were never checked and
developers were expected to satisfy them manually, which is error-prone.

This patch adds `TestFlagCombinations` that checks these dependencies
and incompatibilities, based on `flagDependencies` and `flagExclusions`
maps which encodes them.

It also adds a new `flag` type for flags, renames `skipLeaseCheck` to
`skipsLeaseCheck`, and adds `isAlone` for `CheckConsistencyRequest`.

Release note: None
  • Loading branch information
erikgrinaker committed Nov 22, 2021
1 parent ca90ff7 commit 0239068
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 77 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,7 @@ func (r *Replica) checkExecutionCanProceed(
st = kvserverpb.LeaseStatus{
Now: now,
}
} else if ba.IsSingleSkipLeaseCheckRequest() {
} else if ba.IsSingleSkipsLeaseCheckRequest() {
// For lease commands, use the provided previous lease for verification.
st = kvserverpb.LeaseStatus{
Lease: ba.GetPrevLeaseForLeaseRequest(),
Expand Down
2 changes: 2 additions & 0 deletions pkg/roachpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ go_test(
"@com_github_cockroachdb_apd_v2//:apd",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_kr_pretty//:pretty",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
151 changes: 83 additions & 68 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,40 @@ func (rc ReadConsistencyType) SupportsBatch(ba BatchRequest) error {
panic("unreachable")
}

type flag int

const (
isAdmin = 1 << iota // admin cmds don't go through raft, but run on lease holder
isRead // read-only cmds don't go through raft, but may run on lease holder
isWrite // write cmds go through raft and must be proposed on lease holder
isTxn // txn commands may be part of a transaction
isLocking // locking cmds acquire locks for their transaction (implies isTxn)
isIntentWrite // intent write cmds leave intents when they succeed (implies isWrite and isLocking)
isRange // range commands may span multiple keys
isReverse // reverse commands traverse ranges in descending direction
isAlone // requests which must be alone in a batch
isPrefix // requests which should be grouped with the next request in a batch
isUnsplittable // range command that must not be split during sending
skipLeaseCheck // commands which skip the check that the evaluating replica has a valid lease
updatesTSCache // commands which update the timestamp cache
updatesTSCacheOnErr // commands which make read data available on errors
needsRefresh // commands which require refreshes to avoid serializable retries
canBackpressure // commands which deserve backpressure when a Range grows too large
isAdmin flag = 1 << iota // admin cmds don't go through raft, but run on lease holder
isRead // read-only cmds don't go through raft, but may run on lease holder
isWrite // write cmds go through raft and must be proposed on lease holder
isTxn // txn commands may be part of a transaction
isLocking // locking cmds acquire locks for their transaction
isIntentWrite // intent write cmds leave intents when they succeed
isRange // range commands may span multiple keys
isReverse // reverse commands traverse ranges in descending direction
isAlone // requests which must be alone in a batch
isPrefix // requests which, in a batch, must not be split from the following request
isUnsplittable // range command that must not be split during sending
skipsLeaseCheck // commands which skip the check that the evaluating replica has a valid lease
updatesTSCache // commands which update the timestamp cache
updatesTSCacheOnErr // commands which make read data available on errors
needsRefresh // commands which require refreshes to avoid serializable retries
canBackpressure // commands which deserve backpressure when a Range grows too large
)

// flagDependencies specifies flag dependencies, asserted by TestFlagCombinations.
var flagDependencies = map[flag][]flag{
isAdmin: {isAlone},
isLocking: {isTxn},
isIntentWrite: {isWrite, isLocking},
skipsLeaseCheck: {isAlone},
}

// flagExclusions specifies flag incompatibilities, asserted by TestFlagCombinations.
var flagExclusions = map[flag][]flag{
skipsLeaseCheck: {isIntentWrite},
}

// IsReadOnly returns true iff the request is read-only. A request is
// read-only if it does not go through raft, meaning that it cannot
// change any replicated state. However, read-only requests may still
Expand Down Expand Up @@ -187,7 +202,7 @@ type Request interface {
Method() Method
// ShallowCopy returns a shallow copy of the receiver.
ShallowCopy() Request
flags() int
flags() flag
}

// SizedWriteRequest is an interface used to expose the number of bytes a
Expand Down Expand Up @@ -1160,19 +1175,19 @@ func scanLockStrength(forUpdate bool) lock.Strength {
return lock.None
}

func flagForLockStrength(l lock.Strength) int {
func flagForLockStrength(l lock.Strength) flag {
if l != lock.None {
return isLocking
}
return 0
}

func (gr *GetRequest) flags() int {
func (gr *GetRequest) flags() flag {
maybeLocking := flagForLockStrength(gr.KeyLocking)
return isRead | isTxn | maybeLocking | updatesTSCache | needsRefresh
}

func (*PutRequest) flags() int {
func (*PutRequest) flags() flag {
return isWrite | isTxn | isLocking | isIntentWrite | canBackpressure
}

Expand All @@ -1181,7 +1196,7 @@ func (*PutRequest) flags() int {
// ConditionalPuts do not require a refresh because on write-too-old errors,
// they return an error immediately instead of continuing a serializable
// transaction to be retried at end transaction.
func (*ConditionalPutRequest) flags() int {
func (*ConditionalPutRequest) flags() flag {
return isRead | isWrite | isTxn | isLocking | isIntentWrite | updatesTSCache | updatesTSCacheOnErr | canBackpressure
}

Expand All @@ -1190,7 +1205,7 @@ func (*ConditionalPutRequest) flags() int {
// InitPuts do not require a refresh because on write-too-old errors, they
// return an error immediately instead of continuing a serializable transaction
// to be retried at end transaction.
func (*InitPutRequest) flags() int {
func (*InitPutRequest) flags() flag {
return isRead | isWrite | isTxn | isLocking | isIntentWrite | updatesTSCache | updatesTSCacheOnErr | canBackpressure
}

Expand All @@ -1199,15 +1214,15 @@ func (*InitPutRequest) flags() int {
// require a refresh because on write-too-old errors, they return an
// error immediately instead of continuing a serializable transaction
// to be retried at end transaction.
func (*IncrementRequest) flags() int {
func (*IncrementRequest) flags() flag {
return isRead | isWrite | isTxn | isLocking | isIntentWrite | canBackpressure
}

func (*DeleteRequest) flags() int {
func (*DeleteRequest) flags() flag {
return isWrite | isTxn | isLocking | isIntentWrite | canBackpressure
}

func (drr *DeleteRangeRequest) flags() int {
func (drr *DeleteRangeRequest) flags() flag {
// DeleteRangeRequest has different properties if the "inline" flag is set.
// This flag indicates that the request is deleting inline MVCC values,
// which cannot be deleted transactionally - inline DeleteRange will thus
Expand Down Expand Up @@ -1236,101 +1251,101 @@ func (drr *DeleteRangeRequest) flags() int {

// Note that ClearRange commands cannot be part of a transaction as
// they clear all MVCC versions.
func (*ClearRangeRequest) flags() int { return isWrite | isRange | isAlone }
func (*ClearRangeRequest) flags() flag { return isWrite | isRange | isAlone }

// Note that RevertRange commands cannot be part of a transaction as
// they clear all MVCC versions above their target time.
func (*RevertRangeRequest) flags() int { return isWrite | isRange }
func (*RevertRangeRequest) flags() flag { return isWrite | isRange }

func (sr *ScanRequest) flags() int {
func (sr *ScanRequest) flags() flag {
maybeLocking := flagForLockStrength(sr.KeyLocking)
return isRead | isRange | isTxn | maybeLocking | updatesTSCache | needsRefresh
}

func (rsr *ReverseScanRequest) flags() int {
func (rsr *ReverseScanRequest) flags() flag {
maybeLocking := flagForLockStrength(rsr.KeyLocking)
return isRead | isRange | isReverse | isTxn | maybeLocking | updatesTSCache | needsRefresh
}

// EndTxn updates the timestamp cache to prevent replays.
// Replays for the same transaction key and timestamp will have
// Txn.WriteTooOld=true and must retry on EndTxn.
func (*EndTxnRequest) flags() int { return isWrite | isTxn | isAlone | updatesTSCache }
func (*AdminSplitRequest) flags() int { return isAdmin | isAlone }
func (*AdminUnsplitRequest) flags() int { return isAdmin | isAlone }
func (*AdminMergeRequest) flags() int { return isAdmin | isAlone }
func (*AdminTransferLeaseRequest) flags() int { return isAdmin | isAlone }
func (*AdminChangeReplicasRequest) flags() int { return isAdmin | isAlone }
func (*AdminRelocateRangeRequest) flags() int { return isAdmin | isAlone }
func (*GCRequest) flags() int { return isWrite | isRange }
func (*EndTxnRequest) flags() flag { return isWrite | isTxn | isAlone | updatesTSCache }
func (*AdminSplitRequest) flags() flag { return isAdmin | isAlone }
func (*AdminUnsplitRequest) flags() flag { return isAdmin | isAlone }
func (*AdminMergeRequest) flags() flag { return isAdmin | isAlone }
func (*AdminTransferLeaseRequest) flags() flag { return isAdmin | isAlone }
func (*AdminChangeReplicasRequest) flags() flag { return isAdmin | isAlone }
func (*AdminRelocateRangeRequest) flags() flag { return isAdmin | isAlone }
func (*GCRequest) flags() flag { return isWrite | isRange }

// HeartbeatTxn updates the timestamp cache with transaction records,
// to avoid checking for them on disk when considering 1PC evaluation.
func (*HeartbeatTxnRequest) flags() int { return isWrite | isTxn | updatesTSCache }
func (*HeartbeatTxnRequest) flags() flag { return isWrite | isTxn | updatesTSCache }

// PushTxnRequest updates different marker keys in the timestamp cache when
// pushing a transaction's timestamp and when aborting a transaction.
func (*PushTxnRequest) flags() int {
func (*PushTxnRequest) flags() flag {
return isWrite | isAlone | updatesTSCache | updatesTSCache
}
func (*RecoverTxnRequest) flags() int { return isWrite | isAlone | updatesTSCache }
func (*QueryTxnRequest) flags() int { return isRead | isAlone }
func (*RecoverTxnRequest) flags() flag { return isWrite | isAlone | updatesTSCache }
func (*QueryTxnRequest) flags() flag { return isRead | isAlone }

// QueryIntent only updates the timestamp cache when attempting to prevent an
// intent that is found missing from ever being written in the future. See
// QueryIntentRequest_PREVENT.
func (*QueryIntentRequest) flags() int {
func (*QueryIntentRequest) flags() flag {
return isRead | isPrefix | updatesTSCache | updatesTSCacheOnErr
}
func (*ResolveIntentRequest) flags() int { return isWrite }
func (*ResolveIntentRangeRequest) flags() int { return isWrite | isRange }
func (*TruncateLogRequest) flags() int { return isWrite }
func (*MergeRequest) flags() int { return isWrite | canBackpressure }
func (*RequestLeaseRequest) flags() int { return isWrite | isAlone | skipLeaseCheck }
func (*ResolveIntentRequest) flags() flag { return isWrite }
func (*ResolveIntentRangeRequest) flags() flag { return isWrite | isRange }
func (*TruncateLogRequest) flags() flag { return isWrite }
func (*MergeRequest) flags() flag { return isWrite | canBackpressure }
func (*RequestLeaseRequest) flags() flag { return isWrite | isAlone | skipsLeaseCheck }

// LeaseInfoRequest is usually executed in an INCONSISTENT batch, which has the
// effect of the `skipLeaseCheck` flag that lease write operations have.
func (*LeaseInfoRequest) flags() int { return isRead | isAlone }
func (*TransferLeaseRequest) flags() int {
// effect of the `skipsLeaseCheck` flag that lease write operations have.
func (*LeaseInfoRequest) flags() flag { return isRead | isAlone }
func (*TransferLeaseRequest) flags() flag {
// TransferLeaseRequest requires the lease, which is checked in
// `AdminTransferLease()` before the TransferLeaseRequest is created and sent
// for evaluation and in the usual way at application time (i.e.
// replica.processRaftCommand() checks that the lease hasn't changed since the
// command resulting from the evaluation of TransferLeaseRequest was
// proposed).
//
// But we're marking it with skipLeaseCheck because `redirectOnOrAcquireLease`
// But we're marking it with skipsLeaseCheck because `redirectOnOrAcquireLease`
// can't be used before evaluation as, by the time that call would be made,
// the store has registered that a transfer is in progress and
// `redirectOnOrAcquireLease` would already tentatively redirect to the future
// lease holder.
return isWrite | isAlone | skipLeaseCheck
}
func (*RecomputeStatsRequest) flags() int { return isWrite | isAlone }
func (*ComputeChecksumRequest) flags() int { return isWrite }
func (*CheckConsistencyRequest) flags() int { return isAdmin | isRange }
func (*ExportRequest) flags() int { return isRead | isRange | updatesTSCache }
func (*AdminScatterRequest) flags() int { return isAdmin | isRange | isAlone }
func (*AdminVerifyProtectedTimestampRequest) flags() int { return isAdmin | isRange | isAlone }
func (*AddSSTableRequest) flags() int {
return isWrite | isAlone | skipsLeaseCheck
}
func (*RecomputeStatsRequest) flags() flag { return isWrite | isAlone }
func (*ComputeChecksumRequest) flags() flag { return isWrite }
func (*CheckConsistencyRequest) flags() flag { return isAdmin | isRange | isAlone }
func (*ExportRequest) flags() flag { return isRead | isRange | updatesTSCache }
func (*AdminScatterRequest) flags() flag { return isAdmin | isRange | isAlone }
func (*AdminVerifyProtectedTimestampRequest) flags() flag { return isAdmin | isRange | isAlone }
func (*AddSSTableRequest) flags() flag {
return isWrite | isRange | isAlone | isUnsplittable | canBackpressure
}
func (*MigrateRequest) flags() int { return isWrite | isRange | isAlone }
func (*MigrateRequest) flags() flag { return isWrite | isRange | isAlone }

// RefreshRequest and RefreshRangeRequest both determine which timestamp cache
// they update based on their Write parameter.
func (r *RefreshRequest) flags() int {
func (r *RefreshRequest) flags() flag {
return isRead | isTxn | updatesTSCache
}
func (r *RefreshRangeRequest) flags() int {
func (r *RefreshRangeRequest) flags() flag {
return isRead | isTxn | isRange | updatesTSCache
}

func (*SubsumeRequest) flags() int { return isRead | isAlone | updatesTSCache }
func (*RangeStatsRequest) flags() int { return isRead }
func (*QueryResolvedTimestampRequest) flags() int { return isRead | isRange }
func (*ScanInterleavedIntentsRequest) flags() int { return isRead | isRange }
func (*BarrierRequest) flags() int { return isWrite | isRange }
func (*SubsumeRequest) flags() flag { return isRead | isAlone | updatesTSCache }
func (*RangeStatsRequest) flags() flag { return isRead }
func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange }
func (*ScanInterleavedIntentsRequest) flags() flag { return isRead | isRange }
func (*BarrierRequest) flags() flag { return isWrite | isRange }

// IsParallelCommit returns whether the EndTxn request is attempting to perform
// a parallel commit. See txn_interceptor_committer.go for a discussion about
Expand Down
43 changes: 43 additions & 0 deletions pkg/roachpb/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ import (
"reflect"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/redact"
gogoproto "github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/proto" // nolint deprecated, but required for Protobuf v1 reflection
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -306,3 +309,43 @@ func TestTenantConsumptionAddSub(t *testing.T) {
t.Errorf("expected\n%#v\ngot\n%#v", exp, c)
}
}

// TestFlagCombinations tests that flag dependencies and exclusions as specified
// in flagDependencies and flagExclusions are satisfied by all requests.
func TestFlagCombinations(t *testing.T) {
// Any non-zero-valued request variants that conditionally affect flags.
reqVariants := []Request{
&DeleteRangeRequest{Inline: true},
&GetRequest{KeyLocking: lock.Exclusive},
&ReverseScanRequest{KeyLocking: lock.Exclusive},
&ScanRequest{KeyLocking: lock.Exclusive},
}

reqTypes := []Request{}
oneofFields := proto.MessageReflect(&RequestUnion{}).Descriptor().Oneofs().Get(0).Fields()
for i := 0; i < oneofFields.Len(); i++ {
msgName := string(oneofFields.Get(i).Message().FullName())
msgType := gogoproto.MessageType(msgName).Elem()
require.NotNil(t, msgType, "unknown message type %s", msgName)
reqTypes = append(reqTypes, reflect.New(msgType).Interface().(Request))
}

for _, req := range append(reqTypes, reqVariants...) {
name := reflect.TypeOf(req).Elem().Name()
flags := req.flags()
for flag, deps := range flagDependencies {
if flags&flag != 0 {
for _, dep := range deps {
require.NotZero(t, flags&dep, "%s has flag %d but not dependant flag %d", name, flag, dep)
}
}
}
for flag, excls := range flagExclusions {
if flags&flag != 0 {
for _, excl := range excls {
require.Zero(t, flags&excl, "%s flag %d cannot be combined with flag %d", name, flag, excl)
}
}
}
}
}
16 changes: 8 additions & 8 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ func (ba *BatchRequest) IsSingleRequest() bool {
return len(ba.Requests) == 1
}

// IsSingleSkipLeaseCheckRequest returns true iff the batch contains a single
// request, and that request has the skipLeaseCheck flag set.
func (ba *BatchRequest) IsSingleSkipLeaseCheckRequest() bool {
return ba.IsSingleRequest() && ba.hasFlag(skipLeaseCheck)
// IsSingleSkipsLeaseCheckRequest returns true iff the batch contains a single
// request, and that request has the skipsLeaseCheck flag set.
func (ba *BatchRequest) IsSingleSkipsLeaseCheckRequest() bool {
return ba.IsSingleRequest() && ba.hasFlag(skipsLeaseCheck)
}

func (ba *BatchRequest) isSingleRequestWithMethod(m Method) bool {
Expand Down Expand Up @@ -339,7 +339,7 @@ func (ba *BatchRequest) GetPrevLeaseForLeaseRequest() Lease {

// hasFlag returns true iff one of the requests within the batch contains the
// specified flag.
func (ba *BatchRequest) hasFlag(flag int) bool {
func (ba *BatchRequest) hasFlag(flag flag) bool {
for _, union := range ba.Requests {
if (union.GetInner().flags() & flag) != 0 {
return true
Expand All @@ -350,7 +350,7 @@ func (ba *BatchRequest) hasFlag(flag int) bool {

// hasFlagForAll returns true iff all of the requests within the batch contains
// the specified flag.
func (ba *BatchRequest) hasFlagForAll(flag int) bool {
func (ba *BatchRequest) hasFlagForAll(flag flag) bool {
if len(ba.Requests) == 0 {
return false
}
Expand Down Expand Up @@ -534,7 +534,7 @@ func (ba *BatchRequest) Methods() []Method {
// read that acquired a latch @ ts10 can't simply be bumped to ts 20 because
// there might have been overlapping writes in the 10..20 window).
func (ba BatchRequest) Split(canSplitET bool) [][]RequestUnion {
compatible := func(exFlags, newFlags int) bool {
compatible := func(exFlags, newFlags flag) bool {
// isAlone requests are never compatible.
if (exFlags&isAlone) != 0 || (newFlags&isAlone) != 0 {
return false
Expand All @@ -557,7 +557,7 @@ func (ba BatchRequest) Split(canSplitET bool) [][]RequestUnion {
var parts [][]RequestUnion
for len(ba.Requests) > 0 {
part := ba.Requests
var gFlags, hFlags = -1, -1
var gFlags, hFlags flag = -1, -1
for i, union := range ba.Requests {
args := union.GetInner()
flags := args.flags()
Expand Down
Loading

0 comments on commit 0239068

Please sign in to comment.