Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Consistent Hashing for tablet selection #11959

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,462 changes: 736 additions & 726 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (vte *VTExplain) newTablet(opts *Options, t *topodatapb.Tablet) *explainTab

tablet.QueryService = queryservice.Wrap(
nil,
func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService, name string, inTransaction bool, inner func(context.Context, *querypb.Target, queryservice.QueryService) (bool, error)) error {
func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService, name string, inTransaction bool, sessionUUID string, inner func(context.Context, *querypb.Target, queryservice.QueryService) (bool, error)) error {
return fmt.Errorf("explainTablet does not implement %s", name)
},
)
Expand Down
123 changes: 114 additions & 9 deletions go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"fmt"
"math/rand"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/cespare/xxhash/v2"
"github.com/spf13/pflag"

"vitess.io/vitess/go/mysql/collations"
Expand Down Expand Up @@ -243,7 +245,7 @@ func (gw *TabletGateway) CacheStatus() TabletCacheStatusList {
// withRetry also adds shard information to errors returned from the inner QueryService, so
// withShardError should not be combined with withRetry.
func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, _ queryservice.QueryService,
_ string, inTransaction bool, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error {
_ string, inTransaction bool, sessionUUID string, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error {
// for transactions, we connect to a specific tablet instead of letting gateway choose one
if inTransaction && target.TabletType != topodatapb.TabletType_PRIMARY {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "tabletGateway's query service can only be used for non-transactional queries on replicas")
Expand Down Expand Up @@ -311,16 +313,16 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target,
err = vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "no healthy tablet available for '%s'", target.String())
break
}
gw.shuffleTablets(gw.localCell, tablets)

var th *discovery.TabletHealth
// skip tablets we tried before
for _, t := range tablets {
if _, ok := invalidTablets[topoproto.TabletAliasString(t.Tablet.Alias)]; !ok {
th = t
break
}

// Old tablet selection implementation
if sessionUUID == "" {
th = gw.selectRandomTablet(tablets, invalidTablets)
} else {
th = gw.selectConsistentTablet(sessionUUID, tablets, invalidTablets)
}
Comment on lines +319 to 324
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could put the tablet selection logic behind a CLI flag, if there's any concern in changing this logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that should be the way to move forward by putting behind a flag like tablet-selection-algorithm


if th == nil {
// do not override error from last attempt.
if err == nil {
Expand Down Expand Up @@ -354,7 +356,7 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target,

// withShardError adds shard information to errors returned from the inner QueryService.
func (gw *TabletGateway) withShardError(ctx context.Context, target *querypb.Target, conn queryservice.QueryService,
_ string, _ bool, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error {
_ string, _ bool, sessionUUID string, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error {
_, err := inner(ctx, target, conn)
return NewShardError(err, target)
}
Expand All @@ -381,6 +383,109 @@ func (gw *TabletGateway) getStatsAggregator(target *querypb.Target) *TabletStatu
return aggr
}

// Select a random tablet out of the list of given tablets.
//
// Prefers tablets local to the TabletGateway's cell over tablets external to it.
func (gw *TabletGateway) selectRandomTablet(tablets []*discovery.TabletHealth, invalidTablets map[string]bool) *discovery.TabletHealth {
gw.shuffleTablets(gw.localCell, tablets)

// skip tablets we tried before
for _, t := range tablets {
if _, skip := invalidTablets[topoproto.TabletAliasString(t.Tablet.Alias)]; !skip {
return t
}
}

return nil
}

func NewConsistentHash() *ConsistentHash {
return &ConsistentHash{
replicas: 10,
hashMap: make(map[uint64]*discovery.TabletHealth),
}
}

type ConsistentHash struct {
nodes []uint64
replicas int
hashMap map[uint64]*discovery.TabletHealth
}

func (ch *ConsistentHash) Add(tabletHealth *discovery.TabletHealth) {
for i := 0; i < ch.replicas; i++ {
hash := xxhash.Sum64String(topoproto.TabletAliasString(tabletHealth.Tablet.Alias) + "-" + strconv.Itoa(i))
ch.nodes = append(ch.nodes, hash)
ch.hashMap[hash] = tabletHealth
}
}

func (ch *ConsistentHash) Sort() {
sort.SliceStable(ch.nodes, func(i, j int) bool { return ch.nodes[i] < ch.nodes[j] })
}

func (ch *ConsistentHash) Get(sessionUUID string) *discovery.TabletHealth {
hash := xxhash.Sum64String(sessionUUID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be done once on the connection, it is not required on every query.


i := sort.Search(len(ch.nodes), func(i int) bool {
return ch.nodes[i] >= hash
})

if i == len(ch.nodes) {
i = 0
}

return ch.hashMap[ch.nodes[i]]
}

// Select a random tablet out of the list of given tablets, consistently selecting the same tablet for the same session UUID.
//
// If the list of passed in tablets, or the list of invalid tablets changes, tablet selection will change accordingly (but stay consistent
// barring any further changes to these two lists).
//
// Prefers tablets local to the TabletGateway's cell over tablets external to it.
func (gw *TabletGateway) selectConsistentTablet(sessionUUID string, tablets []*discovery.TabletHealth, invalidTablets map[string]bool) *discovery.TabletHealth {
var localTablets []*discovery.TabletHealth
var externalTablets []*discovery.TabletHealth

for _, tablet := range tablets {
// Skip tablets we've tried already
if _, skip := invalidTablets[topoproto.TabletAliasString(tablet.Tablet.Alias)]; skip {
continue
}

if tablet.Tablet.Alias.GetCell() == gw.localCell {
localTablets = append(localTablets, tablet)
} else {
externalTablets = append(externalTablets, tablet)
}
}

if len(localTablets) > 0 {
hash := NewConsistentHash()
Copy link
Contributor Author

@arthurschreiber arthurschreiber Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of re-generating the hashes here on each call, we could generate them whenever we get notified of a topology event (via a topology watcher) and then clone the hash struct and it's data when we need to make modifications during tablet selection.

That'd reduce the number of calls to xxhash down to just one (for the session UUID). And even that could be changed to just happen once per query.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all should be done as part of health check subscription calls.


for _, tablet := range localTablets {
hash.Add(tablet)
}
hash.Sort()

return hash.Get(sessionUUID)
}

if len(externalTablets) > 0 {
hash := NewConsistentHash()

for _, tablet := range externalTablets {
hash.Add(tablet)
}
hash.Sort()

return hash.Get(sessionUUID)
}

return nil
}

func (gw *TabletGateway) shuffleTablets(cell string, tablets []*discovery.TabletHealth) {
sameCell, diffCell, sameCellMax := 0, 0, -1
length := len(tablets)
Expand Down
106 changes: 104 additions & 2 deletions go/vt/vtgate/tabletgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
)

Expand Down Expand Up @@ -84,7 +85,7 @@ func TestTabletGatewayBeginExecute(t *testing.T) {

func TestTabletGatewayShuffleTablets(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)
tg := NewTabletGateway(context.Background(), hc, nil, "local")
tg := NewTabletGateway(context.Background(), hc, nil, "cell1")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand how the existing shuffleTablets test cases where even passing? 🤔


ts1 := &discovery.TabletHealth{
Tablet: topo.NewTablet(1, "cell1", "host1"),
Expand All @@ -108,7 +109,7 @@ func TestTabletGatewayShuffleTablets(t *testing.T) {
}

ts4 := &discovery.TabletHealth{
Tablet: topo.NewTablet(4, "cell2", "host4"),
Tablet: topo.NewTablet(4, "cell3", "host4"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
Expand Down Expand Up @@ -137,6 +138,107 @@ func TestTabletGatewayShuffleTablets(t *testing.T) {

assert.Contains(t, mixedTablets[2:4], ts3, "should have diff cell tablets in the rear, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[2:4], ts4, "should have diff cell tablets in the rear, got %+v", mixedTablets)

tg.shuffleTablets("cell2", mixedTablets)
assert.Len(t, mixedTablets, 4, "should have 4 tablets, got %+v", mixedTablets)

assert.Contains(t, mixedTablets[0:1], ts3, "should have same cell tablets in the front, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[1:4], ts1, "should have diff cell tablets in the rear, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[1:4], ts2, "should have diff cell tablets in the rear, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[1:4], ts4, "should have diff cell tablets in the rear, got %+v", mixedTablets)

tg.shuffleTablets("cell3", mixedTablets)
assert.Len(t, mixedTablets, 4, "should have 4 tablets, got %+v", mixedTablets)

assert.Contains(t, mixedTablets[0:1], ts4, "should have same cell tablets in the front, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[1:4], ts1, "should have diff cell tablets in the rear, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[1:4], ts2, "should have diff cell tablets in the rear, got %+v", mixedTablets)
assert.Contains(t, mixedTablets[1:4], ts3, "should have diff cell tablets in the rear, got %+v", mixedTablets)
}
}

func TestTabletGatewaySelectConsistentTablet(t *testing.T) {
hc := discovery.NewFakeHealthCheck(nil)
tg := NewTabletGateway(context.Background(), hc, nil, "cell1")

ts1 := &discovery.TabletHealth{
Tablet: topo.NewTablet(1, "cell1", "host1"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
}

ts2 := &discovery.TabletHealth{
Tablet: topo.NewTablet(2, "cell1", "host2"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
}

ts3 := &discovery.TabletHealth{
Tablet: topo.NewTablet(3, "cell2", "host3"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
}

ts4 := &discovery.TabletHealth{
Tablet: topo.NewTablet(4, "cell3", "host4"),
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2},
}

tablets := []*discovery.TabletHealth{ts1, ts2, ts3, ts4}

var tablet *discovery.TabletHealth

// Test that connections try to re-use the same tablets
for i := 0; i < 10; i++ {
tablet = tg.selectConsistentTablet("ae3750e4-c011-43f3-bb24-162f4c89ee79", tablets, make(map[string]bool))
assert.Equal(t, "host1", tablet.Tablet.Hostname)

tablet = tg.selectConsistentTablet("445ae551-a78f-4287-9b61-9d6a5192acfd", tablets, make(map[string]bool))
assert.Equal(t, "host2", tablet.Tablet.Hostname)
}

// Marking host1 as failed moves queries from host1 to host2
tablet = tg.selectConsistentTablet("ae3750e4-c011-43f3-bb24-162f4c89ee79", tablets, map[string]bool{
topoproto.TabletAliasString(ts1.Tablet.Alias): true,
})
assert.Equal(t, "host2", tablet.Tablet.Hostname)

// Marking host2 as failed doesn't affect queries to host1
tablet = tg.selectConsistentTablet("ae3750e4-c011-43f3-bb24-162f4c89ee79", tablets, map[string]bool{
topoproto.TabletAliasString(ts2.Tablet.Alias): true,
})
assert.Equal(t, "host1", tablet.Tablet.Hostname)

// Marking host2 as failed moves queries from host2 to host1
tablet = tg.selectConsistentTablet("445ae551-a78f-4287-9b61-9d6a5192acfd", tablets, map[string]bool{
topoproto.TabletAliasString(ts2.Tablet.Alias): true,
})
assert.Equal(t, "host1", tablet.Tablet.Hostname)

// Marking host2 as failed doesn't affect queries to host2
tablet = tg.selectConsistentTablet("445ae551-a78f-4287-9b61-9d6a5192acfd", tablets, map[string]bool{
topoproto.TabletAliasString(ts1.Tablet.Alias): true,
})
assert.Equal(t, "host2", tablet.Tablet.Hostname)

// Mark all local cell tablets as failed
for i := 0; i < 10; i++ {
tablet = tg.selectConsistentTablet("ae3750e4-c011-43f3-bb24-162f4c89ee79", tablets, map[string]bool{
topoproto.TabletAliasString(ts1.Tablet.Alias): true,
topoproto.TabletAliasString(ts2.Tablet.Alias): true,
})
assert.Equal(t, "host3", tablet.Tablet.Hostname)

tablet = tg.selectConsistentTablet("445ae551-a78f-4287-9b61-9d6a5192acfd", tablets, map[string]bool{
topoproto.TabletAliasString(ts1.Tablet.Alias): true,
topoproto.TabletAliasString(ts2.Tablet.Alias): true,
})
assert.Equal(t, "host4", tablet.Tablet.Hostname)
}
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/queryservice/fakes/error_query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// ErrorQueryService is an object that returns an error for all methods.
var ErrorQueryService = queryservice.Wrap(
nil,
func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService, name string, inTransaction bool, inner func(context.Context, *querypb.Target, queryservice.QueryService) (bool, error)) error {
func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService, name string, inTransaction bool, sessionUUID string, inner func(context.Context, *querypb.Target, queryservice.QueryService) (bool, error)) error {
return fmt.Errorf("ErrorQueryService does not implement any method")
},
)
Loading