Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tablet throttler: be explicit about client app name, exempt some apps from checks and heartbeat renewals #13195

56 changes: 27 additions & 29 deletions go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,18 @@ import (
"vitess.io/vitess/go/test/endtoend/onlineddl"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
clusterInstance *cluster.LocalProcessCluster
shards []cluster.Shard
vtParams mysql.ConnParams
httpClient = throttlebase.SetupHTTPClient(time.Second)
onlineDDLThrottlerAppName = "online-ddl"
vstreamerThrottlerAppName = "vstreamer"
clusterInstance *cluster.LocalProcessCluster
shards []cluster.Shard
vtParams mysql.ConnParams
httpClient = throttlebase.SetupHTTPClient(time.Second)

normalMigrationWait = 45 * time.Second
extendedMigrationWait = 60 * time.Second
Expand Down Expand Up @@ -385,7 +383,7 @@ func TestSchemaChange(t *testing.T) {
// begin throttling:
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusRunning)
Expand All @@ -400,11 +398,11 @@ func TestSchemaChange(t *testing.T) {
// to be strictly higher than started_timestamp
assert.GreaterOrEqual(t, lastThrottledTimestamp, startedTimestamp)
component := row.AsString("component_throttled", "")
assert.Contains(t, []string{string(vreplication.VCopierComponentName), string(vreplication.VPlayerComponentName)}, component)
assert.Contains(t, []string{throttlerapp.VCopierName, throttlerapp.VPlayerName}, component)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it might be worth making a new type that's an alias for string so that what the string represents is even more explicit. This make it clear to future code writers that we're not using arbitrary strings.

Copy link
Contributor Author

@shlomi-noach shlomi-noach May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I initially did that but then that affected so many lines of code, casting into and out of string, that I decided to hold off. I can still do that for this PR if you think it's worhtwhile.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it up to you. I think it's probably worth it, but we can always do it later too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 37a7c2f. I'm not sure how I feel about it, what do you think?


// unthrottle
onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, false)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false)

status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
Expand All @@ -421,11 +419,11 @@ func TestSchemaChange(t *testing.T) {
// vstreamer source; but it's OK to be on the safe side and throttle on all tablets. Doesn't
// change the essence of this test.
for _, tablet := range shard.Vttablets {
body, err := throttleApp(tablet, vstreamerThrottlerAppName)
defer unthrottleApp(tablet, vstreamerThrottlerAppName)
body, err := throttleApp(tablet, throttlerapp.VStreamerName)
defer unthrottleApp(tablet, throttlerapp.VStreamerName)

assert.NoError(t, err)
assert.Contains(t, body, vstreamerThrottlerAppName)
assert.Contains(t, body, throttlerapp.VStreamerName)
}
}

Expand All @@ -449,7 +447,7 @@ func TestSchemaChange(t *testing.T) {
// clock irregularities
assert.GreaterOrEqual(t, lastThrottledTime.Add(time.Second), startedTime)
component := row.AsString("component_throttled", "")
assert.Contains(t, []string{string(vreplication.VStreamerComponentName), string(vreplication.RowStreamerComponentName)}, component)
assert.Contains(t, []string{throttlerapp.VStreamerName, throttlerapp.RowStreamerName}, component)
}()
// now unthrottled
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
Expand Down Expand Up @@ -478,7 +476,7 @@ func TestSchemaChange(t *testing.T) {
// Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...`
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

// spawn n migrations; cancel them via cancel-all
var wg sync.WaitGroup
Expand All @@ -497,7 +495,7 @@ func TestSchemaChange(t *testing.T) {
// Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...`
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

// spawn n migrations; cancel them via cancel-all
var wg sync.WaitGroup
Expand Down Expand Up @@ -531,16 +529,16 @@ func TestSchemaChange(t *testing.T) {
case 0:
// this is the shard where we run PRS
// Use per-tablet throttling API
body, err = throttleApp(currentPrimaryTablet, onlineDDLThrottlerAppName)
defer unthrottleApp(currentPrimaryTablet, onlineDDLThrottlerAppName)
body, err = throttleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName)
defer unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName)
case 1:
// no PRS on this shard
// Use per-tablet throttling API
body, err = throttleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName)
defer unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName)
body, err = throttleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName)
defer unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName)
}
assert.NoError(t, err)
assert.Contains(t, body, onlineDDLThrottlerAppName)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
}
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)

