Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Defer Secondary Index Creation #11700

Merged
merged 48 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
af620cb
Strip secondary keys during schema copy
mattlord Nov 12, 2022
f573106
Save ALTER to run when copy finishes
mattlord Nov 12, 2022
4570dd6
Move work to vcopier
mattlord Nov 12, 2022
31978a7
go mod tidy
mattlord Nov 13, 2022
82c78b6
Re-apply stashed secondary keys
mattlord Nov 13, 2022
31b8a89
Minor fixes and cleanup
mattlord Nov 13, 2022
e587cc0
Add Reshard support
mattlord Nov 13, 2022
69a58fb
Support multiple SQL actions
mattlord Nov 13, 2022
6d9fde5
Merge remote-tracking branch 'origin/main' into vrepl_sec_idxs
mattlord Dec 7, 2022
8a5893e
Rename action->>"$.action" to action->>"$.task"
mattlord Dec 7, 2022
6d1ace8
Merge remote-tracking branch 'origin/main' into vrepl_sec_idxs
mattlord Dec 18, 2022
9837718
Fix unit tests
mattlord Dec 18, 2022
1061004
Add new unit test
mattlord Dec 19, 2022
85b85cf
Improve the unit test
mattlord Dec 20, 2022
3ec5a4a
Add vtctl --defer-secondary-keys client flag
mattlord Dec 20, 2022
8286659
Fix binlog unit tests
mattlord Dec 20, 2022
f98165d
Fix wrangler unit tests
mattlord Dec 20, 2022
cc30174
Move workflow type check to a function
mattlord Dec 20, 2022
46cd94c
Update error in unit test and add OnlineDDL
mattlord Dec 20, 2022
3aa31eb
Minor unit test improvements
mattlord Dec 20, 2022
4874cf2
Improve name of table used
mattlord Dec 22, 2022
95758ba
Improvements after self review
mattlord Dec 22, 2022
963715d
Ensure ALTERs are killed when engine shuts down
mattlord Dec 23, 2022
4b2a173
Fixup & deflake the unit test
mattlord Dec 23, 2022
e705f9e
Minor improvements after self review
mattlord Dec 23, 2022
3a75def
Auto handle more edge cases
mattlord Dec 24, 2022
8f1f2fd
More unit test improvements
mattlord Dec 24, 2022
922a350
Address review comments
mattlord Dec 26, 2022
65330ea
Correct new error handling in killAction
mattlord Dec 26, 2022
09b1c12
Improve getTableSecondaryKeys function
mattlord Dec 26, 2022
716520e
Nitty comment improvements
mattlord Dec 26, 2022
5879ccc
Address review comments
mattlord Jan 15, 2023
1263c49
Minor improvements to log message
mattlord Jan 15, 2023
ce18c40
Merge remote-tracking branch 'origin/main' into vrepl_sec_idxs
mattlord Jan 15, 2023
31056e8
Fix unit tests
mattlord Jan 15, 2023
800ddad
Add defer_secondary_keys to new unit tests added via main merge
mattlord Jan 16, 2023
facc86b
More unit test fixes
mattlord Jan 16, 2023
822980b
Minor improvements to the log messages
mattlord Jan 16, 2023
e140cb8
Minor changes and a fix after self review
mattlord Jan 16, 2023
4fca6df
Ignore --defer-secondary-keys for Reshard merges
mattlord Jan 16, 2023
51f6b64
Nitty improvement to comment
mattlord Jan 16, 2023
f9cb3d2
Add full support for shard merges
mattlord Jan 18, 2023
5537455
Move newClientConnection() to vreplicator
mattlord Jan 18, 2023
72cc647
Add unit test for new behavior
mattlord Jan 19, 2023
07be197
Use the engine context rather than vreplicator context
mattlord Jan 19, 2023
540fa01
Improve testing and other minor tweaks
mattlord Jan 19, 2023
255fdfb
Add post copy action cleanup to workflow cancel
mattlord Jan 19, 2023
02d0bc6
More unit test fixes for post copy action cleanup
mattlord Jan 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ const (
ErSPNotVarArg = 1414
ERRowIsReferenced2 = 1451
ErNoReferencedRow2 = 1452
ERDupIndex = 1831
ERInnodbReadOnly = 1874

// already exists
Expand Down
21 changes: 11 additions & 10 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,23 @@ package vreplication
// The internal table _vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431 should be ignored by vreplication
// The db_order_test table is used to ensure vreplication and vdiff work well with complex non-integer PKs, even across DB versions.
var (
// All standard user tables should have a primary key and at least one secondary key.
initialProductSchema = `
create table product(pid int, description varbinary(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid)) CHARSET=utf8mb4;
create table product(pid int, description varbinary(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid), key(date1,date2)) CHARSET=utf8mb4;
create table customer(cid int, name varchar(128) collate utf8mb4_bin, meta json default null, typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),
ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), primary key(cid,typ)) CHARSET=utf8mb4;
date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), primary key(cid,typ), key(name)) CHARSET=utf8mb4;
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table merchant(mname varchar(128), category varchar(128), primary key(mname)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
create table orders(oid int, cid int, pid int, mname varchar(128), price int, qty int, total int as (qty * price), total2 int as (qty * price) stored, primary key(oid)) CHARSET=utf8;
create table merchant(mname varchar(128), category varchar(128), primary key(mname), key(category)) CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
create table orders(oid int, cid int, pid int, mname varchar(128), price int, qty int, total int as (qty * price), total2 int as (qty * price) stored, primary key(oid), key(pid), key(cid)) CHARSET=utf8;
create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table customer2(cid int, name varchar(128), typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid)) CHARSET=utf8;
create table customer2(cid int, name varchar(128), typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid), key(ts)) CHARSET=utf8;
create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table ` + "`Lead`(`Lead-id`" + ` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (` + "`Lead-id`" + `));
create table ` + "`Lead-1`(`Lead`" + ` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (` + "`Lead`" + `));
create table _vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431(id int, val varbinary(128), primary key(id));
create table db_order_test (c_uuid varchar(64) not null default '', created_at datetime not null, dstuff varchar(128), dtstuff text, dbstuff blob, cstuff char(32), primary key (c_uuid,created_at)) CHARSET=utf8mb4;
create table datze (id int, dt1 datetime not null default current_timestamp, dt2 datetime not null, ts1 timestamp default current_timestamp, primary key (id));
create table ` + "`Lead`(`Lead-id`" + ` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (` + "`Lead-id`" + `), key (date1));
create table ` + "`Lead-1`(`Lead`" + ` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (` + "`Lead`" + `), key (date2));
create table _vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431(id int, val varbinary(128), primary key(id), key(val));
create table db_order_test (c_uuid varchar(64) not null default '', created_at datetime not null, dstuff varchar(128), dtstuff text, dbstuff blob, cstuff char(32), primary key (c_uuid,created_at), key (dstuff)) CHARSET=utf8mb4;
create table datze (id int, dt1 datetime not null default current_timestamp, dt2 datetime not null, ts1 timestamp default current_timestamp, primary key (id), key (dt1));
`

// These should always be ignored in vreplication
Expand Down
64 changes: 57 additions & 7 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,19 @@ import (
"testing"
"time"

"vitess.io/vitess/go/vt/log"

"github.com/PuerkitoBio/goquery"
"github.com/buger/jsonparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/schema"

"github.com/PuerkitoBio/goquery"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
)

const (
Expand Down Expand Up @@ -298,6 +297,57 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
}
}

// confirmTablesHaveSecondaryKeys confirms that the tables provided
// as a CSV have secondary keys. This is useful when testing the
// --defer-secondary-keys flag to confirm that the secondary keys
// were re-added by the time the workflow hits the running phase.
// For a Reshard workflow, where no tables are specififed, pass
// an empty string for the tables and all tables in the target
// keyspace will be checked.
func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletProcess, ksName string, tables string) {
require.NotNil(t, tablets)
require.NotNil(t, tablets[0])
var tableArr []string
if strings.TrimSpace(tables) != "" {
tableArr = strings.Split(tables, ",")
}
if len(tableArr) == 0 { // We don't specify any for Reshard.
// In this case we check all of them.
res, err := tablets[0].QueryTablet("show tables", ksName, true)
require.NoError(t, err)
require.NotNil(t, res)
for _, row := range res.Rows {
tableArr = append(tableArr, row[0].ToString())
}
}
for _, tablet := range tablets {
for _, table := range tableArr {
if schema.IsInternalOperationTableName(table) {
continue
}
table := strings.TrimSpace(table)
secondaryKeys := 0
res, err := tablet.QueryTablet(fmt.Sprintf("show create table %s", sqlescape.EscapeID(table)), ksName, true)
require.NoError(t, err)
require.NotNil(t, res)
row := res.Named().Row()
tableSchema := row["Create Table"].ToString()
parsedDDL, err := sqlparser.ParseStrictDDL(tableSchema)
require.NoError(t, err)
createTable, ok := parsedDDL.(*sqlparser.CreateTable)
require.True(t, ok)
require.NotNil(t, createTable)
require.NotNil(t, createTable.GetTableSpec())
for _, index := range createTable.GetTableSpec().Indexes {
if !index.Info.Primary {
secondaryKeys++
}
}
require.Greater(t, secondaryKeys, 0, "Table %s does not have any secondary keys", table)
}
}
}

