From d1fee851ae8207c799589fe16d029cee5861543e Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 10 May 2023 12:02:47 +0200 Subject: [PATCH] txthrottler: filter healthcheck stream by shard Signed-off-by: Tim Vaillancourt --- .../tabletserver/txthrottler/tx_throttler.go | 29 ++++++++++--------- .../txthrottler/tx_throttler_test.go | 4 +-- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index b65dbfc20df..dc489be2550 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -44,7 +44,7 @@ import ( // topology watchers and go/vt/throttler. These are provided here so that they can be overridden // in tests to generate mocks. type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck -type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface +type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) (TopologyWatcherInterface, error) type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) var ( @@ -61,8 +61,9 @@ func resetTxThrottlerFactories() { healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) } - topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { - return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency) + topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) (TopologyWatcherInterface, error) { + cellTabletsFilter, err := discovery.NewFilterByShard([]string{fmt.Sprintf("%s|%s", keyspace, shard)}) + return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, cellTabletsFilter, cell, refreshInterval, true, topoReadConcurrency), err } throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) @@ -307,16 +308,18 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string result.topologyWatchers = make( []TopologyWatcherInterface, 0, len(config.healthCheckCells)) for _, cell := range config.healthCheckCells { - result.topologyWatchers = append( - result.topologyWatchers, - topologyWatcherFactory( - config.topoServer, - result.healthCheck, - cell, - keyspace, - shard, - discovery.DefaultTopologyWatcherRefreshInterval, - discovery.DefaultTopoReadConcurrency)) + topologyWatcher, err := topologyWatcherFactory( + config.topoServer, + result.healthCheck, + cell, + keyspace, + shard, + discovery.DefaultTopologyWatcherRefreshInterval, + discovery.DefaultTopoReadConcurrency) + if err != nil { + return nil, err + } + result.topologyWatchers = append(result.topologyWatchers, topologyWatcher) } return result, nil } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index f55b0800ca4..0745ca26fdb 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -70,14 +70,14 @@ func TestEnabledThrottler(t *testing.T) { return mockHealthCheck } - topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { + topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) (TopologyWatcherInterface, error) { assert.Equal(t, ts, topoServer) assert.Contains(t, []string{"cell1", "cell2"}, cell) assert.Equal(t, "keyspace", keyspace) assert.Equal(t, "shard", shard) result := NewMockTopologyWatcherInterface(mockCtrl) result.EXPECT().Stop() - return result + return result, nil } mockThrottler := NewMockThrottlerInterface(mockCtrl)