Skip to content

Commit

Permalink
Properly handle multiple streams in UpdateVReplicationWorkflow
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Nov 3, 2023
1 parent 3037acd commit ecbe83d
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 85 deletions.
6 changes: 4 additions & 2 deletions go/textutil/strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"unicode"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/proto/binlogdata"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -94,7 +94,7 @@ func ValueIsSimulatedNull(val any) bool {
return cval == SimulatedNullString
case []string:
return len(cval) == 1 && cval[0] == sqltypes.NULL.String()
case binlogdata.OnDDLAction:
case binlogdatapb.OnDDLAction:
return int32(cval) == int32(SimulatedNullInt)
case int:
return cval == SimulatedNullInt
Expand All @@ -104,6 +104,8 @@ func ValueIsSimulatedNull(val any) bool {
return int64(cval) == int64(SimulatedNullInt)
case []topodatapb.TabletType:
return len(cval) == 1 && cval[0] == topodatapb.TabletType(SimulatedNullInt)
case binlogdatapb.VReplicationWorkflowState:
return int32(cval) == int32(SimulatedNullInt)
default:
return false
}
Expand Down
133 changes: 69 additions & 64 deletions go/vt/vttablet/tabletmanager/rpc_vreplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ const (
sqlReadVReplicationWorkflow = "select id, source, pos, stop_pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, message, db_name, rows_copied, tags, time_heartbeat, workflow_type, time_throttled, component_throttled, workflow_sub_type, defer_secondary_keys from %s.vreplication where workflow = %a and db_name = %a"
// Delete VReplication records for the given workflow.
sqlDeleteVReplicationWorkflow = "delete from %s.vreplication where workflow = %a and db_name = %a"
// Retrieve the current configuration values for a workflow's vreplication stream.
// Retrieve the current configuration values for a workflow's vreplication stream(s).
sqlSelectVReplicationWorkflowConfig = "select id, source, cell, tablet_types, state, message from %s.vreplication where workflow = %a"
// Update the configuration values for a workflow's vreplication stream.
sqlUpdateVReplicationWorkflowConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a where id = %a"
sqlUpdateVReplicationWorkflowStreamConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a where id = %a"
)

func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
Expand Down Expand Up @@ -228,8 +228,8 @@ func (tm *TabletManager) ReadVReplicationWorkflow(ctx context.Context, req *tabl
}

// UpdateVReplicationWorkflow updates the sidecar databases's vreplication
// record for this tablet's vreplication workflow stream(s). If there
// is no stream for the given workflow on the tablet then a nil result
// record(s) for this tablet's vreplication workflow stream(s). If there
// are no streams for the given workflow on the tablet then a nil result
// is returned as this is expected e.g. on source tablets of a
// Reshard workflow (source and target are the same keyspace). The
// caller can consider this case an error if they choose to.
Expand Down Expand Up @@ -257,68 +257,73 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta
return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: nil}, nil
}

