Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tablet throttler: be explicit about client app name, exempt some apps from checks and heartbeat renewals #13195

64 changes: 31 additions & 33 deletions go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,18 @@ import (
"vitess.io/vitess/go/test/endtoend/onlineddl"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
clusterInstance *cluster.LocalProcessCluster
shards []cluster.Shard
vtParams mysql.ConnParams
httpClient = throttlebase.SetupHTTPClient(time.Second)
onlineDDLThrottlerAppName = "online-ddl"
vstreamerThrottlerAppName = "vstreamer"
clusterInstance *cluster.LocalProcessCluster
shards []cluster.Shard
vtParams mysql.ConnParams
httpClient = throttlebase.SetupHTTPClient(time.Second)

normalMigrationWait = 45 * time.Second
extendedMigrationWait = 60 * time.Second
Expand Down Expand Up @@ -234,13 +232,13 @@ func throttleResponse(tablet *cluster.Vttablet, path string) (respBody string, e
}

// direct per-tablet throttler API instruction
func throttleApp(tablet *cluster.Vttablet, app string) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app))
func throttleApp(tablet *cluster.Vttablet, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerApp))
}

// direct per-tablet throttler API instruction
func unthrottleApp(tablet *cluster.Vttablet, app string) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app))
func unthrottleApp(tablet *cluster.Vttablet, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp))
}

func TestSchemaChange(t *testing.T) {
Expand Down Expand Up @@ -385,7 +383,7 @@ func TestSchemaChange(t *testing.T) {
// begin throttling:
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusRunning)
Expand All @@ -400,11 +398,11 @@ func TestSchemaChange(t *testing.T) {
// to be strictly higher than started_timestamp
assert.GreaterOrEqual(t, lastThrottledTimestamp, startedTimestamp)
component := row.AsString("component_throttled", "")
assert.Contains(t, []string{string(vreplication.VCopierComponentName), string(vreplication.VPlayerComponentName)}, component)
assert.Contains(t, []string{throttlerapp.VCopierName.String(), throttlerapp.VPlayerName.String()}, component)

// unthrottle
onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, false)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false)

status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
Expand All @@ -421,11 +419,11 @@ func TestSchemaChange(t *testing.T) {
// vstreamer source; but it's OK to be on the safe side and throttle on all tablets. Doesn't
// change the essence of this test.
for _, tablet := range shard.Vttablets {
body, err := throttleApp(tablet, vstreamerThrottlerAppName)
defer unthrottleApp(tablet, vstreamerThrottlerAppName)
body, err := throttleApp(tablet, throttlerapp.VStreamerName)
defer unthrottleApp(tablet, throttlerapp.VStreamerName)

assert.NoError(t, err)
assert.Contains(t, body, vstreamerThrottlerAppName)
assert.Contains(t, body, throttlerapp.VStreamerName)
}
}

Expand All @@ -449,7 +447,7 @@ func TestSchemaChange(t *testing.T) {
// clock irregularities
assert.GreaterOrEqual(t, lastThrottledTime.Add(time.Second), startedTime)
component := row.AsString("component_throttled", "")
assert.Contains(t, []string{string(vreplication.VStreamerComponentName), string(vreplication.RowStreamerComponentName)}, component)
assert.Contains(t, []string{throttlerapp.VStreamerName.String(), throttlerapp.RowStreamerName.String()}, component)
}()
// now unthrottled
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
Expand Down Expand Up @@ -478,7 +476,7 @@ func TestSchemaChange(t *testing.T) {
// Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...`
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

// spawn n migrations; cancel them via cancel-all
var wg sync.WaitGroup
Expand All @@ -497,7 +495,7 @@ func TestSchemaChange(t *testing.T) {
// Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...`
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

// spawn n migrations; cancel them via cancel-all
var wg sync.WaitGroup
Expand Down Expand Up @@ -531,16 +529,16 @@ func TestSchemaChange(t *testing.T) {
case 0:
// this is the shard where we run PRS
// Use per-tablet throttling API
body, err = throttleApp(currentPrimaryTablet, onlineDDLThrottlerAppName)
defer unthrottleApp(currentPrimaryTablet, onlineDDLThrottlerAppName)
body, err = throttleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName)
defer unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName)
case 1:
// no PRS on this shard
// Use per-tablet throttling API
body, err = throttleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName)
defer unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName)
body, err = throttleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName)
defer unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName)
}
assert.NoError(t, err)
assert.Contains(t, body, onlineDDLThrottlerAppName)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
}
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)

