Skip to content

Commit

Permalink
vtgate: use a consistent hash when selecting target tablets
Browse files Browse the repository at this point in the history
This changes tablet selection from being purely random (with a preference for local tablets) to being based on the result of a consistent hash using the session's UUID.

This effectively means that incoming queries via MySQL connections will be routed to the same tablet unless there is a change in the topology (e.g. a new tablet being added, or a tablet being detected as unhealthy). But even if such a topolgy change is detected, only a subset of all incoming connections will start using a different tablet.

We have clusters that have a large amount of replicas that we can pick from, but picking a random replica on each query can lead to a very inconsistent view of our data, especially if there's a high variance of replication lag between replicas.

Signed-off-by: Arthur Schreiber <[email protected]>
  • Loading branch information
arthurschreiber committed Dec 14, 2022
1 parent 397fbe2 commit d57f40e
Show file tree
Hide file tree
Showing 8 changed files with 1,033 additions and 771 deletions.
1,462 changes: 736 additions & 726 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (vte *VTExplain) newTablet(opts *Options, t *topodatapb.Tablet) *explainTab

tablet.QueryService = queryservice.Wrap(
nil,
func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService, name string, inTransaction bool, inner func(context.Context, *querypb.Target, queryservice.QueryService) (bool, error)) error {
func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService, name string, inTransaction bool, sessionUUID string, inner func(context.Context, *querypb.Target, queryservice.QueryService) (bool, error)) error {
return fmt.Errorf("explainTablet does not implement %s", name)
},
)
Expand Down
123 changes: 114 additions & 9 deletions go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"fmt"
"math/rand"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/cespare/xxhash/v2"
"github.com/spf13/pflag"

