Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txthrottler: filter topology watcher by shard #13062

16 changes: 12 additions & 4 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package txthrottler

import (
"context"
"fmt"
"math/rand"
"strings"
"sync"
Expand All @@ -42,7 +43,7 @@ import (
// topology watchers and go/vt/throttler. These are provided here so that they can be overridden
// in tests to generate mocks.
type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck
type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface
type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) (TopologyWatcherInterface, error)
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error)

var (
Expand All @@ -55,8 +56,12 @@ func resetTxThrottlerFactories() {
healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck {
return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","))
}
topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency)
topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) (TopologyWatcherInterface, error) {
cellTabletsFilter, err := discovery.NewFilterByShard([]string{fmt.Sprintf("%s|%s", keyspace, shard)})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conventional way to combine keyspace&shard is with /. Is there a reason the above uses | to separate them? If there's no particular reason, please accept the below suggestion:

Suggested change
cellTabletsFilter, err := discovery.NewFilterByShard([]string{fmt.Sprintf("%s|%s", keyspace, shard)})
cellTabletsFilter, err := discovery.NewFilterByShard([]string{fmt.Sprintf("%s/%s", keyspace, shard)})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shlomi-noach for some reason discovery.NewFilterByShard uses | as a delimiter for filters. It's strange because later in that func the format you suggest is used in an error 😄

if err != nil {
return nil, err
}
return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, cellTabletsFilter, cell, refreshInterval, true, topoReadConcurrency), nil
}
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now)
Expand Down Expand Up @@ -310,7 +315,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta
state.topologyWatchers = make(
map[string]TopologyWatcherInterface, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
state.topologyWatchers[cell] = topologyWatcherFactory(
state.topologyWatchers[cell], err = topologyWatcherFactory(
txThrottler.topoServer,
state.healthCheck,
cell,
Expand All @@ -319,6 +324,9 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency,
)
if err != nil {
return nil, err
}
txThrottler.topoWatchers.Add(cell, 1)
}
return state, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ func TestEnabledThrottler(t *testing.T) {
return mockHealthCheck
}

topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) (TopologyWatcherInterface, error) {
assert.Equal(t, ts, topoServer)
assert.Contains(t, []string{"cell1", "cell2"}, cell)
assert.Equal(t, "keyspace", keyspace)
assert.Equal(t, "shard", shard)
result := NewMockTopologyWatcherInterface(mockCtrl)
result.EXPECT().Stop()
return result
return result, nil
}

mockThrottler := NewMockThrottlerInterface(mockCtrl)
Expand Down