Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…achdb#108071 cockroachdb#108085

107882: kv: don't let pusher win when pushing STAGING txn with equal priority r=miraradeva a=nvanbenschoten

Fixes cockroachdb#105330.
Fixes cockroachdb#101721.

This commit updates the transaction push logic to stop pushes from completing successfully when the pushee is STAGING and the pusher has equal priority. Prior to this change, the pusher would win in these cases when using a PUSH_TIMESTAMP if at least one of the two transactions involved used a weak isolation level.

This had two undesirable effects:
- if the pushee was still possibly committable and requiring recovery (`!(knownHigherTimestamp || knownHigherEpoch)` in the code) then the pusher would immediately begin parallel commit recovery, attempting to disrupt the commit and abort the pushee. This is undesirable because the pushee may still be in progress and in cases of equal priority, we'd like to wait for the parallel commit to complete before kicking off recovery, deferring recovery to only the cases where the pushee/committers's heartbeat has expired.
- if the pushee was known to be uncommittable (`knownHigherTimestamp || knownHigherEpoch` in the code), then txn recovery was not kicked off but the pusher still could not perform the PUSH_TIMESTAMP (see e40c1b4), so it would return a `TransactionPushError`. This confused logic in `handleTransactionPushError`, allowing the error to escape to the client.

This commit resolves both issues by considering the pushee's transaction status in `txnwait.ShouldPushImmediately`.

Release note: None

107962: docs: do not generate docs for crdb_internal functions r=knz a=rafiss

Epic: None
Release note: None

108062: server: Add rangefeed metrics r=miretskiy a=miretskiy

Add rangefeed related metrics to keep track of the number of actively running rangefeeds on the server.

Epic: CRDB-26372
Release note: None

108071: stmtdiagnostics: use background context when building the bundle r=yuzefovich a=yuzefovich

When the context is canceled, we still want to build the bundle as best as possible. Over in 532274b we introduced the usage of the background context in order to insert the bundle into the system tables, but we still built the bundle with the canceled context. This commit fixes that oversight - in particular, we should now get `env.sql` correctly.

Informs: https://github.com/cockroachlabs/support/issues/2494.
Epic: None

Release note: None

108085: builtins: improve docs and refactor code for unordered_unique_id r=rafiss,rharding6373 a=andyyang890

**builtins: improve documentation for unordered_unique_rowid**

This patch clarifies that `unordered_unique_rowid` generates unique IDs
that are statistically likely to be unordered because it bit-reverses
the insert timestamp for use as the ID's timestamp bit segment.

Release note: None

----

**builtinconstants: define constants for unique int bit segments**

This patch defines constants for the sizes and bitmasks for each
bit segment in a unique int generated for row IDs.

Release note: None

----

**builtins: refactor bit shifting logic in mapToUnorderedUniqueInt**

This patch refactors the bit shifting logic in `mapToUnorderedUniqueInt`
to use constants instead of magic numbers.

Release note: None

----

Epic: None


Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Andy Yang <[email protected]>
  • Loading branch information