"vitess.io/vitess/go/mysql/collations"
Expand Down Expand Up @@ -243,7 +245,7 @@ func (gw *TabletGateway) CacheStatus() TabletCacheStatusList {
// withRetry also adds shard information to errors returned from the inner QueryService, so
// withShardError should not be combined with withRetry.
func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, _ queryservice.QueryService,
_ string, inTransaction bool, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error {
_ string, inTransaction bool, sessionUUID string, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error {
// for transactions, we connect to a specific tablet instead of letting gateway choose one
if inTransaction && target.TabletType != topodatapb.TabletType_PRIMARY {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "tabletGateway's query service can only be used for non-transactional queries on replicas")
Expand Down Expand Up @@ -311,16 +313,16 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target,
err = vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "no healthy tablet available for '%s'", target.String())
break
}
gw.shuffleTablets(gw.localCell, tablets)

var th *discovery.TabletHealth
// skip tablets we tried before
for _, t := range tablets {
if _, ok := invalidTablets[topoproto.TabletAliasString(t.Tablet.Alias)]; !ok {
th = t
break
}

// Old tablet selection implementation
if sessionUUID == "" {
th = gw.selectRandomTablet(tablets, invalidTablets)
} else {
th = gw.selectConsistentTablet(sessionUUID, tablets, invalidTablets)
}

if th == nil {
// do not override error from last attempt.
if err == nil {
Expand Down Expand Up @@ -354,7 +356,7 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target,

// withShardError adds shard information to errors returned from the inner QueryService.
func (gw *TabletGateway) withShardError(ctx context.Context, target *querypb.Target, conn queryservice.QueryService,
_ string, _ bool, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error {
_ string, _ bool, sessionUUID string, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error {
_, err := inner(ctx, target, conn)
return NewShardError(err, target)
}
Expand All @@ -381,6 +383,109 @@ func (gw *TabletGateway) getStatsAggregator(target *querypb.Target) *TabletStatu
return aggr
}

// Select a random tablet out of the list of given tablets.
//
// Prefers tablets local to the TabletGateway's cell over tablets external to it.
func (gw *TabletGateway) selectRandomTablet(tablets []*discovery.TabletHealth, invalidTablets map[string]bool) *discovery.TabletHealth {
gw.shuffleTablets(gw.localCell, tablets)

// skip tablets we tried before
for _, t := range tablets {
if _, skip := invalidTablets[topoproto.TabletAliasString(t.Tablet.Alias)]; !skip {
return t
}
}

return nil
}

func NewConsistentHash() *ConsistentHash {
return &ConsistentHash{
replicas: 10,
hashMap: make(map[uint64]*discovery.TabletHealth),
}
}

type ConsistentHash struct {
nodes []uint64
replicas int
hashMap map[uint64]*discovery.TabletHealth
}

func (ch *ConsistentHash) Add(tabletHealth *discovery.TabletHealth) {
for i := 0; i < ch.replicas; i++ {
hash := xxhash.Sum64String(topoproto.TabletAliasString(tabletHealth.Tablet.Alias) + "-" + strconv.Itoa(i))
ch.nodes = append(ch.nodes, hash)
ch.hashMap[hash] = tabletHealth
}
}

func (ch *ConsistentHash) Sort() {
sort.SliceStable(ch.nodes, func(i, j int) bool { return ch.nodes[i] < ch.nodes[j] })
}

func (ch *ConsistentHash) Get(sessionUUID string) *discovery.TabletHealth {
hash := xxhash.Sum64String(sessionUUID)

i := sort.Search(len(ch.nodes), func(i int) bool {
return ch.nodes[i] >= hash
})

if i == len(ch.nodes) {
i = 0
}

return ch.hashMap[ch.nodes[i]]
}

// Select a random tablet out of the list of given tablets, consistently selecting the same tablet for the same session UUID.
//
// If the list of passed in tablets, or the list of invalid tablets changes, tablet selection will change accordingly (but stay consistent
// barring any further changes to these two lists).
//
// Prefers tablets local to the TabletGateway's cell over tablets external to it.
func (gw *TabletGateway) selectConsistentTablet(sessionUUID string, tablets []*discovery.TabletHealth, invalidTablets map[string]bool) *discovery.TabletHealth {
var localTablets []*discovery.TabletHealth
var externalTablets []*discovery.TabletHealth

for _, tablet := range tablets {
// Skip tablets we've tried already
if _, skip := invalidTablets[topoproto.TabletAliasString(tablet.Tablet.Alias)]; skip {
continue
}

if tablet.Tablet.Alias.GetCell() == gw.localCell {
localTablets = append(localTablets, tablet)
} else {
externalTablets = append(externalTablets, tablet)
}
}

if len(localTablets) > 0 {
hash := NewConsistentHash()

for _, tablet := range localTablets {
hash.Add(tablet)
}
hash.Sort()

return hash.Get(sessionUUID)
}

if len(externalTablets) > 0 {
hash := NewConsistentHash()

for _, tablet := range externalTablets {
hash.Add(tablet)
}
hash.Sort()

return hash.Get(sessionUUID)
}

return nil
}

func (gw *TabletGateway) shuffleTablets(cell string, tablets []*discovery.TabletHealth) {
sameCell, diffCell, sameCellMax := 0, 0, -1
length := len(tablets)
Expand Down
106 changes: 104 additions & 2 deletions go/vt/vtgate/tabletgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
)

Expand Down Expand Up @@ -84,7 +85,7 @@ func TestTabletGatewayBeginExecute(t *testing.T) {

func TestTabletGatewayShuffleTablets(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)
tg := NewTabletGateway(context.Background(), hc, nil, "local")
tg := NewTabletGateway(context.Background(), hc, nil, "cell1")

ts1 := &discovery.TabletHealth{
Tablet: topo.NewTablet(1, "cell1", "host1"),
Expand All @@ -108,7 +109,7 @@ func TestTabletGatewayShuffleTablets(t *testing.T) {
}

ts4 := &discovery.TabletHealth{
Tablet: topo.NewTablet(4, "cell2", "host4"),
Tablet: topo.NewTablet(4, "cell3", "host4"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
Expand Down Expand Up @@ -137,6 +138,107 @@ func TestTabletGatewayShuffleTablets(t *testing.T) {

assert.Contains(t, mixedTablets[2:4], ts3, "should have diff cell tablets in the rear, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[2:4], ts4, "should have diff cell tablets in the rear, got %+v", mixedTablets)

tg.shuffleTablets("cell2", mixedTablets)
assert.Len(t, mixedTablets, 4, "should have 4 tablets, got %+v", mixedTablets)

assert.Contains(t, mixedTablets[0:1], ts3, "should have same cell tablets in the front, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[1:4], ts1, "should have diff cell tablets in the rear, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[1:4], ts2, "should have diff cell tablets in the rear, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[1:4], ts4, "should have diff cell tablets in the rear, got %+v", mixedTablets)

tg.shuffleTablets("cell3", mixedTablets)
assert.Len(t, mixedTablets, 4, "should have 4 tablets, got %+v", mixedTablets)

assert.Contains(t, mixedTablets[0:1], ts4, "should have same cell tablets in the front, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[1:4], ts1, "should have diff cell tablets in the rear, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[1:4], ts2, "should have diff cell tablets in the rear, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[1:4], ts3, "should have diff cell tablets in the rear, got %+v", mixedTablets)
}
}

func TestTabletGatewaySelectConsistentTablet(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)
tg := NewTabletGateway(context.Background(), hc, nil, "cell1")

ts1 := &discovery.TabletHealth{
Tablet: topo.NewTablet(1, "cell1", "host1"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
}

ts2 := &discovery.TabletHealth{
Tablet: topo.NewTablet(2, "cell1", "host2"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
}

ts3 := &discovery.TabletHealth{
Tablet: topo.NewTablet(3, "cell2", "host3"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
}

ts4 := &discovery.TabletHealth{
Tablet: topo.NewTablet(4, "cell3", "host4"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
}

tablets := []*discovery.TabletHealth{ts1, ts2, ts3, ts4}

var tablet *discovery.TabletHealth

// Test that connections try to re-use the same tablets
for i := 0; i < 10; i++ {
tablet = tg.selectConsistentTablet("ae3750e4-c011-43f3-bb24-162f4c89ee79", tablets, make(map[string]bool))
assert.Equal(t, "host1", tablet.Tablet.Hostname)

tablet = tg.selectConsistentTablet("445ae551-a78f-4287-9b61-9d6a5192acfd", tablets, make(map[string]bool))
assert.Equal(t, "host2", tablet.Tablet.Hostname)
}

// Marking host1 as failed moves queries from host1 to host2
tablet = tg.selectConsistentTablet("ae3750e4-c011-43f3-bb24-162f4c89ee79", tablets, map[string]bool{
topoproto.TabletAliasString(ts1.Tablet.Alias): true,
})
assert.Equal(t, "host2", tablet.Tablet.Hostname)

// Marking host2 as failed doesn't affect queries to host1
tablet = tg.selectConsistentTablet("ae3750e4-c011-43f3-bb24-162f4c89ee79", tablets, map[string]bool{
topoproto.TabletAliasString(ts2.Tablet.Alias): true,
})
assert.Equal(t, "host1", tablet.Tablet.Hostname)

// Marking host2 as failed moves queries from host2 to host1
tablet = tg.selectConsistentTablet("445ae551-a78f-4287-9b61-9d6a5192acfd", tablets, map[string]bool{
topoproto.TabletAliasString(ts2.Tablet.Alias): true,
})
assert.Equal(t, "host1", tablet.Tablet.Hostname)

// Marking host2 as failed doesn't affect queries to host2
tablet = tg.selectConsistentTablet("445ae551-a78f-4287-9b61-9d6a5192acfd", tablets, map[string]bool{
topoproto.TabletAliasString(ts1.Tablet.Alias): true,
})
assert.Equal(t, "host2", tablet.Tablet.Hostname)

// Mark all local cell tablets as failed
for i := 0; i < 10; i++ {
tablet = tg.selectConsistentTablet("ae3750e4-c011-43f3-bb24-162f4c89ee79", tablets, map[string]bool{
topoproto.TabletAliasString(ts1.Tablet.Alias): true,
topoproto.TabletAliasString(ts2.Tablet.Alias): true,
})
assert.Equal(t, "host3", tablet.Tablet.Hostname)

tablet = tg.selectConsistentTablet("445ae551-a78f-4287-9b61-9d6a5192acfd", tablets, map[string]bool{
topoproto.TabletAliasString(ts1.Tablet.Alias): true,
topoproto.TabletAliasString(ts2.Tablet.Alias): true,
})
assert.Equal(t, "host4", tablet.Tablet.Hostname)
}
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/queryservice/fakes/error_query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// ErrorQueryService is an object that returns an error for all methods.
var ErrorQueryService = queryservice.Wrap(
nil,
func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService, name string, inTransaction bool, inner func(context.Context, *querypb.Target, queryservice.QueryService) (bool, error)) error {
func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService, name string, inTransaction bool, sessionUUID string, inner func(context.Context, *querypb.Target, queryservice.QueryService) (bool, error)) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
},
)
Loading

0 comments on commit d57f40e

Please sign in to comment.