Skip to content

Commit

Permalink
Merge pull request cockroachdb#50410 from andreimatei/backport20.1-50303
Browse files Browse the repository at this point in the history
release-20.1: kvserver: small logging improvements
  • Loading branch information
andreimatei authored Jun 19, 2020
2 parents 20acec6 + b22e924 commit c63847e
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 8 deletions.
9 changes: 7 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,12 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
bumpedTxn)
br = nil
}
if pErr != nil && maxRefreshAttempts > 0 {
br, pErr = sr.maybeRetrySend(ctx, ba, pErr, maxRefreshAttempts)
if pErr != nil {
if maxRefreshAttempts > 0 {
br, pErr = sr.maybeRetrySend(ctx, ba, pErr, maxRefreshAttempts)
} else {
log.VEventf(ctx, 2, "not checking error for refresh; refresh attempts exhausted")
}
}
sr.forwardRefreshTimestampOnResponse(br, pErr)
return br, pErr
Expand Down Expand Up @@ -323,6 +327,7 @@ func (sr *txnSpanRefresher) maybeRetrySend(
return nil, pErr
}
sr.refreshSuccess.Inc(1)
log.Eventf(ctx, "refresh succeeded; retrying original request")

// We've refreshed all of the read spans successfully and bumped
// ba.Txn's timestamps. Attempt the request again.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func EndTxn(
// to perform this verification for commits. Rollbacks can always write
// an aborted txn record.
if args.Commit {
if err := CanCreateTxnRecord(cArgs.EvalCtx, reply.Txn); err != nil {
if err := CanCreateTxnRecord(ctx, cArgs.EvalCtx, reply.Txn); err != nil {
return result.Result{}, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_heartbeat_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func HeartbeatTxn(
txn = *h.Txn

// Verify that it is safe to create the transaction record.
if err := CanCreateTxnRecord(cArgs.EvalCtx, &txn); err != nil {
if err := CanCreateTxnRecord(ctx, cArgs.EvalCtx, &txn); err != nil {
return result.Result{}, err
}
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/batcheval/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,17 @@ func CanPushWithPriority(pusher, pushee *roachpb.Transaction) bool {
// CanCreateTxnRecord determines whether a transaction record can be created for
// the provided transaction. If not, the function will return an error. If so,
// the function may modify the provided transaction.
func CanCreateTxnRecord(rec EvalContext, txn *roachpb.Transaction) error {
func CanCreateTxnRecord(ctx context.Context, rec EvalContext, txn *roachpb.Transaction) error {
// Provide the transaction's minimum timestamp. The transaction could not
// have written a transaction record previously with a timestamp below this.
ok, minCommitTS, reason := rec.CanCreateTxnRecord(txn.ID, txn.Key, txn.MinTimestamp)
if !ok {
log.VEventf(ctx, 2, "txn tombstone present; transaction has been aborted")
return roachpb.NewTransactionAbortedError(reason)
}
txn.WriteTimestamp.Forward(minCommitTS)
if bumped := txn.WriteTimestamp.Forward(minCommitTS); bumped {
log.VEventf(ctx, 2, "write timestamp bumped by txn tombstone to: %s", txn.WriteTimestamp)
}
return nil
}

Expand Down
23 changes: 21 additions & 2 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

Expand Down Expand Up @@ -265,6 +266,7 @@ func (r *Replica) applyTimestampCache(
// below is due to the minReadTS.
var bumpedDueToMinReadTS bool
var bumped bool
var conflictingTxn uuid.UUID

for _, union := range ba.Requests {
args := union.GetInner()
Expand All @@ -291,14 +293,31 @@ func (r *Replica) applyTimestampCache(
} else {
bumpedCurReq = ba.Timestamp.Forward(nextRTS)
}
if bumpedCurReq && (rTxnID != uuid.Nil) {
conflictingTxn = rTxnID
}
// Preserve bumpedDueToMinReadTS if we did not just bump or set it
// appropriately if we did.
bumpedDueToMinReadTS = (!bumpedCurReq && bumpedDueToMinReadTS) || (bumpedCurReq && forwardedToMinReadTS)
bumped, bumpedCurReq = bumped || bumpedCurReq, false
}
}
if bumpedDueToMinReadTS {
telemetry.Inc(batchesPushedDueToClosedTimestamp)
if bumped {
bumpedTS := ba.Timestamp
if ba.Txn != nil {
bumpedTS = ba.Txn.WriteTimestamp
}

if bumpedDueToMinReadTS {
telemetry.Inc(batchesPushedDueToClosedTimestamp)
log.VEventf(ctx, 2, "bumped write timestamp due to closed ts: %s", minReadTS)
} else {
conflictMsg := "conflicting txn unknown"
if conflictingTxn != uuid.Nil {
conflictMsg = "conflicting txn: " + conflictingTxn.Short()
}
log.VEventf(ctx, 2, "bumped write timestamp to %s; %s", bumpedTS, log.Safe(conflictMsg))
}
}
return bumped
}
Expand Down

0 comments on commit c63847e

Please sign in to comment.