Skip to content

Commit

Permalink
txthrottler: filter healthcheck stream by shard
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed May 10, 2023
1 parent daffb1d commit d1fee85
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
29 changes: 16 additions & 13 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
// topology watchers and go/vt/throttler. These are provided here so that they can be overridden
// in tests to generate mocks.
type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck
type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface
type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) (TopologyWatcherInterface, error)
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error)

var (
Expand All @@ -61,8 +61,9 @@ func resetTxThrottlerFactories() {
healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck {
return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","))
}
topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency)
topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) (TopologyWatcherInterface, error) {
cellTabletsFilter, err := discovery.NewFilterByShard([]string{fmt.Sprintf("%s|%s", keyspace, shard)})
return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, cellTabletsFilter, cell, refreshInterval, true, topoReadConcurrency), err
}
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now)
Expand Down Expand Up @@ -307,16 +308,18 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string
result.topologyWatchers = make(
[]TopologyWatcherInterface, 0, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
result.topologyWatchers = append(
result.topologyWatchers,
topologyWatcherFactory(
config.topoServer,
result.healthCheck,
cell,
keyspace,
shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency))
topologyWatcher, err := topologyWatcherFactory(
config.topoServer,
result.healthCheck,
cell,
keyspace,
shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency)
if err != nil {
return nil, err
}
result.topologyWatchers = append(result.topologyWatchers, topologyWatcher)
}
return result, nil
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ func TestEnabledThrottler(t *testing.T) {
return mockHealthCheck
}

topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) (TopologyWatcherInterface, error) {
assert.Equal(t, ts, topoServer)
assert.Contains(t, []string{"cell1", "cell2"}, cell)
assert.Equal(t, "keyspace", keyspace)
assert.Equal(t, "shard", shard)
result := NewMockTopologyWatcherInterface(mockCtrl)
result.EXPECT().Stop()
return result
return result, nil
}

mockThrottler := NewMockThrottlerInterface(mockCtrl)
Expand Down

0 comments on commit d1fee85

Please sign in to comment.