Expand Down Expand Up @@ -596,14 +594,14 @@ func TestSchemaChange(t *testing.T) {
case 0:
// this is the shard where we run PRS
// Use per-tablet throttling API
body, err = unthrottleApp(currentPrimaryTablet, onlineDDLThrottlerAppName)
body, err = unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName)
case 1:
// no PRS on this shard
// Use per-tablet throttling API
body, err = unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName)
body, err = unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName)
}
assert.NoError(t, err)
assert.Contains(t, body, onlineDDLThrottlerAppName)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
}
})
t.Run("expect completion", func(t *testing.T) {
Expand Down Expand Up @@ -823,11 +821,11 @@ func TestSchemaChange(t *testing.T) {
// - tablet throttling
t.Run("Revert a migration completed on one shard and cancelled on another", func(t *testing.T) {
// shard 0 will run normally, shard 1 will be throttled
defer unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName)
defer unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName)
t.Run("throttle shard 1", func(t *testing.T) {
body, err := throttleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName)
body, err := throttleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName)
assert.NoError(t, err)
assert.Contains(t, body, onlineDDLThrottlerAppName)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
})

var uuid string
Expand All @@ -849,9 +847,9 @@ func TestSchemaChange(t *testing.T) {
onlineddl.CheckCancelAllMigrations(t, &vtParams, 1)
})
t.Run("unthrottle shard 1", func(t *testing.T) {
body, err := unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName)
body, err := unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName)
assert.NoError(t, err)
assert.Contains(t, body, onlineDDLThrottlerAppName)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
})
var revertUUID string
t.Run("issue revert migration", func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ func throttledApps(tablet *cluster.Vttablet) (resp *http.Response, respBody stri
}

func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*http.Response, error) {
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats))
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test&s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats))
return resp, err
}

func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath))
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test", tablet.HTTPPort, checkSelfAPIPath))
}

func warmUpHeartbeat(t *testing.T) (respStatus int) {
Expand Down
5 changes: 3 additions & 2 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
)

Expand All @@ -58,8 +59,8 @@ var (
sourceKsOpts = make(map[string]string)
targetKsOpts = make(map[string]string)
httpClient = throttlebase.SetupHTTPClient(time.Second)
sourceThrottlerAppName = "vstreamer"
targetThrottlerAppName = "vreplication"
sourceThrottlerAppName = throttlerapp.VStreamerName
targetThrottlerAppName = throttlerapp.VReplicationName
)

const (
Expand Down
17 changes: 9 additions & 8 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

Expand Down Expand Up @@ -124,7 +125,6 @@ var (
migrationFailureFileName = "migration-failure.log"
onlineDDLUser = "vt-online-ddl-internal"
onlineDDLGrant = fmt.Sprintf("'%s'@'%s'", onlineDDLUser, "%")
throttlerOnlineDDLApp = "online-ddl"
throttleCheckFlags = &throttle.CheckFlags{}
)

Expand Down Expand Up @@ -1621,7 +1621,7 @@ exit $exit_code
fmt.Sprintf("--serve-socket-file=%s", serveSocketFile),
fmt.Sprintf("--hooks-path=%s", tempDir),
fmt.Sprintf(`--hooks-hint-token=%s`, onlineDDL.UUID),
fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=%s:gh-ost:%s&p=low`, servenv.Port(), throttlerOnlineDDLApp, onlineDDL.UUID),
fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=%s:%s:%s&p=low`, servenv.Port(), throttlerapp.OnlineDDLName, throttlerapp.GhostName, onlineDDL.UUID),
fmt.Sprintf(`--database=%s`, e.dbName),
fmt.Sprintf(`--table=%s`, onlineDDL.Table),
fmt.Sprintf(`--alter=%s`, alterOptions),
Expand Down Expand Up @@ -1768,7 +1768,7 @@ export MYSQL_PWD
my ($self, %args) = @_;

