Skip to content

Commit

Permalink
kv: enable trace redaction in concurrency manager data-driven test
Browse files Browse the repository at this point in the history
This commit enables trace redaction in the `TestConcurrencyManagerBasic`
test. Doing so ensures that all logging/tracing performed during concurrency
control is properly redacting sensitive information while preserving all
other information. This ensures that if/when we need these logs in a
customer escalation, they will be there.

In order to make this change, the commit implements `redact.SafeFormatter`
on a few different data types. Notable, it implements the interface on
`waitingState`. With these changes, the only redacted information left is
`roachpb.Key`s.

Epic: None
Release note: None
  • Loading branch information
nvanbenschoten committed May 24, 2023
1 parent bd6f8f5 commit e8e33ca
Show file tree
Hide file tree
Showing 32 changed files with 556 additions and 525 deletions.
17 changes: 10 additions & 7 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/maruel/panicparse/v2/stack"
"github.com/petermattis/goid"
)
Expand Down Expand Up @@ -422,7 +423,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
txnAcquire.Sequence = seqNum

mon.runSync("acquire lock", func(ctx context.Context) {
log.Eventf(ctx, "txn %s @ %s", txn.ID.Short(), key)
log.Eventf(ctx, "txn %s @ %s", txn.Short(), key)
acq := roachpb.MakeLockAcquisition(txnAcquire, roachpb.Key(key), dur)
m.OnLockAcquired(ctx, &acq)
})
Expand Down Expand Up @@ -472,7 +473,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
txnUpdate.WriteTimestamp.Forward(ts)

mon.runSync("update lock", func(ctx context.Context) {
log.Eventf(ctx, "%s txn %s @ %s", verb, txn.ID.Short(), key)
log.Eventf(ctx, "%s txn %s @ %s", verb, txn.Short(), key)
span := roachpb.Span{Key: roachpb.Key(key)}
up := roachpb.MakeLockUpdate(txnUpdate, span)
m.OnLockUpdated(ctx, &up)
Expand All @@ -494,7 +495,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
}

mon.runSync("update txn", func(ctx context.Context) {
log.Eventf(ctx, "%s %s", verb, txnName)
log.Eventf(ctx, "%s %s", verb, redact.Safe(txnName))
if err := c.updateTxnRecord(txn.ID, status, ts); err != nil {
d.Fatalf(t, err.Error())
}
Expand Down Expand Up @@ -934,7 +935,7 @@ func (c *cluster) detectDeadlocks() {
}
chainBuf.WriteString(id.Short())
}
log.Eventf(origPush.ctx, "dependency cycle detected %s", chainBuf.String())
log.Eventf(origPush.ctx, "dependency cycle detected %s", redact.Safe(chainBuf.String()))
}
break
}
Expand Down Expand Up @@ -1056,8 +1057,10 @@ type monitoredGoroutine struct {
}

func newMonitor() *monitor {
tr := tracing.NewTracer()
tr.SetRedactable(true)
return &monitor{
tr: tracing.NewTracer(),
tr: tr,
gs: make(map[*monitoredGoroutine]struct{}),
}
}
Expand Down Expand Up @@ -1124,7 +1127,7 @@ func (m *monitor) collectRecordings() string {
continue
}
logs = append(logs, logRecord{
g: g, value: log.Msg().StripMarkers(),
g: g, value: string(log.Msg()),
})
g.prevEvents++
}
Expand Down Expand Up @@ -1244,7 +1247,7 @@ func (m *monitor) waitForAsyncGoroutinesToStall(t *testing.T) {
}
stalledCall := firstNonStdlib(stat.Stack.Calls)
log.Eventf(g.ctx, "blocked on %s in %s.%s",
stat.State, stalledCall.Func.DirName, stalledCall.Func.Name)
redact.Safe(stat.State), redact.Safe(stalledCall.Func.DirName), redact.Safe(stalledCall.Func.Name))
}
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/concurrency/datadriven_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/redact"
)

func nextUUID(counter *uint32) uuid.UUID {
Expand Down Expand Up @@ -238,11 +239,13 @@ func scanSingleRequest(
}
}