row := res.Named().Row()
id := row.AsInt64("id", 0)
cells := strings.Split(row.AsString("cell", ""), ",")
tabletTypes, inorder, err := discovery.ParseTabletTypesAndOrder(row.AsString("tablet_types", ""))
if err != nil {
return nil, err
}
bls := &binlogdatapb.BinlogSource{}
source := row.AsBytes("source", []byte{})
state := row.AsString("state", "")
message := row.AsString("message", "")
if req.State == binlogdatapb.VReplicationWorkflowState_Running && strings.ToUpper(message) == workflow.Frozen {
return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: nil},
vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "cannot start a workflow when it is frozen")
}
// For the string based values, we use NULL to differentiate
// from an empty string. The NULL value indicates that we
// should keep the existing value.
if !textutil.ValueIsSimulatedNull(req.Cells) {
cells = req.Cells
}
if !textutil.ValueIsSimulatedNull(req.TabletTypes) {
tabletTypes = req.TabletTypes
}
tabletTypesStr := topoproto.MakeStringTypeCSV(tabletTypes)
if inorder && req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_UNKNOWN ||
req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER {
tabletTypesStr = discovery.InOrderHint + tabletTypesStr
}
if err = prototext.Unmarshal(source, bls); err != nil {
return nil, err
}
// If we don't want to update the existing value then pass
// the simulated NULL value of -1.
if !textutil.ValueIsSimulatedNull(req.OnDdl) {
bls.OnDdl = req.OnDdl
}
source, err = prototext.Marshal(bls)
if err != nil {
return nil, err
}
if !textutil.ValueIsSimulatedNull(req.State) {
state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)]
}
bindVars = map[string]*querypb.BindVariable{
"st": sqltypes.StringBindVariable(state),
"sc": sqltypes.StringBindVariable(string(source)),
"cl": sqltypes.StringBindVariable(strings.Join(cells, ",")),
"tt": sqltypes.StringBindVariable(tabletTypesStr),
"id": sqltypes.Int64BindVariable(id),
}
parsed = sqlparser.BuildParsedQuery(sqlUpdateVReplicationWorkflowConfig, sidecar.GetIdentifier(), ":st", ":sc", ":cl", ":tt", ":id")
stmt, err = parsed.GenerateQuery(bindVars, nil)
if err != nil {
return nil, err
for _, row := range res.Named().Rows {
id := row.AsInt64("id", 0)
cells := strings.Split(row.AsString("cell", ""), ",")
tabletTypes, inorder, err := discovery.ParseTabletTypesAndOrder(row.AsString("tablet_types", ""))
if err != nil {
return nil, err
}
bls := &binlogdatapb.BinlogSource{}
source := row.AsBytes("source", []byte{})
state := row.AsString("state", "")
message := row.AsString("message", "")
if req.State == binlogdatapb.VReplicationWorkflowState_Running && strings.ToUpper(message) == workflow.Frozen {
return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: nil},
vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "cannot start a workflow when it is frozen")
}
// For the string based values, we use NULL to differentiate
// from an empty string. The NULL value indicates that we
// should keep the existing value.
if !textutil.ValueIsSimulatedNull(req.Cells) {
cells = req.Cells
}
if !textutil.ValueIsSimulatedNull(req.TabletTypes) {
tabletTypes = req.TabletTypes
}
tabletTypesStr := topoproto.MakeStringTypeCSV(tabletTypes)
if inorder && req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_UNKNOWN ||
req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER {
tabletTypesStr = discovery.InOrderHint + tabletTypesStr
}
if err = prototext.Unmarshal(source, bls); err != nil {
return nil, err
}
// If we don't want to update the existing value then pass
// the simulated NULL value of -1.
if !textutil.ValueIsSimulatedNull(req.OnDdl) {
bls.OnDdl = req.OnDdl
}
source, err = prototext.Marshal(bls)
if err != nil {
return nil, err
}
if !textutil.ValueIsSimulatedNull(req.State) {
state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)]
}
bindVars = map[string]*querypb.BindVariable{
"st": sqltypes.StringBindVariable(state),
"sc": sqltypes.StringBindVariable(string(source)),
"cl": sqltypes.StringBindVariable(strings.Join(cells, ",")),
"tt": sqltypes.StringBindVariable(tabletTypesStr),
"id": sqltypes.Int64BindVariable(id),
}
parsed = sqlparser.BuildParsedQuery(sqlUpdateVReplicationWorkflowStreamConfig, sidecar.GetIdentifier(), ":st", ":sc", ":cl", ":tt", ":id")
stmt, err = parsed.GenerateQuery(bindVars, nil)
if err != nil {
return nil, err
}
res, err = tm.VREngine.Exec(stmt)
if err != nil {
return nil, err
}
}
res, err = tm.VREngine.Exec(stmt)

if err != nil {
return nil, err
}
return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{
Result: &querypb.QueryResult{
RowsAffected: uint64(len(res.Rows)),
},
}, nil
}