return sub {
if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app={{THROTTLER_ONLINE_DDL_APP}}:pt-osc:{{MIGRATION_UUID}}&p=low")) {
if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app={{THROTTLER_ONLINE_DDL_APP}}:{{THROTTLER_PT_OSC_APP}}:{{MIGRATION_UUID}}&p=low")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that we didn't support pt-osc? Might be a good opportunity to clean up the code if so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's labeled as "experimental". I'd rather not do everything in this PR. We can deprecate pt-osc later.

# Got HTTP 200 OK, means throttler is happy
return 0;
} else {
Expand All @@ -1782,7 +1782,8 @@ export MYSQL_PWD
`
pluginCode = strings.ReplaceAll(pluginCode, "{{VTTABLET_PORT}}", fmt.Sprintf("%d", servenv.Port()))
pluginCode = strings.ReplaceAll(pluginCode, "{{MIGRATION_UUID}}", onlineDDL.UUID)
pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_ONLINE_DDL_APP}}", throttlerOnlineDDLApp)
pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_ONLINE_DDL_APP}}", throttlerapp.OnlineDDLName)
pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_PT_OSC_APP}}", throttlerapp.PTOSCName)

pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusRunning}}", string(schema.OnlineDDLStatusRunning))
pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusComplete}}", string(schema.OnlineDDLStatusComplete))
Expand Down Expand Up @@ -2141,7 +2142,7 @@ func (e *Executor) ThrottleAllMigrations(ctx context.Context, expireString strin
if err := e.lagThrottler.CheckIsReady(); err != nil {
return nil, err
}
_ = e.lagThrottler.ThrottleApp(throttlerOnlineDDLApp, time.Now().Add(duration), ratio)
_ = e.lagThrottler.ThrottleApp(throttlerapp.OnlineDDLName, time.Now().Add(duration), ratio)
return emptyResult, nil
}

Expand All @@ -2161,7 +2162,7 @@ func (e *Executor) UnthrottleAllMigrations(ctx context.Context) (result *sqltype
return nil, err
}
defer e.triggerNextCheckInterval()
_ = e.lagThrottler.UnthrottleApp(throttlerOnlineDDLApp)
_ = e.lagThrottler.UnthrottleApp(throttlerapp.OnlineDDLName)
return emptyResult, nil
}

Expand Down Expand Up @@ -3469,7 +3470,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
if err := e.lagThrottler.CheckIsReady(); err == nil {
// No point in reviewing throttler info if it's not enabled&open
for _, app := range e.lagThrottler.ThrottledApps() {
if app.AppName == throttlerOnlineDDLApp {
if app.AppName == throttlerapp.OnlineDDLName {
currentUserThrottleRatio = app.Ratio
break
}
Expand Down Expand Up @@ -3603,7 +3604,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
// - it's a deadlock.
// And so, once per reviewRunningMigrations(), and assuming there _are_ running migrations, we ensure to hit a throttler check. This will kick
// on-demand heartbeats, unlocking the deadlock.
e.lagThrottler.CheckByType(ctx, throttlerOnlineDDLApp, "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite)
e.lagThrottler.CheckByType(ctx, throttlerapp.OnlineDDLName, "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite)
})
}
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletconntest"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv"
"vitess.io/vitess/go/vt/vttablet/tmclient"
Expand Down Expand Up @@ -235,7 +236,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS
if vstreamHook != nil {
vstreamHook(ctx)
}
return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send)
return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, throttlerapp.VStreamerName, send)
}

// vstreamRowsHook allows you to do work just before calling VStreamRows.
Expand Down
7 changes: 3 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

const (
Expand All @@ -51,9 +52,7 @@ const (
copyStateTableName = "copy_state"
postCopyActionTableName = "post_copy_action"

maxRows = 10000
throttlerVReplicationAppName = "vreplication"
throttlerOnlineDDLAppName = "online-ddl"
maxRows = 10000
)

const (
Expand Down Expand Up @@ -136,7 +135,7 @@ func NewEngine(config *tabletenv.TabletConfig, ts *topo.Server, cell string, mys
mysqld: mysqld,
journaler: make(map[string]*journalEvent),
ec: newExternalConnector(config.ExternalConnections),
throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerVReplicationAppName, throttle.ThrottleCheckPrimaryWrite),
throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerapp.VReplicationName, throttle.ThrottleCheckPrimaryWrite),
}

return vre
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletconn"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
)

Expand Down Expand Up @@ -126,7 +127,7 @@ func (c *mysqlConnector) Close(ctx context.Context) error {
}

func (c *mysqlConnector) VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, send)
return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, throttlerapp.ExternalConnectorName, send)
}

func (c *mysqlConnector) VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletconn"
"vitess.io/vitess/go/vt/vttablet/tabletconntest"
qh "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication/queryhistory"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv"

Expand Down Expand Up @@ -307,7 +308,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS
if vstreamHook != nil {
vstreamHook(ctx)
}
return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send)
return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, throttlerapp.VStreamerName, send)
}

// vstreamRowsHook allows you to do work just before calling VStreamRows.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"context"

Expand Down Expand Up @@ -84,7 +85,7 @@ func (c *ReplicaConnector) Close(ctx context.Context) error {
}

func (c *ReplicaConnector) VStream(ctx context.Context, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return c.vstreamer.Stream(ctx, startPos, nil, filter, send)
return c.vstreamer.Stream(ctx, startPos, nil, filter, throttlerapp.ReplicaConnectorName, send)
}

// VStreamRows streams rows from query result
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

type vcopier struct {
Expand Down Expand Up @@ -457,7 +458,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
default:
}
if rows.Throttled {
_ = vc.vr.updateTimeThrottled(RowStreamerComponentName)
_ = vc.vr.updateTimeThrottled(throttlerapp.RowStreamerName)
return nil
}
if rows.Heartbeat {
Expand All @@ -468,7 +469,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
if vc.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vc.throttlerAppName) {
break // out of 'for' loop
} else { // we're throttled
_ = vc.vr.updateTimeThrottled(VCopierComponentName)
_ = vc.vr.updateTimeThrottled(throttlerapp.VCopierName)
}
}
if !copyWorkQueue.isOpen {
Expand Down
Loading