From 78f98fc70ae18b92d8b0536f80ebeb70138ee96f Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 28 May 2023 17:01:00 +0300 Subject: [PATCH 1/8] Tablet throttler: non-PRIMARY tablets report back to PRIMARY throttler when they've been 'check'ed Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletserver/throttle/check_result.go | 11 +++--- .../tabletserver/throttle/throttler.go | 37 +++++++++++++++++-- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/check_result.go b/go/vt/vttablet/tabletserver/throttle/check_result.go index 52d52b78468..3bc162b623a 100644 --- a/go/vt/vttablet/tabletserver/throttle/check_result.go +++ b/go/vt/vttablet/tabletserver/throttle/check_result.go @@ -14,11 +14,12 @@ import ( // CheckResult is the result for an app inquiring on a metric. It also exports as JSON via the API type CheckResult struct { - StatusCode int `json:"StatusCode"` - Value float64 `json:"Value"` - Threshold float64 `json:"Threshold"` - Error error `json:"-"` - Message string `json:"Message"` + StatusCode int `json:"StatusCode"` + Value float64 `json:"Value"` + Threshold float64 `json:"Threshold"` + Error error `json:"-"` + Message string `json:"Message"` + RecentlyChecked bool `json:"RecentlyChecked"` } // NewCheckResult returns a CheckResult diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 56e913bb119..939885601a2 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -126,6 +126,12 @@ type Throttler struct { srvTopoServer srvtopo.Server heartbeatWriter heartbeat.HeartbeatWriter + // recentCheckTickerValue is an ever increasing number, incrementing once per second. + recentCheckTickerValue int64 + // recentCheckValue is set to match or exceed recentCheckTickerValue whenever a "check" was made (other than by the throttler itself). + // when recentCheckValue < recentCheckTickerValue that means there hasn't been a recent check. + recentCheckValue int64 + throttleTabletTypesMap map[topodatapb.TabletType]bool mysqlThrottleMetricChan chan *mysql.MySQLThrottleMetric @@ -576,6 +582,7 @@ func (throttler *Throttler) Operate(ctx context.Context) { mysqlRefreshTicker := addTicker(mysqlRefreshInterval) mysqlAggregateTicker := addTicker(mysqlAggregateInterval) throttledAppsTicker := addTicker(throttledAppsSnapshotInterval) + recentCheckTicker := addTicker(time.Second) go func() { defer log.Infof("Throttler: Operate terminated, tickers stopped") @@ -670,6 +677,9 @@ func (throttler *Throttler) Operate(ctx context.Context) { } case throttlerConfig := <-throttler.throttlerConfigChan: throttler.applyThrottlerConfig(ctx, throttlerConfig) + case <-recentCheckTicker.C: + // Increment recentCheckTickerValue by one. + atomic.AddInt64(&throttler.recentCheckTickerValue, 1) } } }() @@ -682,7 +692,7 @@ func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context, mySQLThrottleMetric.ClusterName = clusterName mySQLThrottleMetric.Key = probe.Key - tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=vitess", probe.TabletHost, probe.TabletPort) + tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", probe.TabletHost, probe.TabletPort, vitessAppName) resp, err := throttler.httpClient.Get(tabletCheckSelfURL) if err != nil { mySQLThrottleMetric.Err = err @@ -704,6 +714,11 @@ func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context, if checkResult.StatusCode == http.StatusInternalServerError { mySQLThrottleMetric.Err = fmt.Errorf("Status code: %d", checkResult.StatusCode) } + if checkResult.RecentlyChecked { + // We have just probed a tablet, and it reported back that someone just recently "check"ed it. + // We therefore renew the heartbeats lease. + go throttler.heartbeatWriter.RequestHeartbeats() + } return mySQLThrottleMetric } } @@ -1015,7 +1030,17 @@ func (throttler *Throttler) checkStore(ctx context.Context, appName string, stor if !throttler.IsEnabled() { return okMetricCheckResult } - return throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) + checkResult = throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) + + if atomic.LoadInt64(&throttler.recentCheckValue) >= atomic.LoadInt64(&throttler.recentCheckTickerValue) { + // This indicates someone, who is not "vitess" ie not internal to the throttling logic, did a _recent_ `check`. + // This could be online-ddl, or vreplication or whoever else. + // If this tablet is a REPLICA or RDONLY, we want to advertise to the PRIMARY that someone did a recent check, + // so that the PRIMARY knows it must renew the heartbeat lease. + checkResult.RecentlyChecked = true + } + + return checkResult } // checkShard checks the health of the shard, and runs on the primary tablet only @@ -1031,7 +1056,13 @@ func (throttler *Throttler) checkSelf(ctx context.Context, appName string, remot // CheckByType runs a check by requested check type func (throttler *Throttler) CheckByType(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags, checkType ThrottleCheckType) (checkResult *CheckResult) { if throttler.IsEnabled() && !flags.SkipRequestHeartbeats { - go throttler.heartbeatWriter.RequestHeartbeats() + if appName != vitessAppName { + go throttler.heartbeatWriter.RequestHeartbeats() + // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. + // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back + // to the PRIMARY so that it knows it must renew the heartbeat lease. + atomic.StoreInt64(&throttler.recentCheckValue, 1+atomic.LoadInt64(&throttler.recentCheckTickerValue)) + } } switch checkType { case ThrottleCheckSelf: From 379ec2b541098523b166a3733422a45e4c488e67 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 May 2023 06:57:01 +0300 Subject: [PATCH 2/8] inclusive language Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletserver/throttle/mysql/mysql_throttle_metric.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go index 3c2fe005828..8c8a5cc4b32 100644 --- a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go +++ b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go @@ -95,7 +95,7 @@ func (metric *MySQLThrottleMetric) Get() (float64, error) { } // ReadThrottleMetric returns a metric for the given probe. Either by explicit query -// or via SHOW SLAVE STATUS +// or via SHOW REPLICA STATUS func ReadThrottleMetric(probe *Probe, clusterName string, overrideGetMetricFunc func() *MySQLThrottleMetric) (mySQLThrottleMetric *MySQLThrottleMetric) { if mySQLThrottleMetric := getCachedMySQLThrottleMetric(probe); mySQLThrottleMetric != nil { return mySQLThrottleMetric From 1ef74c3c78ea06f1085fc3dd60ec459f1d366f00 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 May 2023 14:49:32 +0300 Subject: [PATCH 3/8] vstreamer: support 'useThrottler' so that clients can choose whther they at all want to involve the throttler. Some lightweight clients, such as the schema tracker or the binlog watcher, or messager, do not need the throttler, and since some of these clients are _always on_, we also do not _want_ them to continuously approach the throttler. One side effect of always engaging with the throttler is the infinite renewal of on-demand heartbeats Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletmanager/vdiff/framework_test.go | 2 +- .../vreplication/external_connector.go | 2 +- .../vreplication/framework_test.go | 2 +- .../vreplication/replica_connector.go | 2 +- go/vt/vttablet/tabletserver/binlog_watcher.go | 4 +- .../vttablet/tabletserver/messager/engine.go | 2 +- .../tabletserver/messager/message_manager.go | 2 +- .../messager/message_manager_test.go | 2 +- go/vt/vttablet/tabletserver/schema/tracker.go | 4 +- .../tabletserver/schema/tracker_test.go | 2 +- go/vt/vttablet/tabletserver/tabletserver.go | 2 +- .../vttablet/tabletserver/throttle/client.go | 2 +- .../tabletserver/throttle/throttler.go | 14 ++-- go/vt/vttablet/tabletserver/vstreamer/copy.go | 4 +- .../vttablet/tabletserver/vstreamer/engine.go | 4 +- .../tabletserver/vstreamer/engine_test.go | 2 +- .../tabletserver/vstreamer/uvstreamer.go | 42 ++++++------ .../vstreamer/uvstreamer_flaky_test.go | 2 +- .../tabletserver/vstreamer/vstreamer.go | 68 ++++++++++--------- .../vstreamer/vstreamer_flaky_test.go | 6 +- 20 files changed, 87 insertions(+), 83 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index bb173c41abc..bd1c3f13005 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -235,7 +235,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS if vstreamHook != nil { vstreamHook(ctx) } - return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send) + return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, false, send) } // vstreamRowsHook allows you to do work just before calling VStreamRows. diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go index ac411de7ce6..0bebf076fef 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go @@ -126,7 +126,7 @@ func (c *mysqlConnector) Close(ctx context.Context) error { } func (c *mysqlConnector) VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, send) + return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, true, send) } func (c *mysqlConnector) VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error { diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 5ed944125af..9d75f7381e8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -307,7 +307,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS if vstreamHook != nil { vstreamHook(ctx) } - return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send) + return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, false, send) } // vstreamRowsHook allows you to do work just before calling VStreamRows. diff --git a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go index 3954f4d0546..2fa2b3f148c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go @@ -84,7 +84,7 @@ func (c *ReplicaConnector) Close(ctx context.Context) error { } func (c *ReplicaConnector) VStream(ctx context.Context, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - return c.vstreamer.Stream(ctx, startPos, nil, filter, send) + return c.vstreamer.Stream(ctx, startPos, nil, filter, true, send) } // VStreamRows streams rows from query result diff --git a/go/vt/vttablet/tabletserver/binlog_watcher.go b/go/vt/vttablet/tabletserver/binlog_watcher.go index 6c713791e6f..85472fa5d8d 100644 --- a/go/vt/vttablet/tabletserver/binlog_watcher.go +++ b/go/vt/vttablet/tabletserver/binlog_watcher.go @@ -31,7 +31,7 @@ import ( // VStreamer defines the functions of VStreamer // that the BinlogWatcher needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error } // BinlogWatcher is a tabletserver service that watches the @@ -91,7 +91,7 @@ func (blw *BinlogWatcher) process(ctx context.Context) { for { // VStreamer will reload the schema when it encounters a DDL. - err := blw.vs.Stream(ctx, "current", nil, filter, func(events []*binlogdatapb.VEvent) error { + err := blw.vs.Stream(ctx, "current", nil, filter, false, func(events []*binlogdatapb.VEvent) error { return nil }) log.Infof("ReplicationWatcher VStream ended: %v, retrying in 5 seconds", err) diff --git a/go/vt/vttablet/tabletserver/messager/engine.go b/go/vt/vttablet/tabletserver/messager/engine.go index 2d7fdf2bb82..caed01e3c9b 100644 --- a/go/vt/vttablet/tabletserver/messager/engine.go +++ b/go/vt/vttablet/tabletserver/messager/engine.go @@ -44,7 +44,7 @@ type TabletService interface { // VStreamer defines the functions of VStreamer // that the messager needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error } diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 39598169baa..4f9db3e0355 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -684,7 +684,7 @@ func (mm *messageManager) runOneVStream(ctx context.Context) error { var curPos string var fields []*querypb.Field - err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, func(events []*binlogdatapb.VEvent) error { + err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, false, func(events []*binlogdatapb.VEvent) error { // We need to get the flow control lock mm.cacheManagementMu.Lock() defer mm.cacheManagementMu.Unlock() diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index d6da59db065..51cfc881bee 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -888,7 +888,7 @@ func (fv *fakeVStreamer) setPollerResponse(pr []*binlogdatapb.VStreamResultsResp fv.pollerResponse = pr } -func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error { fv.streamInvocations.Add(1) for { fv.mu.Lock() diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index 490ad59eadf..e0d8b61e9ed 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -40,7 +40,7 @@ import ( // VStreamer defines the functions of VStreamer // that the replicationWatcher needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error } // Tracker watches the replication and saves the latest schema into the schema_version table when a DDL is encountered. @@ -129,7 +129,7 @@ func (tr *Tracker) process(ctx context.Context) { var gtid string for { - err := tr.vs.Stream(ctx, "current", nil, filter, func(events []*binlogdatapb.VEvent) error { + err := tr.vs.Stream(ctx, "current", nil, filter, false, func(events []*binlogdatapb.VEvent) error { for _, event := range events { if event.Type == binlogdatapb.VEventType_GTID { gtid = event.Gtid diff --git a/go/vt/vttablet/tabletserver/schema/tracker_test.go b/go/vt/vttablet/tabletserver/schema/tracker_test.go index 9822b6bfe5a..50f262169a4 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker_test.go +++ b/go/vt/vttablet/tabletserver/schema/tracker_test.go @@ -137,7 +137,7 @@ type fakeVstreamer struct { events [][]*binlogdatapb.VEvent } -func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error { for _, events := range f.events { err := send(events) if err != nil { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index ff641fe7198..d86b6b8ab1d 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1144,7 +1144,7 @@ func (tsv *TabletServer) VStream(ctx context.Context, request *binlogdatapb.VStr if err := tsv.sm.VerifyTarget(ctx, request.Target); err != nil { return err } - return tsv.vstreamer.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send) + return tsv.vstreamer.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, true, send) } // VStreamRows streams rows from the specified starting point. diff --git a/go/vt/vttablet/tabletserver/throttle/client.go b/go/vt/vttablet/tabletserver/throttle/client.go index 10fe910d264..9a3f699b2da 100644 --- a/go/vt/vttablet/tabletserver/throttle/client.go +++ b/go/vt/vttablet/tabletserver/throttle/client.go @@ -140,7 +140,7 @@ func (c *Client) Throttle(ctx context.Context) { return } if c.ThrottleCheckOKOrWait(ctx) { - break + return } } } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 939885601a2..b10100ec01c 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -1055,14 +1055,12 @@ func (throttler *Throttler) checkSelf(ctx context.Context, appName string, remot // CheckByType runs a check by requested check type func (throttler *Throttler) CheckByType(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags, checkType ThrottleCheckType) (checkResult *CheckResult) { - if throttler.IsEnabled() && !flags.SkipRequestHeartbeats { - if appName != vitessAppName { - go throttler.heartbeatWriter.RequestHeartbeats() - // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. - // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back - // to the PRIMARY so that it knows it must renew the heartbeat lease. - atomic.StoreInt64(&throttler.recentCheckValue, 1+atomic.LoadInt64(&throttler.recentCheckTickerValue)) - } + if throttler.IsEnabled() && !flags.SkipRequestHeartbeats && appName != vitessAppName { + go throttler.heartbeatWriter.RequestHeartbeats() + // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. + // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back + // to the PRIMARY so that it knows it must renew the heartbeat lease. + atomic.StoreInt64(&throttler.recentCheckValue, 1+atomic.LoadInt64(&throttler.recentCheckTickerValue)) } switch checkType { case ThrottleCheckSelf: diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 62b93cf5063..dcd8dacd121 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -73,7 +73,7 @@ func (uvs *uvstreamer) catchup(ctx context.Context) error { errch := make(chan error, 1) go func() { startPos := mysql.EncodePosition(uvs.pos) - vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.send2, "catchup", uvs.vse) + vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send2, "catchup", uvs.vse) uvs.setVs(vs) errch <- vs.Stream() uvs.setVs(nil) @@ -302,7 +302,7 @@ func (uvs *uvstreamer) fastForward(stopPos string) error { }() log.Infof("starting fastForward from %s upto pos %s", mysql.EncodePosition(uvs.pos), stopPos) uvs.stopPos, _ = mysql.DecodePosition(stopPos) - vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.getVSchema(), uvs.send2, "fastforward", uvs.vse) + vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send2, "fastforward", uvs.vse) uvs.setVs(vs) return vs.Stream() } diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 829d0b88a5f..9e8074fb4b0 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -226,7 +226,7 @@ func (vse *Engine) validateBinlogRowImage(ctx context.Context, db dbconfigs.Conn // Stream starts a new stream. // This streams events from the binary logs -func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error { if err := vse.validateBinlogRowImage(ctx, vse.se.GetDBConnector()); err != nil { return err @@ -244,7 +244,7 @@ func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binl } vse.mu.Lock() defer vse.mu.Unlock() - streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, send) + streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, useThrottler, send) idx := vse.streamIdx vse.streamers[idx] = streamer vse.streamIdx++ diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go index 16729e8fd24..d271164558a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go @@ -93,7 +93,7 @@ func TestUpdateVSchema(t *testing.T) { }}, } // Stream should terminate immediately due to invalid pos. - _ = engine.Stream(ctx, "invalid", nil, filter, func(_ []*binlogdatapb.VEvent) error { + _ = engine.Stream(ctx, "invalid", nil, filter, false, func(_ []*binlogdatapb.VEvent) error { return nil }) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index e26b6f2939f..e33ca77e478 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -51,13 +51,14 @@ type uvstreamer struct { cancel func() // input parameters - vse *Engine - send func([]*binlogdatapb.VEvent) error - cp dbconfigs.Connector - se *schema.Engine - startPos string - filter *binlogdatapb.Filter - inTablePKs []*binlogdatapb.TableLastPK + vse *Engine + send func([]*binlogdatapb.VEvent) error + cp dbconfigs.Connector + se *schema.Engine + startPos string + filter *binlogdatapb.Filter + inTablePKs []*binlogdatapb.TableLastPK + useThrottler bool vschema *localVSchema @@ -90,7 +91,7 @@ type uvstreamerConfig struct { CatchupRetryTime time.Duration } -func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error) *uvstreamer { +func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, useThrottler bool, send func([]*binlogdatapb.VEvent) error) *uvstreamer { ctx, cancel := context.WithCancel(ctx) config := &uvstreamerConfig{ MaxReplicationLag: 1 * time.Nanosecond, @@ -105,17 +106,18 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se return send(evs) } uvs := &uvstreamer{ - ctx: ctx, - cancel: cancel, - vse: vse, - send: send2, - cp: cp, - se: se, - startPos: startPos, - filter: filter, - vschema: vschema, - config: config, - inTablePKs: tablePKs, + ctx: ctx, + cancel: cancel, + vse: vse, + send: send2, + cp: cp, + se: se, + startPos: startPos, + filter: filter, + vschema: vschema, + config: config, + inTablePKs: tablePKs, + useThrottler: useThrottler, } return uvs @@ -418,7 +420,7 @@ func (uvs *uvstreamer) Stream() error { } } vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), - uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) + uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send, "replicate", uvs.vse) uvs.setVs(vs) return vs.Stream() diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index 7d22f9c0034..ec2a8d2334d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -440,7 +440,7 @@ func getEventCallback(event *binlogdatapb.VEvent) func() { func startVStreamCopy(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, tablePKs []*binlogdatapb.TableLastPK) { pos := "" go func() { - err := engine.Stream(ctx, pos, tablePKs, filter, func(evs []*binlogdatapb.VEvent) error { + err := engine.Stream(ctx, pos, tablePKs, filter, true, func(evs []*binlogdatapb.VEvent) error { //t.Logf("Received events: %v", evs) muAllEvents.Lock() defer muAllEvents.Unlock() diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index d9ac730e43e..cf5c5fb350c 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -61,11 +61,12 @@ type vstreamer struct { ctx context.Context cancel func() - cp dbconfigs.Connector - se *schema.Engine - startPos string - filter *binlogdatapb.Filter - send func([]*binlogdatapb.VEvent) error + cp dbconfigs.Connector + se *schema.Engine + startPos string + filter *binlogdatapb.Filter + send func([]*binlogdatapb.VEvent) error + useThrottler bool vevents chan *localVSchema vschema *localVSchema @@ -112,22 +113,23 @@ type streamerPlan struct { // // vschema: the current vschema. This value can later be changed through the SetVSchema method. // send: callback function to send events. -func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, startPos string, stopPos string, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error, phase string, vse *Engine) *vstreamer { +func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, startPos string, stopPos string, filter *binlogdatapb.Filter, vschema *localVSchema, useThrottler bool, send func([]*binlogdatapb.VEvent) error, phase string, vse *Engine) *vstreamer { ctx, cancel := context.WithCancel(ctx) return &vstreamer{ - ctx: ctx, - cancel: cancel, - cp: cp, - se: se, - startPos: startPos, - stopPos: stopPos, - filter: filter, - send: send, - vevents: make(chan *localVSchema, 1), - vschema: vschema, - plans: make(map[uint64]*streamerPlan), - phase: phase, - vse: vse, + ctx: ctx, + cancel: cancel, + cp: cp, + se: se, + startPos: startPos, + stopPos: stopPos, + useThrottler: useThrottler, + filter: filter, + send: send, + vevents: make(chan *localVSchema, 1), + vschema: vschema, + plans: make(map[uint64]*streamerPlan), + phase: phase, + vse: vse, } } @@ -301,20 +303,22 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime) defer throttledHeartbeatsRateLimiter.Stop() for { - // check throttler. - if !vs.vse.throttlerClient.ThrottleCheckOKOrWait(ctx) { - // make sure to leave if context is cancelled - select { - case <-ctx.Done(): - return - default: - // do nothing special + if vs.useThrottler { + // check throttler. + if !vs.vse.throttlerClient.ThrottleCheckOKOrWait(ctx) { + // make sure to leave if context is cancelled + select { + case <-ctx.Done(): + return + default: + // do nothing special + } + throttledHeartbeatsRateLimiter.Do(func() error { + return injectHeartbeat(true) + }) + // we won't process events, until we're no longer throttling + continue } - throttledHeartbeatsRateLimiter.Do(func() error { - return injectHeartbeat(true) - }) - // we won't process events, until we're no longer throttling - continue } select { case ev, ok := <-events: diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index a1d68b5eb81..1a99e7e9169 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -607,7 +607,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { var errGoroutine error go func() { defer wg.Done() - engine.Stream(ctx2, "", nil, filter, func(evs []*binlogdatapb.VEvent) error { + engine.Stream(ctx2, "", nil, filter, true, func(evs []*binlogdatapb.VEvent) error { for _, ev := range evs { if ev.Type == binlogdatapb.VEventType_HEARTBEAT { continue @@ -1925,7 +1925,7 @@ func TestMinimalMode(t *testing.T) { newEngine(t, "minimal") defer newEngine(t, "full") - err := engine.Stream(context.Background(), "current", nil, nil, func(evs []*binlogdatapb.VEvent) error { return nil }) + err := engine.Stream(context.Background(), "current", nil, nil, false, func(evs []*binlogdatapb.VEvent) error { return nil }) require.Error(t, err, "minimal binlog_row_image is not supported by Vitess VReplication") } @@ -2309,7 +2309,7 @@ func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogda }}, } } - return engine.Stream(ctx, pos, tablePKs, filter, func(evs []*binlogdatapb.VEvent) error { + return engine.Stream(ctx, pos, tablePKs, filter, false, func(evs []*binlogdatapb.VEvent) error { timer := time.NewTimer(2 * time.Second) defer timer.Stop() From 94b6f036dae405945aa2579921ce2f4c7391fac6 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 May 2023 14:49:54 +0300 Subject: [PATCH 4/8] add app name in test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../endtoend/tabletmanager/throttler_topo/throttler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index fe87262a21f..a3204e0be2f 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -172,12 +172,12 @@ func throttledApps(tablet *cluster.Vttablet) (resp *http.Response, respBody stri } func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*http.Response, error) { - resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats)) + resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test&s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats)) return resp, err } func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) { - return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath)) + return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test", tablet.HTTPPort, checkSelfAPIPath)) } func warmUpHeartbeat(t *testing.T) (respStatus int) { From d2429654e44e99735b9a59b69ece460ca531b28e Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 30 May 2023 09:46:40 +0300 Subject: [PATCH 5/8] Enforcing use of explicit names in all throttler checks, specifically in vstreamer.Engine. The throttler exempts specific apps from checks and will not renew heartbeats leases for those apps Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl/vrepl/onlineddl_vrepl_test.go | 56 +++++++++---------- .../vreplication/vreplication_test.go | 5 +- go/vt/vttablet/onlineddl/executor.go | 17 +++--- .../tabletmanager/vdiff/framework_test.go | 3 +- .../tabletmanager/vreplication/engine.go | 7 +-- .../vreplication/external_connector.go | 3 +- .../vreplication/framework_test.go | 3 +- .../vreplication/replica_connector.go | 3 +- .../tabletmanager/vreplication/vcopier.go | 5 +- .../tabletmanager/vreplication/vplayer.go | 5 +- .../tabletmanager/vreplication/vreplicator.go | 18 ++---- go/vt/vttablet/tabletserver/binlog_watcher.go | 5 +- go/vt/vttablet/tabletserver/gc/tablegc.go | 6 +- .../vttablet/tabletserver/messager/engine.go | 2 +- .../tabletserver/messager/message_manager.go | 3 +- .../messager/message_manager_test.go | 2 +- go/vt/vttablet/tabletserver/schema/tracker.go | 5 +- .../tabletserver/schema/tracker_test.go | 2 +- go/vt/vttablet/tabletserver/tabletserver.go | 5 +- go/vt/vttablet/tabletserver/throttle/check.go | 9 +-- .../tabletserver/throttle/throttler.go | 22 +++++--- go/vt/vttablet/tabletserver/vstreamer/copy.go | 4 +- .../vttablet/tabletserver/vstreamer/engine.go | 11 ++-- .../tabletserver/vstreamer/engine_test.go | 3 +- .../tabletserver/vstreamer/resultstreamer.go | 3 +- .../tabletserver/vstreamer/rowstreamer.go | 3 +- .../tabletserver/vstreamer/uvstreamer.go | 8 +-- .../vstreamer/uvstreamer_flaky_test.go | 3 +- .../tabletserver/vstreamer/vstreamer.go | 34 ++++++----- .../vstreamer/vstreamer_flaky_test.go | 7 ++- 30 files changed, 133 insertions(+), 129 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go index 0303e4fcec9..d1f38059e79 100644 --- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go +++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go @@ -33,20 +33,18 @@ import ( "vitess.io/vitess/go/test/endtoend/onlineddl" "vitess.io/vitess/go/test/endtoend/throttler" "vitess.io/vitess/go/vt/schema" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) var ( - clusterInstance *cluster.LocalProcessCluster - shards []cluster.Shard - vtParams mysql.ConnParams - httpClient = throttlebase.SetupHTTPClient(time.Second) - onlineDDLThrottlerAppName = "online-ddl" - vstreamerThrottlerAppName = "vstreamer" + clusterInstance *cluster.LocalProcessCluster + shards []cluster.Shard + vtParams mysql.ConnParams + httpClient = throttlebase.SetupHTTPClient(time.Second) normalMigrationWait = 45 * time.Second extendedMigrationWait = 60 * time.Second @@ -385,7 +383,7 @@ func TestSchemaChange(t *testing.T) { // begin throttling: onlineddl.ThrottleAllMigrations(t, &vtParams) defer onlineddl.UnthrottleAllMigrations(t, &vtParams) - onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true) + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true) uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true) _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusRunning) @@ -400,11 +398,11 @@ func TestSchemaChange(t *testing.T) { // to be strictly higher than started_timestamp assert.GreaterOrEqual(t, lastThrottledTimestamp, startedTimestamp) component := row.AsString("component_throttled", "") - assert.Contains(t, []string{string(vreplication.VCopierComponentName), string(vreplication.VPlayerComponentName)}, component) + assert.Contains(t, []string{throttlerapp.VCopierName, throttlerapp.VPlayerName}, component) // unthrottle onlineddl.UnthrottleAllMigrations(t, &vtParams) - onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, false) + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false) status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) @@ -421,11 +419,11 @@ func TestSchemaChange(t *testing.T) { // vstreamer source; but it's OK to be on the safe side and throttle on all tablets. Doesn't // change the essence of this test. for _, tablet := range shard.Vttablets { - body, err := throttleApp(tablet, vstreamerThrottlerAppName) - defer unthrottleApp(tablet, vstreamerThrottlerAppName) + body, err := throttleApp(tablet, throttlerapp.VStreamerName) + defer unthrottleApp(tablet, throttlerapp.VStreamerName) assert.NoError(t, err) - assert.Contains(t, body, vstreamerThrottlerAppName) + assert.Contains(t, body, throttlerapp.VStreamerName) } } @@ -449,7 +447,7 @@ func TestSchemaChange(t *testing.T) { // clock irregularities assert.GreaterOrEqual(t, lastThrottledTime.Add(time.Second), startedTime) component := row.AsString("component_throttled", "") - assert.Contains(t, []string{string(vreplication.VStreamerComponentName), string(vreplication.RowStreamerComponentName)}, component) + assert.Contains(t, []string{throttlerapp.VStreamerName, throttlerapp.RowStreamerName}, component) }() // now unthrottled status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) @@ -478,7 +476,7 @@ func TestSchemaChange(t *testing.T) { // Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...` onlineddl.ThrottleAllMigrations(t, &vtParams) defer onlineddl.UnthrottleAllMigrations(t, &vtParams) - onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true) + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true) // spawn n migrations; cancel them via cancel-all var wg sync.WaitGroup @@ -497,7 +495,7 @@ func TestSchemaChange(t *testing.T) { // Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...` onlineddl.ThrottleAllMigrations(t, &vtParams) defer onlineddl.UnthrottleAllMigrations(t, &vtParams) - onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true) + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true) // spawn n migrations; cancel them via cancel-all var wg sync.WaitGroup @@ -531,16 +529,16 @@ func TestSchemaChange(t *testing.T) { case 0: // this is the shard where we run PRS // Use per-tablet throttling API - body, err = throttleApp(currentPrimaryTablet, onlineDDLThrottlerAppName) - defer unthrottleApp(currentPrimaryTablet, onlineDDLThrottlerAppName) + body, err = throttleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName) + defer unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName) case 1: // no PRS on this shard // Use per-tablet throttling API - body, err = throttleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName) - defer unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName) + body, err = throttleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName) + defer unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName) } assert.NoError(t, err) - assert.Contains(t, body, onlineDDLThrottlerAppName) + assert.Contains(t, body, throttlerapp.OnlineDDLName) } uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true) @@ -596,14 +594,14 @@ func TestSchemaChange(t *testing.T) { case 0: // this is the shard where we run PRS // Use per-tablet throttling API - body, err = unthrottleApp(currentPrimaryTablet, onlineDDLThrottlerAppName) + body, err = unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName) case 1: // no PRS on this shard // Use per-tablet throttling API - body, err = unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName) + body, err = unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName) } assert.NoError(t, err) - assert.Contains(t, body, onlineDDLThrottlerAppName) + assert.Contains(t, body, throttlerapp.OnlineDDLName) } }) t.Run("expect completion", func(t *testing.T) { @@ -823,11 +821,11 @@ func TestSchemaChange(t *testing.T) { // - tablet throttling t.Run("Revert a migration completed on one shard and cancelled on another", func(t *testing.T) { // shard 0 will run normally, shard 1 will be throttled - defer unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName) + defer unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName) t.Run("throttle shard 1", func(t *testing.T) { - body, err := throttleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName) + body, err := throttleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName) assert.NoError(t, err) - assert.Contains(t, body, onlineDDLThrottlerAppName) + assert.Contains(t, body, throttlerapp.OnlineDDLName) }) var uuid string @@ -849,9 +847,9 @@ func TestSchemaChange(t *testing.T) { onlineddl.CheckCancelAllMigrations(t, &vtParams, 1) }) t.Run("unthrottle shard 1", func(t *testing.T) { - body, err := unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName) + body, err := unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName) assert.NoError(t, err) - assert.Contains(t, body, onlineDDLThrottlerAppName) + assert.Contains(t, body, throttlerapp.OnlineDDLName) }) var revertUUID string t.Run("issue revert migration", func(t *testing.T) { diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index ca9e316c97b..69c364197c5 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -44,6 +44,7 @@ import ( "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" ) @@ -58,8 +59,8 @@ var ( sourceKsOpts = make(map[string]string) targetKsOpts = make(map[string]string) httpClient = throttlebase.SetupHTTPClient(time.Second) - sourceThrottlerAppName = "vstreamer" - targetThrottlerAppName = "vreplication" + sourceThrottlerAppName = throttlerapp.VStreamerName + targetThrottlerAppName = throttlerapp.VReplicationName ) const ( diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index f53b0ef44e3..55068d26e65 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -63,6 +63,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tmclient" ) @@ -124,7 +125,6 @@ var ( migrationFailureFileName = "migration-failure.log" onlineDDLUser = "vt-online-ddl-internal" onlineDDLGrant = fmt.Sprintf("'%s'@'%s'", onlineDDLUser, "%") - throttlerOnlineDDLApp = "online-ddl" throttleCheckFlags = &throttle.CheckFlags{} ) @@ -1621,7 +1621,7 @@ exit $exit_code fmt.Sprintf("--serve-socket-file=%s", serveSocketFile), fmt.Sprintf("--hooks-path=%s", tempDir), fmt.Sprintf(`--hooks-hint-token=%s`, onlineDDL.UUID), - fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=%s:gh-ost:%s&p=low`, servenv.Port(), throttlerOnlineDDLApp, onlineDDL.UUID), + fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=%s:%s:%s&p=low`, servenv.Port(), throttlerapp.OnlineDDLName, throttlerapp.GhostName, onlineDDL.UUID), fmt.Sprintf(`--database=%s`, e.dbName), fmt.Sprintf(`--table=%s`, onlineDDL.Table), fmt.Sprintf(`--alter=%s`, alterOptions), @@ -1768,7 +1768,7 @@ export MYSQL_PWD my ($self, %args) = @_; return sub { - if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app={{THROTTLER_ONLINE_DDL_APP}}:pt-osc:{{MIGRATION_UUID}}&p=low")) { + if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app={{THROTTLER_ONLINE_DDL_APP}}:{{THROTTLER_PT_OSC_APP}}:{{MIGRATION_UUID}}&p=low")) { # Got HTTP 200 OK, means throttler is happy return 0; } else { @@ -1782,7 +1782,8 @@ export MYSQL_PWD ` pluginCode = strings.ReplaceAll(pluginCode, "{{VTTABLET_PORT}}", fmt.Sprintf("%d", servenv.Port())) pluginCode = strings.ReplaceAll(pluginCode, "{{MIGRATION_UUID}}", onlineDDL.UUID) - pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_ONLINE_DDL_APP}}", throttlerOnlineDDLApp) + pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_ONLINE_DDL_APP}}", throttlerapp.OnlineDDLName) + pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_PT_OSC_APP}}", throttlerapp.PTOSCName) pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusRunning}}", string(schema.OnlineDDLStatusRunning)) pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusComplete}}", string(schema.OnlineDDLStatusComplete)) @@ -2141,7 +2142,7 @@ func (e *Executor) ThrottleAllMigrations(ctx context.Context, expireString strin if err := e.lagThrottler.CheckIsReady(); err != nil { return nil, err } - _ = e.lagThrottler.ThrottleApp(throttlerOnlineDDLApp, time.Now().Add(duration), ratio) + _ = e.lagThrottler.ThrottleApp(throttlerapp.OnlineDDLName, time.Now().Add(duration), ratio) return emptyResult, nil } @@ -2161,7 +2162,7 @@ func (e *Executor) UnthrottleAllMigrations(ctx context.Context) (result *sqltype return nil, err } defer e.triggerNextCheckInterval() - _ = e.lagThrottler.UnthrottleApp(throttlerOnlineDDLApp) + _ = e.lagThrottler.UnthrottleApp(throttlerapp.OnlineDDLName) return emptyResult, nil } @@ -3469,7 +3470,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i if err := e.lagThrottler.CheckIsReady(); err == nil { // No point in reviewing throttler info if it's not enabled&open for _, app := range e.lagThrottler.ThrottledApps() { - if app.AppName == throttlerOnlineDDLApp { + if app.AppName == throttlerapp.OnlineDDLName { currentUserThrottleRatio = app.Ratio break } @@ -3603,7 +3604,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i // - it's a deadlock. // And so, once per reviewRunningMigrations(), and assuming there _are_ running migrations, we ensure to hit a throttler check. This will kick // on-demand heartbeats, unlocking the deadlock. - e.lagThrottler.CheckByType(ctx, throttlerOnlineDDLApp, "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite) + e.lagThrottler.CheckByType(ctx, throttlerapp.OnlineDDLName, "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite) }) } } diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index bd1c3f13005..bbda63340de 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -40,6 +40,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" "vitess.io/vitess/go/vt/vttablet/tmclient" @@ -235,7 +236,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS if vstreamHook != nil { vstreamHook(ctx) } - return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, false, send) + return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, throttlerapp.VStreamerName, send) } // vstreamRowsHook allows you to do work just before calling VStreamRows. diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 00ac2824d3a..52f9c072d49 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -43,6 +43,7 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( @@ -51,9 +52,7 @@ const ( copyStateTableName = "copy_state" postCopyActionTableName = "post_copy_action" - maxRows = 10000 - throttlerVReplicationAppName = "vreplication" - throttlerOnlineDDLAppName = "online-ddl" + maxRows = 10000 ) const ( @@ -136,7 +135,7 @@ func NewEngine(config *tabletenv.TabletConfig, ts *topo.Server, cell string, mys mysqld: mysqld, journaler: make(map[string]*journalEvent), ec: newExternalConnector(config.ExternalConnections), - throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerVReplicationAppName, throttle.ThrottleCheckPrimaryWrite), + throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerapp.VReplicationName, throttle.ThrottleCheckPrimaryWrite), } return vre diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go index 0bebf076fef..5bec2b4f78a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go @@ -33,6 +33,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletconn" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" ) @@ -126,7 +127,7 @@ func (c *mysqlConnector) Close(ctx context.Context) error { } func (c *mysqlConnector) VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, true, send) + return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, throttlerapp.ExternalConnectorName, send) } func (c *mysqlConnector) VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error { diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 9d75f7381e8..8260d95b462 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -51,6 +51,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletconn" "vitess.io/vitess/go/vt/vttablet/tabletconntest" qh "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication/queryhistory" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" @@ -307,7 +308,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS if vstreamHook != nil { vstreamHook(ctx) } - return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, false, send) + return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, throttlerapp.VStreamerName, send) } // vstreamRowsHook allows you to do work just before calling VStreamRows. diff --git a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go index 2fa2b3f148c..9c6f427b418 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go @@ -20,6 +20,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "context" @@ -84,7 +85,7 @@ func (c *ReplicaConnector) Close(ctx context.Context) error { } func (c *ReplicaConnector) VStream(ctx context.Context, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - return c.vstreamer.Stream(ctx, startPos, nil, filter, true, send) + return c.vstreamer.Stream(ctx, startPos, nil, filter, throttlerapp.ReplicaConnectorName, send) } // VStreamRows streams rows from query result diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index 2bff8db85ef..1f94f0fe578 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -39,6 +39,7 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) type vcopier struct { @@ -457,7 +458,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma default: } if rows.Throttled { - _ = vc.vr.updateTimeThrottled(RowStreamerComponentName) + _ = vc.vr.updateTimeThrottled(throttlerapp.RowStreamerName) return nil } if rows.Heartbeat { @@ -468,7 +469,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma if vc.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vc.throttlerAppName) { break // out of 'for' loop } else { // we're throttled - _ = vc.vr.updateTimeThrottled(VCopierComponentName) + _ = vc.vr.updateTimeThrottled(throttlerapp.VCopierName) } } if !copyWorkQueue.isOpen { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 0e33eed5f6a..aacb5bb9376 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -31,6 +31,7 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -336,7 +337,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { } // check throttler. if !vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vp.throttlerAppName) { - _ = vp.vr.updateTimeThrottled(VPlayerComponentName) + _ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName) continue } @@ -628,7 +629,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return io.EOF case binlogdatapb.VEventType_HEARTBEAT: if event.Throttled { - if err := vp.vr.updateTimeThrottled(VStreamerComponentName); err != nil { + if err := vp.vr.updateTimeThrottled(throttlerapp.VStreamerName); err != nil { return err } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index dc30e25419d..4df4cbb3c39 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -30,6 +30,7 @@ import ( "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" @@ -91,15 +92,6 @@ const ( table_name=%a and id=%a` ) -type ComponentName string - -const ( - VPlayerComponentName ComponentName = "vplayer" - VCopierComponentName ComponentName = "vcopier" - VStreamerComponentName ComponentName = "vstreamer" - RowStreamerComponentName ComponentName = "rowstreamer" -) - // vreplicator provides the core logic to start vreplication streams type vreplicator struct { vre *Engine @@ -562,17 +554,17 @@ func (vr *vreplicator) setSQLMode(ctx context.Context, dbClient *vdbClient) (fun // This is useful when we want to throttle all migrations. We throttle "online-ddl" and that applies to both vreplication // migrations as well as gh-ost migrations. func (vr *vreplicator) throttlerAppName() string { - names := []string{vr.WorkflowName, throttlerVReplicationAppName} + names := []string{vr.WorkflowName, throttlerapp.VReplicationName} if vr.WorkflowType == int32(binlogdatapb.VReplicationWorkflowType_OnlineDDL) { - names = append(names, throttlerOnlineDDLAppName) + names = append(names, throttlerapp.OnlineDDLName) } return strings.Join(names, ":") } -func (vr *vreplicator) updateTimeThrottled(componentThrottled ComponentName) error { +func (vr *vreplicator) updateTimeThrottled(appThrottled string) error { err := vr.throttleUpdatesRateLimiter.Do(func() error { tm := time.Now().Unix() - update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, string(componentThrottled)) + update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appThrottled) if err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/binlog_watcher.go b/go/vt/vttablet/tabletserver/binlog_watcher.go index 85472fa5d8d..db8039d403c 100644 --- a/go/vt/vttablet/tabletserver/binlog_watcher.go +++ b/go/vt/vttablet/tabletserver/binlog_watcher.go @@ -24,6 +24,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -31,7 +32,7 @@ import ( // VStreamer defines the functions of VStreamer // that the BinlogWatcher needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp string, send func([]*binlogdatapb.VEvent) error) error } // BinlogWatcher is a tabletserver service that watches the @@ -91,7 +92,7 @@ func (blw *BinlogWatcher) process(ctx context.Context) { for { // VStreamer will reload the schema when it encounters a DDL. - err := blw.vs.Stream(ctx, "current", nil, filter, false, func(events []*binlogdatapb.VEvent) error { + err := blw.vs.Stream(ctx, "current", nil, filter, throttlerapp.BinlogWatcherName, func(events []*binlogdatapb.VEvent) error { return nil }) log.Infof("ReplicationWatcher VStream ended: %v, retrying in 5 seconds", err) diff --git a/go/vt/vttablet/tabletserver/gc/tablegc.go b/go/vt/vttablet/tabletserver/gc/tablegc.go index 8022e5aee76..829d97039a1 100644 --- a/go/vt/vttablet/tabletserver/gc/tablegc.go +++ b/go/vt/vttablet/tabletserver/gc/tablegc.go @@ -39,12 +39,12 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( // evacHours is a hard coded, reasonable time for a table to spend in EVAC state - evacHours = 72 - throttlerAppName = "tablegc" + evacHours = 72 ) var ( @@ -131,7 +131,7 @@ type Status struct { // NewTableGC creates a table collector func NewTableGC(env tabletenv.Env, ts *topo.Server, lagThrottler *throttle.Throttler) *TableGC { collector := &TableGC{ - throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerAppName, throttle.ThrottleCheckPrimaryWrite), + throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerapp.TableGCName, throttle.ThrottleCheckPrimaryWrite), isOpen: 0, env: env, diff --git a/go/vt/vttablet/tabletserver/messager/engine.go b/go/vt/vttablet/tabletserver/messager/engine.go index caed01e3c9b..8cff2e70b9a 100644 --- a/go/vt/vttablet/tabletserver/messager/engine.go +++ b/go/vt/vttablet/tabletserver/messager/engine.go @@ -44,7 +44,7 @@ type TabletService interface { // VStreamer defines the functions of VStreamer // that the messager needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp string, send func([]*binlogdatapb.VEvent) error) error StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error } diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 4f9db3e0355..5c925150322 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -38,6 +38,7 @@ import ( "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) var ( @@ -684,7 +685,7 @@ func (mm *messageManager) runOneVStream(ctx context.Context) error { var curPos string var fields []*querypb.Field - err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, false, func(events []*binlogdatapb.VEvent) error { + err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, throttlerapp.MessagerName, func(events []*binlogdatapb.VEvent) error { // We need to get the flow control lock mm.cacheManagementMu.Lock() defer mm.cacheManagementMu.Unlock() diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 51cfc881bee..c0cefc67666 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -888,7 +888,7 @@ func (fv *fakeVStreamer) setPollerResponse(pr []*binlogdatapb.VStreamResultsResp fv.pollerResponse = pr } -func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error { +func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp string, send func([]*binlogdatapb.VEvent) error) error { fv.streamInvocations.Add(1) for { fv.mu.Lock() diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index e0d8b61e9ed..d0341fe231b 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -35,12 +35,13 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) // VStreamer defines the functions of VStreamer // that the replicationWatcher needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp string, send func([]*binlogdatapb.VEvent) error) error } // Tracker watches the replication and saves the latest schema into the schema_version table when a DDL is encountered. @@ -129,7 +130,7 @@ func (tr *Tracker) process(ctx context.Context) { var gtid string for { - err := tr.vs.Stream(ctx, "current", nil, filter, false, func(events []*binlogdatapb.VEvent) error { + err := tr.vs.Stream(ctx, "current", nil, filter, throttlerapp.SchemaTrackerName, func(events []*binlogdatapb.VEvent) error { for _, event := range events { if event.Type == binlogdatapb.VEventType_GTID { gtid = event.Gtid diff --git a/go/vt/vttablet/tabletserver/schema/tracker_test.go b/go/vt/vttablet/tabletserver/schema/tracker_test.go index 50f262169a4..ed0fc917524 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker_test.go +++ b/go/vt/vttablet/tabletserver/schema/tracker_test.go @@ -137,7 +137,7 @@ type fakeVstreamer struct { events [][]*binlogdatapb.VEvent } -func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error { +func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp string, send func([]*binlogdatapb.VEvent) error) error { for _, events := range f.events { err := send(events) if err != nil { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index d86b6b8ab1d..07125c7a639 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -64,6 +64,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/txserializer" "vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" @@ -1144,7 +1145,7 @@ func (tsv *TabletServer) VStream(ctx context.Context, request *binlogdatapb.VStr if err := tsv.sm.VerifyTarget(ctx, request.Target); err != nil { return err } - return tsv.vstreamer.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, true, send) + return tsv.vstreamer.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, throttlerapp.VStreamerName, send) } // VStreamRows streams rows from the specified starting point. @@ -1799,7 +1800,7 @@ func (tsv *TabletServer) registerThrottlerCheckHandlers() { } appName := r.URL.Query().Get("app") if appName == "" { - appName = throttle.DefaultAppName + appName = throttlerapp.DefaultName } flags := &throttle.CheckFlags{ LowPriority: (r.URL.Query().Get("p") == "low"), diff --git a/go/vt/vttablet/tabletserver/throttle/check.go b/go/vt/vttablet/tabletserver/throttle/check.go index e4d9e96204f..877e3668185 100644 --- a/go/vt/vttablet/tabletserver/throttle/check.go +++ b/go/vt/vttablet/tabletserver/throttle/check.go @@ -17,13 +17,10 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( - // DefaultAppName is the app name used by vitess when app doesn't indicate its name - DefaultAppName = "default" - vitessAppName = "vitess" - selfCheckInterval = 250 * time.Millisecond ) @@ -90,7 +87,7 @@ func (check *ThrottlerCheck) checkAppMetricResult(ctx context.Context, appName s statusCode = http.StatusTooManyRequests // 429 err = base.ErrThresholdExceeded - if !flags.LowPriority && !flags.ReadCheck && appName != vitessAppName { + if !flags.LowPriority && !flags.ReadCheck && appName != throttlerapp.VitessName { // low priority requests will henceforth be denied go check.throttler.nonLowPriorityAppRequestsThrottled.SetDefault(metricName, true) } @@ -151,7 +148,7 @@ func (check *ThrottlerCheck) localCheck(ctx context.Context, metricName string) if err != nil { return NoSuchMetricCheckResult } - checkResult = check.Check(ctx, vitessAppName, storeType, storeName, "local", StandardCheckFlags) + checkResult = check.Check(ctx, throttlerapp.VitessName, storeType, storeName, "local", StandardCheckFlags) if checkResult.StatusCode == http.StatusOK { check.throttler.markMetricHealthy(metricName) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index b10100ec01c..a1f0bd7e0b1 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -39,6 +39,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( @@ -692,7 +693,7 @@ func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context, mySQLThrottleMetric.ClusterName = clusterName mySQLThrottleMetric.Key = probe.Key - tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", probe.TabletHost, probe.TabletPort, vitessAppName) + tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", probe.TabletHost, probe.TabletPort, throttlerapp.VitessName) resp, err := throttler.httpClient.Get(tabletCheckSelfURL) if err != nil { mySQLThrottleMetric.Err = err @@ -1030,6 +1031,18 @@ func (throttler *Throttler) checkStore(ctx context.Context, appName string, stor if !throttler.IsEnabled() { return okMetricCheckResult } + if throttlerapp.ExemptFromChecks(appName) { + // Some apps are exempt from checks. They are always responded with OK. This is because those apps are + // continuous and do not generate a substantial load. + return okMetricCheckResult + } + if !flags.SkipRequestHeartbeats && appName != throttlerapp.VitessName { + go throttler.heartbeatWriter.RequestHeartbeats() + // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. + // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back + // to the PRIMARY so that it knows it must renew the heartbeat lease. + atomic.StoreInt64(&throttler.recentCheckValue, 1+atomic.LoadInt64(&throttler.recentCheckTickerValue)) + } checkResult = throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) if atomic.LoadInt64(&throttler.recentCheckValue) >= atomic.LoadInt64(&throttler.recentCheckTickerValue) { @@ -1055,13 +1068,6 @@ func (throttler *Throttler) checkSelf(ctx context.Context, appName string, remot // CheckByType runs a check by requested check type func (throttler *Throttler) CheckByType(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags, checkType ThrottleCheckType) (checkResult *CheckResult) { - if throttler.IsEnabled() && !flags.SkipRequestHeartbeats && appName != vitessAppName { - go throttler.heartbeatWriter.RequestHeartbeats() - // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. - // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back - // to the PRIMARY so that it knows it must renew the heartbeat lease. - atomic.StoreInt64(&throttler.recentCheckValue, 1+atomic.LoadInt64(&throttler.recentCheckTickerValue)) - } switch checkType { case ThrottleCheckSelf: return throttler.checkSelf(ctx, appName, remoteAddr, flags) diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index dcd8dacd121..bc84b1e57ed 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -73,7 +73,7 @@ func (uvs *uvstreamer) catchup(ctx context.Context) error { errch := make(chan error, 1) go func() { startPos := mysql.EncodePosition(uvs.pos) - vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send2, "catchup", uvs.vse) + vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.throttlerApp, uvs.send2, "catchup", uvs.vse) uvs.setVs(vs) errch <- vs.Stream() uvs.setVs(nil) @@ -302,7 +302,7 @@ func (uvs *uvstreamer) fastForward(stopPos string) error { }() log.Infof("starting fastForward from %s upto pos %s", mysql.EncodePosition(uvs.pos), stopPos) uvs.stopPos, _ = mysql.DecodePosition(stopPos) - vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send2, "fastforward", uvs.vse) + vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.getVSchema(), uvs.throttlerApp, uvs.send2, "fastforward", uvs.vse) uvs.setVs(vs) return vs.Stream() } diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 9e8074fb4b0..870eaceebbd 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -43,16 +43,13 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) -const ( - throttlerAppName = "vstreamer" -) - // Engine is the engine for handling vreplication streaming requests. type Engine struct { env tabletenv.Env @@ -115,7 +112,7 @@ func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, lagThrot ts: ts, se: se, cell: cell, - throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerAppName, throttle.ThrottleCheckSelf), + throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerapp.VStreamerName, throttle.ThrottleCheckSelf), streamers: make(map[int]*uvstreamer), rowStreamers: make(map[int]*rowStreamer), @@ -226,7 +223,7 @@ func (vse *Engine) validateBinlogRowImage(ctx context.Context, db dbconfigs.Conn // Stream starts a new stream. // This streams events from the binary logs -func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error { +func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp string, send func([]*binlogdatapb.VEvent) error) error { if err := vse.validateBinlogRowImage(ctx, vse.se.GetDBConnector()); err != nil { return err @@ -244,7 +241,7 @@ func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binl } vse.mu.Lock() defer vse.mu.Unlock() - streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, useThrottler, send) + streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, throttlerApp, send) idx := vse.streamIdx vse.streamers[idx] = streamer vse.streamIdx++ diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go index d271164558a..e73912b6088 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/vt/dbconfigs" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) var ( @@ -93,7 +94,7 @@ func TestUpdateVSchema(t *testing.T) { }}, } // Stream should terminate immediately due to invalid pos. - _ = engine.Stream(ctx, "invalid", nil, filter, false, func(_ []*binlogdatapb.VEvent) error { + _ = engine.Stream(ctx, "invalid", nil, filter, throttlerapp.VStreamerName, func(_ []*binlogdatapb.VEvent) error { return nil }) diff --git a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go index f7ad6aa45cf..91f319fa2c5 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go @@ -25,6 +25,7 @@ import ( "vitess.io/vitess/go/vt/dbconfigs" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) // resultStreamer streams the results of the requested query @@ -104,7 +105,7 @@ func (rs *resultStreamer) Stream() error { } // check throttler. - if !rs.vse.throttlerClient.ThrottleCheckOKOrWait(rs.ctx) { + if !rs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(rs.ctx, throttlerapp.ResultStreamerName) { continue } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 15c8fa54621..b8a794e02b4 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -36,6 +36,7 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) var ( @@ -348,7 +349,7 @@ func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.V } // check throttler. - if !rs.vse.throttlerClient.ThrottleCheckOKOrWait(rs.ctx) { + if !rs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(rs.ctx, throttlerapp.RowStreamerName) { throttleResponseRateLimiter.Do(func() error { return safeSend(&binlogdatapb.VStreamRowsResponse{Throttled: true}) }) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index e33ca77e478..958ee4b43e8 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -58,7 +58,7 @@ type uvstreamer struct { startPos string filter *binlogdatapb.Filter inTablePKs []*binlogdatapb.TableLastPK - useThrottler bool + throttlerApp string vschema *localVSchema @@ -91,7 +91,7 @@ type uvstreamerConfig struct { CatchupRetryTime time.Duration } -func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, useThrottler bool, send func([]*binlogdatapb.VEvent) error) *uvstreamer { +func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, throttlerApp string, send func([]*binlogdatapb.VEvent) error) *uvstreamer { ctx, cancel := context.WithCancel(ctx) config := &uvstreamerConfig{ MaxReplicationLag: 1 * time.Nanosecond, @@ -117,7 +117,7 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se vschema: vschema, config: config, inTablePKs: tablePKs, - useThrottler: useThrottler, + throttlerApp: throttlerApp, } return uvs @@ -420,7 +420,7 @@ func (uvs *uvstreamer) Stream() error { } } vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), - uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send, "replicate", uvs.vse) + uvs.filter, uvs.getVSchema(), uvs.throttlerApp, uvs.send, "replicate", uvs.vse) uvs.setVs(vs) return vs.Stream() diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index ec2a8d2334d..1d23cf5a98b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -52,6 +52,7 @@ import ( "time" "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/proto/query" @@ -440,7 +441,7 @@ func getEventCallback(event *binlogdatapb.VEvent) func() { func startVStreamCopy(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, tablePKs []*binlogdatapb.TableLastPK) { pos := "" go func() { - err := engine.Stream(ctx, pos, tablePKs, filter, true, func(evs []*binlogdatapb.VEvent) error { + err := engine.Stream(ctx, pos, tablePKs, filter, throttlerapp.VStreamerName, func(evs []*binlogdatapb.VEvent) error { //t.Logf("Received events: %v", evs) muAllEvents.Lock() defer muAllEvents.Unlock() diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index cf5c5fb350c..80ba82df2f4 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -66,7 +66,7 @@ type vstreamer struct { startPos string filter *binlogdatapb.Filter send func([]*binlogdatapb.VEvent) error - useThrottler bool + throttlerApp string vevents chan *localVSchema vschema *localVSchema @@ -113,7 +113,7 @@ type streamerPlan struct { // // vschema: the current vschema. This value can later be changed through the SetVSchema method. // send: callback function to send events. -func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, startPos string, stopPos string, filter *binlogdatapb.Filter, vschema *localVSchema, useThrottler bool, send func([]*binlogdatapb.VEvent) error, phase string, vse *Engine) *vstreamer { +func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, startPos string, stopPos string, filter *binlogdatapb.Filter, vschema *localVSchema, throttlerApp string, send func([]*binlogdatapb.VEvent) error, phase string, vse *Engine) *vstreamer { ctx, cancel := context.WithCancel(ctx) return &vstreamer{ ctx: ctx, @@ -122,7 +122,7 @@ func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine se: se, startPos: startPos, stopPos: stopPos, - useThrottler: useThrottler, + throttlerApp: throttlerApp, filter: filter, send: send, vevents: make(chan *localVSchema, 1), @@ -303,22 +303,20 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime) defer throttledHeartbeatsRateLimiter.Stop() for { - if vs.useThrottler { - // check throttler. - if !vs.vse.throttlerClient.ThrottleCheckOKOrWait(ctx) { - // make sure to leave if context is cancelled - select { - case <-ctx.Done(): - return - default: - // do nothing special - } - throttledHeartbeatsRateLimiter.Do(func() error { - return injectHeartbeat(true) - }) - // we won't process events, until we're no longer throttling - continue + // check throttler. + if !vs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vs.throttlerApp) { + // make sure to leave if context is cancelled + select { + case <-ctx.Done(): + return + default: + // do nothing special } + throttledHeartbeatsRateLimiter.Do(func() error { + return injectHeartbeat(true) + }) + // we won't process events, until we're no longer throttling + continue } select { case ev, ok := <-events: diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index 1a99e7e9169..1a28722680a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" "google.golang.org/protobuf/proto" @@ -607,7 +608,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { var errGoroutine error go func() { defer wg.Done() - engine.Stream(ctx2, "", nil, filter, true, func(evs []*binlogdatapb.VEvent) error { + engine.Stream(ctx2, "", nil, filter, throttlerapp.VStreamerName, func(evs []*binlogdatapb.VEvent) error { for _, ev := range evs { if ev.Type == binlogdatapb.VEventType_HEARTBEAT { continue @@ -1925,7 +1926,7 @@ func TestMinimalMode(t *testing.T) { newEngine(t, "minimal") defer newEngine(t, "full") - err := engine.Stream(context.Background(), "current", nil, nil, false, func(evs []*binlogdatapb.VEvent) error { return nil }) + err := engine.Stream(context.Background(), "current", nil, nil, throttlerapp.VStreamerName, func(evs []*binlogdatapb.VEvent) error { return nil }) require.Error(t, err, "minimal binlog_row_image is not supported by Vitess VReplication") } @@ -2309,7 +2310,7 @@ func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogda }}, } } - return engine.Stream(ctx, pos, tablePKs, filter, false, func(evs []*binlogdatapb.VEvent) error { + return engine.Stream(ctx, pos, tablePKs, filter, throttlerapp.VStreamerName, func(evs []*binlogdatapb.VEvent) error { timer := time.NewTimer(2 * time.Second) defer timer.Stop() From f978086b66ddc727fca5c71483b89330cfd7411e Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 30 May 2023 10:00:18 +0300 Subject: [PATCH 6/8] new formal throttler app names placeholder Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletserver/throttle/throttlerapp/app.go | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go diff --git a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go new file mode 100644 index 00000000000..369c8515c4c --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go @@ -0,0 +1,55 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package throttlerapp + +const ( + // DefaultName is the app name used by vitess when app doesn't indicate its name + DefaultName = "default" + VitessName = "vitess" + + TableGCName = "tablegc" + OnlineDDLName = "online-ddl" + GhostName = "gh-ost" + PTOSCName = "pt-osc" + + VStreamerName = "vstreamer" + VReplicationName = "vreplication" + VPlayerName = "vplayer" + VCopierName = "vcopier" + ResultStreamerName = "resultstreamer" + RowStreamerName = "rowstreamer" + ExternalConnectorName = "external-connector" + ReplicaConnectorName = "replica-connector" + + BinlogWatcherName = "binlog-watcher" + MessagerName = "messager" + SchemaTrackerName = "schema-tracker" +) + +var ( + exemptFromChecks = map[string]bool{ + BinlogWatcherName: true, + MessagerName: true, + SchemaTrackerName: true, + } +) + +// ExemptFromChecks returns 'true' for apps that should skip the throttler checks. The throttler should +// always repsond with automated "OK" to those apps, without delay. These apps also do not cause a heartbeat renewal. +func ExemptFromChecks(appName string) bool { + return exemptFromChecks[appName] +} From 37a7c2fd92b1b8e18bdf34eab9dfd60caaf0faa8 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 31 May 2023 08:01:44 +0300 Subject: [PATCH 7/8] formalize throttlerapp.Name Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/onlineddl/executor.go | 12 ++-- .../tabletmanager/vreplication/vcopier.go | 2 +- .../tabletmanager/vreplication/vplayer.go | 2 +- .../tabletmanager/vreplication/vreplicator.go | 8 +-- go/vt/vttablet/tabletserver/binlog_watcher.go | 2 +- .../vttablet/tabletserver/messager/engine.go | 3 +- .../messager/message_manager_test.go | 3 +- go/vt/vttablet/tabletserver/schema/tracker.go | 2 +- .../tabletserver/schema/tracker_test.go | 3 +- go/vt/vttablet/tabletserver/tabletserver.go | 2 +- go/vt/vttablet/tabletserver/throttle/check.go | 4 +- .../vttablet/tabletserver/throttle/client.go | 14 +++-- .../tabletserver/throttle/throttler.go | 2 +- .../tabletserver/throttle/throttlerapp/app.go | 56 +++++++++++-------- .../vttablet/tabletserver/vstreamer/engine.go | 2 +- .../tabletserver/vstreamer/uvstreamer.go | 5 +- .../tabletserver/vstreamer/vstreamer.go | 5 +- 17 files changed, 72 insertions(+), 55 deletions(-) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 55068d26e65..21551a7330c 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -1782,8 +1782,8 @@ export MYSQL_PWD ` pluginCode = strings.ReplaceAll(pluginCode, "{{VTTABLET_PORT}}", fmt.Sprintf("%d", servenv.Port())) pluginCode = strings.ReplaceAll(pluginCode, "{{MIGRATION_UUID}}", onlineDDL.UUID) - pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_ONLINE_DDL_APP}}", throttlerapp.OnlineDDLName) - pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_PT_OSC_APP}}", throttlerapp.PTOSCName) + pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_ONLINE_DDL_APP}}", throttlerapp.OnlineDDLName.String()) + pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_PT_OSC_APP}}", throttlerapp.PTOSCName.String()) pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusRunning}}", string(schema.OnlineDDLStatusRunning)) pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusComplete}}", string(schema.OnlineDDLStatusComplete)) @@ -2142,7 +2142,7 @@ func (e *Executor) ThrottleAllMigrations(ctx context.Context, expireString strin if err := e.lagThrottler.CheckIsReady(); err != nil { return nil, err } - _ = e.lagThrottler.ThrottleApp(throttlerapp.OnlineDDLName, time.Now().Add(duration), ratio) + _ = e.lagThrottler.ThrottleApp(throttlerapp.OnlineDDLName.String(), time.Now().Add(duration), ratio) return emptyResult, nil } @@ -2162,7 +2162,7 @@ func (e *Executor) UnthrottleAllMigrations(ctx context.Context) (result *sqltype return nil, err } defer e.triggerNextCheckInterval() - _ = e.lagThrottler.UnthrottleApp(throttlerapp.OnlineDDLName) + _ = e.lagThrottler.UnthrottleApp(throttlerapp.OnlineDDLName.String()) return emptyResult, nil } @@ -3470,7 +3470,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i if err := e.lagThrottler.CheckIsReady(); err == nil { // No point in reviewing throttler info if it's not enabled&open for _, app := range e.lagThrottler.ThrottledApps() { - if app.AppName == throttlerapp.OnlineDDLName { + if throttlerapp.OnlineDDLName.Equals(app.AppName) { currentUserThrottleRatio = app.Ratio break } @@ -3604,7 +3604,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i // - it's a deadlock. // And so, once per reviewRunningMigrations(), and assuming there _are_ running migrations, we ensure to hit a throttler check. This will kick // on-demand heartbeats, unlocking the deadlock. - e.lagThrottler.CheckByType(ctx, throttlerapp.OnlineDDLName, "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite) + e.lagThrottler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite) }) } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index 1f94f0fe578..0f2bf9c109a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -466,7 +466,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma return nil } // verify throttler is happy, otherwise keep looping - if vc.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vc.throttlerAppName) { + if vc.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vc.throttlerAppName)) { break // out of 'for' loop } else { // we're throttled _ = vc.vr.updateTimeThrottled(throttlerapp.VCopierName) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index aacb5bb9376..084079506d9 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -336,7 +336,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { return ctx.Err() } // check throttler. - if !vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vp.throttlerAppName) { + if !vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)) { _ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName) continue } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 4df4cbb3c39..fd8117a6b5f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -554,17 +554,17 @@ func (vr *vreplicator) setSQLMode(ctx context.Context, dbClient *vdbClient) (fun // This is useful when we want to throttle all migrations. We throttle "online-ddl" and that applies to both vreplication // migrations as well as gh-ost migrations. func (vr *vreplicator) throttlerAppName() string { - names := []string{vr.WorkflowName, throttlerapp.VReplicationName} + names := []string{vr.WorkflowName, throttlerapp.VReplicationName.String()} if vr.WorkflowType == int32(binlogdatapb.VReplicationWorkflowType_OnlineDDL) { - names = append(names, throttlerapp.OnlineDDLName) + names = append(names, throttlerapp.OnlineDDLName.String()) } return strings.Join(names, ":") } -func (vr *vreplicator) updateTimeThrottled(appThrottled string) error { +func (vr *vreplicator) updateTimeThrottled(appThrottled throttlerapp.Name) error { err := vr.throttleUpdatesRateLimiter.Do(func() error { tm := time.Now().Unix() - update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appThrottled) + update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appThrottled.String()) if err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/binlog_watcher.go b/go/vt/vttablet/tabletserver/binlog_watcher.go index db8039d403c..aaa3f2b1725 100644 --- a/go/vt/vttablet/tabletserver/binlog_watcher.go +++ b/go/vt/vttablet/tabletserver/binlog_watcher.go @@ -32,7 +32,7 @@ import ( // VStreamer defines the functions of VStreamer // that the BinlogWatcher needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp string, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error } // BinlogWatcher is a tabletserver service that watches the diff --git a/go/vt/vttablet/tabletserver/messager/engine.go b/go/vt/vttablet/tabletserver/messager/engine.go index 8cff2e70b9a..e0262b93492 100644 --- a/go/vt/vttablet/tabletserver/messager/engine.go +++ b/go/vt/vttablet/tabletserver/messager/engine.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" @@ -44,7 +45,7 @@ type TabletService interface { // VStreamer defines the functions of VStreamer // that the messager needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp string, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error } diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index c0cefc67666..59286403885 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -37,6 +37,7 @@ import ( "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" @@ -888,7 +889,7 @@ func (fv *fakeVStreamer) setPollerResponse(pr []*binlogdatapb.VStreamResultsResp fv.pollerResponse = pr } -func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp string, send func([]*binlogdatapb.VEvent) error) error { +func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error { fv.streamInvocations.Add(1) for { fv.mu.Lock() diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index d0341fe231b..ec9050b5c7e 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -41,7 +41,7 @@ import ( // VStreamer defines the functions of VStreamer // that the replicationWatcher needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp string, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error } // Tracker watches the replication and saves the latest schema into the schema_version table when a DDL is encountered. diff --git a/go/vt/vttablet/tabletserver/schema/tracker_test.go b/go/vt/vttablet/tabletserver/schema/tracker_test.go index ed0fc917524..2029235b2e3 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker_test.go +++ b/go/vt/vttablet/tabletserver/schema/tracker_test.go @@ -26,6 +26,7 @@ import ( "vitess.io/vitess/go/sqltypes" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) func TestTracker(t *testing.T) { @@ -137,7 +138,7 @@ type fakeVstreamer struct { events [][]*binlogdatapb.VEvent } -func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp string, send func([]*binlogdatapb.VEvent) error) error { +func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error { for _, events := range f.events { err := send(events) if err != nil { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 07125c7a639..f9ee238e7d6 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1800,7 +1800,7 @@ func (tsv *TabletServer) registerThrottlerCheckHandlers() { } appName := r.URL.Query().Get("app") if appName == "" { - appName = throttlerapp.DefaultName + appName = throttlerapp.DefaultName.String() } flags := &throttle.CheckFlags{ LowPriority: (r.URL.Query().Get("p") == "low"), diff --git a/go/vt/vttablet/tabletserver/throttle/check.go b/go/vt/vttablet/tabletserver/throttle/check.go index 877e3668185..dd209a0c423 100644 --- a/go/vt/vttablet/tabletserver/throttle/check.go +++ b/go/vt/vttablet/tabletserver/throttle/check.go @@ -87,7 +87,7 @@ func (check *ThrottlerCheck) checkAppMetricResult(ctx context.Context, appName s statusCode = http.StatusTooManyRequests // 429 err = base.ErrThresholdExceeded - if !flags.LowPriority && !flags.ReadCheck && appName != throttlerapp.VitessName { + if !flags.LowPriority && !flags.ReadCheck && throttlerapp.VitessName.Equals(appName) { // low priority requests will henceforth be denied go check.throttler.nonLowPriorityAppRequestsThrottled.SetDefault(metricName, true) } @@ -148,7 +148,7 @@ func (check *ThrottlerCheck) localCheck(ctx context.Context, metricName string) if err != nil { return NoSuchMetricCheckResult } - checkResult = check.Check(ctx, throttlerapp.VitessName, storeType, storeName, "local", StandardCheckFlags) + checkResult = check.Check(ctx, throttlerapp.VitessName.String(), storeType, storeName, "local", StandardCheckFlags) if checkResult.StatusCode == http.StatusOK { check.throttler.markMetricHealthy(metricName) diff --git a/go/vt/vttablet/tabletserver/throttle/client.go b/go/vt/vttablet/tabletserver/throttle/client.go index 9a3f699b2da..30d6c79afdf 100644 --- a/go/vt/vttablet/tabletserver/throttle/client.go +++ b/go/vt/vttablet/tabletserver/throttle/client.go @@ -22,6 +22,8 @@ import ( "sync" "sync/atomic" "time" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( @@ -46,7 +48,7 @@ func initThrottleTicker() { // Client construct is used by apps who wish to consult with a throttler. It encapsulates the check/throttling/backoff logic type Client struct { throttler *Throttler - appName string + appName throttlerapp.Name checkType ThrottleCheckType flags CheckFlags @@ -54,7 +56,7 @@ type Client struct { } // NewProductionClient creates a client suitable for foreground/production jobs, which have normal priority. -func NewProductionClient(throttler *Throttler, appName string, checkType ThrottleCheckType) *Client { +func NewProductionClient(throttler *Throttler, appName throttlerapp.Name, checkType ThrottleCheckType) *Client { initThrottleTicker() return &Client{ throttler: throttler, @@ -68,7 +70,7 @@ func NewProductionClient(throttler *Throttler, appName string, checkType Throttl // NewBackgroundClient creates a client suitable for background jobs, which have low priority over productio ntraffic, // e.g. migration, table pruning, vreplication -func NewBackgroundClient(throttler *Throttler, appName string, checkType ThrottleCheckType) *Client { +func NewBackgroundClient(throttler *Throttler, appName throttlerapp.Name, checkType ThrottleCheckType) *Client { initThrottleTicker() return &Client{ throttler: throttler, @@ -85,7 +87,7 @@ func NewBackgroundClient(throttler *Throttler, appName string, checkType Throttl // The function caches results for a brief amount of time, hence it's safe and efficient to // be called very frequenty. // The function is not thread safe. -func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName string) (throttleCheckOK bool) { +func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName throttlerapp.Name) (throttleCheckOK bool) { if c == nil { // no client return true @@ -103,7 +105,7 @@ func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName string) (t if overrideAppName != "" { checkApp = overrideAppName } - checkResult := c.throttler.CheckByType(ctx, checkApp, "", &c.flags, c.checkType) + checkResult := c.throttler.CheckByType(ctx, checkApp.String(), "", &c.flags, c.checkType) if checkResult.StatusCode != http.StatusOK { return false } @@ -116,7 +118,7 @@ func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName string) (t // otherwise it briefly sleeps and returns 'false'. // Non-empty appName overrides the default appName. // The function is not thread safe. -func (c *Client) ThrottleCheckOKOrWaitAppName(ctx context.Context, appName string) bool { +func (c *Client) ThrottleCheckOKOrWaitAppName(ctx context.Context, appName throttlerapp.Name) bool { ok := c.ThrottleCheckOK(ctx, appName) if !ok { time.Sleep(throttleCheckDuration) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index a1f0bd7e0b1..83cb28ae849 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -1036,7 +1036,7 @@ func (throttler *Throttler) checkStore(ctx context.Context, appName string, stor // continuous and do not generate a substantial load. return okMetricCheckResult } - if !flags.SkipRequestHeartbeats && appName != throttlerapp.VitessName { + if !flags.SkipRequestHeartbeats && !throttlerapp.VitessName.Equals(appName) { go throttler.heartbeatWriter.RequestHeartbeats() // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back diff --git a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go index 369c8515c4c..41ac3d5a671 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go +++ b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go @@ -16,35 +16,45 @@ limitations under the License. package throttlerapp +type Name string + +func (n Name) String() string { + return string(n) +} + +func (n Name) Equals(s string) bool { + return string(n) == s +} + const ( // DefaultName is the app name used by vitess when app doesn't indicate its name - DefaultName = "default" - VitessName = "vitess" - - TableGCName = "tablegc" - OnlineDDLName = "online-ddl" - GhostName = "gh-ost" - PTOSCName = "pt-osc" - - VStreamerName = "vstreamer" - VReplicationName = "vreplication" - VPlayerName = "vplayer" - VCopierName = "vcopier" - ResultStreamerName = "resultstreamer" - RowStreamerName = "rowstreamer" - ExternalConnectorName = "external-connector" - ReplicaConnectorName = "replica-connector" - - BinlogWatcherName = "binlog-watcher" - MessagerName = "messager" - SchemaTrackerName = "schema-tracker" + DefaultName Name = "default" + VitessName Name = "vitess" + + TableGCName Name = "tablegc" + OnlineDDLName Name = "online-ddl" + GhostName Name = "gh-ost" + PTOSCName Name = "pt-osc" + + VStreamerName Name = "vstreamer" + VReplicationName Name = "vreplication" + VPlayerName Name = "vplayer" + VCopierName Name = "vcopier" + ResultStreamerName Name = "resultstreamer" + RowStreamerName Name = "rowstreamer" + ExternalConnectorName Name = "external-connector" + ReplicaConnectorName Name = "replica-connector" + + BinlogWatcherName Name = "binlog-watcher" + MessagerName Name = "messager" + SchemaTrackerName Name = "schema-tracker" ) var ( exemptFromChecks = map[string]bool{ - BinlogWatcherName: true, - MessagerName: true, - SchemaTrackerName: true, + BinlogWatcherName.String(): true, + MessagerName.String(): true, + SchemaTrackerName.String(): true, } ) diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 870eaceebbd..c55c312f442 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -223,7 +223,7 @@ func (vse *Engine) validateBinlogRowImage(ctx context.Context, db dbconfigs.Conn // Stream starts a new stream. // This streams events from the binary logs -func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp string, send func([]*binlogdatapb.VEvent) error) error { +func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error { if err := vse.validateBinlogRowImage(ctx, vse.se.GetDBConnector()); err != nil { return err diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 958ee4b43e8..0eec8d93db3 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -37,6 +37,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) var uvstreamerTestMode = false // Only used for testing @@ -58,7 +59,7 @@ type uvstreamer struct { startPos string filter *binlogdatapb.Filter inTablePKs []*binlogdatapb.TableLastPK - throttlerApp string + throttlerApp throttlerapp.Name vschema *localVSchema @@ -91,7 +92,7 @@ type uvstreamerConfig struct { CatchupRetryTime time.Duration } -func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, throttlerApp string, send func([]*binlogdatapb.VEvent) error) *uvstreamer { +func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) *uvstreamer { ctx, cancel := context.WithCancel(ctx) config := &uvstreamerConfig{ MaxReplicationLag: 1 * time.Nanosecond, diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 80ba82df2f4..aefa239d3e8 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -43,6 +43,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( @@ -66,7 +67,7 @@ type vstreamer struct { startPos string filter *binlogdatapb.Filter send func([]*binlogdatapb.VEvent) error - throttlerApp string + throttlerApp throttlerapp.Name vevents chan *localVSchema vschema *localVSchema @@ -113,7 +114,7 @@ type streamerPlan struct { // // vschema: the current vschema. This value can later be changed through the SetVSchema method. // send: callback function to send events. -func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, startPos string, stopPos string, filter *binlogdatapb.Filter, vschema *localVSchema, throttlerApp string, send func([]*binlogdatapb.VEvent) error, phase string, vse *Engine) *vstreamer { +func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, startPos string, stopPos string, filter *binlogdatapb.Filter, vschema *localVSchema, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error, phase string, vse *Engine) *vstreamer { ctx, cancel := context.WithCancel(ctx) return &vstreamer{ ctx: ctx, From 3aab1517743800398004a9b96e8502256df62a56 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 31 May 2023 09:29:34 +0300 Subject: [PATCH 8/8] Refactor names in endtoend tests Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go | 12 ++++++------ go/test/endtoend/onlineddl/vtgate_util.go | 7 ++++--- go/test/endtoend/vreplication/helper_test.go | 7 ++++--- go/test/endtoend/vreplication/vreplication_test.go | 12 ++++++------ 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go index d1f38059e79..be88fc4618b 100644 --- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go +++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go @@ -232,13 +232,13 @@ func throttleResponse(tablet *cluster.Vttablet, path string) (respBody string, e } // direct per-tablet throttler API instruction -func throttleApp(tablet *cluster.Vttablet, app string) (string, error) { - return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app)) +func throttleApp(tablet *cluster.Vttablet, throttlerApp throttlerapp.Name) (string, error) { + return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerApp)) } // direct per-tablet throttler API instruction -func unthrottleApp(tablet *cluster.Vttablet, app string) (string, error) { - return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app)) +func unthrottleApp(tablet *cluster.Vttablet, throttlerApp throttlerapp.Name) (string, error) { + return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp)) } func TestSchemaChange(t *testing.T) { @@ -398,7 +398,7 @@ func TestSchemaChange(t *testing.T) { // to be strictly higher than started_timestamp assert.GreaterOrEqual(t, lastThrottledTimestamp, startedTimestamp) component := row.AsString("component_throttled", "") - assert.Contains(t, []string{throttlerapp.VCopierName, throttlerapp.VPlayerName}, component) + assert.Contains(t, []string{throttlerapp.VCopierName.String(), throttlerapp.VPlayerName.String()}, component) // unthrottle onlineddl.UnthrottleAllMigrations(t, &vtParams) @@ -447,7 +447,7 @@ func TestSchemaChange(t *testing.T) { // clock irregularities assert.GreaterOrEqual(t, lastThrottledTime.Add(time.Second), startedTime) component := row.AsString("component_throttled", "") - assert.Contains(t, []string{throttlerapp.VStreamerName, throttlerapp.RowStreamerName}, component) + assert.Contains(t, []string{throttlerapp.VStreamerName.String(), throttlerapp.RowStreamerName.String()}, component) }() // now unthrottled status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) diff --git a/go/test/endtoend/onlineddl/vtgate_util.go b/go/test/endtoend/onlineddl/vtgate_util.go index 75c35061da9..e59e5759a75 100644 --- a/go/test/endtoend/onlineddl/vtgate_util.go +++ b/go/test/endtoend/onlineddl/vtgate_util.go @@ -31,6 +31,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/test/endtoend/cluster" @@ -311,17 +312,17 @@ func UnthrottleAllMigrations(t *testing.T, vtParams *mysql.ConnParams) { } // CheckThrottledApps checks for existence or non-existence of an app in the throttled apps list -func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, appName string, expectFind bool) { +func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, throttlerApp throttlerapp.Name, expectFind bool) { query := "show vitess_throttled_apps" r := VtgateExecQuery(t, vtParams, query, "") found := false for _, row := range r.Named().Rows { - if row.AsString("app", "") == appName { + if throttlerApp.Equals(row.AsString("app", "")) { found = true } } - assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", appName, found) + assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", throttlerApp, found) } // WaitForThrottledTimestamp waits for a migration to have a non-empty last_throttled_timestamp diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 82a1281c6b5..49f35d49b06 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -45,6 +45,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( @@ -125,12 +126,12 @@ func waitForQueryResult(t *testing.T, conn *mysql.Conn, database string, query s // waitForTabletThrottlingStatus waits for the tablet to return the provided HTTP code for // the provided app name in its self check. -func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, appName string, wantCode int64) { +func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name, wantCode int64) { var gotCode int64 timer := time.NewTimer(defaultTimeout) defer timer.Stop() for { - output, err := throttlerCheckSelf(tablet, appName) + output, err := throttlerCheckSelf(tablet, throttlerApp) require.NoError(t, err) gotCode, err = jsonparser.GetInt([]byte(output), "StatusCode") @@ -144,7 +145,7 @@ func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess select { case <-timer.C: require.FailNow(t, fmt.Sprintf("tablet %q did not return expected status of %d for application %q before the timeout of %s; last seen status: %d", - tablet.Name, wantCode, appName, defaultTimeout, gotCode)) + tablet.Name, wantCode, throttlerApp, defaultTimeout, gotCode)) default: time.Sleep(defaultTick) } diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 69c364197c5..38f8e250d3d 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -95,16 +95,16 @@ func throttleResponse(tablet *cluster.VttabletProcess, path string) (respBody st return respBody, err } -func throttleApp(tablet *cluster.VttabletProcess, app string) (string, error) { - return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app)) +func throttleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) { + return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerApp.String())) } -func unthrottleApp(tablet *cluster.VttabletProcess, app string) (string, error) { - return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app)) +func unthrottleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) { + return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp.String())) } -func throttlerCheckSelf(tablet *cluster.VttabletProcess, app string) (respBody string, err error) { - apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, app) +func throttlerCheckSelf(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (respBody string, err error) { + apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, throttlerApp.String()) resp, err := httpClient.Get(apiURL) if err != nil { return "", err