diff --git a/go/vt/vtgate/balancer/balancer.go b/go/vt/vtgate/balancer/balancer.go new file mode 100644 index 00000000000..7fcd800239d --- /dev/null +++ b/go/vt/vtgate/balancer/balancer.go @@ -0,0 +1,368 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package balancer + +import ( + "encoding/json" + "fmt" + "math/rand" + "strings" + "sync" + + "vitess.io/vitess/go/vt/discovery" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +/* + +The tabletBalancer probabalistically orders the list of available tablets into +a ranked order of preference in order to satisfy two high-level goals: + +1. Balance the load across the available replicas +2. Prefer a replica in the same cell as the vtgate if possible + +In some topologies this is trivial to accomplish by simply preferring tablets in the +local cell, assuming there are a proportional number of local tablets in each cell to +satisfy the inbound traffic to the vtgates in that cell. + +However, for topologies with a relatively small number of tablets in each cell, a simple +affinity algorithm does not effectively balance the load. + +As a simple example: + + Given three cells with vtgates, four replicas spread into those cells, where each vtgate + receives an equal query share. If each routes only to its local cell, the tablets will be + unbalanced since two of them receive 1/3 of the queries, but the two replicas in the same + cell will only receive 1/6 of the queries. + + Cell A: 1/3 --> vtgate --> 1/3 => vttablet + + Cell B: 1/3 --> vtgate --> 1/3 => vttablet + + Cell C: 1/3 --> vtgate --> 1/6 => vttablet + \-> 1/6 => vttablet + +Other topologies that can cause similar pathologies include cases where there may be cells +containing replicas but no local vtgates, and/or cells that have only vtgates but no replicas. + +For these topologies, the tabletBalancer can be configured in a mode that proportionally assigns +the output flow to each tablet, regardless of whether or not the topology is balanced. The local +cell is still preferred where possible, but only as long as the global query balance is maintained. + +To accomplish this goal, the balancer is optionally configured into balanced mode and is given: + +* The list of cells that receive inbound traffic to vtgates +* The local cell where the vtgate exists +* The set of tablets and their cells (learned from discovery) + +The model assumes equal probability of a query coming from each cell that has a vtgate, i.e. +traffic is effectively load balanced between the cells with vtgates. + +Given that information, the balancer builds a simple model to determine how much query load +would go to each tablet if vtgate only routed to its local cell. Then if any tablets are +unbalanced, it shifts the desired allocation away from the local cell preference in order to +even out the query load. + +Based on this global model, the vtgate then probabalistically picks a destination for each +query to be sent by ordering the available tablets accordingly. + +Assuming each vtgate is configured with and discovers the same information about the topology, +then each should come the the same conclusion about the global flows, and cooperatively should +converge on the desired balanced query load. + +*/ + +type TabletBalancer interface { + // Randomly shuffle the tablets into an order for routing queries + ShuffleTablets(target *querypb.Target, tablets []*discovery.TabletHealth) + + // Callback when the topology changes to invalidate any cached state + TopologyChanged() +} + +func NewTabletBalancer(mode, localCell, vtGateCells string) TabletBalancer { + if mode == "balanced" { + vtgateCells := strings.Split(vtGateCells, ",") + return &tabletBalancer{ + isBalanced: true, + localCell: localCell, + vtGateCells: vtgateCells, + allocations: map[discovery.KeyspaceShardTabletType]*targetAllocation{}, + } + } + return &tabletBalancer{isBalanced: false, localCell: localCell} +} + +type tabletBalancer struct { + // + // Configuration + // + + // Balancer mode + isBalanced bool + + // The local cell for the vtgate + localCell string + + // The set of cells that have vtgates + vtGateCells []string + + // mu protects the allocation map + mu sync.Mutex + + // + // Allocations for balanced mode, calculated once per target and invalidated + // whenever the topology changes. + // + allocations map[discovery.KeyspaceShardTabletType]*targetAllocation +} + +type targetAllocation struct { + // Target flow per cell based on the number of tablets discovered in the cell + Target map[string]int // json:target + + // Input flows allocated for each cell + Inflows map[string]int + + // Output flows from each vtgate cell to each target cell + Outflows map[string]map[string]int + + // Allocation routed to each tablet from the local cell used for ranking + Allocation map[uint32]int + + // Total allocation which is basically 1,000,000 / len(vtgatecells) + TotalAllocation int +} + +func (b *tabletBalancer) print() string { + allocations, _ := json.Marshal(&b.allocations) + return fmt.Sprintf("LocalCell: %s, VtGateCells: %s, allocations: %s", + b.localCell, b.vtGateCells, string(allocations)) +} + +// ShuffleTablets is the main entry point to the balancer. +// +// It shuffles the tablets into a preference list for routing a given query. +// However, since all tablets should be healthy, the query will almost always go +// to the first tablet in the list, so the balancer ranking algoritm randomly +// shuffles the list to break ties, then chooses a weighted random selection +// based on the balance alorithm to promote to the first in the set. +func (b *tabletBalancer) ShuffleTablets(target *querypb.Target, tablets []*discovery.TabletHealth) { + + numTablets := len(tablets) + + if b.isBalanced { + allocationMap, totalAllocation := b.getAllocation(target, tablets) + + rand.Shuffle(numTablets, func(i, j int) { tablets[i], tablets[j] = tablets[j], tablets[i] }) + + // Do another O(n) seek through the list to effect the weighted sample by picking + // a random point in the allocation space and seeking forward in the list of (randomized) + // tablets to that point, promoting the tablet to the head of the list. + r := rand.Intn(totalAllocation) + for i := 0; i < numTablets; i++ { + flow := allocationMap[tablets[i].Tablet.Alias.Uid] + if r < flow { + tablets[0], tablets[i] = tablets[i], tablets[0] + break + } + r -= flow + } + } else { + // Randomly shuffle, then boost same-cell to the front + rand.Shuffle(numTablets, func(i, j int) { tablets[i], tablets[j] = tablets[j], tablets[i] }) + i := 0 + for j := 0; j < numTablets; j++ { + if tablets[j].Tablet.Alias.Cell == b.localCell { + tablets[i], tablets[j] = tablets[j], tablets[i] + i++ + } + } + } +} + +// TopologyChanged is a callback to indicate the topology changed and any cached +// allocations should be cleared +func (b *tabletBalancer) TopologyChanged() { + b.mu.Lock() + defer b.mu.Unlock() + + b.allocations = map[discovery.KeyspaceShardTabletType]*targetAllocation{} +} + +// To stick with integer arithmetic, use 1,000,000 as the full load +const ALLOCATION = 1000000 + +func (b *tabletBalancer) allocateFlows(allTablets []*discovery.TabletHealth) *targetAllocation { + + a := targetAllocation{} + + // Initialization: Set up some data structures and derived values + a.Target = make(map[string]int) + a.Inflows = make(map[string]int) + a.Outflows = make(map[string]map[string]int) + a.Allocation = make(map[uint32]int) + flowPerVtgateCell := ALLOCATION / len(b.vtGateCells) + flowPerTablet := ALLOCATION / len(allTablets) + cellExistsWithNoTablets := false + + for _, th := range allTablets { + a.Target[th.Tablet.Alias.Cell] += flowPerTablet + } + + // + // First pass: Allocate vtgate flow to the local cell where the vtgate exists + // and along the way figure out if there are any vtgates with no local tablets. + // + for _, cell := range b.vtGateCells { + outflow := map[string]int{} + target := a.Target[cell] + + if target > 0 { + a.Inflows[cell] += flowPerVtgateCell + outflow[cell] = flowPerVtgateCell + } else { + cellExistsWithNoTablets = true + } + + a.Outflows[cell] = outflow + } + + // + // Figure out if there is a shortfall + // + underAllocated := make(map[string]int) + unbalancedFlow := 0 + for cell, allocation := range a.Target { + if a.Inflows[cell] < allocation { + underAllocated[cell] = allocation - a.Inflows[cell] + unbalancedFlow += underAllocated[cell] + } + } + + // + // Second pass: if there are any vtgates with no local tablets, allocate the underallocated amount + // proportionally to all cells that may need it + // + if cellExistsWithNoTablets { + for _, vtgateCell := range b.vtGateCells { + target := a.Target[vtgateCell] + if target != 0 { + continue + } + + for underAllocatedCell, underAllocatedFlow := range underAllocated { + allocation := flowPerVtgateCell * underAllocatedFlow / unbalancedFlow + a.Inflows[underAllocatedCell] += allocation + a.Outflows[vtgateCell][underAllocatedCell] += allocation + } + } + + // Recompute underallocated after these flows were assigned + unbalancedFlow = 0 + underAllocated = make(map[string]int) + for cell, allocation := range a.Target { + if a.Inflows[cell] < allocation { + underAllocated[cell] = allocation - a.Inflows[cell] + unbalancedFlow += underAllocated[cell] + } + } + } + + // + // Third pass: Shift remaining imbalance if any cell is over/under allocated after + // assigning local cell traffic and distributing load from cells without tablets. + // + if /* fudge for integer arithmetic */ unbalancedFlow > 10 { + + // cells which are overallocated + overAllocated := make(map[string]int) + for cell, allocation := range a.Target { + if a.Inflows[cell] > allocation { + overAllocated[cell] = a.Inflows[cell] - allocation + } + } + + // fmt.Printf("outflows %v over %v under %v\n", a.Outflows, overAllocated, underAllocated) + + // + // For each overallocated cell, proportionally shift flow from targets that are overallocated + // to targets that are underallocated. + // + // Note this is an O(N^3) loop, but only over the cells which need adjustment. + // + for _, vtgateCell := range b.vtGateCells { + for underAllocatedCell, underAllocatedFlow := range underAllocated { + for overAllocatedCell, overAllocatedFlow := range overAllocated { + + currentFlow := a.Outflows[vtgateCell][overAllocatedCell] + if currentFlow == 0 { + continue + } + + // Shift a proportional fraction of the amount that the cell is currently allocated weighted + // by the fraction that this vtgate cell is already sending to the overallocated cell, and the + // fraction that the new target is underallocated + // + // Note that the operator order matters -- multiplications need to occur before divisions + // to avoid truncating the integer values. + shiftFlow := overAllocatedFlow * currentFlow * underAllocatedFlow / a.Inflows[overAllocatedCell] / unbalancedFlow + + //fmt.Printf("shift %d %s %s -> %s (over %d current %d in %d under %d unbalanced %d) \n", shiftFlow, vtgateCell, overAllocatedCell, underAllocatedCell, + // overAllocatedFlow, currentFlow, a.Inflows[overAllocatedCell], underAllocatedFlow, unbalancedFlow) + + a.Outflows[vtgateCell][overAllocatedCell] -= shiftFlow + a.Inflows[overAllocatedCell] -= shiftFlow + + a.Inflows[underAllocatedCell] += shiftFlow + a.Outflows[vtgateCell][underAllocatedCell] += shiftFlow + } + } + } + } + + // + // Finally, once the cell flows are all adjusted, figure out the local allocation to each + // tablet in the target cells + // + outflow := a.Outflows[b.localCell] + for _, tablet := range allTablets { + cell := tablet.Tablet.Alias.Cell + flow := outflow[cell] + if flow > 0 { + a.Allocation[tablet.Tablet.Alias.Uid] = flow * flowPerTablet / a.Target[cell] + a.TotalAllocation += flow * flowPerTablet / a.Target[cell] + } + } + + return &a +} + +// getAllocation builds the allocation map if needed and returns a copy of the map +func (b *tabletBalancer) getAllocation(target *querypb.Target, tablets []*discovery.TabletHealth) (map[uint32]int, int) { + + b.mu.Lock() + defer b.mu.Unlock() + + allocation, exists := b.allocations[discovery.KeyFromTarget(target)] + if !exists { + allocation = b.allocateFlows(tablets) + b.allocations[discovery.KeyFromTarget(target)] = allocation + } + + return allocation.Allocation, allocation.TotalAllocation +} diff --git a/go/vt/vtgate/balancer/balancer_test.go b/go/vt/vtgate/balancer/balancer_test.go new file mode 100644 index 00000000000..00396407e1b --- /dev/null +++ b/go/vt/vtgate/balancer/balancer_test.go @@ -0,0 +1,457 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package balancer + +import ( + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/vt/discovery" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" +) + +var nextTestTabletUID int + +func createTestTablet(cell string) *discovery.TabletHealth { + nextTestTabletUID++ + tablet := topo.NewTablet(uint32(nextTestTabletUID), cell, strconv.Itoa(nextTestTabletUID)) + tablet.PortMap["vt"] = 1 + tablet.PortMap["grpc"] = 2 + tablet.Keyspace = "k" + tablet.Shard = "s" + + return &discovery.TabletHealth{ + Tablet: tablet, + Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, + Serving: false, + Stats: nil, + PrimaryTermStartTime: 0, + } +} + +// allow 2% fuzz +const FUZZ = 2 + +func fuzzyEquals(a, b int) bool { + diff := a - b + if diff < 0 { + diff = -diff + } + return diff < a*FUZZ/100 +} + +func TestAllocateFlows(t *testing.T) { + cases := []struct { + test string + tablets []*discovery.TabletHealth + vtgateCells []string + }{ + { + "balanced one tablet per cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d"}, + }, + { + "balanced multiple tablets per cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d"}, + }, + { + "vtgate in cell with no tablets", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d", "e"}, + }, + { + "vtgates in multiple cells with no tablets", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d", "e", "f", "g"}, + }, + { + "imbalanced multiple tablets in one cell", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + }, + []string{"a", "b", "c"}, + }, + { + "imbalanced multiple tablets in multiple cells", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + createTestTablet("d"), + createTestTablet("d"), + createTestTablet("d"), + }, + []string{"a", "b", "c", "d"}, + }, + { + "heavy imbalance", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("c"), + }, + []string{"a", "b", "c", "d"}, + }, + } + + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + + for _, c := range cases { + t.Logf("\n\nTest Case: %s\n\n", c.test) + + tablets := c.tablets + vtGateCells := c.vtgateCells + + tabletsByCell := make(map[string][]*discovery.TabletHealth) + for _, tablet := range tablets { + cell := tablet.Tablet.Alias.Cell + tabletsByCell[cell] = append(tabletsByCell[cell], tablet) + } + + allocationPerTablet := make(map[uint32]int) + expectedPerTablet := ALLOCATION / len(tablets) + + expectedPerCell := make(map[string]int) + for cell := range tabletsByCell { + expectedPerCell[cell] = ALLOCATION / len(tablets) * len(tabletsByCell[cell]) + } + + // Run the balancer over each vtgate cell + for _, localCell := range vtGateCells { + b := NewTabletBalancer("balanced", localCell, strings.Join(vtGateCells, ",")).(*tabletBalancer) + a := b.allocateFlows(tablets) + b.allocations[discovery.KeyFromTarget(target)] = a + + t.Logf("Target Flows %v, Balancer: %s XXX %d %v \n", expectedPerCell, b.print(), len(b.allocations), b.allocations) + + // Accumulate all the output per tablet cell + outflowPerCell := make(map[string]int) + for _, outflow := range a.Outflows { + for tabletCell, flow := range outflow { + if flow < 0 { + t.Errorf("balancer %v negative outflow", b.print()) + } + outflowPerCell[tabletCell] += flow + } + } + + // Check in / out flow to each tablet cell + for cell := range tabletsByCell { + expectedForCell := expectedPerCell[cell] + + if !fuzzyEquals(a.Inflows[cell], expectedForCell) || !fuzzyEquals(outflowPerCell[cell], expectedForCell) { + t.Errorf("Balancer {%s} ExpectedPerCell {%v} did not allocate correct flow to cell %s: expected %d, inflow %d outflow %d", + b.print(), expectedPerCell, cell, expectedForCell, a.Inflows[cell], outflowPerCell[cell]) + } + } + + // Accumulate the allocations for all runs to compare what the system does as a whole + // when routing from all vtgate cells + for uid, flow := range a.Allocation { + allocationPerTablet[uid] += flow + } + } + + // Check that the allocations all add up + for _, tablet := range tablets { + uid := tablet.Tablet.Alias.Uid + + allocation := allocationPerTablet[uid] + if !fuzzyEquals(allocation, expectedPerTablet) { + t.Errorf("did not allocate full allocation to tablet %d: expected %d got %d", + uid, expectedPerTablet, allocation) + } + } + } +} + +func TestBalancedShuffle(t *testing.T) { + cases := []struct { + test string + tablets []*discovery.TabletHealth + vtgateCells []string + }{ + { + "simple balanced", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + + []string{"a", "b", "c", "d"}, + }, + { + "simple unbalanced", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("d"), + }, + + []string{"a", "b", "c", "d"}, + }, + { + "mixed unbalanced", + []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("c"), + createTestTablet("c"), + }, + + []string{"a", "b", "c", "d"}, + }, + } + + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + for _, c := range cases { + t.Logf("\n\nTest Case: %s\n\n", c.test) + + tablets := c.tablets + vtGateCells := c.vtgateCells + + // test unbalanced distribution + + routed := make(map[uint32]int) + + expectedPerCell := make(map[string]int) + for _, tablet := range tablets { + cell := tablet.Tablet.Alias.Cell + expectedPerCell[cell] += ALLOCATION / len(tablets) + } + + // Run the algorithm a bunch of times to get a random enough sample + N := 1000000 + for _, localCell := range vtGateCells { + b := NewTabletBalancer("balanced", localCell, strings.Join(vtGateCells, ",")).(*tabletBalancer) + + for i := 0; i < N/len(vtGateCells); i++ { + b.ShuffleTablets(target, tablets) + if i == 0 { + t.Logf("Target Flows %v, Balancer: %s\n", expectedPerCell, b.print()) + t.Logf(b.print()) + } + + routed[tablets[0].Tablet.Alias.Uid]++ + } + } + + expected := N / len(tablets) + delta := make(map[uint32]int) + for _, tablet := range tablets { + got := routed[tablet.Tablet.Alias.Uid] + delta[tablet.Tablet.Alias.Uid] = got - expected + if !fuzzyEquals(got, expected) { + t.Errorf("routing to tablet %d got %d expected %d", tablet.Tablet.Alias.Uid, got, expected) + } + } + t.Logf("Expected %d per tablet, Routed %v, Delta %v, Max delta %d", N/len(tablets), routed, delta, expected*FUZZ/100) + } +} + +func TestTopologyChanged(t *testing.T) { + allTablets := []*discovery.TabletHealth{ + createTestTablet("a"), + createTestTablet("a"), + createTestTablet("b"), + createTestTablet("b"), + } + + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + + b := NewTabletBalancer("balanced", "b", "a,b").(*tabletBalancer) + + N := 1 + + // initially create a slice of tablets with just the two in cell a + tablets := allTablets + tablets = tablets[0:2] + + for i := 0; i < N; i++ { + b.ShuffleTablets(target, tablets) + allocation, totalAllocation := b.getAllocation(target, tablets) + + if totalAllocation != ALLOCATION/2 { + t.Errorf("totalAllocation mismatch %s", b.print()) + } + + if allocation[allTablets[0].Tablet.Alias.Uid] != ALLOCATION/4 { + t.Errorf("allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell) + } + + if tablets[0].Tablet.Alias.Cell != "a" { + t.Errorf("shuffle promoted wrong tablet from cell %s", tablets[0].Tablet.Alias.Cell) + } + } + + // Run again with the full topology, but without triggering a topology change + // event to cause a reallocation + tablets2 := allTablets + for i := 0; i < N; i++ { + b.ShuffleTablets(target, tablets2) + + allocation, totalAllocation := b.getAllocation(target, tablets2) + + if totalAllocation != ALLOCATION/2 { + t.Errorf("totalAllocation mismatch %s", b.print()) + } + + if allocation[allTablets[0].Tablet.Alias.Uid] != ALLOCATION/4 { + t.Errorf("allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell) + } + + if tablets2[0].Tablet.Alias.Cell != "a" { + t.Errorf("shuffle promoted wrong tablet from cell %s", tablets2[0].Tablet.Alias.Cell) + } + } + + // Trigger toplogy changed event, now traffic should go to b + b.TopologyChanged() + for i := 0; i < N; i++ { + b.ShuffleTablets(target, tablets2) + + allocation, totalAllocation := b.getAllocation(target, tablets2) + + if totalAllocation != ALLOCATION/2 { + t.Errorf("totalAllocation mismatch %s", b.print()) + } + + if allocation[allTablets[0].Tablet.Alias.Uid] != ALLOCATION/4 { + t.Errorf("allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell) + } + + if tablets2[0].Tablet.Alias.Cell != "b" { + t.Errorf("shuffle promoted wrong tablet from cell %s", tablets2[0].Tablet.Alias.Cell) + } + } +} + +func TestAffinityShuffle(t *testing.T) { + balancer := NewTabletBalancer("affinity", "cell1", "") + + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + + ts1 := &discovery.TabletHealth{ + Tablet: topo.NewTablet(1, "cell1", "host1"), + Target: target, + Serving: true, + Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2}, + } + + ts2 := &discovery.TabletHealth{ + Tablet: topo.NewTablet(2, "cell1", "host2"), + Target: target, + Serving: true, + Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2}, + } + + ts3 := &discovery.TabletHealth{ + Tablet: topo.NewTablet(3, "cell2", "host3"), + Target: target, + Serving: true, + Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2}, + } + + ts4 := &discovery.TabletHealth{ + Tablet: topo.NewTablet(4, "cell2", "host4"), + Target: target, + Serving: true, + Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2}, + } + + sameCellTablets := []*discovery.TabletHealth{ts1, ts2} + diffCellTablets := []*discovery.TabletHealth{ts3, ts4} + mixedTablets := []*discovery.TabletHealth{ts1, ts2, ts3, ts4} + + // repeat shuffling 10 times and every time the same cell tablets should be in the front + for i := 0; i < 10; i++ { + balancer.ShuffleTablets(target, sameCellTablets) + assert.Len(t, sameCellTablets, 2, "Wrong number of TabletHealth") + assert.Equal(t, sameCellTablets[0].Tablet.Alias.Cell, "cell1", "Wrong tablet cell") + assert.Equal(t, sameCellTablets[1].Tablet.Alias.Cell, "cell1", "Wrong tablet cell") + + balancer.ShuffleTablets(target, diffCellTablets) + assert.Len(t, diffCellTablets, 2, "should shuffle in only diff cell tablets") + assert.Contains(t, diffCellTablets, ts3, "diffCellTablets should contain %v", ts3) + assert.Contains(t, diffCellTablets, ts4, "diffCellTablets should contain %v", ts4) + + balancer.ShuffleTablets(target, mixedTablets) + assert.Len(t, mixedTablets, 4, "should have 4 tablets, got %+v", mixedTablets) + + assert.Contains(t, mixedTablets[0:2], ts1, "should have same cell tablets in the front, got %+v", mixedTablets) + assert.Contains(t, mixedTablets[0:2], ts2, "should have same cell tablets in the front, got %+v", mixedTablets) + + assert.Contains(t, mixedTablets[2:4], ts3, "should have diff cell tablets in the rear, got %+v", mixedTablets) + assert.Contains(t, mixedTablets[2:4], ts4, "should have diff cell tablets in the rear, got %+v", mixedTablets) + } +} diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index c437721d58d..ec2a1f75821 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -19,7 +19,6 @@ package vtgate import ( "context" "fmt" - "math/rand" "sort" "sync" "sync/atomic" @@ -35,6 +34,7 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/balancer" "vitess.io/vitess/go/vt/vtgate/buffer" "vitess.io/vitess/go/vt/vttablet/queryservice" @@ -51,7 +51,9 @@ var ( bufferImplementation = "keyspace_events" initialTabletTimeout = 30 * time.Second // retryCount is the number of times a query will be retried on error - retryCount = 2 + retryCount = 2 + balancerMode = "affinity" + balancerVtgateCells = "" ) func init() { @@ -59,6 +61,8 @@ func init() { fs.StringVar(&CellsToWatch, "cells_to_watch", "", "comma-separated list of cells for watching tablets") fs.StringVar(&bufferImplementation, "buffer_implementation", "keyspace_events", "Allowed values: healthcheck (legacy implementation), keyspace_events (default)") fs.DurationVar(&initialTabletTimeout, "gateway_initial_tablet_timeout", 30*time.Second, "At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type") + fs.StringVar(&balancerMode, "balancer_mode", "affinity", "Tablet balancer mode, one of affinity (default mode which prefers same-cell replicas) or balanced (attempt to evenly spread query load)") + fs.StringVar(&balancerVtgateCells, "balancer_vtgate_cells", "", "When in balanced mode, a comma-separated list of cells that contain vtgates") fs.IntVar(&retryCount, "retry-count", 2, "retry count") }) } @@ -82,6 +86,9 @@ type TabletGateway struct { // buffer, if enabled, buffers requests during a detected PRIMARY failover. buffer *buffer.Buffer + + // balancer used for routing to tablets + balancer balancer.TabletBalancer } func createHealthCheck(ctx context.Context, retryDelay, timeout time.Duration, ts *topo.Server, cell, cellsToWatch string) discovery.HealthCheck { @@ -110,6 +117,7 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop statusAggregators: make(map[string]*TabletStatusAggregator), } gw.setupBuffering(ctx) + gw.setupBalancer(ctx) gw.QueryService = queryservice.Wrap(nil, gw.withRetry) return gw } @@ -169,6 +177,23 @@ func (gw *TabletGateway) setupBuffering(ctx context.Context) { } } +func (gw *TabletGateway) setupBalancer(ctx context.Context) { + gw.balancer = balancer.NewTabletBalancer(balancerMode, gw.localCell, balancerVtgateCells) + + // subscribe to healthcheck updates so that the balancer can reset its allocation + hcChan := gw.hc.Subscribe() + go func(ctx context.Context, c chan *discovery.TabletHealth, balancer balancer.TabletBalancer) { + for { + select { + case <-ctx.Done(): + return + case <-hcChan: + balancer.TopologyChanged() + } + } + }(ctx, hcChan, gw.balancer) +} + // QueryServiceByAlias satisfies the Gateway interface func (gw *TabletGateway) QueryServiceByAlias(alias *topodatapb.TabletAlias, target *querypb.Target) (queryservice.QueryService, error) { qs, err := gw.hc.TabletConnection(alias, target) @@ -311,7 +336,8 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, err = vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "no healthy tablet available for '%s'", target.String()) break } - gw.shuffleTablets(gw.localCell, tablets) + + gw.balancer.ShuffleTablets(target, tablets) var th *discovery.TabletHealth // skip tablets we tried before @@ -381,53 +407,6 @@ func (gw *TabletGateway) getStatsAggregator(target *querypb.Target) *TabletStatu return aggr } -func (gw *TabletGateway) shuffleTablets(cell string, tablets []*discovery.TabletHealth) { - sameCell, diffCell, sameCellMax := 0, 0, -1 - length := len(tablets) - - // move all same cell tablets to the front, this is O(n) - for { - sameCellMax = diffCell - 1 - sameCell = gw.nextTablet(cell, tablets, sameCell, length, true) - diffCell = gw.nextTablet(cell, tablets, diffCell, length, false) - // either no more diffs or no more same cells should stop the iteration - if sameCell < 0 || diffCell < 0 { - break - } - - if sameCell < diffCell { - // fast forward the `sameCell` lookup to `diffCell + 1`, `diffCell` unchanged - sameCell = diffCell + 1 - } else { - // sameCell > diffCell, swap needed - tablets[sameCell], tablets[diffCell] = tablets[diffCell], tablets[sameCell] - sameCell++ - diffCell++ - } - } - - // shuffle in same cell tablets - for i := sameCellMax; i > 0; i-- { - swap := rand.Intn(i + 1) - tablets[i], tablets[swap] = tablets[swap], tablets[i] - } - - // shuffle in diff cell tablets - for i, diffCellMin := length-1, sameCellMax+1; i > diffCellMin; i-- { - swap := rand.Intn(i-sameCellMax) + diffCellMin - tablets[i], tablets[swap] = tablets[swap], tablets[i] - } -} - -func (gw *TabletGateway) nextTablet(cell string, tablets []*discovery.TabletHealth, offset, length int, sameCell bool) int { - for ; offset < length; offset++ { - if (tablets[offset].Tablet.Alias.Cell == cell) == sameCell { - return offset - } - } - return -1 -} - // TabletsCacheStatus returns a displayable version of the health check cache. func (gw *TabletGateway) TabletsCacheStatus() discovery.TabletsCacheStatusList { return gw.hc.CacheStatus() diff --git a/go/vt/vtgate/tabletgateway_test.go b/go/vt/vtgate/tabletgateway_test.go index 99388551ebf..b860a09d42a 100644 --- a/go/vt/vtgate/tabletgateway_test.go +++ b/go/vt/vtgate/tabletgateway_test.go @@ -30,7 +30,6 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" ) @@ -82,64 +81,6 @@ func TestTabletGatewayBeginExecute(t *testing.T) { }) } -func TestTabletGatewayShuffleTablets(t *testing.T) { - hc := discovery.NewFakeHealthCheck(nil) - tg := NewTabletGateway(context.Background(), hc, nil, "local") - - ts1 := &discovery.TabletHealth{ - Tablet: topo.NewTablet(1, "cell1", "host1"), - Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, - Serving: true, - Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2}, - } - - ts2 := &discovery.TabletHealth{ - Tablet: topo.NewTablet(2, "cell1", "host2"), - Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, - Serving: true, - Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2}, - } - - ts3 := &discovery.TabletHealth{ - Tablet: topo.NewTablet(3, "cell2", "host3"), - Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, - Serving: true, - Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2}, - } - - ts4 := &discovery.TabletHealth{ - Tablet: topo.NewTablet(4, "cell2", "host4"), - Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA}, - Serving: true, - Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.2}, - } - - sameCellTablets := []*discovery.TabletHealth{ts1, ts2} - diffCellTablets := []*discovery.TabletHealth{ts3, ts4} - mixedTablets := []*discovery.TabletHealth{ts1, ts2, ts3, ts4} - // repeat shuffling 10 times and every time the same cell tablets should be in the front - for i := 0; i < 10; i++ { - tg.shuffleTablets("cell1", sameCellTablets) - assert.Len(t, sameCellTablets, 2, "Wrong number of TabletHealth") - assert.Equal(t, sameCellTablets[0].Tablet.Alias.Cell, "cell1", "Wrong tablet cell") - assert.Equal(t, sameCellTablets[1].Tablet.Alias.Cell, "cell1", "Wrong tablet cell") - - tg.shuffleTablets("cell1", diffCellTablets) - assert.Len(t, diffCellTablets, 2, "should shuffle in only diff cell tablets") - assert.Contains(t, diffCellTablets, ts3, "diffCellTablets should contain %v", ts3) - assert.Contains(t, diffCellTablets, ts4, "diffCellTablets should contain %v", ts4) - - tg.shuffleTablets("cell1", mixedTablets) - assert.Len(t, mixedTablets, 4, "should have 4 tablets, got %+v", mixedTablets) - - assert.Contains(t, mixedTablets[0:2], ts1, "should have same cell tablets in the front, got %+v", mixedTablets) - assert.Contains(t, mixedTablets[0:2], ts2, "should have same cell tablets in the front, got %+v", mixedTablets) - - assert.Contains(t, mixedTablets[2:4], ts3, "should have diff cell tablets in the rear, got %+v", mixedTablets) - assert.Contains(t, mixedTablets[2:4], ts4, "should have diff cell tablets in the rear, got %+v", mixedTablets) - } -} - func TestTabletGatewayReplicaTransactionError(t *testing.T) { keyspace := "ks" shard := "0"