Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
73843: kvcoord: simplify txn already committed assertion r=nvanbenschoten a=tbg

- kvcoord: improve TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit
- kvcoord: simplify txn already committed assertion


73951: kvserver: improve definition of equivalence classes r=aayushshah15 a=aayushshah15

This commit makes an effort to make `equivalenceClass`es more well-defined.
Previously, we would coalesce the equivalence classes for any two existing
replicas if they shared the exact same locality hierarchy (including the node
ids). This logic became ineffective with cockroachdb#51567 since we disallow multiple
replicas on a single node under all circumstances.  Furthermore, coalescing the
equivalence classes for existing replicas did not buy us anything and instead
required a bunch of custom code for us to correctly deal with them.

This commit takes an opinionated approach and gets rid of the logic that
coalesces two existing replicas' equivalence classes together.

Release note: None


Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Aayush Shah <[email protected]>
  • Loading branch information
3 people committed Dec 20, 2021
3 parents 61077f8 + 1de1865 + 7d51027 commit 9838965
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 281 deletions.
1 change: 0 additions & 1 deletion pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ go_test(
"//pkg/kv/kvbase",
"//pkg/kv/kvclient/rangecache:with-mocks",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/batcheval",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/kvserverbase",
Expand Down
84 changes: 48 additions & 36 deletions pkg/kv/kvclient/kvcoord/replayed_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand Down Expand Up @@ -54,54 +55,65 @@ func TestCommitSanityCheckAssertionFiresOnUndetectedAmbiguousCommit(t *testing.T

ctx := context.Background()
var args base.TestClusterArgs
args.ServerArgs.Knobs.KVClient = &kvcoord.ClientTestingKnobs{TransportFactory: func(
options kvcoord.SendOptions,
dialer *nodedialer.Dialer,
slice kvcoord.ReplicaSlice,
) (kvcoord.Transport, error) {
tf, err := kvcoord.GRPCTransportFactory(options, dialer, slice)
if err != nil {
return nil, err
}
return &interceptingTransport{
Transport: tf,
intercept: func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse, err error) (*roachpb.BatchResponse, error) {
if err != nil || ba.Txn == nil || br.Txn == nil ||
ba.Txn.Status != roachpb.PENDING || br.Txn.Status != roachpb.COMMITTED ||
!keys.ScratchRangeMin.Equal(br.Txn.Key) {
// Only want to inject error on successful commit for "our" txn.
return br, err
}
err = circuit.ErrBreakerOpen
assert.True(t, grpcutil.RequestDidNotStart(err)) // avoid Fatal on goroutine
args.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
IntentResolverKnobs: kvserverbase.IntentResolverTestingKnobs{
// Disable async intent resolution, as it could possibly GC the txn record
// out from under us, leading to the retried commit taking a path
// different from the one we want to exercise in this test.
DisableAsyncIntentResolution: true,
},
}
args.ServerArgs.Knobs.KVClient = &kvcoord.ClientTestingKnobs{
TransportFactory: func(
options kvcoord.SendOptions,
dialer *nodedialer.Dialer,
slice kvcoord.ReplicaSlice,
) (kvcoord.Transport, error) {
tf, err := kvcoord.GRPCTransportFactory(options, dialer, slice)
if err != nil {
return nil, err
},
}, nil
},
}
return &interceptingTransport{
Transport: tf,
intercept: func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse, err error) (*roachpb.BatchResponse, error) {
if err != nil || ba.Txn == nil || br.Txn == nil ||
ba.Txn.Status != roachpb.PENDING || br.Txn.Status != roachpb.COMMITTED ||
!keys.ScratchRangeMin.Equal(br.Txn.Key) {
// Only want to inject error on successful commit for "our" txn.
return br, err
}
err = circuit.ErrBreakerOpen
assert.True(t, grpcutil.RequestDidNotStart(err)) // avoid Fatal on goroutine
return nil, err
},
}, nil
},
// Turn the assertion into an error returned via the txn.
DisableCommitSanityCheck: true,
}

tc := testcluster.StartTestCluster(t, 1, args)
defer tc.Stopper().Stop(ctx)

// Txn record GC populates txn tscache which prevents second commit
// attempt from hitting TransactionStatusError(alreadyCommitted).
defer batcheval.TestingSetTxnAutoGC(false)()
{
// Turn the assertion into an error.
prev := kvcoord.DisableCommitSanityCheck
kvcoord.DisableCommitSanityCheck = true
defer func() {
kvcoord.DisableCommitSanityCheck = prev
}()
}

k := tc.ScratchRange(t)
kNext := k.Next()
require.Equal(t, keys.ScratchRangeMin, k) // interceptor above relies on this
tc.SplitRangeOrFatal(t, kNext)