func getHTTPBody(url string) string {
resp, err := http.Get(url)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func createReshardWorkflow(t *testing.T, sourceShards, targetShards string) erro
"", workflowActionCreate, "", sourceShards, targetShards)
require.NoError(t, err)
waitForWorkflowState(t, vc, ksWorkflow, workflowStateRunning)
confirmTablesHaveSecondaryKeys(t, []*cluster.VttabletProcess{targetTab1}, targetKs, "")
catchup(t, targetTab1, workflowName, "Reshard")
catchup(t, targetTab2, workflowName, "Reshard")
vdiff1(t, ksWorkflow, "")
Expand All @@ -77,6 +78,7 @@ func createMoveTablesWorkflow(t *testing.T, tables string) {
tables, workflowActionCreate, "", "", "")
require.NoError(t, err)
waitForWorkflowState(t, vc, ksWorkflow, workflowStateRunning)
confirmTablesHaveSecondaryKeys(t, []*cluster.VttabletProcess{targetTab1}, targetKs, tables)
catchup(t, targetTab1, workflowName, "MoveTables")
catchup(t, targetTab2, workflowName, "MoveTables")
vdiff1(t, ksWorkflow, "")
Expand Down Expand Up @@ -110,6 +112,11 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
} else {
args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards)
}
// Test new experimental --defer-secondary-keys flag
switch currentWorkflowType {
case wrangler.MoveTablesWorkflow, wrangler.MigrateWorkflow, wrangler.ReshardWorkflow:
args = append(args, "--defer-secondary-keys")
}
}
if cells != "" {
args = append(args, "--cells", cells)
Expand Down
1 change: 0 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ func TestVreplicationCopyThrottling(t *testing.T) {
// to avoid flakiness when the CI is very slow.
fmt.Sprintf("--queryserver-config-transaction-timeout=%d", int64(defaultTimeout.Seconds())*3),
fmt.Sprintf("--vreplication_copy_phase_max_innodb_history_list_length=%d", maxSourceTrxHistory),

parallelInsertWorkers,
}

Expand Down
49 changes: 28 additions & 21 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ var AlterVReplicationTable = []string{
"ALTER TABLE _vt.vreplication ADD COLUMN time_throttled BIGINT NOT NULL DEFAULT 0",
"ALTER TABLE _vt.vreplication ADD COLUMN component_throttled VARCHAR(255) NOT NULL DEFAULT ''",
"ALTER TABLE _vt.vreplication ADD COLUMN workflow_sub_type int NOT NULL DEFAULT 0",
"ALTER TABLE _vt.vreplication ADD COLUMN defer_secondary_keys bool NOT NULL DEFAULT false",
}

// WithDDLInitialQueries contains the queries that:
Expand All @@ -596,20 +597,21 @@ var WithDDLInitialQueries = []string{

// VRSettings contains the settings of a vreplication table.
type VRSettings struct {
StartPos mysql.Position
StopPos mysql.Position
MaxTPS int64
MaxReplicationLag int64
State string
WorkflowType int32
WorkflowSubType int32
WorkflowName string
StartPos mysql.Position
StopPos mysql.Position
MaxTPS int64
MaxReplicationLag int64
State string
WorkflowType int32
WorkflowSubType int32
WorkflowName string
DeferSecondaryKeys bool
}

// ReadVRSettings retrieves the throttler settings for
// vreplication from the checkpoint table.
func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) {
query := fmt.Sprintf("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow, workflow_sub_type from _vt.vreplication where id=%v", uid)
query := fmt.Sprintf("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow, workflow_sub_type, defer_secondary_keys from _vt.vreplication where id=%v", uid)
qr, err := dbClient.ExecuteFetch(query, 1)
if err != nil {
return VRSettings{}, fmt.Errorf("error %v in selecting vreplication settings %v", err, query)
Expand Down Expand Up @@ -646,27 +648,32 @@ func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) {
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse workflow_sub_type column: %v", err)
}
deferSecondaryKeys, err := vrRow.ToBool("defer_secondary_keys")
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse defer_secondary_keys column: %v", err)
}
return VRSettings{
StartPos: startPos,
StopPos: stopPos,
MaxTPS: maxTPS,
MaxReplicationLag: maxReplicationLag,
State: vrRow.AsString("state", ""),
WorkflowType: workflowType,
WorkflowName: vrRow.AsString("workflow", ""),
WorkflowSubType: workflowSubType,
StartPos: startPos,
StopPos: stopPos,
MaxTPS: maxTPS,
MaxReplicationLag: maxReplicationLag,
State: vrRow.AsString("state", ""),
WorkflowType: workflowType,
WorkflowName: vrRow.AsString("workflow", ""),
WorkflowSubType: workflowSubType,
DeferSecondaryKeys: deferSecondaryKeys,
}, nil
}

// CreateVReplication returns a statement to populate the first value into
// the _vt.vreplication table.
func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, maxTPS, maxReplicationLag, timeUpdated int64, dbName string,
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType) string {
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) string {
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %v, %v)",
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %v, %v, %v)",
encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag,
timeUpdated, BlpRunning, encodeString(dbName), int64(workflowType), int64(workflowSubType))
timeUpdated, BlpRunning, encodeString(dbName), int64(workflowType), int64(workflowSubType), deferSecondaryKeys)
}

// CreateVReplicationState returns a statement to create a stopped vreplication.
Expand Down
Loading