6 people committed Aug 3, 2023
6 parents dc28e19 + 90516db + 846c348 + 0287bb7 + 7d9a30f + a7fe6cf commit b9f3f15
Show file tree
Hide file tree
Showing 18 changed files with 423 additions and 446 deletions.
321 changes: 1 addition & 320 deletions docs/generated/sql/functions.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/cmd/docgen/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func generateFunctions(from []string, categorize bool) []byte {
// NB: funcs can appear more than once i.e. upper/lowercase variants for
// faster lookups, so normalize to lowercase and de-dupe using a set.
name = strings.ToLower(name)
if _, ok := seen[name]; ok {
if _, ok := seen[name]; ok || strings.HasPrefix(name, "crdb_internal.") {
continue
}
seen[name] = struct{}{}
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ func makeDistSenderRangeFeedMetrics() DistSenderRangeFeedMetrics {
}
}

// MetricStruct implements metrics.Struct interface.
func (DistSenderRangeFeedMetrics) MetricStruct() {}

// updateCrossLocalityMetricsOnReplicaAddressedBatchRequest updates
// DistSenderMetrics for batch requests that have been divided and are currently
// forwarding to a specific replica for the corresponding range. The metrics
Expand Down
10 changes: 0 additions & 10 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1232,16 +1232,6 @@ func (v *validator) failIfError(
) (ambiguous, hasError bool) {
exceptions = append(exceptions[:len(exceptions):len(exceptions)], func(err error) bool {
return errors.Is(err, errInjected)
}, func(err error) bool {
// Work-around for [1].
//
// TODO(arul): find out why we (as of [2]) sometimes leaking
// *TransactionPushError (wrapped in `UnhandledRetryableError`) from
// `db.Get`, `db.Scan`, etc.
//
// [1]: https://github.com/cockroachdb/cockroach/issues/105330
// [2]: https://github.com/cockroachdb/cockroach/pull/97779
return errors.HasType(err, (*kvpb.UnhandledRetryableError)(nil))
})
switch r.Type {
case ResultType_Unknown:
Expand Down
67 changes: 35 additions & 32 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/must"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
Expand Down Expand Up @@ -213,38 +214,9 @@ func PushTxn(
}
reply.PusheeTxn.UpgradePriority(args.PusheeTxn.Priority)

// If the pusher is aware that the pushee's currently recorded attempt at a
// parallel commit failed, either because it found intents at a higher
// timestamp than the parallel commit attempt or because it found intents at
// a higher epoch than the parallel commit attempt, it should not consider
// the pushee to be performing a parallel commit. Its commit status is not
// indeterminate.
if (knownHigherTimestamp || knownHigherEpoch) && reply.PusheeTxn.Status == roachpb.STAGING {
reply.PusheeTxn.Status = roachpb.PENDING
reply.PusheeTxn.InFlightWrites = nil
// If the pusher is aware that the pushee's currently recorded attempt
// at a parallel commit failed, upgrade PUSH_TIMESTAMPs to PUSH_ABORTs.
// We don't want to move the transaction back to PENDING, as this is not
// (currently) allowed by the recovery protocol. We also don't want to
// move the transaction to a new timestamp while retaining the STAGING
// status, as this could allow the transaction to enter an implicit
// commit state without its knowledge, leading to atomicity violations.
//
// This has no effect on pushes that fail with a TransactionPushError.
// Such pushes will still wait on the pushee to retry its commit and
// eventually commit or abort. It also has no effect on expired pushees,
// as they would have been aborted anyway. This only impacts pushes
// which would have succeeded due to priority mismatches. In these
// cases, the push acts the same as a short-circuited transaction
// recovery process, because the transaction recovery procedure always
// finalizes target transactions, even if initiated by a PUSH_TIMESTAMP.
if pushType == kvpb.PUSH_TIMESTAMP {
pushType = kvpb.PUSH_ABORT
}
}

pusherIso, pusheeIso := args.PusherTxn.IsoLevel, reply.PusheeTxn.IsoLevel
pusherPri, pusheePri := args.PusherTxn.Priority, reply.PusheeTxn.Priority
pusheeStatus := reply.PusheeTxn.Status
var pusherWins bool
var reason string
switch {
Expand All @@ -258,7 +230,7 @@ func PushTxn(
// If just attempting to cleanup old or already-committed txns,
// pusher always fails.
pusherWins = false
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri):
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus):
reason = "pusher has priority"
pusherWins = true
case args.Force:
Expand All @@ -282,13 +254,42 @@ func PushTxn(
// If the pushed transaction is in the staging state, we can't change its
// record without first going through the transaction recovery process and
// attempting to finalize it.
pusheeStaging := pusheeStatus == roachpb.STAGING
// However, if the pusher is aware that the pushee's currently recorded
// attempt at a parallel commit failed, either because it found intents at a
// higher timestamp than the parallel commit attempt or because it found
// intents at a higher epoch than the parallel commit attempt, it should not
// consider the pushee to be performing a parallel commit. Its commit status
// is not indeterminate.
pusheeStagingFailed := pusheeStaging && (knownHigherTimestamp || knownHigherEpoch)
recoverOnFailedPush := cArgs.EvalCtx.EvalKnobs().RecoverIndeterminateCommitsOnFailedPushes
if reply.PusheeTxn.Status == roachpb.STAGING && (pusherWins || recoverOnFailedPush) {
if pusheeStaging && !pusheeStagingFailed && (pusherWins || recoverOnFailedPush) {
err := kvpb.NewIndeterminateCommitError(reply.PusheeTxn)
log.VEventf(ctx, 1, "%v", err)
return result.Result{}, err
}

// If the pusher is aware that the pushee's currently recorded attempt at a
// parallel commit failed, upgrade PUSH_TIMESTAMPs to PUSH_ABORTs. We don't
// want to move the transaction back to PENDING, as this is not (currently)
// allowed by the recovery protocol. We also don't want to move the
// transaction to a new timestamp while retaining the STAGING status, as this
// could allow the transaction to enter an implicit commit state without its
// knowledge, leading to atomicity violations.
//
// This has no effect on pushes that fail with a TransactionPushError. Such
// pushes will still wait on the pushee to retry its commit and eventually
// commit or abort. It also has no effect on expired pushees, as they would
// have been aborted anyway. This only impacts pushes which would have
// succeeded due to priority mismatches. In these cases, the push acts the
// same as a short-circuited transaction recovery process, because the
// transaction recovery procedure always finalizes target transactions, even
// if initiated by a PUSH_TIMESTAMP.
if pusheeStaging && pusherWins && pushType == kvpb.PUSH_TIMESTAMP {
_ = must.True(ctx, pusheeStagingFailed, "parallel commit must be known to have failed for push to succeed")
pushType = kvpb.PUSH_ABORT
}

if !pusherWins {
err := kvpb.NewTransactionPushError(reply.PusheeTxn)
log.VEventf(ctx, 1, "%v", err)
Expand All @@ -306,6 +307,8 @@ func PushTxn(
// Forward the timestamp to accommodate AbortSpan GC. See method comment for
// details.
reply.PusheeTxn.WriteTimestamp.Forward(reply.PusheeTxn.LastActive())
// If the transaction was previously staging, clear its in-flight writes.
reply.PusheeTxn.InFlightWrites = nil
// If the transaction record was already present, persist the updates to it.
// If not, then we don't want to create it. This could allow for finalized
// transactions to be revived. Instead, we obey the invariant that only the
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,18 +747,19 @@ func (c *cluster) PushTransaction(
pusheeTxn, pusheeRecordSig := pusheeRecord.asTxn()
pusheeIso := pusheeTxn.IsoLevel
pusheePri := pusheeTxn.Priority
pusheeStatus := pusheeTxn.Status
// NOTE: this logic is adapted from cmd_push_txn.go.
var pusherWins bool
switch {
case pusheeTxn.Status.IsFinalized():
case pusheeStatus.IsFinalized():
// Already finalized.
return pusheeTxn, nil
case pushType == kvpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp):
// Already pushed.
return pusheeTxn, nil
case pushType == kvpb.PUSH_TOUCH:
pusherWins = false
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri):
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus):
pusherWins = true
default:
pusherWins = false
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,11 @@ func canPushWithPriority(req Request, s waitingState) bool {
}
pusheeIso = s.txn.IsoLevel
pusheePri = s.txn.Priority
return txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri)
// We assume that the pushee is in the PENDING state when deciding whether
// to push. A push may determine that the pushee is STAGING or has already
// been finalized.
pusheeStatus := roachpb.PENDING
return txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus)
}

func logResolveIntent(ctx context.Context, intent roachpb.LockUpdate) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ func (r *Replica) handleTransactionPushError(
dontRetry := r.store.cfg.TestingKnobs.DontRetryPushTxnFailures
if !dontRetry && ba.IsSinglePushTxnRequest() {
pushReq := ba.Requests[0].GetInner().(*kvpb.PushTxnRequest)
dontRetry = txnwait.ShouldPushImmediately(pushReq)
dontRetry = txnwait.ShouldPushImmediately(pushReq, t.PusheeTxn.Status)
}
if dontRetry {
return g, pErr
Expand Down
123 changes: 123 additions & 0 deletions pkg/kv/kvserver/txn_recovery_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -288,6 +290,127 @@ func TestTxnRecoveryFromStagingWithHighPriority(t *testing.T) {
})
}

// TestTxnRecoveryFromStagingWithoutHighPriority tests that the transaction
// recovery process is NOT initiated by a normal-priority operation which
// encounters a staging transaction. Instead, the normal-priority operation
// waits for the committing transaction to complete. The test contains a subtest
// for each of the combinations of the following options:
//
// - pusheeIsoLevel: configures the isolation level of the pushee (committing)
// transaction. Isolation levels affect the behavior of pushes of pending
// transactions, but not of staging transactions.
//
// - pusheeCommits: configures whether or not the staging transaction is
// implicitly and, eventually, explicitly committed or not.
//
// - pusherWriting: configures whether or not the conflicting operation is a
// read (false) or a write (true), which dictates the kind of push operation
// dispatched against the staging transaction.
func TestTxnRecoveryFromStagingWithoutHighPriority(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

run := func(t *testing.T, pusheeIsoLevel isolation.Level, pusheeCommits, pusherWriting bool) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
manual := timeutil.NewManualTime(timeutil.Unix(0, 123))
cfg := TestStoreConfig(hlc.NewClockForTesting(manual))
store := createTestStoreWithConfig(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg)

// Create a transaction that will get stuck performing a parallel
// commit.
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
txn := newTransaction("txn", keyA, 1, store.Clock())
txn.IsoLevel = pusheeIsoLevel

// Issue two writes, which will be considered in-flight at the time of
// the transaction's EndTxn request.
keyAVal := []byte("value")
pArgs := putArgs(keyA, keyAVal)
pArgs.Sequence = 1
h := kvpb.Header{Txn: txn}
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), h, &pArgs)
require.Nil(t, pErr, "error: %s", pErr)

pArgs = putArgs(keyB, []byte("value2"))
pArgs.Sequence = 2
h2 := kvpb.Header{Txn: txn.Clone()}
if !pusheeCommits {
// If we're not going to have the pushee commit, make sure it never enters
// the implicit commit state by bumping the timestamp of one of its writes.
manual.Advance(100)
h2.Txn.WriteTimestamp = store.Clock().Now()
}
_, pErr = kv.SendWrappedWith(ctx, store.TestSender(), h2, &pArgs)
require.Nil(t, pErr, "error: %s", pErr)

// Issue a parallel commit, which will put the transaction into a
// STAGING state. Include both writes as the EndTxn's in-flight writes.
et, etH := endTxnArgs(txn, true)
et.InFlightWrites = []roachpb.SequencedWrite{
{Key: keyA, Sequence: 1},
{Key: keyB, Sequence: 2},
}
etReply, pErr := kv.SendWrappedWith(ctx, store.TestSender(), etH, &et)
require.Nil(t, pErr, "error: %s", pErr)
require.Equal(t, roachpb.STAGING, etReply.Header().Txn.Status)

// Issue a conflicting, normal-priority operation.
var conflictArgs kvpb.Request
if pusherWriting {
pArgs = putArgs(keyB, []byte("value3"))
conflictArgs = &pArgs
} else {
gArgs := getArgs(keyB)
conflictArgs = &gArgs
}
manual.Advance(100)
pErrC := make(chan *kvpb.Error, 1)
require.NoError(t, stopper.RunAsyncTask(ctx, "conflict", func(ctx context.Context) {
_, pErr := kv.SendWrapped(ctx, store.TestSender(), conflictArgs)
pErrC <- pErr
}))

// Wait for the conflict to push and be queued in the txn wait queue.
testutils.SucceedsSoon(t, func() error {
select {
case pErr := <-pErrC:
t.Fatalf("conflicting operation unexpectedly completed: pErr=%s", pErr)
default:
}
if v := store.txnWaitMetrics.PusherWaiting.Value(); v != 1 {
return errors.Errorf("expected 1 pusher waiting, found %d", v)
}
return nil
})

// Finalize the STAGING txn, either by committing it or by aborting it.
et2, et2H := endTxnArgs(txn, pusheeCommits)
etReply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), et2H, &et2)
require.Nil(t, pErr, "error: %s", pErr)
expStatus := roachpb.COMMITTED
if !pusheeCommits {
expStatus = roachpb.ABORTED
}
require.Equal(t, expStatus, etReply.Header().Txn.Status)