Expand Down Expand Up @@ -596,14 +594,14 @@ func TestSchemaChange(t *testing.T) {
case 0:
// this is the shard where we run PRS
// Use per-tablet throttling API
body, err = unthrottleApp(currentPrimaryTablet, onlineDDLThrottlerAppName)
body, err = unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName)
case 1:
// no PRS on this shard
// Use per-tablet throttling API
body, err = unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName)
body, err = unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName)
}
assert.NoError(t, err)
assert.Contains(t, body, onlineDDLThrottlerAppName)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
}
})
t.Run("expect completion", func(t *testing.T) {
Expand Down Expand Up @@ -823,11 +821,11 @@ func TestSchemaChange(t *testing.T) {
// - tablet throttling
t.Run("Revert a migration completed on one shard and cancelled on another", func(t *testing.T) {
// shard 0 will run normally, shard 1 will be throttled
defer unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName)
defer unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName)
t.Run("throttle shard 1", func(t *testing.T) {
body, err := throttleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName)
body, err := throttleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName)
assert.NoError(t, err)
assert.Contains(t, body, onlineDDLThrottlerAppName)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
})

var uuid string
Expand All @@ -849,9 +847,9 @@ func TestSchemaChange(t *testing.T) {
onlineddl.CheckCancelAllMigrations(t, &vtParams, 1)
})
t.Run("unthrottle shard 1", func(t *testing.T) {
body, err := unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName)
body, err := unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName)
assert.NoError(t, err)
assert.Contains(t, body, onlineDDLThrottlerAppName)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
})
var revertUUID string
t.Run("issue revert migration", func(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

"vitess.io/vitess/go/test/endtoend/cluster"

Expand Down Expand Up @@ -311,17 +312,17 @@ func UnthrottleAllMigrations(t *testing.T, vtParams *mysql.ConnParams) {
}

// CheckThrottledApps checks for existence or non-existence of an app in the throttled apps list
func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, appName string, expectFind bool) {
func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, throttlerApp throttlerapp.Name, expectFind bool) {
query := "show vitess_throttled_apps"
r := VtgateExecQuery(t, vtParams, query, "")

found := false
for _, row := range r.Named().Rows {
if row.AsString("app", "") == appName {
if throttlerApp.Equals(row.AsString("app", "")) {
found = true
}
}
assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", appName, found)
assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", throttlerApp, found)
}

// WaitForThrottledTimestamp waits for a migration to have a non-empty last_throttled_timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ func throttledApps(tablet *cluster.Vttablet) (resp *http.Response, respBody stri
}

func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*http.Response, error) {
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats))
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test&s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats))
return resp, err
}

func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath))
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test", tablet.HTTPPort, checkSelfAPIPath))
}

func warmUpHeartbeat(t *testing.T) (respStatus int) {
Expand Down
7 changes: 4 additions & 3 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

const (
Expand Down Expand Up @@ -125,12 +126,12 @@ func waitForQueryResult(t *testing.T, conn *mysql.Conn, database string, query s

// waitForTabletThrottlingStatus waits for the tablet to return the provided HTTP code for
// the provided app name in its self check.
func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, appName string, wantCode int64) {
func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name, wantCode int64) {
var gotCode int64
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
output, err := throttlerCheckSelf(tablet, appName)
output, err := throttlerCheckSelf(tablet, throttlerApp)
require.NoError(t, err)

gotCode, err = jsonparser.GetInt([]byte(output), "StatusCode")
Expand All @@ -144,7 +145,7 @@ func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess
select {
case <-timer.C:
require.FailNow(t, fmt.Sprintf("tablet %q did not return expected status of %d for application %q before the timeout of %s; last seen status: %d",
tablet.Name, wantCode, appName, defaultTimeout, gotCode))
tablet.Name, wantCode, throttlerApp, defaultTimeout, gotCode))
default:
time.Sleep(defaultTick)
}
Expand Down
17 changes: 9 additions & 8 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
)

Expand All @@ -58,8 +59,8 @@ var (
sourceKsOpts = make(map[string]string)
targetKsOpts = make(map[string]string)
httpClient = throttlebase.SetupHTTPClient(time.Second)
sourceThrottlerAppName = "vstreamer"
targetThrottlerAppName = "vreplication"
sourceThrottlerAppName = throttlerapp.VStreamerName
targetThrottlerAppName = throttlerapp.VReplicationName
)

const (
Expand Down Expand Up @@ -94,16 +95,16 @@ func throttleResponse(tablet *cluster.VttabletProcess, path string) (respBody st
return respBody, err
}

func throttleApp(tablet *cluster.VttabletProcess, app string) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app))
func throttleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerApp.String()))
}

func unthrottleApp(tablet *cluster.VttabletProcess, app string) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app))
func unthrottleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp.String()))
}

