Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: move splitHealthy to method on grpcTransport #74344

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ func TestComplexScenarios(t *testing.T) {
}
}

// TestSplitHealthy tests that the splitHealthy helper function sorts healthy
// nodes before unhealthy nodes.
// TestSplitHealthy tests that the splitHealthy method sorts healthy nodes
// before unhealthy nodes.
func TestSplitHealthy(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -299,8 +299,12 @@ func TestSplitHealthy(t *testing.T) {
health.Set(i, healthUnhealthy)
}
}
splitHealthy(replicas, health)
if !reflect.DeepEqual(replicas, td.out) {
gt := grpcTransport{
replicas: replicas,
replicaHealth: health,
}
gt.splitHealthy()
if !reflect.DeepEqual(gt.replicas, td.out) {
t.Errorf("splitHealthy(...) = %+v not %+v", replicas, td.out)
}
})
Expand Down
58 changes: 28 additions & 30 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func grpcTransportFactoryImpl(

// We'll map the index of the replica descriptor in its slice to its health.
var health util.FastIntMap
for i, r := range rs {
for i := range rs {
r := &rs[i]
replicas[i] = r.ReplicaDescriptor
healthy := nodeDialer.ConnHealth(r.NodeID, opts.class) == nil
if healthy {
Expand All @@ -127,18 +128,20 @@ func grpcTransportFactoryImpl(
}
}

*transport = grpcTransport{
opts: opts,
nodeDialer: nodeDialer,
class: opts.class,
replicas: replicas,
replicaHealth: health,
}

if !opts.dontConsiderConnHealth {
// Put known-healthy clients first, while otherwise respecting the existing
// Put known-healthy replica first, while otherwise respecting the existing
// ordering of the replicas.
splitHealthy(replicas, health)
transport.splitHealthy()
}

*transport = grpcTransport{
opts: opts,
nodeDialer: nodeDialer,
class: opts.class,
replicas: replicas,
}
return transport, nil
}

Expand All @@ -148,6 +151,9 @@ type grpcTransport struct {
class rpc.ConnectionClass

replicas []roachpb.ReplicaDescriptor
// replicaHealth maps replica index within the replicas slice to healthHealthy
// if healthy, and healthUnhealthy if unhealthy. Used by splitHealthy.
replicaHealth util.FastIntMap
// nextReplicaIdx represents the index into replicas of the next replica to be
// tried.
nextReplicaIdx int
Expand Down Expand Up @@ -262,38 +268,30 @@ func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
}
}

// splitHealthy splits the provided client slice into healthy clients and
// unhealthy clients, based on their connection state. Healthy clients will
// be rearranged first in the slice, and unhealthy clients will be rearranged
// last. Within these two groups, the rearrangement will be stable. The function
// will then return the number of healthy clients.
// The input FastIntMap maps index within the input replicas slice to an integer
// healthHealthy or healthUnhealthy.
func splitHealthy(replicas []roachpb.ReplicaDescriptor, health util.FastIntMap) {
sort.Stable(&byHealth{replicas: replicas, health: health})
// splitHealthy splits the grpcTransport's replica slice into healthy replica
// and unhealthy replica, based on their connection state. Healthy replicas will
// be rearranged first in the replicas slice, and unhealthy replicas will be
// rearranged last. Within these two groups, the rearrangement will be stable.
func (gt *grpcTransport) splitHealthy() {
sort.Stable((*byHealth)(gt))
}

// byHealth sorts a slice of batchClients by their health with healthy first.
type byHealth struct {
replicas []roachpb.ReplicaDescriptor
// This map maps replica index within the replicas slice to healthHealthy if
// healthy, and healthUnhealthy if unhealthy.
health util.FastIntMap
}
// byHealth sorts a slice of replicas by their health with healthy first.
type byHealth grpcTransport

func (h *byHealth) Len() int { return len(h.replicas) }
func (h *byHealth) Swap(i, j int) {
h.replicas[i], h.replicas[j] = h.replicas[j], h.replicas[i]
oldI := h.health.GetDefault(i)
h.health.Set(i, h.health.GetDefault(j))
h.health.Set(j, oldI)
oldI := h.replicaHealth.GetDefault(i)
h.replicaHealth.Set(i, h.replicaHealth.GetDefault(j))
h.replicaHealth.Set(j, oldI)
}
func (h *byHealth) Less(i, j int) bool {
ih, ok := h.health.Get(i)
ih, ok := h.replicaHealth.Get(i)
if !ok {
panic(fmt.Sprintf("missing health info for %s", h.replicas[i]))
}
jh, ok := h.health.Get(j)
jh, ok := h.replicaHealth.Get(j)
if !ok {
panic(fmt.Sprintf("missing health info for %s", h.replicas[j]))
}
Expand Down