diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go index 0303e4fcec9..be88fc4618b 100644 --- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go +++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go @@ -33,20 +33,18 @@ import ( "vitess.io/vitess/go/test/endtoend/onlineddl" "vitess.io/vitess/go/test/endtoend/throttler" "vitess.io/vitess/go/vt/schema" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) var ( - clusterInstance *cluster.LocalProcessCluster - shards []cluster.Shard - vtParams mysql.ConnParams - httpClient = throttlebase.SetupHTTPClient(time.Second) - onlineDDLThrottlerAppName = "online-ddl" - vstreamerThrottlerAppName = "vstreamer" + clusterInstance *cluster.LocalProcessCluster + shards []cluster.Shard + vtParams mysql.ConnParams + httpClient = throttlebase.SetupHTTPClient(time.Second) normalMigrationWait = 45 * time.Second extendedMigrationWait = 60 * time.Second @@ -234,13 +232,13 @@ func throttleResponse(tablet *cluster.Vttablet, path string) (respBody string, e } // direct per-tablet throttler API instruction -func throttleApp(tablet *cluster.Vttablet, app string) (string, error) { - return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app)) +func throttleApp(tablet *cluster.Vttablet, throttlerApp throttlerapp.Name) (string, error) { + return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerApp)) } // direct per-tablet throttler API instruction -func unthrottleApp(tablet *cluster.Vttablet, app string) (string, error) { - return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app)) +func unthrottleApp(tablet *cluster.Vttablet, throttlerApp throttlerapp.Name) (string, error) { + return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp)) } func TestSchemaChange(t *testing.T) { @@ -385,7 +383,7 @@ func TestSchemaChange(t *testing.T) { // begin throttling: onlineddl.ThrottleAllMigrations(t, &vtParams) defer onlineddl.UnthrottleAllMigrations(t, &vtParams) - onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true) + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true) uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true) _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusRunning) @@ -400,11 +398,11 @@ func TestSchemaChange(t *testing.T) { // to be strictly higher than started_timestamp assert.GreaterOrEqual(t, lastThrottledTimestamp, startedTimestamp) component := row.AsString("component_throttled", "") - assert.Contains(t, []string{string(vreplication.VCopierComponentName), string(vreplication.VPlayerComponentName)}, component) + assert.Contains(t, []string{throttlerapp.VCopierName.String(), throttlerapp.VPlayerName.String()}, component) // unthrottle onlineddl.UnthrottleAllMigrations(t, &vtParams) - onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, false) + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false) status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) @@ -421,11 +419,11 @@ func TestSchemaChange(t *testing.T) { // vstreamer source; but it's OK to be on the safe side and throttle on all tablets. Doesn't // change the essence of this test. for _, tablet := range shard.Vttablets { - body, err := throttleApp(tablet, vstreamerThrottlerAppName) - defer unthrottleApp(tablet, vstreamerThrottlerAppName) + body, err := throttleApp(tablet, throttlerapp.VStreamerName) + defer unthrottleApp(tablet, throttlerapp.VStreamerName) assert.NoError(t, err) - assert.Contains(t, body, vstreamerThrottlerAppName) + assert.Contains(t, body, throttlerapp.VStreamerName) } } @@ -449,7 +447,7 @@ func TestSchemaChange(t *testing.T) { // clock irregularities assert.GreaterOrEqual(t, lastThrottledTime.Add(time.Second), startedTime) component := row.AsString("component_throttled", "") - assert.Contains(t, []string{string(vreplication.VStreamerComponentName), string(vreplication.RowStreamerComponentName)}, component) + assert.Contains(t, []string{throttlerapp.VStreamerName.String(), throttlerapp.RowStreamerName.String()}, component) }() // now unthrottled status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) @@ -478,7 +476,7 @@ func TestSchemaChange(t *testing.T) { // Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...` onlineddl.ThrottleAllMigrations(t, &vtParams) defer onlineddl.UnthrottleAllMigrations(t, &vtParams) - onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true) + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true) // spawn n migrations; cancel them via cancel-all var wg sync.WaitGroup @@ -497,7 +495,7 @@ func TestSchemaChange(t *testing.T) { // Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...` onlineddl.ThrottleAllMigrations(t, &vtParams) defer onlineddl.UnthrottleAllMigrations(t, &vtParams) - onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true) + onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true) // spawn n migrations; cancel them via cancel-all var wg sync.WaitGroup @@ -531,16 +529,16 @@ func TestSchemaChange(t *testing.T) { case 0: // this is the shard where we run PRS // Use per-tablet throttling API - body, err = throttleApp(currentPrimaryTablet, onlineDDLThrottlerAppName) - defer unthrottleApp(currentPrimaryTablet, onlineDDLThrottlerAppName) + body, err = throttleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName) + defer unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName) case 1: // no PRS on this shard // Use per-tablet throttling API - body, err = throttleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName) - defer unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName) + body, err = throttleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName) + defer unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName) } assert.NoError(t, err) - assert.Contains(t, body, onlineDDLThrottlerAppName) + assert.Contains(t, body, throttlerapp.OnlineDDLName) } uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true) @@ -596,14 +594,14 @@ func TestSchemaChange(t *testing.T) { case 0: // this is the shard where we run PRS // Use per-tablet throttling API - body, err = unthrottleApp(currentPrimaryTablet, onlineDDLThrottlerAppName) + body, err = unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName) case 1: // no PRS on this shard // Use per-tablet throttling API - body, err = unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName) + body, err = unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName) } assert.NoError(t, err) - assert.Contains(t, body, onlineDDLThrottlerAppName) + assert.Contains(t, body, throttlerapp.OnlineDDLName) } }) t.Run("expect completion", func(t *testing.T) { @@ -823,11 +821,11 @@ func TestSchemaChange(t *testing.T) { // - tablet throttling t.Run("Revert a migration completed on one shard and cancelled on another", func(t *testing.T) { // shard 0 will run normally, shard 1 will be throttled - defer unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName) + defer unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName) t.Run("throttle shard 1", func(t *testing.T) { - body, err := throttleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName) + body, err := throttleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName) assert.NoError(t, err) - assert.Contains(t, body, onlineDDLThrottlerAppName) + assert.Contains(t, body, throttlerapp.OnlineDDLName) }) var uuid string @@ -849,9 +847,9 @@ func TestSchemaChange(t *testing.T) { onlineddl.CheckCancelAllMigrations(t, &vtParams, 1) }) t.Run("unthrottle shard 1", func(t *testing.T) { - body, err := unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName) + body, err := unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName) assert.NoError(t, err) - assert.Contains(t, body, onlineDDLThrottlerAppName) + assert.Contains(t, body, throttlerapp.OnlineDDLName) }) var revertUUID string t.Run("issue revert migration", func(t *testing.T) { diff --git a/go/test/endtoend/onlineddl/vtgate_util.go b/go/test/endtoend/onlineddl/vtgate_util.go index 75c35061da9..e59e5759a75 100644 --- a/go/test/endtoend/onlineddl/vtgate_util.go +++ b/go/test/endtoend/onlineddl/vtgate_util.go @@ -31,6 +31,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/test/endtoend/cluster" @@ -311,17 +312,17 @@ func UnthrottleAllMigrations(t *testing.T, vtParams *mysql.ConnParams) { } // CheckThrottledApps checks for existence or non-existence of an app in the throttled apps list -func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, appName string, expectFind bool) { +func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, throttlerApp throttlerapp.Name, expectFind bool) { query := "show vitess_throttled_apps" r := VtgateExecQuery(t, vtParams, query, "") found := false for _, row := range r.Named().Rows { - if row.AsString("app", "") == appName { + if throttlerApp.Equals(row.AsString("app", "")) { found = true } } - assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", appName, found) + assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", throttlerApp, found) } // WaitForThrottledTimestamp waits for a migration to have a non-empty last_throttled_timestamp diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index fe87262a21f..a3204e0be2f 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -172,12 +172,12 @@ func throttledApps(tablet *cluster.Vttablet) (resp *http.Response, respBody stri } func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*http.Response, error) { - resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats)) + resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test&s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats)) return resp, err } func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) { - return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath)) + return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test", tablet.HTTPPort, checkSelfAPIPath)) } func warmUpHeartbeat(t *testing.T) (respStatus int) { diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 82a1281c6b5..49f35d49b06 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -45,6 +45,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( @@ -125,12 +126,12 @@ func waitForQueryResult(t *testing.T, conn *mysql.Conn, database string, query s // waitForTabletThrottlingStatus waits for the tablet to return the provided HTTP code for // the provided app name in its self check. -func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, appName string, wantCode int64) { +func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name, wantCode int64) { var gotCode int64 timer := time.NewTimer(defaultTimeout) defer timer.Stop() for { - output, err := throttlerCheckSelf(tablet, appName) + output, err := throttlerCheckSelf(tablet, throttlerApp) require.NoError(t, err) gotCode, err = jsonparser.GetInt([]byte(output), "StatusCode") @@ -144,7 +145,7 @@ func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess select { case <-timer.C: require.FailNow(t, fmt.Sprintf("tablet %q did not return expected status of %d for application %q before the timeout of %s; last seen status: %d", - tablet.Name, wantCode, appName, defaultTimeout, gotCode)) + tablet.Name, wantCode, throttlerApp, defaultTimeout, gotCode)) default: time.Sleep(defaultTick) } diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index ca9e316c97b..38f8e250d3d 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -44,6 +44,7 @@ import ( "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" ) @@ -58,8 +59,8 @@ var ( sourceKsOpts = make(map[string]string) targetKsOpts = make(map[string]string) httpClient = throttlebase.SetupHTTPClient(time.Second) - sourceThrottlerAppName = "vstreamer" - targetThrottlerAppName = "vreplication" + sourceThrottlerAppName = throttlerapp.VStreamerName + targetThrottlerAppName = throttlerapp.VReplicationName ) const ( @@ -94,16 +95,16 @@ func throttleResponse(tablet *cluster.VttabletProcess, path string) (respBody st return respBody, err } -func throttleApp(tablet *cluster.VttabletProcess, app string) (string, error) { - return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app)) +func throttleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) { + return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerApp.String())) } -func unthrottleApp(tablet *cluster.VttabletProcess, app string) (string, error) { - return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app)) +func unthrottleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) { + return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp.String())) } -func throttlerCheckSelf(tablet *cluster.VttabletProcess, app string) (respBody string, err error) { - apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, app) +func throttlerCheckSelf(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (respBody string, err error) { + apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, throttlerApp.String()) resp, err := httpClient.Get(apiURL) if err != nil { return "", err diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index f53b0ef44e3..21551a7330c 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -63,6 +63,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tmclient" ) @@ -124,7 +125,6 @@ var ( migrationFailureFileName = "migration-failure.log" onlineDDLUser = "vt-online-ddl-internal" onlineDDLGrant = fmt.Sprintf("'%s'@'%s'", onlineDDLUser, "%") - throttlerOnlineDDLApp = "online-ddl" throttleCheckFlags = &throttle.CheckFlags{} ) @@ -1621,7 +1621,7 @@ exit $exit_code fmt.Sprintf("--serve-socket-file=%s", serveSocketFile), fmt.Sprintf("--hooks-path=%s", tempDir), fmt.Sprintf(`--hooks-hint-token=%s`, onlineDDL.UUID), - fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=%s:gh-ost:%s&p=low`, servenv.Port(), throttlerOnlineDDLApp, onlineDDL.UUID), + fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=%s:%s:%s&p=low`, servenv.Port(), throttlerapp.OnlineDDLName, throttlerapp.GhostName, onlineDDL.UUID), fmt.Sprintf(`--database=%s`, e.dbName), fmt.Sprintf(`--table=%s`, onlineDDL.Table), fmt.Sprintf(`--alter=%s`, alterOptions), @@ -1768,7 +1768,7 @@ export MYSQL_PWD my ($self, %args) = @_; return sub { - if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app={{THROTTLER_ONLINE_DDL_APP}}:pt-osc:{{MIGRATION_UUID}}&p=low")) { + if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app={{THROTTLER_ONLINE_DDL_APP}}:{{THROTTLER_PT_OSC_APP}}:{{MIGRATION_UUID}}&p=low")) { # Got HTTP 200 OK, means throttler is happy return 0; } else { @@ -1782,7 +1782,8 @@ export MYSQL_PWD ` pluginCode = strings.ReplaceAll(pluginCode, "{{VTTABLET_PORT}}", fmt.Sprintf("%d", servenv.Port())) pluginCode = strings.ReplaceAll(pluginCode, "{{MIGRATION_UUID}}", onlineDDL.UUID) - pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_ONLINE_DDL_APP}}", throttlerOnlineDDLApp) + pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_ONLINE_DDL_APP}}", throttlerapp.OnlineDDLName.String()) + pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_PT_OSC_APP}}", throttlerapp.PTOSCName.String()) pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusRunning}}", string(schema.OnlineDDLStatusRunning)) pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusComplete}}", string(schema.OnlineDDLStatusComplete)) @@ -2141,7 +2142,7 @@ func (e *Executor) ThrottleAllMigrations(ctx context.Context, expireString strin if err := e.lagThrottler.CheckIsReady(); err != nil { return nil, err } - _ = e.lagThrottler.ThrottleApp(throttlerOnlineDDLApp, time.Now().Add(duration), ratio) + _ = e.lagThrottler.ThrottleApp(throttlerapp.OnlineDDLName.String(), time.Now().Add(duration), ratio) return emptyResult, nil } @@ -2161,7 +2162,7 @@ func (e *Executor) UnthrottleAllMigrations(ctx context.Context) (result *sqltype return nil, err } defer e.triggerNextCheckInterval() - _ = e.lagThrottler.UnthrottleApp(throttlerOnlineDDLApp) + _ = e.lagThrottler.UnthrottleApp(throttlerapp.OnlineDDLName.String()) return emptyResult, nil } @@ -3469,7 +3470,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i if err := e.lagThrottler.CheckIsReady(); err == nil { // No point in reviewing throttler info if it's not enabled&open for _, app := range e.lagThrottler.ThrottledApps() { - if app.AppName == throttlerOnlineDDLApp { + if throttlerapp.OnlineDDLName.Equals(app.AppName) { currentUserThrottleRatio = app.Ratio break } @@ -3603,7 +3604,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i // - it's a deadlock. // And so, once per reviewRunningMigrations(), and assuming there _are_ running migrations, we ensure to hit a throttler check. This will kick // on-demand heartbeats, unlocking the deadlock. - e.lagThrottler.CheckByType(ctx, throttlerOnlineDDLApp, "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite) + e.lagThrottler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite) }) } } diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index bb173c41abc..bbda63340de 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -40,6 +40,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletconntest" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" "vitess.io/vitess/go/vt/vttablet/tmclient" @@ -235,7 +236,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS if vstreamHook != nil { vstreamHook(ctx) } - return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send) + return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, throttlerapp.VStreamerName, send) } // vstreamRowsHook allows you to do work just before calling VStreamRows. diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 00ac2824d3a..52f9c072d49 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -43,6 +43,7 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( @@ -51,9 +52,7 @@ const ( copyStateTableName = "copy_state" postCopyActionTableName = "post_copy_action" - maxRows = 10000 - throttlerVReplicationAppName = "vreplication" - throttlerOnlineDDLAppName = "online-ddl" + maxRows = 10000 ) const ( @@ -136,7 +135,7 @@ func NewEngine(config *tabletenv.TabletConfig, ts *topo.Server, cell string, mys mysqld: mysqld, journaler: make(map[string]*journalEvent), ec: newExternalConnector(config.ExternalConnections), - throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerVReplicationAppName, throttle.ThrottleCheckPrimaryWrite), + throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerapp.VReplicationName, throttle.ThrottleCheckPrimaryWrite), } return vre diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go index ac411de7ce6..5bec2b4f78a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go @@ -33,6 +33,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletconn" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" ) @@ -126,7 +127,7 @@ func (c *mysqlConnector) Close(ctx context.Context) error { } func (c *mysqlConnector) VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, send) + return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, throttlerapp.ExternalConnectorName, send) } func (c *mysqlConnector) VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error { diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 5ed944125af..8260d95b462 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -51,6 +51,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletconn" "vitess.io/vitess/go/vt/vttablet/tabletconntest" qh "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication/queryhistory" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" @@ -307,7 +308,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS if vstreamHook != nil { vstreamHook(ctx) } - return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send) + return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, throttlerapp.VStreamerName, send) } // vstreamRowsHook allows you to do work just before calling VStreamRows. diff --git a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go index 3954f4d0546..9c6f427b418 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go @@ -20,6 +20,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "context" @@ -84,7 +85,7 @@ func (c *ReplicaConnector) Close(ctx context.Context) error { } func (c *ReplicaConnector) VStream(ctx context.Context, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - return c.vstreamer.Stream(ctx, startPos, nil, filter, send) + return c.vstreamer.Stream(ctx, startPos, nil, filter, throttlerapp.ReplicaConnectorName, send) } // VStreamRows streams rows from query result diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index 2bff8db85ef..0f2bf9c109a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -39,6 +39,7 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) type vcopier struct { @@ -457,7 +458,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma default: } if rows.Throttled { - _ = vc.vr.updateTimeThrottled(RowStreamerComponentName) + _ = vc.vr.updateTimeThrottled(throttlerapp.RowStreamerName) return nil } if rows.Heartbeat { @@ -465,10 +466,10 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma return nil } // verify throttler is happy, otherwise keep looping - if vc.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vc.throttlerAppName) { + if vc.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vc.throttlerAppName)) { break // out of 'for' loop } else { // we're throttled - _ = vc.vr.updateTimeThrottled(VCopierComponentName) + _ = vc.vr.updateTimeThrottled(throttlerapp.VCopierName) } } if !copyWorkQueue.isOpen { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 0e33eed5f6a..084079506d9 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -31,6 +31,7 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -335,8 +336,8 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { return ctx.Err() } // check throttler. - if !vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vp.throttlerAppName) { - _ = vp.vr.updateTimeThrottled(VPlayerComponentName) + if !vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)) { + _ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName) continue } @@ -628,7 +629,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return io.EOF case binlogdatapb.VEventType_HEARTBEAT: if event.Throttled { - if err := vp.vr.updateTimeThrottled(VStreamerComponentName); err != nil { + if err := vp.vr.updateTimeThrottled(throttlerapp.VStreamerName); err != nil { return err } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index dc30e25419d..fd8117a6b5f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -30,6 +30,7 @@ import ( "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" @@ -91,15 +92,6 @@ const ( table_name=%a and id=%a` ) -type ComponentName string - -const ( - VPlayerComponentName ComponentName = "vplayer" - VCopierComponentName ComponentName = "vcopier" - VStreamerComponentName ComponentName = "vstreamer" - RowStreamerComponentName ComponentName = "rowstreamer" -) - // vreplicator provides the core logic to start vreplication streams type vreplicator struct { vre *Engine @@ -562,17 +554,17 @@ func (vr *vreplicator) setSQLMode(ctx context.Context, dbClient *vdbClient) (fun // This is useful when we want to throttle all migrations. We throttle "online-ddl" and that applies to both vreplication // migrations as well as gh-ost migrations. func (vr *vreplicator) throttlerAppName() string { - names := []string{vr.WorkflowName, throttlerVReplicationAppName} + names := []string{vr.WorkflowName, throttlerapp.VReplicationName.String()} if vr.WorkflowType == int32(binlogdatapb.VReplicationWorkflowType_OnlineDDL) { - names = append(names, throttlerOnlineDDLAppName) + names = append(names, throttlerapp.OnlineDDLName.String()) } return strings.Join(names, ":") } -func (vr *vreplicator) updateTimeThrottled(componentThrottled ComponentName) error { +func (vr *vreplicator) updateTimeThrottled(appThrottled throttlerapp.Name) error { err := vr.throttleUpdatesRateLimiter.Do(func() error { tm := time.Now().Unix() - update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, string(componentThrottled)) + update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, appThrottled.String()) if err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/binlog_watcher.go b/go/vt/vttablet/tabletserver/binlog_watcher.go index 6c713791e6f..aaa3f2b1725 100644 --- a/go/vt/vttablet/tabletserver/binlog_watcher.go +++ b/go/vt/vttablet/tabletserver/binlog_watcher.go @@ -24,6 +24,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -31,7 +32,7 @@ import ( // VStreamer defines the functions of VStreamer // that the BinlogWatcher needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error } // BinlogWatcher is a tabletserver service that watches the @@ -91,7 +92,7 @@ func (blw *BinlogWatcher) process(ctx context.Context) { for { // VStreamer will reload the schema when it encounters a DDL. - err := blw.vs.Stream(ctx, "current", nil, filter, func(events []*binlogdatapb.VEvent) error { + err := blw.vs.Stream(ctx, "current", nil, filter, throttlerapp.BinlogWatcherName, func(events []*binlogdatapb.VEvent) error { return nil }) log.Infof("ReplicationWatcher VStream ended: %v, retrying in 5 seconds", err) diff --git a/go/vt/vttablet/tabletserver/gc/tablegc.go b/go/vt/vttablet/tabletserver/gc/tablegc.go index 8022e5aee76..829d97039a1 100644 --- a/go/vt/vttablet/tabletserver/gc/tablegc.go +++ b/go/vt/vttablet/tabletserver/gc/tablegc.go @@ -39,12 +39,12 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( // evacHours is a hard coded, reasonable time for a table to spend in EVAC state - evacHours = 72 - throttlerAppName = "tablegc" + evacHours = 72 ) var ( @@ -131,7 +131,7 @@ type Status struct { // NewTableGC creates a table collector func NewTableGC(env tabletenv.Env, ts *topo.Server, lagThrottler *throttle.Throttler) *TableGC { collector := &TableGC{ - throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerAppName, throttle.ThrottleCheckPrimaryWrite), + throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerapp.TableGCName, throttle.ThrottleCheckPrimaryWrite), isOpen: 0, env: env, diff --git a/go/vt/vttablet/tabletserver/messager/engine.go b/go/vt/vttablet/tabletserver/messager/engine.go index 2d7fdf2bb82..e0262b93492 100644 --- a/go/vt/vttablet/tabletserver/messager/engine.go +++ b/go/vt/vttablet/tabletserver/messager/engine.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" @@ -44,7 +45,7 @@ type TabletService interface { // VStreamer defines the functions of VStreamer // that the messager needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error } diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 39598169baa..5c925150322 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -38,6 +38,7 @@ import ( "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) var ( @@ -684,7 +685,7 @@ func (mm *messageManager) runOneVStream(ctx context.Context) error { var curPos string var fields []*querypb.Field - err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, func(events []*binlogdatapb.VEvent) error { + err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, throttlerapp.MessagerName, func(events []*binlogdatapb.VEvent) error { // We need to get the flow control lock mm.cacheManagementMu.Lock() defer mm.cacheManagementMu.Unlock() diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index d6da59db065..59286403885 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -37,6 +37,7 @@ import ( "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" @@ -888,7 +889,7 @@ func (fv *fakeVStreamer) setPollerResponse(pr []*binlogdatapb.VStreamResultsResp fv.pollerResponse = pr } -func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error { fv.streamInvocations.Add(1) for { fv.mu.Lock() diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index 490ad59eadf..ec9050b5c7e 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -35,12 +35,13 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) // VStreamer defines the functions of VStreamer // that the replicationWatcher needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error } // Tracker watches the replication and saves the latest schema into the schema_version table when a DDL is encountered. @@ -129,7 +130,7 @@ func (tr *Tracker) process(ctx context.Context) { var gtid string for { - err := tr.vs.Stream(ctx, "current", nil, filter, func(events []*binlogdatapb.VEvent) error { + err := tr.vs.Stream(ctx, "current", nil, filter, throttlerapp.SchemaTrackerName, func(events []*binlogdatapb.VEvent) error { for _, event := range events { if event.Type == binlogdatapb.VEventType_GTID { gtid = event.Gtid diff --git a/go/vt/vttablet/tabletserver/schema/tracker_test.go b/go/vt/vttablet/tabletserver/schema/tracker_test.go index 9822b6bfe5a..2029235b2e3 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker_test.go +++ b/go/vt/vttablet/tabletserver/schema/tracker_test.go @@ -26,6 +26,7 @@ import ( "vitess.io/vitess/go/sqltypes" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) func TestTracker(t *testing.T) { @@ -137,7 +138,7 @@ type fakeVstreamer struct { events [][]*binlogdatapb.VEvent } -func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error { for _, events := range f.events { err := send(events) if err != nil { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index ff641fe7198..f9ee238e7d6 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -64,6 +64,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/txserializer" "vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" @@ -1144,7 +1145,7 @@ func (tsv *TabletServer) VStream(ctx context.Context, request *binlogdatapb.VStr if err := tsv.sm.VerifyTarget(ctx, request.Target); err != nil { return err } - return tsv.vstreamer.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send) + return tsv.vstreamer.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, throttlerapp.VStreamerName, send) } // VStreamRows streams rows from the specified starting point. @@ -1799,7 +1800,7 @@ func (tsv *TabletServer) registerThrottlerCheckHandlers() { } appName := r.URL.Query().Get("app") if appName == "" { - appName = throttle.DefaultAppName + appName = throttlerapp.DefaultName.String() } flags := &throttle.CheckFlags{ LowPriority: (r.URL.Query().Get("p") == "low"), diff --git a/go/vt/vttablet/tabletserver/throttle/check.go b/go/vt/vttablet/tabletserver/throttle/check.go index e4d9e96204f..dd209a0c423 100644 --- a/go/vt/vttablet/tabletserver/throttle/check.go +++ b/go/vt/vttablet/tabletserver/throttle/check.go @@ -17,13 +17,10 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( - // DefaultAppName is the app name used by vitess when app doesn't indicate its name - DefaultAppName = "default" - vitessAppName = "vitess" - selfCheckInterval = 250 * time.Millisecond ) @@ -90,7 +87,7 @@ func (check *ThrottlerCheck) checkAppMetricResult(ctx context.Context, appName s statusCode = http.StatusTooManyRequests // 429 err = base.ErrThresholdExceeded - if !flags.LowPriority && !flags.ReadCheck && appName != vitessAppName { + if !flags.LowPriority && !flags.ReadCheck && throttlerapp.VitessName.Equals(appName) { // low priority requests will henceforth be denied go check.throttler.nonLowPriorityAppRequestsThrottled.SetDefault(metricName, true) } @@ -151,7 +148,7 @@ func (check *ThrottlerCheck) localCheck(ctx context.Context, metricName string) if err != nil { return NoSuchMetricCheckResult } - checkResult = check.Check(ctx, vitessAppName, storeType, storeName, "local", StandardCheckFlags) + checkResult = check.Check(ctx, throttlerapp.VitessName.String(), storeType, storeName, "local", StandardCheckFlags) if checkResult.StatusCode == http.StatusOK { check.throttler.markMetricHealthy(metricName) diff --git a/go/vt/vttablet/tabletserver/throttle/check_result.go b/go/vt/vttablet/tabletserver/throttle/check_result.go index 52d52b78468..3bc162b623a 100644 --- a/go/vt/vttablet/tabletserver/throttle/check_result.go +++ b/go/vt/vttablet/tabletserver/throttle/check_result.go @@ -14,11 +14,12 @@ import ( // CheckResult is the result for an app inquiring on a metric. It also exports as JSON via the API type CheckResult struct { - StatusCode int `json:"StatusCode"` - Value float64 `json:"Value"` - Threshold float64 `json:"Threshold"` - Error error `json:"-"` - Message string `json:"Message"` + StatusCode int `json:"StatusCode"` + Value float64 `json:"Value"` + Threshold float64 `json:"Threshold"` + Error error `json:"-"` + Message string `json:"Message"` + RecentlyChecked bool `json:"RecentlyChecked"` } // NewCheckResult returns a CheckResult diff --git a/go/vt/vttablet/tabletserver/throttle/client.go b/go/vt/vttablet/tabletserver/throttle/client.go index 10fe910d264..30d6c79afdf 100644 --- a/go/vt/vttablet/tabletserver/throttle/client.go +++ b/go/vt/vttablet/tabletserver/throttle/client.go @@ -22,6 +22,8 @@ import ( "sync" "sync/atomic" "time" + + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( @@ -46,7 +48,7 @@ func initThrottleTicker() { // Client construct is used by apps who wish to consult with a throttler. It encapsulates the check/throttling/backoff logic type Client struct { throttler *Throttler - appName string + appName throttlerapp.Name checkType ThrottleCheckType flags CheckFlags @@ -54,7 +56,7 @@ type Client struct { } // NewProductionClient creates a client suitable for foreground/production jobs, which have normal priority. -func NewProductionClient(throttler *Throttler, appName string, checkType ThrottleCheckType) *Client { +func NewProductionClient(throttler *Throttler, appName throttlerapp.Name, checkType ThrottleCheckType) *Client { initThrottleTicker() return &Client{ throttler: throttler, @@ -68,7 +70,7 @@ func NewProductionClient(throttler *Throttler, appName string, checkType Throttl // NewBackgroundClient creates a client suitable for background jobs, which have low priority over productio ntraffic, // e.g. migration, table pruning, vreplication -func NewBackgroundClient(throttler *Throttler, appName string, checkType ThrottleCheckType) *Client { +func NewBackgroundClient(throttler *Throttler, appName throttlerapp.Name, checkType ThrottleCheckType) *Client { initThrottleTicker() return &Client{ throttler: throttler, @@ -85,7 +87,7 @@ func NewBackgroundClient(throttler *Throttler, appName string, checkType Throttl // The function caches results for a brief amount of time, hence it's safe and efficient to // be called very frequenty. // The function is not thread safe. -func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName string) (throttleCheckOK bool) { +func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName throttlerapp.Name) (throttleCheckOK bool) { if c == nil { // no client return true @@ -103,7 +105,7 @@ func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName string) (t if overrideAppName != "" { checkApp = overrideAppName } - checkResult := c.throttler.CheckByType(ctx, checkApp, "", &c.flags, c.checkType) + checkResult := c.throttler.CheckByType(ctx, checkApp.String(), "", &c.flags, c.checkType) if checkResult.StatusCode != http.StatusOK { return false } @@ -116,7 +118,7 @@ func (c *Client) ThrottleCheckOK(ctx context.Context, overrideAppName string) (t // otherwise it briefly sleeps and returns 'false'. // Non-empty appName overrides the default appName. // The function is not thread safe. -func (c *Client) ThrottleCheckOKOrWaitAppName(ctx context.Context, appName string) bool { +func (c *Client) ThrottleCheckOKOrWaitAppName(ctx context.Context, appName throttlerapp.Name) bool { ok := c.ThrottleCheckOK(ctx, appName) if !ok { time.Sleep(throttleCheckDuration) @@ -140,7 +142,7 @@ func (c *Client) Throttle(ctx context.Context) { return } if c.ThrottleCheckOKOrWait(ctx) { - break + return } } } diff --git a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go index 3c2fe005828..8c8a5cc4b32 100644 --- a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go +++ b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go @@ -95,7 +95,7 @@ func (metric *MySQLThrottleMetric) Get() (float64, error) { } // ReadThrottleMetric returns a metric for the given probe. Either by explicit query -// or via SHOW SLAVE STATUS +// or via SHOW REPLICA STATUS func ReadThrottleMetric(probe *Probe, clusterName string, overrideGetMetricFunc func() *MySQLThrottleMetric) (mySQLThrottleMetric *MySQLThrottleMetric) { if mySQLThrottleMetric := getCachedMySQLThrottleMetric(probe); mySQLThrottleMetric != nil { return mySQLThrottleMetric diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 56e913bb119..83cb28ae849 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -39,6 +39,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( @@ -126,6 +127,12 @@ type Throttler struct { srvTopoServer srvtopo.Server heartbeatWriter heartbeat.HeartbeatWriter + // recentCheckTickerValue is an ever increasing number, incrementing once per second. + recentCheckTickerValue int64 + // recentCheckValue is set to match or exceed recentCheckTickerValue whenever a "check" was made (other than by the throttler itself). + // when recentCheckValue < recentCheckTickerValue that means there hasn't been a recent check. + recentCheckValue int64 + throttleTabletTypesMap map[topodatapb.TabletType]bool mysqlThrottleMetricChan chan *mysql.MySQLThrottleMetric @@ -576,6 +583,7 @@ func (throttler *Throttler) Operate(ctx context.Context) { mysqlRefreshTicker := addTicker(mysqlRefreshInterval) mysqlAggregateTicker := addTicker(mysqlAggregateInterval) throttledAppsTicker := addTicker(throttledAppsSnapshotInterval) + recentCheckTicker := addTicker(time.Second) go func() { defer log.Infof("Throttler: Operate terminated, tickers stopped") @@ -670,6 +678,9 @@ func (throttler *Throttler) Operate(ctx context.Context) { } case throttlerConfig := <-throttler.throttlerConfigChan: throttler.applyThrottlerConfig(ctx, throttlerConfig) + case <-recentCheckTicker.C: + // Increment recentCheckTickerValue by one. + atomic.AddInt64(&throttler.recentCheckTickerValue, 1) } } }() @@ -682,7 +693,7 @@ func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context, mySQLThrottleMetric.ClusterName = clusterName mySQLThrottleMetric.Key = probe.Key - tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=vitess", probe.TabletHost, probe.TabletPort) + tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", probe.TabletHost, probe.TabletPort, throttlerapp.VitessName) resp, err := throttler.httpClient.Get(tabletCheckSelfURL) if err != nil { mySQLThrottleMetric.Err = err @@ -704,6 +715,11 @@ func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context, if checkResult.StatusCode == http.StatusInternalServerError { mySQLThrottleMetric.Err = fmt.Errorf("Status code: %d", checkResult.StatusCode) } + if checkResult.RecentlyChecked { + // We have just probed a tablet, and it reported back that someone just recently "check"ed it. + // We therefore renew the heartbeats lease. + go throttler.heartbeatWriter.RequestHeartbeats() + } return mySQLThrottleMetric } } @@ -1015,7 +1031,29 @@ func (throttler *Throttler) checkStore(ctx context.Context, appName string, stor if !throttler.IsEnabled() { return okMetricCheckResult } - return throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) + if throttlerapp.ExemptFromChecks(appName) { + // Some apps are exempt from checks. They are always responded with OK. This is because those apps are + // continuous and do not generate a substantial load. + return okMetricCheckResult + } + if !flags.SkipRequestHeartbeats && !throttlerapp.VitessName.Equals(appName) { + go throttler.heartbeatWriter.RequestHeartbeats() + // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. + // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back + // to the PRIMARY so that it knows it must renew the heartbeat lease. + atomic.StoreInt64(&throttler.recentCheckValue, 1+atomic.LoadInt64(&throttler.recentCheckTickerValue)) + } + checkResult = throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) + + if atomic.LoadInt64(&throttler.recentCheckValue) >= atomic.LoadInt64(&throttler.recentCheckTickerValue) { + // This indicates someone, who is not "vitess" ie not internal to the throttling logic, did a _recent_ `check`. + // This could be online-ddl, or vreplication or whoever else. + // If this tablet is a REPLICA or RDONLY, we want to advertise to the PRIMARY that someone did a recent check, + // so that the PRIMARY knows it must renew the heartbeat lease. + checkResult.RecentlyChecked = true + } + + return checkResult } // checkShard checks the health of the shard, and runs on the primary tablet only @@ -1030,9 +1068,6 @@ func (throttler *Throttler) checkSelf(ctx context.Context, appName string, remot // CheckByType runs a check by requested check type func (throttler *Throttler) CheckByType(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags, checkType ThrottleCheckType) (checkResult *CheckResult) { - if throttler.IsEnabled() && !flags.SkipRequestHeartbeats { - go throttler.heartbeatWriter.RequestHeartbeats() - } switch checkType { case ThrottleCheckSelf: return throttler.checkSelf(ctx, appName, remoteAddr, flags) diff --git a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go new file mode 100644 index 00000000000..41ac3d5a671 --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go @@ -0,0 +1,65 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package throttlerapp + +type Name string + +func (n Name) String() string { + return string(n) +} + +func (n Name) Equals(s string) bool { + return string(n) == s +} + +const ( + // DefaultName is the app name used by vitess when app doesn't indicate its name + DefaultName Name = "default" + VitessName Name = "vitess" + + TableGCName Name = "tablegc" + OnlineDDLName Name = "online-ddl" + GhostName Name = "gh-ost" + PTOSCName Name = "pt-osc" + + VStreamerName Name = "vstreamer" + VReplicationName Name = "vreplication" + VPlayerName Name = "vplayer" + VCopierName Name = "vcopier" + ResultStreamerName Name = "resultstreamer" + RowStreamerName Name = "rowstreamer" + ExternalConnectorName Name = "external-connector" + ReplicaConnectorName Name = "replica-connector" + + BinlogWatcherName Name = "binlog-watcher" + MessagerName Name = "messager" + SchemaTrackerName Name = "schema-tracker" +) + +var ( + exemptFromChecks = map[string]bool{ + BinlogWatcherName.String(): true, + MessagerName.String(): true, + SchemaTrackerName.String(): true, + } +) + +// ExemptFromChecks returns 'true' for apps that should skip the throttler checks. The throttler should +// always repsond with automated "OK" to those apps, without delay. These apps also do not cause a heartbeat renewal. +func ExemptFromChecks(appName string) bool { + return exemptFromChecks[appName] +} diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 62b93cf5063..bc84b1e57ed 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -73,7 +73,7 @@ func (uvs *uvstreamer) catchup(ctx context.Context) error { errch := make(chan error, 1) go func() { startPos := mysql.EncodePosition(uvs.pos) - vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.send2, "catchup", uvs.vse) + vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.throttlerApp, uvs.send2, "catchup", uvs.vse) uvs.setVs(vs) errch <- vs.Stream() uvs.setVs(nil) @@ -302,7 +302,7 @@ func (uvs *uvstreamer) fastForward(stopPos string) error { }() log.Infof("starting fastForward from %s upto pos %s", mysql.EncodePosition(uvs.pos), stopPos) uvs.stopPos, _ = mysql.DecodePosition(stopPos) - vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.getVSchema(), uvs.send2, "fastforward", uvs.vse) + vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.getVSchema(), uvs.throttlerApp, uvs.send2, "fastforward", uvs.vse) uvs.setVs(vs) return vs.Stream() } diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 829d0b88a5f..c55c312f442 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -43,16 +43,13 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) -const ( - throttlerAppName = "vstreamer" -) - // Engine is the engine for handling vreplication streaming requests. type Engine struct { env tabletenv.Env @@ -115,7 +112,7 @@ func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, lagThrot ts: ts, se: se, cell: cell, - throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerAppName, throttle.ThrottleCheckSelf), + throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerapp.VStreamerName, throttle.ThrottleCheckSelf), streamers: make(map[int]*uvstreamer), rowStreamers: make(map[int]*rowStreamer), @@ -226,7 +223,7 @@ func (vse *Engine) validateBinlogRowImage(ctx context.Context, db dbconfigs.Conn // Stream starts a new stream. // This streams events from the binary logs -func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) error { if err := vse.validateBinlogRowImage(ctx, vse.se.GetDBConnector()); err != nil { return err @@ -244,7 +241,7 @@ func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binl } vse.mu.Lock() defer vse.mu.Unlock() - streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, send) + streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, throttlerApp, send) idx := vse.streamIdx vse.streamers[idx] = streamer vse.streamIdx++ diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go index 16729e8fd24..e73912b6088 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/vt/dbconfigs" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) var ( @@ -93,7 +94,7 @@ func TestUpdateVSchema(t *testing.T) { }}, } // Stream should terminate immediately due to invalid pos. - _ = engine.Stream(ctx, "invalid", nil, filter, func(_ []*binlogdatapb.VEvent) error { + _ = engine.Stream(ctx, "invalid", nil, filter, throttlerapp.VStreamerName, func(_ []*binlogdatapb.VEvent) error { return nil }) diff --git a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go index f7ad6aa45cf..91f319fa2c5 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go @@ -25,6 +25,7 @@ import ( "vitess.io/vitess/go/vt/dbconfigs" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) // resultStreamer streams the results of the requested query @@ -104,7 +105,7 @@ func (rs *resultStreamer) Stream() error { } // check throttler. - if !rs.vse.throttlerClient.ThrottleCheckOKOrWait(rs.ctx) { + if !rs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(rs.ctx, throttlerapp.ResultStreamerName) { continue } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 15c8fa54621..b8a794e02b4 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -36,6 +36,7 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) var ( @@ -348,7 +349,7 @@ func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.V } // check throttler. - if !rs.vse.throttlerClient.ThrottleCheckOKOrWait(rs.ctx) { + if !rs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(rs.ctx, throttlerapp.RowStreamerName) { throttleResponseRateLimiter.Do(func() error { return safeSend(&binlogdatapb.VStreamRowsResponse{Throttled: true}) }) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index e26b6f2939f..0eec8d93db3 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -37,6 +37,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) var uvstreamerTestMode = false // Only used for testing @@ -51,13 +52,14 @@ type uvstreamer struct { cancel func() // input parameters - vse *Engine - send func([]*binlogdatapb.VEvent) error - cp dbconfigs.Connector - se *schema.Engine - startPos string - filter *binlogdatapb.Filter - inTablePKs []*binlogdatapb.TableLastPK + vse *Engine + send func([]*binlogdatapb.VEvent) error + cp dbconfigs.Connector + se *schema.Engine + startPos string + filter *binlogdatapb.Filter + inTablePKs []*binlogdatapb.TableLastPK + throttlerApp throttlerapp.Name vschema *localVSchema @@ -90,7 +92,7 @@ type uvstreamerConfig struct { CatchupRetryTime time.Duration } -func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error) *uvstreamer { +func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error) *uvstreamer { ctx, cancel := context.WithCancel(ctx) config := &uvstreamerConfig{ MaxReplicationLag: 1 * time.Nanosecond, @@ -105,17 +107,18 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se return send(evs) } uvs := &uvstreamer{ - ctx: ctx, - cancel: cancel, - vse: vse, - send: send2, - cp: cp, - se: se, - startPos: startPos, - filter: filter, - vschema: vschema, - config: config, - inTablePKs: tablePKs, + ctx: ctx, + cancel: cancel, + vse: vse, + send: send2, + cp: cp, + se: se, + startPos: startPos, + filter: filter, + vschema: vschema, + config: config, + inTablePKs: tablePKs, + throttlerApp: throttlerApp, } return uvs @@ -418,7 +421,7 @@ func (uvs *uvstreamer) Stream() error { } } vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), - uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) + uvs.filter, uvs.getVSchema(), uvs.throttlerApp, uvs.send, "replicate", uvs.vse) uvs.setVs(vs) return vs.Stream() diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index 7d22f9c0034..1d23cf5a98b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -52,6 +52,7 @@ import ( "time" "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/proto/query" @@ -440,7 +441,7 @@ func getEventCallback(event *binlogdatapb.VEvent) func() { func startVStreamCopy(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, tablePKs []*binlogdatapb.TableLastPK) { pos := "" go func() { - err := engine.Stream(ctx, pos, tablePKs, filter, func(evs []*binlogdatapb.VEvent) error { + err := engine.Stream(ctx, pos, tablePKs, filter, throttlerapp.VStreamerName, func(evs []*binlogdatapb.VEvent) error { //t.Logf("Received events: %v", evs) muAllEvents.Lock() defer muAllEvents.Unlock() diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index d9ac730e43e..aefa239d3e8 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -43,6 +43,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" ) const ( @@ -61,11 +62,12 @@ type vstreamer struct { ctx context.Context cancel func() - cp dbconfigs.Connector - se *schema.Engine - startPos string - filter *binlogdatapb.Filter - send func([]*binlogdatapb.VEvent) error + cp dbconfigs.Connector + se *schema.Engine + startPos string + filter *binlogdatapb.Filter + send func([]*binlogdatapb.VEvent) error + throttlerApp throttlerapp.Name vevents chan *localVSchema vschema *localVSchema @@ -112,22 +114,23 @@ type streamerPlan struct { // // vschema: the current vschema. This value can later be changed through the SetVSchema method. // send: callback function to send events. -func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, startPos string, stopPos string, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error, phase string, vse *Engine) *vstreamer { +func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, startPos string, stopPos string, filter *binlogdatapb.Filter, vschema *localVSchema, throttlerApp throttlerapp.Name, send func([]*binlogdatapb.VEvent) error, phase string, vse *Engine) *vstreamer { ctx, cancel := context.WithCancel(ctx) return &vstreamer{ - ctx: ctx, - cancel: cancel, - cp: cp, - se: se, - startPos: startPos, - stopPos: stopPos, - filter: filter, - send: send, - vevents: make(chan *localVSchema, 1), - vschema: vschema, - plans: make(map[uint64]*streamerPlan), - phase: phase, - vse: vse, + ctx: ctx, + cancel: cancel, + cp: cp, + se: se, + startPos: startPos, + stopPos: stopPos, + throttlerApp: throttlerApp, + filter: filter, + send: send, + vevents: make(chan *localVSchema, 1), + vschema: vschema, + plans: make(map[uint64]*streamerPlan), + phase: phase, + vse: vse, } } @@ -302,7 +305,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog defer throttledHeartbeatsRateLimiter.Stop() for { // check throttler. - if !vs.vse.throttlerClient.ThrottleCheckOKOrWait(ctx) { + if !vs.vse.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vs.throttlerApp) { // make sure to leave if context is cancelled select { case <-ctx.Done(): diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index a1d68b5eb81..1a28722680a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" "google.golang.org/protobuf/proto" @@ -607,7 +608,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { var errGoroutine error go func() { defer wg.Done() - engine.Stream(ctx2, "", nil, filter, func(evs []*binlogdatapb.VEvent) error { + engine.Stream(ctx2, "", nil, filter, throttlerapp.VStreamerName, func(evs []*binlogdatapb.VEvent) error { for _, ev := range evs { if ev.Type == binlogdatapb.VEventType_HEARTBEAT { continue @@ -1925,7 +1926,7 @@ func TestMinimalMode(t *testing.T) { newEngine(t, "minimal") defer newEngine(t, "full") - err := engine.Stream(context.Background(), "current", nil, nil, func(evs []*binlogdatapb.VEvent) error { return nil }) + err := engine.Stream(context.Background(), "current", nil, nil, throttlerapp.VStreamerName, func(evs []*binlogdatapb.VEvent) error { return nil }) require.Error(t, err, "minimal binlog_row_image is not supported by Vitess VReplication") } @@ -2309,7 +2310,7 @@ func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogda }}, } } - return engine.Stream(ctx, pos, tablePKs, filter, func(evs []*binlogdatapb.VEvent) error { + return engine.Stream(ctx, pos, tablePKs, filter, throttlerapp.VStreamerName, func(evs []*binlogdatapb.VEvent) error { timer := time.NewTimer(2 * time.Second) defer timer.Stop()