// This will unblock the conflicting operation, which should succeed.
pErr = <-pErrC
require.Nil(t, pErr, "error: %s", pErr)
}

for _, pusheeIsoLevel := range isolation.Levels() {
t.Run("pushee_iso_level="+pusheeIsoLevel.String(), func(t *testing.T) {
testutils.RunTrueAndFalse(t, "pushee_commits", func(t *testing.T, pusheeCommits bool) {
testutils.RunTrueAndFalse(t, "pusher_writing", func(t *testing.T, pusherWriting bool) {
run(t, pusheeIsoLevel, pusheeCommits, pusherWriting)
})
})
})
}
}

// TestTxnClearRangeIntents tests whether a ClearRange call blindly removes
// write intents. This can cause it to remove an intent from an implicitly
// committed STAGING txn. When txn recovery kicks in, it will fail to find the
Expand Down
23 changes: 17 additions & 6 deletions pkg/kv/kvserver/txnwait/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,26 @@ func TestingOverrideTxnLivenessThreshold(t time.Duration) func() {
// proceed without queueing. This is true for pushes which are neither
// ABORT nor TIMESTAMP, but also for ABORT and TIMESTAMP pushes where
// the pushee has min priority or pusher has max priority.
func ShouldPushImmediately(req *kvpb.PushTxnRequest) bool {
func ShouldPushImmediately(req *kvpb.PushTxnRequest, pusheeStatus roachpb.TransactionStatus) bool {
if req.Force {
return true
}
return CanPushWithPriority(
req.PushType,
req.PusherTxn.IsoLevel, req.PusheeTxn.IsoLevel,
req.PusherTxn.Priority, req.PusheeTxn.Priority,
pusheeStatus,
)
}

// CanPushWithPriority returns true if the pusher can perform the specified push
// type on the pushee, based on the two txns' isolation levels and priorities.
// type on the pushee, based on the two txns' isolation levels, their priorities,
// and the pushee's status.
func CanPushWithPriority(
pushType kvpb.PushTxnType,
pusherIso, pusheeIso isolation.Level,
pusherPri, pusheePri enginepb.TxnPriority,
pusheeStatus roachpb.TransactionStatus,
) bool {
// Normalize the transaction priorities so that normal user priorities are
// considered equal for the purposes of pushing.
Expand All @@ -103,6 +106,15 @@ func CanPushWithPriority(
case kvpb.PUSH_ABORT:
return pusherPri > pusheePri
case kvpb.PUSH_TIMESTAMP:
// If the pushee transaction is STAGING, only let the PUSH_TIMESTAMP through
// to disrupt the transaction commit if the pusher has a higher priority. If
// the priorities are equal, the PUSH_TIMESTAMP should wait for the commit
// to complete.
if pusheeStatus == roachpb.STAGING {
return pusherPri > pusheePri
}
// Otherwise, the pushee has not yet started committing...

// If the pushee transaction tolerates write skew, the PUSH_TIMESTAMP is
// harmless, so let it through.
return pusheeIso.ToleratesWriteSkew() ||
Expand Down Expand Up @@ -476,10 +488,6 @@ func (q *Queue) releaseWaitingQueriesLocked(ctx context.Context, txnID uuid.UUID
func (q *Queue) MaybeWaitForPush(
ctx context.Context, req *kvpb.PushTxnRequest,
) (*kvpb.PushTxnResponse, *kvpb.Error) {
if ShouldPushImmediately(req) {
return nil, nil
}

q.mu.Lock()
// If the txn wait queue is not enabled or if the request is not
// contained within the replica, do nothing. The request can fall
Expand All @@ -501,6 +509,9 @@ func (q *Queue) MaybeWaitForPush(
if txn := pending.getTxn(); isPushed(req, txn) {
q.mu.Unlock()
return createPushTxnResponse(txn), nil
} else if ShouldPushImmediately(req, txn.Status) {
q.mu.Unlock()
return nil, nil
}

push := &waitingPush{
Expand Down
Loading

0 comments on commit b9f3f15

Please sign in to comment.