From d1fee851ae8207c799589fe16d029cee5861543e Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 10 May 2023 12:02:47 +0200 Subject: [PATCH 1/5] txthrottler: filter healthcheck stream by shard Signed-off-by: Tim Vaillancourt --- .../tabletserver/txthrottler/tx_throttler.go | 29 ++++++++++--------- .../txthrottler/tx_throttler_test.go | 4 +-- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index b65dbfc20df..dc489be2550 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -44,7 +44,7 @@ import ( // topology watchers and go/vt/throttler. These are provided here so that they can be overridden // in tests to generate mocks. type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck -type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface +type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) (TopologyWatcherInterface, error) type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) var ( @@ -61,8 +61,9 @@ func resetTxThrottlerFactories() { healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) } - topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { - return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency) + topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) (TopologyWatcherInterface, error) { + cellTabletsFilter, err := discovery.NewFilterByShard([]string{fmt.Sprintf("%s|%s", keyspace, shard)}) + return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, cellTabletsFilter, cell, refreshInterval, true, topoReadConcurrency), err } throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) @@ -307,16 +308,18 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string result.topologyWatchers = make( []TopologyWatcherInterface, 0, len(config.healthCheckCells)) for _, cell := range config.healthCheckCells { - result.topologyWatchers = append( - result.topologyWatchers, - topologyWatcherFactory( - config.topoServer, - result.healthCheck, - cell, - keyspace, - shard, - discovery.DefaultTopologyWatcherRefreshInterval, - discovery.DefaultTopoReadConcurrency)) + topologyWatcher, err := topologyWatcherFactory( + config.topoServer, + result.healthCheck, + cell, + keyspace, + shard, + discovery.DefaultTopologyWatcherRefreshInterval, + discovery.DefaultTopoReadConcurrency) + if err != nil { + return nil, err + } + result.topologyWatchers = append(result.topologyWatchers, topologyWatcher) } return result, nil } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index f55b0800ca4..0745ca26fdb 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -70,14 +70,14 @@ func TestEnabledThrottler(t *testing.T) { return mockHealthCheck } - topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { + topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) (TopologyWatcherInterface, error) { assert.Equal(t, ts, topoServer) assert.Contains(t, []string{"cell1", "cell2"}, cell) assert.Equal(t, "keyspace", keyspace) assert.Equal(t, "shard", shard) result := NewMockTopologyWatcherInterface(mockCtrl) result.EXPECT().Stop() - return result + return result, nil } mockThrottler := NewMockThrottlerInterface(mockCtrl) From 927b7216291342588d8e5e9b6d62d162410e4e29 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 19 May 2023 03:32:54 +0200 Subject: [PATCH 2/5] go fmt Signed-off-by: Tim Vaillancourt --- .../tabletserver/txthrottler/tx_throttler.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 356ca3191c9..f6cefe863c6 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -328,13 +328,13 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar []TopologyWatcherInterface, 0, len(config.healthCheckCells)) for _, cell := range config.healthCheckCells { topologyWatcher, err := topologyWatcherFactory( - topoServer, - result.healthCheck, - cell, - target.Keyspace, - target.Shard, - discovery.DefaultTopologyWatcherRefreshInterval, - discovery.DefaultTopoReadConcurrency) + topoServer, + result.healthCheck, + cell, + target.Keyspace, + target.Shard, + discovery.DefaultTopologyWatcherRefreshInterval, + discovery.DefaultTopoReadConcurrency) if err != nil { return nil, err } From 5b3091993ff726005f04f894a03ad95f694566d3 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 19 May 2023 03:34:19 +0200 Subject: [PATCH 3/5] Check err Signed-off-by: Tim Vaillancourt --- go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index f6cefe863c6..5a93251268a 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -59,7 +59,10 @@ func resetTxThrottlerFactories() { } topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) (TopologyWatcherInterface, error) { cellTabletsFilter, err := discovery.NewFilterByShard([]string{fmt.Sprintf("%s|%s", keyspace, shard)}) - return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, cellTabletsFilter, cell, refreshInterval, true, topoReadConcurrency), err + if err != nil { + return nil, err + } + return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, cellTabletsFilter, cell, refreshInterval, true, topoReadConcurrency), nil } throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) From fb76352a94304483871c81ad9dbc63c47e75fc7c Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 25 Jul 2023 23:03:34 +0200 Subject: [PATCH 4/5] Go fmt after conflict resolution Signed-off-by: Tim Vaillancourt --- go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index da31b5e1240..bba06bc74b7 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -18,6 +18,7 @@ package txthrottler import ( "context" + "fmt" "math/rand" "strings" "sync" @@ -323,7 +324,7 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta discovery.DefaultTopologyWatcherRefreshInterval, discovery.DefaultTopoReadConcurrency, ) - if err != nil { + if err != nil { return nil, err } txThrottler.topoWatchers.Add(cell, 1) From bce443c7787516c4376e66b2ad3679fb921cea6e Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 3 Aug 2023 14:48:49 +0200 Subject: [PATCH 5/5] Fix merge conflict resolution Signed-off-by: Tim Vaillancourt --- go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 5ca3487610b..9c111018c07 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -331,10 +331,11 @@ func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, t ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) ts.healthCheckChan = ts.healthCheck.Subscribe() + var err error ts.topologyWatchers = make( map[string]TopologyWatcherInterface, len(ts.healthCheckCells)) for _, cell := range ts.healthCheckCells { - ts.topologyWatchers[cell] = topologyWatcherFactory( + ts.topologyWatchers[cell], err = topologyWatcherFactory( topoServer, ts.healthCheck, cell, @@ -344,7 +345,8 @@ func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, t discovery.DefaultTopoReadConcurrency, ) if err != nil { - return nil, err + log.Errorf("Failed to start topology watcher for cell %s: %v", err) + continue } ts.txThrottler.topoWatchers.Add(cell, 1) }