Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allocatorimpl: vlog on all excl. repl due to catchup conditions #132603

Merged
merged 2 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3084,14 +3084,23 @@ func excludeReplicasInNeedOfCatchup(
sendStreamStats(stats)
filled := 0
for _, repl := range replicas {
if stats, ok := stats.ReplicaSendStreamStats(repl.ReplicaID); ok &&
(!stats.IsStateReplicate || stats.HasSendQueue) {
if replicaSendStreamStats, ok := stats.ReplicaSendStreamStats(repl.ReplicaID); ok &&
(!replicaSendStreamStats.IsStateReplicate || replicaSendStreamStats.HasSendQueue) {
log.KvDistribution.VEventf(ctx, 5,
"not considering %s as a potential candidate for a lease transfer "+
"not considering %v as a potential candidate for a lease transfer "+
"because the replica requires catchup: "+
"[is_state_replicate=%v has_send_queue=%v]",
repl, stats.IsStateReplicate, stats.HasSendQueue)
"replica=(%v) range=%v",
repl, replicaSendStreamStats, stats)
continue
} else if ok {
log.KvDistribution.VEventf(ctx, 6,
"replica %v is up-to-date and does not require catchup "+
"replica=(%v) range=%v",
repl, replicaSendStreamStats, stats)
} else {
log.KvDistribution.VEventf(ctx, 4,
"replica %v is not in the send stream stats range=%v",
repl, stats)
}
// We are also not excluding any replicas which weren't included in the
// stats here. If they weren't included it indicates that they were either
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/queue",
Expand Down
37 changes: 36 additions & 1 deletion pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -209,6 +210,22 @@ type RangeSendStreamStats struct {
internal []ReplicaSendStreamStats
}

func (s *RangeSendStreamStats) String() string {
return redact.StringWithoutMarkers(s)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (s *RangeSendStreamStats) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("[")
for i := range s.internal {
if i > 0 {
w.Printf(", ")
}
w.Printf("r%v=(%v)", s.internal[i].ReplicaID, s.internal[i])
}
w.Printf("]")
}

// Clear clears the stats for all replica send streams so that the underlying
// memory can be reused.
func (s *RangeSendStreamStats) Clear() {
Expand Down Expand Up @@ -313,6 +330,15 @@ type ReplicaSendStreamStats struct {
ReplicaSendQueueStats
}

func (rsss ReplicaSendStreamStats) String() string {
return redact.StringWithoutMarkers(rsss)
}

func (rsss ReplicaSendStreamStats) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("is_state_replicate=%v has_send_queue=%v %v",
rsss.IsStateReplicate, rsss.HasSendQueue, rsss.ReplicaSendQueueStats)
}

// ReplicaSendQueueStats contains the size and count of the send stream queue
// for a replica.
type ReplicaSendQueueStats struct {
Expand All @@ -323,6 +349,16 @@ type ReplicaSendQueueStats struct {
SendQueueCount int64
}

func (rsqs ReplicaSendQueueStats) String() string {
return redact.StringWithoutMarkers(rsqs)
}

// SafeFormat implements the redact.SafeFormatter interface.
func (rsqs ReplicaSendQueueStats) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("send_queue_size=%v / %v entries",
humanizeutil.IBytes(rsqs.SendQueueBytes), rsqs.SendQueueCount)
}

// RaftEvent carries a RACv2-relevant subset of raft state sent to storage.
type RaftEvent struct {
// MsgAppMode is the current mode. This is only relevant on the leader.
Expand Down Expand Up @@ -1433,7 +1469,6 @@ func (rc *rangeController) SendStreamStats(statsToSet *RangeSendStreamStats) {
if len(statsToSet.internal) != 0 {
panic(errors.AssertionFailedf("statsToSet is non-empty %v", statsToSet.internal))
}
statsToSet.Clear()
rc.mu.RLock()
defer rc.mu.RUnlock()

Expand Down
33 changes: 33 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2247,3 +2247,36 @@ func TestConstructRaftEventForReplica(t *testing.T) {
})
}
}

func TestRangeSendStreamStatsString(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

stats := RangeSendStreamStats{
internal: []ReplicaSendStreamStats{
{
IsStateReplicate: false,
HasSendQueue: true,
ReplicaSendQueueStats: ReplicaSendQueueStats{
ReplicaID: 1,
SendQueueCount: 10,
SendQueueBytes: 100,
},
},
{
IsStateReplicate: true,
HasSendQueue: false,
ReplicaSendQueueStats: ReplicaSendQueueStats{
ReplicaID: 2,
SendQueueCount: 0,
SendQueueBytes: 0,
},
},
},
}

require.Equal(t,
"[r1=(is_state_replicate=false has_send_queue=true send_queue_size=100 B / 10 entries), "+
"r2=(is_state_replicate=true has_send_queue=false send_queue_size=0 B / 0 entries)]",
stats.String())
}
Loading