func throttlerCheckSelf(tablet *cluster.VttabletProcess, app string) (respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, app)
func throttlerCheckSelf(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, throttlerApp.String())
resp, err := httpClient.Get(apiURL)
if err != nil {
return "", err
Expand Down
17 changes: 9 additions & 8 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

Expand Down Expand Up @@ -124,7 +125,6 @@ var (
migrationFailureFileName = "migration-failure.log"
onlineDDLUser = "vt-online-ddl-internal"
onlineDDLGrant = fmt.Sprintf("'%s'@'%s'", onlineDDLUser, "%")
throttlerOnlineDDLApp = "online-ddl"
throttleCheckFlags = &throttle.CheckFlags{}
)

Expand Down Expand Up @@ -1621,7 +1621,7 @@ exit $exit_code
fmt.Sprintf("--serve-socket-file=%s", serveSocketFile),
fmt.Sprintf("--hooks-path=%s", tempDir),
fmt.Sprintf(`--hooks-hint-token=%s`, onlineDDL.UUID),
fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=%s:gh-ost:%s&p=low`, servenv.Port(), throttlerOnlineDDLApp, onlineDDL.UUID),
fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=%s:%s:%s&p=low`, servenv.Port(), throttlerapp.OnlineDDLName, throttlerapp.GhostName, onlineDDL.UUID),
fmt.Sprintf(`--database=%s`, e.dbName),
fmt.Sprintf(`--table=%s`, onlineDDL.Table),
fmt.Sprintf(`--alter=%s`, alterOptions),
Expand Down Expand Up @@ -1768,7 +1768,7 @@ export MYSQL_PWD
my ($self, %args) = @_;

return sub {
if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app={{THROTTLER_ONLINE_DDL_APP}}:pt-osc:{{MIGRATION_UUID}}&p=low")) {
if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app={{THROTTLER_ONLINE_DDL_APP}}:{{THROTTLER_PT_OSC_APP}}:{{MIGRATION_UUID}}&p=low")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that we didn't support pt-osc? Might be a good opportunity to clean up the code if so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's labeled as "experimental". I'd rather not do everything in this PR. We can deprecate pt-osc later.

# Got HTTP 200 OK, means throttler is happy
return 0;
} else {
Expand All @@ -1782,7 +1782,8 @@ export MYSQL_PWD
`
pluginCode = strings.ReplaceAll(pluginCode, "{{VTTABLET_PORT}}", fmt.Sprintf("%d", servenv.Port()))
pluginCode = strings.ReplaceAll(pluginCode, "{{MIGRATION_UUID}}", onlineDDL.UUID)
pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_ONLINE_DDL_APP}}", throttlerOnlineDDLApp)
pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_ONLINE_DDL_APP}}", throttlerapp.OnlineDDLName.String())
pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_PT_OSC_APP}}", throttlerapp.PTOSCName.String())

pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusRunning}}", string(schema.OnlineDDLStatusRunning))
pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusComplete}}", string(schema.OnlineDDLStatusComplete))
Expand Down Expand Up @@ -2141,7 +2142,7 @@ func (e *Executor) ThrottleAllMigrations(ctx context.Context, expireString strin
if err := e.lagThrottler.CheckIsReady(); err != nil {
return nil, err
}
_ = e.lagThrottler.ThrottleApp(throttlerOnlineDDLApp, time.Now().Add(duration), ratio)
_ = e.lagThrottler.ThrottleApp(throttlerapp.OnlineDDLName.String(), time.Now().Add(duration), ratio)
return emptyResult, nil
}

Expand All @@ -2161,7 +2162,7 @@ func (e *Executor) UnthrottleAllMigrations(ctx context.Context) (result *sqltype
return nil, err
}
defer e.triggerNextCheckInterval()
_ = e.lagThrottler.UnthrottleApp(throttlerOnlineDDLApp)
_ = e.lagThrottler.UnthrottleApp(throttlerapp.OnlineDDLName.String())
return emptyResult, nil
}

Expand Down Expand Up @@ -3469,7 +3470,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
if err := e.lagThrottler.CheckIsReady(); err == nil {
// No point in reviewing throttler info if it's not enabled&open
for _, app := range e.lagThrottler.ThrottledApps() {
if app.AppName == throttlerOnlineDDLApp {
if throttlerapp.OnlineDDLName.Equals(app.AppName) {
currentUserThrottleRatio = app.Ratio
break
}
Expand Down Expand Up @@ -3603,7 +3604,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
// - it's a deadlock.
// And so, once per reviewRunningMigrations(), and assuming there _are_ running migrations, we ensure to hit a throttler check. This will kick
// on-demand heartbeats, unlocking the deadlock.
e.lagThrottler.CheckByType(ctx, throttlerOnlineDDLApp, "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite)
e.lagThrottler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite)
})
}
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletconntest"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv"
"vitess.io/vitess/go/vt/vttablet/tmclient"
Expand Down Expand Up @@ -235,7 +236,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS
if vstreamHook != nil {
vstreamHook(ctx)
}
return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send)
return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, throttlerapp.VStreamerName, send)
}

// vstreamRowsHook allows you to do work just before calling VStreamRows.
Expand Down
Loading