diff --git a/pkg/kv/kvpb/BUILD.bazel b/pkg/kv/kvpb/BUILD.bazel index 07c52028fc66..9ad04dc508c5 100644 --- a/pkg/kv/kvpb/BUILD.bazel +++ b/pkg/kv/kvpb/BUILD.bazel @@ -74,6 +74,7 @@ go_test( "//pkg/storage/enginepb", "//pkg/testutils/echotest", "//pkg/util/buildutil", + "//pkg/util/encoding", "//pkg/util/hlc", "//pkg/util/protoutil", "//pkg/util/timeutil", diff --git a/pkg/kv/kvpb/string_test.go b/pkg/kv/kvpb/string_test.go index 2ceb51083ea0..70751bdda3e8 100644 --- a/pkg/kv/kvpb/string_test.go +++ b/pkg/kv/kvpb/string_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils/echotest" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -85,7 +86,7 @@ func TestReplicaUnavailableError(t *testing.T) { func TestAmbiguousResultError(t *testing.T) { ctx := context.Background() - wrapped := errors.Errorf("boom with a %s", redact.Unsafe("secret")) + wrapped := errors.Errorf("boom with a %s", encoding.Unsafe("secret")) var err error = kvpb.NewAmbiguousResultError(wrapped) err = errors.DecodeError(ctx, errors.EncodeError(ctx, err)) require.True(t, errors.Is(err, wrapped), "%+v", err) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 572c540b1f2c..9785af67e666 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -3084,14 +3084,23 @@ func excludeReplicasInNeedOfCatchup( sendStreamStats(stats) filled := 0 for _, repl := range replicas { - if stats, ok := stats.ReplicaSendStreamStats(repl.ReplicaID); ok && - (!stats.IsStateReplicate || stats.HasSendQueue) { + if replicaSendStreamStats, ok := stats.ReplicaSendStreamStats(repl.ReplicaID); ok && + (!replicaSendStreamStats.IsStateReplicate || replicaSendStreamStats.HasSendQueue) { log.KvDistribution.VEventf(ctx, 5, - "not considering %s as a potential candidate for a lease transfer "+ + "not considering %v as a potential candidate for a lease transfer "+ "because the replica requires catchup: "+ - "[is_state_replicate=%v has_send_queue=%v]", - repl, stats.IsStateReplicate, stats.HasSendQueue) + "replica=(%v) range=%v", + repl, replicaSendStreamStats, stats) continue + } else if ok { + log.KvDistribution.VEventf(ctx, 6, + "replica %v is up-to-date and does not require catchup "+ + "replica=(%v) range=%v", + repl, replicaSendStreamStats, stats) + } else { + log.KvDistribution.VEventf(ctx, 4, + "replica %v is not in the send stream stats range=%v", + repl, stats) } // We are also not excluding any replicas which weren't included in the // stats here. If they weren't included it indicates that they were either diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel index 185de4f353cd..0ff065c637ee 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", "//pkg/util/hlc", + "//pkg/util/humanizeutil", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/queue", diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 7458a2ac7beb..2145927159e6 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" @@ -194,6 +195,22 @@ type RangeSendStreamStats struct { internal []ReplicaSendStreamStats } +func (s *RangeSendStreamStats) String() string { + return redact.StringWithoutMarkers(s) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (s *RangeSendStreamStats) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("[") + for i := range s.internal { + if i > 0 { + w.Printf(", ") + } + w.Printf("r%v=(%v)", s.internal[i].ReplicaID, s.internal[i]) + } + w.Printf("]") +} + // Clear clears the stats for all replica send streams so that the underlying // memory can be reused. func (s *RangeSendStreamStats) Clear() { @@ -298,6 +315,15 @@ type ReplicaSendStreamStats struct { ReplicaSendQueueStats } +func (rsss ReplicaSendStreamStats) String() string { + return redact.StringWithoutMarkers(rsss) +} + +func (rsss ReplicaSendStreamStats) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("is_state_replicate=%v has_send_queue=%v %v", + rsss.IsStateReplicate, rsss.HasSendQueue, rsss.ReplicaSendQueueStats) +} + // ReplicaSendQueueStats contains the size and count of the send stream queue // for a replica. type ReplicaSendQueueStats struct { @@ -308,6 +334,16 @@ type ReplicaSendQueueStats struct { SendQueueCount int64 } +func (rsqs ReplicaSendQueueStats) String() string { + return redact.StringWithoutMarkers(rsqs) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (rsqs ReplicaSendQueueStats) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("send_queue_size=%v / %v entries", + humanizeutil.IBytes(rsqs.SendQueueBytes), rsqs.SendQueueCount) +} + // RaftEvent carries a RACv2-relevant subset of raft state sent to storage. type RaftEvent struct { // MsgAppMode is the current mode. This is only relevant on the leader. @@ -1455,7 +1491,6 @@ func (rc *rangeController) SendStreamStats(statsToSet *RangeSendStreamStats) { if len(statsToSet.internal) != 0 { panic(errors.AssertionFailedf("statsToSet is non-empty %v", statsToSet.internal)) } - statsToSet.Clear() rc.mu.RLock() defer rc.mu.RUnlock() diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index 74431bd41e10..bd1d762d9670 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -2283,3 +2283,36 @@ func TestConstructRaftEventForReplica(t *testing.T) { }) } } + +func TestRangeSendStreamStatsString(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + stats := RangeSendStreamStats{ + internal: []ReplicaSendStreamStats{ + { + IsStateReplicate: false, + HasSendQueue: true, + ReplicaSendQueueStats: ReplicaSendQueueStats{ + ReplicaID: 1, + SendQueueCount: 10, + SendQueueBytes: 100, + }, + }, + { + IsStateReplicate: true, + HasSendQueue: false, + ReplicaSendQueueStats: ReplicaSendQueueStats{ + ReplicaID: 2, + SendQueueCount: 0, + SendQueueBytes: 0, + }, + }, + }, + } + + require.Equal(t, + "[r1=(is_state_replicate=false has_send_queue=true send_queue_size=100 B / 10 entries), "+ + "r2=(is_state_replicate=true has_send_queue=false send_queue_size=0 B / 0 entries)]", + stats.String()) +} diff --git a/pkg/sql/inverted/expression.go b/pkg/sql/inverted/expression.go index 1a88cc71976a..c87de68999ae 100644 --- a/pkg/sql/inverted/expression.go +++ b/pkg/sql/inverted/expression.go @@ -167,7 +167,7 @@ func formatSpan(span Span, redactable bool) string { } output := fmt.Sprintf("[%s, %s%c", start, end, spanEndOpenOrClosed) if redactable { - output = string(redact.Sprintf("%s", redact.Unsafe(output))) + output = string(redact.Sprintf("%s", encoding.Unsafe(output))) } return output } diff --git a/pkg/sql/opt/cat/BUILD.bazel b/pkg/sql/opt/cat/BUILD.bazel index 53868ab1316c..94b1abe17f73 100644 --- a/pkg/sql/opt/cat/BUILD.bazel +++ b/pkg/sql/opt/cat/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/types", + "//pkg/util/encoding", "//pkg/util/treeprinter", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", diff --git a/pkg/sql/opt/cat/utils.go b/pkg/sql/opt/cat/utils.go index 8cb6694ebc95..3ee5594fa047 100644 --- a/pkg/sql/opt/cat/utils.go +++ b/pkg/sql/opt/cat/utils.go @@ -12,6 +12,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/treeprinter" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" @@ -324,7 +325,7 @@ func formatFamily(family Family, buf *bytes.Buffer) { // markRedactable is true. func MaybeMarkRedactable(unsafe string, markRedactable bool) string { if markRedactable { - return string(redact.Sprintf("%s", redact.Unsafe(unsafe))) + return string(redact.Sprintf("%s", encoding.Unsafe(unsafe))) } return unsafe } diff --git a/pkg/util/encoding/encoding.go b/pkg/util/encoding/encoding.go index 71716f1c01a1..86fcaa353bf7 100644 --- a/pkg/util/encoding/encoding.go +++ b/pkg/util/encoding/encoding.go @@ -3749,3 +3749,19 @@ func BytesPrevish(b []byte, length int) []byte { copy(buf[bLen:], bytes.Repeat([]byte{0xff}, length-bLen)) return buf } + +// unsafeWrapper is implementation of SafeFormatter. This is used to mark +// arguments as unsafe for redaction. This would make sure that redact.Unsafe() is implementing SafeFormatter interface +// without affecting invocations. +// TODO(aa-joshi): This is a temporary solution to mark arguments as unsafe. We should move/update this into cockroachdb/redact package. +type unsafeWrapper struct { + a any +} + +func (uw unsafeWrapper) SafeFormat(w redact.SafePrinter, _ rune) { + w.Print(redact.Unsafe(uw.a)) +} + +func Unsafe(args any) any { + return unsafeWrapper{a: args} +} diff --git a/pkg/util/log/redact.go b/pkg/util/log/redact.go index cb4c7de33a85..54ff365bde97 100644 --- a/pkg/util/log/redact.go +++ b/pkg/util/log/redact.go @@ -135,7 +135,7 @@ func maybeRedactEntry(payload entryPayload, editor redactEditor) (res entryPaylo func init() { // We consider booleans and numeric values to be always safe for - // reporting. A log call can opt out by using redact.Unsafe() around + // reporting. A log call can opt out by using encoding.Unsafe() around // a value that would be otherwise considered safe. redact.RegisterSafeType(reflect.TypeOf(true)) // bool redact.RegisterSafeType(reflect.TypeOf(123)) // int