Skip to content

Commit

Permalink
Minor changes after self review
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Mar 31, 2022
1 parent 4ba3209 commit e60a3ce
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 57 deletions.
4 changes: 3 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,9 @@ func verifySourceTabletThrottling(t *testing.T, targetKS, workflow string) {
streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream
state := attributeValue.Get("State").String()
if state != "Copying" {
require.FailNowf(t, "Unexpected running workflow stream", "Initial copy phase for the MoveTables workflow %s started in less than %d seconds when it should have been waiting. Show output: %s", ksWorkflow, int(tDuration.Seconds()), output)
require.FailNowf(t, "Unexpected running workflow stream",
"Initial copy phase for the MoveTables workflow %s started in less than %d seconds when it should have been waiting. Show output: %s",
ksWorkflow, int(tDuration.Seconds()), output)
}
return true // end attribute loop
})
Expand Down
2 changes: 0 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ func TestVReplicationStats(t *testing.T) {
require.Greater(t, want, testStats.status().Controllers[0].PhaseTimings["catchup"])
record("copy")
require.Greater(t, want, testStats.status().Controllers[0].PhaseTimings["copy"])
record("waitForMySQL")
require.Greater(t, want, testStats.status().Controllers[0].PhaseTimings["waitForMySQL"])

blpStats.QueryCount.Add("replicate", 11)
blpStats.QueryCount.Add("fastforward", 23)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletserver/debugenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
case "WarnResultSize":
setIntVal(tsv.SetWarnResultSize)
case "RowStreamerMaxInnoDBTrxHistLen":
setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxTrxHistLen = val })
setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxInnoDBTrxHistLen = val })
case "RowStreamerMaxMySQLReplLagSecs":
setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxReplLagSecs = val })
setInt64Val(func(val int64) { tsv.Config().RowStreamer.MaxMySQLReplLagSecs = val })
case "UnhealthyThreshold":
setDurationVal(tsv.Config().Healthcheck.UnhealthyThresholdSeconds.Set)
setDurationVal(tsv.hs.SetUnhealthyThreshold)
Expand Down Expand Up @@ -160,8 +160,8 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request)
addIntVar("QueryCacheCapacity", tsv.QueryPlanCacheCap)
addIntVar("MaxResultSize", tsv.MaxResultSize)
addIntVar("WarnResultSize", tsv.WarnResultSize)
addInt64Var("VReplication: max InnoDB history list length on source for streaming rows", func() int64 { return tsv.Config().RowStreamer.MaxTrxHistLen })
addInt64Var("VReplication: max MySQL replication lag on source for streaming rows", func() int64 { return tsv.Config().RowStreamer.MaxReplLagSecs })
addInt64Var("RowStreamerMaxInnoDBTrxHistLen", func() int64 { return tsv.Config().RowStreamer.MaxInnoDBTrxHistLen })
addInt64Var("RowStreamerMaxMySQLReplLagSecs", func() int64 { return tsv.Config().RowStreamer.MaxMySQLReplLagSecs })
addDurationVar("UnhealthyThreshold", tsv.Config().Healthcheck.UnhealthyThresholdSeconds.Get)
addFloat64Var("ThrottleMetricThreshold", tsv.ThrottleMetricThreshold)
vars = append(vars, envValue{
Expand Down
14 changes: 7 additions & 7 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func init() {
flag.BoolVar(&enableReplicationReporter, "enable_replication_reporter", false, "Use polling to track replication lag.")
flag.BoolVar(&currentConfig.EnableOnlineDDL, "queryserver_enable_online_ddl", true, "Enable online DDL.")

flag.Int64Var(&currentConfig.RowStreamer.MaxTrxHistLen, "vreplication_copy_phase_max_innodb_history_list_length", 1000000, "The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet.")
flag.Int64Var(&currentConfig.RowStreamer.MaxReplLagSecs, "vreplication_copy_phase_max_mysql_replication_lag", 43200, "The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet.")
flag.Int64Var(&currentConfig.RowStreamer.MaxInnoDBTrxHistLen, "vreplication_copy_phase_max_innodb_history_list_length", 1000000, "The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet.")
flag.Int64Var(&currentConfig.RowStreamer.MaxMySQLReplLagSecs, "vreplication_copy_phase_max_mysql_replication_lag", 43200, "The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet.")
}

// Init must be called after flag.Parse, and before doing any other operations.
Expand Down Expand Up @@ -290,7 +290,7 @@ type TabletConfig struct {
EnforceStrictTransTables bool `json:"-"`
EnableOnlineDDL bool `json:"-"`

RowStreamer RowStreamerConfig `json:"rowstreamer,omitempty"`
RowStreamer RowStreamerConfig `json:"rowStreamer,omitempty"`
}

// ConnPoolConfig contains the config for a conn pool.
Expand Down Expand Up @@ -355,8 +355,8 @@ type TransactionLimitConfig struct {
// RowStreamerConfig contains configuration parameters for a vstreamer (source) that is
// copying the contents of a table to a target
type RowStreamerConfig struct {
MaxTrxHistLen int64 `json:"maxTrxHistLen,omitempty"`
MaxReplLagSecs int64 `json:"maxReplLagSecs,omitempty"`
MaxInnoDBTrxHistLen int64 `json:"maxInnoDBTrxHistLen,omitempty"`
MaxMySQLReplLagSecs int64 `json:"maxMySQLReplLagSecs,omitempty"`
}

// NewCurrentConfig returns a copy of the current config.
Expand Down Expand Up @@ -500,8 +500,8 @@ var defaultConfig = TabletConfig{
EnableOnlineDDL: true,

RowStreamer: RowStreamerConfig{
MaxTrxHistLen: 1000000,
MaxReplLagSecs: 43200,
MaxInnoDBTrxHistLen: 1000000,
MaxMySQLReplLagSecs: 43200,
},
}

Expand Down
24 changes: 12 additions & 12 deletions go/vt/vttablet/tabletserver/tabletenv/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func TestConfigParse(t *testing.T) {
MaxWaiters: 40,
},
RowStreamer: RowStreamerConfig{
MaxTrxHistLen: 1000,
MaxReplLagSecs: 400,
MaxInnoDBTrxHistLen: 1000,
MaxMySQLReplLagSecs: 400,
},
}
gotBytes, err := yaml2.Marshal(&cfg)
Expand Down Expand Up @@ -83,9 +83,9 @@ oltpReadPool:
size: 16
timeoutSeconds: 10
replicationTracker: {}
rowstreamer:
maxReplLagSecs: 400
maxTrxHistLen: 1000
rowStreamer:
maxInnoDBTrxHistLen: 1000
maxMySQLReplLagSecs: 400
txPool: {}
`
assert.Equal(t, wantBytes, string(gotBytes))
Expand Down Expand Up @@ -148,9 +148,9 @@ queryCacheSize: 5000
replicationTracker:
heartbeatIntervalSeconds: 0.25
mode: disable
rowstreamer:
maxReplLagSecs: 43200
maxTrxHistLen: 1000000
rowStreamer:
maxInnoDBTrxHistLen: 1000000
maxMySQLReplLagSecs: 43200
schemaReloadIntervalSeconds: 1800
signalSchemaChangeReloadIntervalSeconds: 5
streamBufferSize: 32768
Expand All @@ -176,8 +176,8 @@ func TestClone(t *testing.T) {
MaxWaiters: 40,
},
RowStreamer: RowStreamerConfig{
MaxTrxHistLen: 1000000,
MaxReplLagSecs: 43200,
MaxInnoDBTrxHistLen: 1000000,
MaxMySQLReplLagSecs: 43200,
},
}
cfg2 := cfg1.Clone()
Expand Down Expand Up @@ -231,8 +231,8 @@ func TestFlags(t *testing.T) {
EnableOnlineDDL: true,
DB: &dbconfigs.DBConfigs{},
RowStreamer: RowStreamerConfig{
MaxTrxHistLen: 1000000,
MaxReplLagSecs: 43200,
MaxInnoDBTrxHistLen: 1000000,
MaxMySQLReplLagSecs: 43200,
},
}
assert.Equal(t, want.DB, currentConfig.DB)
Expand Down
21 changes: 12 additions & 9 deletions go/vt/vttablet/tabletserver/vstreamer/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, lagThrot
resultStreamerNumRows: env.Exporter().NewCounter("ResultStreamerNumRows", "Number of rows sent in result streamer"),
rowStreamerNumPackets: env.Exporter().NewCounter("RowStreamerNumPackets", "Number of packets in row streamer"),
rowStreamerNumRows: env.Exporter().NewCounter("RowStreamerNumRows", "Number of rows sent in row streamer"),
rowStreamerWaits: env.Exporter().NewTimings("RowStreamerWaits", "Count and total time we've waited for the source when starting a new vstream copy cycle", "copy-phase-waits"),
rowStreamerWaits: env.Exporter().NewTimings("RowStreamerWaits", "Total counts and time we've waited when streaming rows in the vstream copy phase", "copy-phase-waits"),
vstreamersCreated: env.Exporter().NewCounter("VStreamersCreated", "Count of vstreamers created"),
vstreamersEndedWithErrors: env.Exporter().NewCounter("VStreamersEndedWithErrors", "Count of vstreamers that ended with errors"),
errorCounts: env.Exporter().NewCountersWithSingleLabel("VStreamerErrors", "Tracks errors in vstreamer", "type", "Catchup", "Copy", "Send", "TablePlan"),
}
env.Exporter().NewGaugeFunc("RowStreamerMaxInnoDBTrxHistLen", "", func() int64 { return env.Config().RowStreamer.MaxTrxHistLen })
env.Exporter().NewGaugeFunc("RowStreamerMaxMySQLReplLagSecs", "", func() int64 { return env.Config().RowStreamer.MaxReplLagSecs })
env.Exporter().NewGaugeFunc("RowStreamerMaxInnoDBTrxHistLen", "", func() int64 { return env.Config().RowStreamer.MaxInnoDBTrxHistLen })
env.Exporter().NewGaugeFunc("RowStreamerMaxMySQLReplLagSecs", "", func() int64 { return env.Config().RowStreamer.MaxMySQLReplLagSecs })
env.Exporter().HandleFunc("/debug/tablet_vschema", vse.ServeHTTP)
return vse
}
Expand Down Expand Up @@ -398,15 +398,18 @@ func (vse *Engine) waitForMySQL(ctx context.Context, db dbconfigs.Connector, tab
backoffLimit := backoff * 30
ready := false
recording := false
mhll := vse.env.Config().RowStreamer.MaxTrxHistLen
mrls := vse.env.Config().RowStreamer.MaxReplLagSecs

loopFunc := func() error {
// Exit if the context has been cancelled
if ctx.Err() != nil {
return ctx.Err()
}
hll := vse.getMySQLTrxHistoryLen(ctx, db)
// Check the config values each time as they can be updated in the running process via the /debug/env endpoint.
// This allows the user to break out of a wait w/o incurring any downtime or restarting the workflow if they
// need to.
mhll := vse.env.Config().RowStreamer.MaxInnoDBTrxHistLen
mrls := vse.env.Config().RowStreamer.MaxMySQLReplLagSecs
hll := vse.getInnoDBTrxHistoryLen(ctx, db)
rpl := vse.getMySQLReplicationLag(ctx, db)
if hll <= mhll && rpl <= mrls {
ready = true
Expand Down Expand Up @@ -454,10 +457,10 @@ func (vse *Engine) waitForMySQL(ctx context.Context, db dbconfigs.Connector, tab
return nil
}

// getMySQLTrxHistoryLen attempts to query InnoDB's current transaction rollback segment's history
// getInnoDBTrxHistoryLen attempts to query InnoDB's current transaction rollback segment's history
// list length. If the value cannot be determined for any reason then -1 is returned, which means
// "unknown".
func (vse *Engine) getMySQLTrxHistoryLen(ctx context.Context, db dbconfigs.Connector) int64 {
func (vse *Engine) getInnoDBTrxHistoryLen(ctx context.Context, db dbconfigs.Connector) int64 {
histLen := int64(-1)
conn, err := db.Connect(ctx)
if err != nil {
Expand Down Expand Up @@ -502,7 +505,7 @@ func (vse *Engine) getMySQLEndpoint(ctx context.Context, db dbconfigs.Connector)

res, err := conn.ExecuteFetch(hostQuery, 1, false)
if err != nil || len(res.Rows) != 1 || res.Rows[0] == nil {
return "", vterrors.Wrap(err, "could not get vstreamer endpoint")
return "", vterrors.Wrap(err, "could not get vstreamer MySQL endpoint")
}
host := res.Rows[0][0].ToString()
port, _ := res.Rows[0][1].ToInt64()
Expand Down
42 changes: 22 additions & 20 deletions go/vt/vttablet/tabletserver/vstreamer/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,46 +170,47 @@ func expectUpdateCount(t *testing.T, wantCount int64) int64 {
}

func TestVStreamerWaitForMySQL(t *testing.T) {
tableName := "test"
type fields struct {
vse *Engine
cp dbconfigs.Connector
se *schema.Engine
ReplicationLagSeconds int64
maxTrxHistLen int64
maxReplLagSecs int64
maxInnoDBTrxHistLen int64
maxMySQLReplLagSecs int64
}
tests := []struct {
name string
fields fields
wantErr bool
}{
{
name: "Small mvcc impact limit",
name: "Small InnoDB MVCC impact limit",
fields: fields{
vse: engine,
se: engine.se,
maxTrxHistLen: 100,
maxReplLagSecs: 5000,
vse: engine,
se: engine.se,
maxInnoDBTrxHistLen: 100,
maxMySQLReplLagSecs: 5000,
},
wantErr: true,
},
{
name: "Small lag impact limit",
name: "Small Repl Lag impact limit",
fields: fields{
vse: engine,
se: engine.se,
maxTrxHistLen: 10000,
maxReplLagSecs: 5,
vse: engine,
se: engine.se,
maxInnoDBTrxHistLen: 10000,
maxMySQLReplLagSecs: 5,
},
wantErr: true,
},
{
name: "Large impact limit",
name: "Large impact limits",
fields: fields{
vse: engine,
se: engine.se,
maxTrxHistLen: 10000,
maxReplLagSecs: 200,
vse: engine,
se: engine.se,
maxInnoDBTrxHistLen: 10000,
maxMySQLReplLagSecs: 200,
},
wantErr: false,
},
Expand Down Expand Up @@ -246,13 +247,14 @@ func TestVStreamerWaitForMySQL(t *testing.T) {
se: tt.fields.se,
ReplicationLagSeconds: tt.fields.ReplicationLagSeconds,
}
env.TabletEnv.Config().RowStreamer.MaxTrxHistLen = tt.fields.maxTrxHistLen
env.TabletEnv.Config().RowStreamer.MaxReplLagSecs = tt.fields.maxReplLagSecs
if err := uvs.vse.waitForMySQL(ctx, uvs.cp, "test"); (err != nil) != tt.wantErr {
env.TabletEnv.Config().RowStreamer.MaxInnoDBTrxHistLen = tt.fields.maxInnoDBTrxHistLen
env.TabletEnv.Config().RowStreamer.MaxMySQLReplLagSecs = tt.fields.maxMySQLReplLagSecs
if err := uvs.vse.waitForMySQL(ctx, uvs.cp, tableName); (err != nil) != tt.wantErr {
t.Errorf("vstreamer.waitForMySQL() error = %v, wantErr %v", err, tt.wantErr)
}
})
}

require.Equal(t, engine.rowStreamerWaits.Counts()["VStreamerTest.waitForMySQL"], int64(2))
require.Equal(t, engine.vstreamerPhaseTimings.Counts()["VStreamerTest."+tableName+":waitForMySQL"], int64(2))
}
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (rs *rowStreamer) Cancel() {
}

func (rs *rowStreamer) Stream() error {
// Ensure se is Open. If vttablet came up in a non_serving role,
// Ensure sh is Open. If vttablet came up in a non_serving role,
// the schema engine may not have been initialized.
if err := rs.se.Open(); err != nil {
return err
Expand Down Expand Up @@ -259,7 +259,7 @@ func (rs *rowStreamer) buildSelect() (string, error) {
}

func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.VStreamRowsResponse) error) error {
// Let's be sure MySQL is in good shape to stream rows
// Let's wait until MySQL is in good shape to stream rows
if err := rs.vse.waitForMySQL(rs.ctx, rs.cp, rs.plan.Table.Name); err != nil {
return err
}
Expand Down

0 comments on commit e60a3ce

Please sign in to comment.