diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index 5eac2c10c992..072b93280da9 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -285,8 +285,12 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts( bumpedTxn) br = nil } - if pErr != nil && maxRefreshAttempts > 0 { - br, pErr = sr.maybeRetrySend(ctx, ba, pErr, maxRefreshAttempts) + if pErr != nil { + if maxRefreshAttempts > 0 { + br, pErr = sr.maybeRetrySend(ctx, ba, pErr, maxRefreshAttempts) + } else { + log.VEventf(ctx, 2, "not checking error for refresh; refresh attempts exhausted") + } } sr.forwardRefreshTimestampOnResponse(br, pErr) return br, pErr @@ -323,6 +327,7 @@ func (sr *txnSpanRefresher) maybeRetrySend( return nil, pErr } sr.refreshSuccess.Inc(1) + log.Eventf(ctx, "refresh succeeded; retrying original request") // We've refreshed all of the read spans successfully and bumped // ba.Txn's timestamps. Attempt the request again. diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index de3c6f7635a3..72e47af3364b 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -211,7 +211,7 @@ func EndTxn( // to perform this verification for commits. Rollbacks can always write // an aborted txn record. if args.Commit { - if err := CanCreateTxnRecord(cArgs.EvalCtx, reply.Txn); err != nil { + if err := CanCreateTxnRecord(ctx, cArgs.EvalCtx, reply.Txn); err != nil { return result.Result{}, err } } diff --git a/pkg/kv/kvserver/batcheval/cmd_heartbeat_txn.go b/pkg/kv/kvserver/batcheval/cmd_heartbeat_txn.go index 868f9801e67b..96fb31572245 100644 --- a/pkg/kv/kvserver/batcheval/cmd_heartbeat_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_heartbeat_txn.go @@ -66,7 +66,7 @@ func HeartbeatTxn( txn = *h.Txn // Verify that it is safe to create the transaction record. - if err := CanCreateTxnRecord(cArgs.EvalCtx, &txn); err != nil { + if err := CanCreateTxnRecord(ctx, cArgs.EvalCtx, &txn); err != nil { return result.Result{}, err } } diff --git a/pkg/kv/kvserver/batcheval/transaction.go b/pkg/kv/kvserver/batcheval/transaction.go index a1993fb7888b..6561f71dcf59 100644 --- a/pkg/kv/kvserver/batcheval/transaction.go +++ b/pkg/kv/kvserver/batcheval/transaction.go @@ -126,14 +126,17 @@ func CanPushWithPriority(pusher, pushee *roachpb.Transaction) bool { // CanCreateTxnRecord determines whether a transaction record can be created for // the provided transaction. If not, the function will return an error. If so, // the function may modify the provided transaction. -func CanCreateTxnRecord(rec EvalContext, txn *roachpb.Transaction) error { +func CanCreateTxnRecord(ctx context.Context, rec EvalContext, txn *roachpb.Transaction) error { // Provide the transaction's minimum timestamp. The transaction could not // have written a transaction record previously with a timestamp below this. ok, minCommitTS, reason := rec.CanCreateTxnRecord(txn.ID, txn.Key, txn.MinTimestamp) if !ok { + log.VEventf(ctx, 2, "txn tombstone present; transaction has been aborted") return roachpb.NewTransactionAbortedError(reason) } - txn.WriteTimestamp.Forward(minCommitTS) + if bumped := txn.WriteTimestamp.Forward(minCommitTS); bumped { + log.VEventf(ctx, 2, "write timestamp bumped by txn tombstone to: %s", txn.WriteTimestamp) + } return nil } diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 858be7fc33f4..ab068093d683 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -265,6 +266,7 @@ func (r *Replica) applyTimestampCache( // below is due to the minReadTS. var bumpedDueToMinReadTS bool var bumped bool + var conflictingTxn uuid.UUID for _, union := range ba.Requests { args := union.GetInner() @@ -291,14 +293,31 @@ func (r *Replica) applyTimestampCache( } else { bumpedCurReq = ba.Timestamp.Forward(nextRTS) } + if bumpedCurReq && (rTxnID != uuid.Nil) { + conflictingTxn = rTxnID + } // Preserve bumpedDueToMinReadTS if we did not just bump or set it // appropriately if we did. bumpedDueToMinReadTS = (!bumpedCurReq && bumpedDueToMinReadTS) || (bumpedCurReq && forwardedToMinReadTS) bumped, bumpedCurReq = bumped || bumpedCurReq, false } } - if bumpedDueToMinReadTS { - telemetry.Inc(batchesPushedDueToClosedTimestamp) + if bumped { + bumpedTS := ba.Timestamp + if ba.Txn != nil { + bumpedTS = ba.Txn.WriteTimestamp + } + + if bumpedDueToMinReadTS { + telemetry.Inc(batchesPushedDueToClosedTimestamp) + log.VEventf(ctx, 2, "bumped write timestamp due to closed ts: %s", minReadTS) + } else { + conflictMsg := "conflicting txn unknown" + if conflictingTxn != uuid.Nil { + conflictMsg = "conflicting txn: " + conflictingTxn.Short() + } + log.VEventf(ctx, 2, "bumped write timestamp to %s; %s", bumpedTS, log.Safe(conflictMsg)) + } } return bumped }