// VReplicationExec executes a vreplication command.
Expand Down
56 changes: 37 additions & 19 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,20 +479,22 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
keyspace, shard)
selectRes := sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|source|cell|tablet_types",
"int64|varchar|varchar|varchar",
"id|source|cell|tablet_types|state|message",
"int64|varchar|varchar|varchar|varchar|varbinary",
),
fmt.Sprintf("%d|%s|%s|%s", vreplID, blsStr, cells[0], tabletTypes[0]),
fmt.Sprintf("%d|%s|%s|%s|Running|", vreplID, blsStr, cells[0], tabletTypes[0]),
fmt.Sprintf("%d|%s|%s|%s|Running|", vreplID+1, blsStr, cells[0], tabletTypes[0]),
)
idQuery, err := sqlparser.ParseAndBind("select id from _vt.vreplication where id = %a",
sqltypes.Int64BindVariable(int64(vreplID)))
idQuery, err := sqlparser.ParseAndBind("select id from _vt.vreplication where workflow = %a",
sqltypes.StringBindVariable(workflow))
require.NoError(t, err)
idRes := sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id",
"int64",
),
fmt.Sprintf("%d", vreplID),
fmt.Sprintf("%d", vreplID+1),
)

tests := []struct {
Expand All @@ -504,61 +506,79 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
name: "update cells",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
Cells: []string{"zone2"},
// TabletTypes is an empty value, so the current value should be cleared
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '' where id in (%d)`,
keyspace, shard, "zone2", vreplID),
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '' where id in (%s)`,
keyspace, shard, "zone2", fmt.Sprintf("%d, %d", vreplID, vreplID+1)),
},
{
name: "update cells, NULL tablet_types",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
Cells: []string{"zone3"},
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, // So keep the current value of replica
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, "zone3", tabletTypes[0], vreplID),
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%s)`,
keyspace, shard, "zone3", tabletTypes[0], fmt.Sprintf("%d, %d", vreplID, vreplID+1)),
},
{
name: "update tablet_types",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
TabletSelectionPreference: tabletmanagerdatapb.TabletSelectionPreference_INORDER,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA},
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '', tablet_types = '%s' where id in (%d)`,
keyspace, shard, "in_order:rdonly,replica", vreplID),
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '', tablet_types = '%s' where id in (%s)`,
keyspace, shard, "in_order:rdonly,replica", fmt.Sprintf("%d, %d", vreplID, vreplID+1)),
},
{
name: "update tablet_types, NULL cells",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
Cells: textutil.SimulatedNullStringSlice, // So keep the current value of zone1
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY},
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, cells[0], "rdonly", vreplID),
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%s)`,
keyspace, shard, cells[0], "rdonly", fmt.Sprintf("%d, %d", vreplID, vreplID+1)),
},
{
name: "update on_ddl",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
OnDdl: binlogdatapb.OnDDLAction_EXEC,
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`,
keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), vreplID),
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%s)`,
keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), fmt.Sprintf("%d, %d", vreplID, vreplID+1)),
},
{
name: "update cell,tablet_types,on_ddl",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt),
Cells: []string{"zone1", "zone2", "zone3"},
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_PRIMARY},
OnDdl: binlogdatapb.OnDDLAction_EXEC_IGNORE,
},
query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", vreplID),
query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%s)`,
keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", fmt.Sprintf("%d, %d", vreplID, vreplID+1)),
},
{
name: "update state",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState_Stopped,
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
},
query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%s)`,
binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], fmt.Sprintf("%d, %d", vreplID, vreplID+1)),
},
}

Expand All @@ -575,8 +595,6 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
require.NotNil(t, tt.request, "No request provided")
require.NotEqual(t, "", tt.query, "No expected query provided")

tt.request.State = binlogdatapb.VReplicationWorkflowState_Stopped

// These are the same for each RPC call.
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil)
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil)
Expand Down

0 comments on commit ecbe83d

Please sign in to comment.