func scanTxnStatus(t *testing.T, d *datadriven.TestData) (roachpb.TransactionStatus, string) {
func scanTxnStatus(
t *testing.T, d *datadriven.TestData,
) (roachpb.TransactionStatus, redact.SafeString) {
var statusStr string
d.ScanArgs(t, "status", &statusStr)
status := parseTxnStatus(t, d, statusStr)
var verb string
var verb redact.SafeString
switch status {
case roachpb.COMMITTED:
verb = "committing"
Expand Down
23 changes: 14 additions & 9 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,30 +102,35 @@ type waitingState struct {

// String implements the fmt.Stringer interface.
func (s waitingState) String() string {
return redact.StringWithoutMarkers(s)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (s waitingState) SafeFormat(w redact.SafePrinter, _ rune) {
switch s.kind {
case waitFor, waitForDistinguished:
distinguished := ""
distinguished := redact.SafeString("")
if s.kind == waitForDistinguished {
distinguished = " (distinguished)"
}
target := "holding lock"
target := redact.SafeString("holding lock")
if !s.held {
target = "running request"
}
return fmt.Sprintf("wait for%s txn %s %s @ key %s (queuedWriters: %d, queuedReaders: %d)",
distinguished, s.txn.ID.Short(), target, s.key, s.queuedWriters, s.queuedReaders)
w.Printf("wait for%s txn %s %s @ key %s (queuedWriters: %d, queuedReaders: %d)",
distinguished, s.txn.Short(), target, s.key, s.queuedWriters, s.queuedReaders)
case waitSelf:
return fmt.Sprintf("wait self @ key %s", s.key)
w.Printf("wait self @ key %s", s.key)
case waitElsewhere:
if !s.held {
return "wait elsewhere by proceeding to evaluation"
w.SafeString("wait elsewhere by proceeding to evaluation")
}
return fmt.Sprintf("wait elsewhere for txn %s @ key %s", s.txn.ID.Short(), s.key)
w.Printf("wait elsewhere for txn %s @ key %s", s.txn.Short(), s.key)
case waitQueueMaxLengthExceeded:
return fmt.Sprintf("wait-queue maximum length exceeded @ key %s with length %d",
w.Printf("wait-queue maximum length exceeded @ key %s with length %d",
s.key, s.queuedWriters)
case doneWaiting:
return "done waiting"
w.SafeString("done waiting")
default:
panic("unhandled waitingState.kind")
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package concurrency

import (
"context"
"fmt"
"math"
"time"

Expand All @@ -34,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"go.opentelemetry.io/otel/attribute"
)

Expand Down Expand Up @@ -497,10 +497,10 @@ func (w *lockTableWaiterImpl) pushLockTxn(
// not persist this, but still preserve the local timestamp when the
// adjusting the intent, accepting that the intent would then no longer
// round-trip and would lose the local timestamp if rewritten later.
log.VEventf(ctx, 2, "pushing timestamp of txn %s above %s", ws.txn.ID.Short(), h.Timestamp)
log.VEventf(ctx, 2, "pushing timestamp of txn %s above %s", ws.txn.Short(), h.Timestamp)
} else {
pushType = kvpb.PUSH_ABORT
log.VEventf(ctx, 2, "pushing txn %s to abort", ws.txn.ID.Short())
log.VEventf(ctx, 2, "pushing txn %s to abort", ws.txn.Short())
}

case lock.WaitPolicy_Error:
Expand All @@ -510,7 +510,7 @@ func (w *lockTableWaiterImpl) pushLockTxn(
// lock, but we push using a PUSH_TOUCH to immediately return an error
// if the lock hold is still active.
pushType = kvpb.PUSH_TOUCH
log.VEventf(ctx, 2, "pushing txn %s to check if abandoned", ws.txn.ID.Short())
log.VEventf(ctx, 2, "pushing txn %s to check if abandoned", ws.txn.Short())

default:
log.Fatalf(ctx, "unexpected WaitPolicy: %v", req.WaitPolicy)
Expand Down Expand Up @@ -698,7 +698,7 @@ func (w *lockTableWaiterImpl) pushRequestTxn(
// the caller of this function cancels the push.
h := w.pushHeader(req)
pushType := kvpb.PUSH_ABORT
log.VEventf(ctx, 3, "pushing txn %s to detect request deadlock", ws.txn.ID.Short())
log.VEventf(ctx, 3, "pushing txn %s to detect request deadlock", ws.txn.Short())

_, err := w.ir.PushTransaction(ctx, ws.txn, h, pushType)
if err != nil {
Expand Down Expand Up @@ -1268,12 +1268,12 @@ func logResolveIntent(ctx context.Context, intent roachpb.LockUpdate) {
if !log.ExpensiveLogEnabled(ctx, 2) {
return
}
var obsStr string
var obsStr redact.RedactableString
if obs := intent.ClockWhilePending; obs != (roachpb.ObservedTimestamp{}) {
obsStr = fmt.Sprintf(" and clock observation {%d %v}", obs.NodeID, obs.Timestamp)
obsStr = redact.Sprintf(" and clock observation {%d %v}", obs.NodeID, obs.Timestamp)
}
log.VEventf(ctx, 2, "resolving intent %s for txn %s with %s status%s",
intent.Key, intent.Txn.ID.Short(), intent.Status, obsStr)
intent.Key, intent.Txn.Short(), intent.Status, obsStr)
}

func minDuration(a, b time.Duration) time.Duration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ sequence req=barrier2
----
[2] sequence barrier2: sequencing request
[2] sequence barrier2: waiting on latches without acquiring
[2] sequence barrier2: waiting to acquire write latch {a-f}@0,0, held by read latch c@15.000000000,1
[2] sequence barrier2: waiting to acquire write latch {a-f}@0,0, held by read latch ‹c›@15.000000000,1
[2] sequence barrier2: blocked on select in spanlatch.(*Manager).waitForSignal

finish req=read1
Expand Down Expand Up @@ -96,7 +96,7 @@ sequence req=barrier1
----
[2] sequence barrier1: sequencing request
[2] sequence barrier1: waiting on latches without acquiring
[2] sequence barrier1: waiting to acquire write latch {a-f}@0,0, held by read latch c@10.000000000,1
[2] sequence barrier1: waiting to acquire write latch {a-f}@0,0, held by read latch ‹c›@10.000000000,1
[2] sequence barrier1: blocked on select in spanlatch.(*Manager).waitForSignal

finish req=read1
Expand Down Expand Up @@ -143,7 +143,7 @@ sequence req=barrier1
----
[2] sequence barrier1: sequencing request
[2] sequence barrier1: waiting on latches without acquiring
[2] sequence barrier1: waiting to acquire write latch {a-f}@0,0, held by write latch c@10.000000000,1
[2] sequence barrier1: waiting to acquire write latch {a-f}@0,0, held by write latch ‹c›@10.000000000,1
[2] sequence barrier1: blocked on select in spanlatch.(*Manager).waitForSignal

debug-latch-manager
Expand Down
34 changes: 17 additions & 17 deletions pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ sequence req=req2

on-lock-acquired req=req2 key=k
----
[-] acquire lock: txn 00000002 @ k
[-] acquire lock: txn 00000002 @ ‹k›

debug-lock-table
----
Expand Down Expand Up @@ -96,7 +96,7 @@ sequence req=req2

on-lock-acquired req=req2 key=k
----
[-] acquire lock: txn 00000002 @ k
[-] acquire lock: txn 00000002 @ ‹k›

finish req=req2
----
Expand All @@ -113,7 +113,7 @@ sequence req=req3
[2] sequence req3: acquiring latches
[2] sequence req3: scanning lock table for conflicting locks
[2] sequence req3: waiting in lock wait-queues
[2] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1)
[2] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1)
[2] sequence req3: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false
[2] sequence req3: pushing timestamp of txn 00000002 above 14.000000000,1
[2] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction
Expand All @@ -124,9 +124,9 @@ debug-advance-clock ts=123
on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[2] sequence req3: resolving intent "k" for txn 00000002 with COMMITTED status
[2] sequence req3: resolving intent "k" for txn 00000002 with COMMITTED status
[2] sequence req3: lock wait-queue event: done waiting
[2] sequence req3: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 123.000s
[2] sequence req3: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 123.000s
[2] sequence req3: acquiring latches
[2] sequence req3: scanning lock table for conflicting locks
[2] sequence req3: sequencing complete, returned guard
Expand All @@ -143,7 +143,7 @@ sequence req=req4
----
[3] sequence req4: sequencing request
[3] sequence req4: acquiring latches
[3] sequence req4: waiting to acquire write latch k@10.000000000,1, held by read latch k{-2}@14.000000000,1
[3] sequence req4: waiting to acquire write latch ‹k›@10.000000000,1, held by read latch k{-2}@14.000000000,1
[3] sequence req4: blocked on select in spanlatch.(*Manager).waitForSignal

debug-latch-manager
Expand Down Expand Up @@ -184,7 +184,7 @@ sequence req=req2

on-lock-acquired req=req2 key=k
----
[-] acquire lock: txn 00000002 @ k
[-] acquire lock: txn 00000002 @ ‹k›

finish req=req2
----
Expand All @@ -200,7 +200,7 @@ sequence req=req5
[2] sequence req5: acquiring latches
[2] sequence req5: scanning lock table for conflicting locks
[2] sequence req5: waiting in lock wait-queues
[2] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1)
[2] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1)
[2] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = false, timeout enforcement = false, priority enforcement = false
[2] sequence req5: pushing timestamp of txn 00000002 above 14.000000000,1
[2] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction
Expand All @@ -215,7 +215,7 @@ sequence req=req6
[3] sequence req6: acquiring latches
[3] sequence req6: scanning lock table for conflicting locks
[3] sequence req6: waiting in lock wait-queues
[3] sequence req6: lock wait-queue event: wait for txn 00000002 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 2)
[3] sequence req6: lock wait-queue event: wait for txn 00000002 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 2)
[3] sequence req6: not pushing
[3] sequence req6: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn

Expand All @@ -225,14 +225,14 @@ debug-advance-clock ts=123
on-txn-updated txn=txn2 status=pending ts=18,1
----
[-] update txn: increasing timestamp of txn2
[2] sequence req5: resolving intent "k" for txn 00000002 with PENDING status and clock observation {1 246.000000000,0}
[2] sequence req5: resolving intent "k" for txn 00000002 with PENDING status and clock observation {1 246.000000000,0}
[2] sequence req5: lock wait-queue event: done waiting
[2] sequence req5: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 123.000s
[2] sequence req5: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 123.000s
[2] sequence req5: acquiring latches
[2] sequence req5: scanning lock table for conflicting locks
[2] sequence req5: sequencing complete, returned guard
[3] sequence req6: lock wait-queue event: done waiting
[3] sequence req6: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 123.000s
[3] sequence req6: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 123.000s
[3] sequence req6: acquiring latches
[3] sequence req6: scanning lock table for conflicting locks
[3] sequence req6: sequencing complete, returned guard
Expand All @@ -245,21 +245,21 @@ sequence req=req7
----
[4] sequence req7: sequencing request
[4] sequence req7: acquiring latches
[4] sequence req7: waiting to acquire write latch k@12.000000000,1, held by read latch {a-m}@14.000000000,1
[4] sequence req7: waiting to acquire write latch ‹k›@12.000000000,1, held by read latch {a-m}@14.000000000,1
[4] sequence req7: blocked on select in spanlatch.(*Manager).waitForSignal

finish req=req5
----
[-] finish req5: finishing request
[4] sequence req7: waiting to acquire write latch k@12.000000000,1, held by read latch {c-z}@16.000000000,1
[4] sequence req7: waiting to acquire write latch ‹k›@12.000000000,1, held by read latch {c-z}@16.000000000,1
[4] sequence req7: blocked on select in spanlatch.(*Manager).waitForSignal

finish req=req6
----
[-] finish req6: finishing request
[4] sequence req7: scanning lock table for conflicting locks
[4] sequence req7: waiting in lock wait-queues
[4] sequence req7: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0)
[4] sequence req7: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0)
[4] sequence req7: pushing after 0s for: liveness detection = true, deadlock detection = false, timeout enforcement = false, priority enforcement = false
[4] sequence req7: pushing txn 00000002 to abort
[4] sequence req7: blocked on select in concurrency_test.(*cluster).PushTransaction
Expand All @@ -270,9 +270,9 @@ debug-advance-clock ts=123
on-txn-updated txn=txn2 status=committed
----
[-] update txn: committing txn2
[4] sequence req7: resolving intent "k" for txn 00000002 with COMMITTED status
[4] sequence req7: resolving intent "k" for txn 00000002 with COMMITTED status
[4] sequence req7: lock wait-queue event: done waiting
[4] sequence req7: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 123.000s
[4] sequence req7: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 123.000s
[4] sequence req7: acquiring latches
[4] sequence req7: scanning lock table for conflicting locks
[4] sequence req7: sequencing complete, returned guard
Expand Down
Loading

0 comments on commit e8e33ca

Please sign in to comment.