err := tc.Server(0).DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
_ = txn.DisablePipelining() // keep it simple
if err := txn.Put(ctx, k, "hello"); err != nil {
t.Log(err)
return err
}
// We need to give the txn an external lock (i.e. one on a different range),
// or we'll auto-GC the txn record on the first commit attempt, preventing
// the second one from getting the "desired"
// TransactionStatusError(alreadyCommitted).
if err := txn.Put(ctx, kNext, "hullo"); err != nil {
t.Log(err)
return err
}
err := txn.Commit(ctx) // hits fatal error
t.Log(err)
return err
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type ClientTestingKnobs struct {
// the descriptor, instead of trying to reorder them by latency. The knob
// only applies to requests sent with the LEASEHOLDER routing policy.
DontReorderReplicas bool

// DisableCommitSanityCheck allows "setting" the DisableCommitSanityCheck to
// true without actually overriding the variable.
DisableCommitSanityCheck bool
}

var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}
Expand Down
44 changes: 26 additions & 18 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,39 +916,47 @@ func (tc *TxnCoordSender) updateStateLocked(

// Update our transaction with any information the error has.
if errTxn := pErr.GetTxn(); errTxn != nil {
if errTxn.Status == roachpb.COMMITTED {
pErr = sanityCheckCommittedErr(ctx, pErr, ba)
if err := sanityCheckErrWithTxn(ctx, pErr, ba, &tc.testingKnobs); err != nil {
return roachpb.NewError(err)
}
tc.mu.txn.Update(errTxn)
}
return pErr
}

// sanityCheckCommittedErr verifies the circumstances in which we're receiving
// an error indicating a COMMITTED transaction. Only rollbacks should be
// encountering such errors. Marking a transaction as explicitly-committed can
// also encounter these errors, but those errors don't make it to the
// TxnCoordSender.
// sanityCheckErrWithTxn verifies whether the error (which must have a txn
// attached) contains a COMMITTED transaction. Only rollbacks should be able to
// encounter such errors. Marking a transaction as explicitly-committed can also
// encounter these errors, but those errors don't make it to the TxnCoordSender.
//
// Returns the passed-in error or fatals (depending on DisableCommitSanityCheck env var),
// wrapping the input error in case of an assertion violation.
// Returns the passed-in error or fatals (depending on DisableCommitSanityCheck
// env var), wrapping the input error in case of an assertion violation.
//
// Requires: pErrWithCommittedTxn is non-nil and GetTxn() is also non-nil.
func sanityCheckCommittedErr(
ctx context.Context, pErrWithCommittedTxn *roachpb.Error, ba roachpb.BatchRequest,
) *roachpb.Error {
// The assertion is known to have failed in the wild, see:
// https://github.com/cockroachdb/cockroach/issues/67765
func sanityCheckErrWithTxn(
ctx context.Context,
pErrWithTxn *roachpb.Error,
ba roachpb.BatchRequest,
knobs *ClientTestingKnobs,
) error {
txn := pErrWithTxn.GetTxn()
if txn.Status != roachpb.COMMITTED {
return nil
}
// The only case in which an error can have a COMMITTED transaction in it is
// when the request was a rollback. Rollbacks can race with commits if a
// context timeout expires while a commit request is in flight.
if ba.IsSingleAbortTxnRequest() {
return pErrWithCommittedTxn
return nil
}

// Finding out about our transaction being committed indicates a serious bug.
// Requests are not supposed to be sent on transactions after they are
// committed.
err := errors.Wrapf(pErrWithCommittedTxn.GoError(),
err := errors.Wrapf(pErrWithTxn.GoError(),
"transaction unexpectedly committed, ba: %s. txn: %s",
ba, pErrWithCommittedTxn.GetTxn(),
ba, pErrWithTxn.GetTxn(),
)
err = errors.WithAssertionFailure(
errors.WithIssueLink(err, errors.IssueLink{
Expand All @@ -958,10 +966,10 @@ func sanityCheckCommittedErr(
"This assertion can be disabled by setting the environment variable " +
"COCKROACH_DISABLE_COMMIT_SANITY_CHECK=true",
}))
if !DisableCommitSanityCheck {
if !DisableCommitSanityCheck && !knobs.DisableCommitSanityCheck {
log.Fatalf(ctx, "%s", err)
}
return roachpb.NewError(err)
return err
}

// setTxnAnchorKey sets the key at which to anchor the transaction record. The
Expand Down
11 changes: 5 additions & 6 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,13 +1077,12 @@ func (a Allocator) rebalanceTarget(
return zero, zero, "", false
}
// Keep looping until we either run out of options or find a target that we're
// pretty sure we won't want to remove immediately after adding it.
// If we would, we don't want to actually rebalance to that target.
var target *candidate
// pretty sure we won't want to remove immediately after adding it. If we
// would, we don't want to actually rebalance to that target.
var target, existingCandidate *candidate
var removeReplica roachpb.ReplicaDescriptor
var existingCandidates candidateList
for {
target, existingCandidates = bestRebalanceTarget(a.randGen, results)
target, existingCandidate = bestRebalanceTarget(a.randGen, results)
if target == nil {
return zero, zero, "", false
}
Expand Down Expand Up @@ -1146,7 +1145,7 @@ func (a Allocator) rebalanceTarget(
// debugging/auditability purposes.
dDetails := decisionDetails{
Target: target.compactString(),
Existing: existingCandidates.compactString(options),
Existing: existingCandidate.compactString(),
}
detailsBytes, err := json.Marshal(dDetails)
if err != nil {
Expand Down
Loading

0 comments on commit 9838965

Please sign in to comment.