diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go index f3bf5869ab1..ecceaa4b41c 100644 --- a/go/vt/vtctl/workflow/framework_test.go +++ b/go/vt/vtctl/workflow/framework_test.go @@ -267,7 +267,7 @@ type testTMClient struct { mu sync.Mutex vrQueries map[int][]*queryResult - createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest + createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest primaryPositions map[uint32]string vdiffRequests map[uint32]*vdiffRequestResponse @@ -289,7 +289,7 @@ func newTestTMClient(env *testEnv) *testTMClient { return &testTMClient{ schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), vrQueries: make(map[int][]*queryResult), - createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), + createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse), readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest), readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse), primaryPositions: make(map[uint32]string), @@ -304,9 +304,12 @@ func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet defer tmc.mu.Unlock() if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { - if !proto.Equal(expect, req) { + if expect.req != nil && !proto.Equal(expect.req, req) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", req, expect) } + if expect.res != nil { + return expect.res, expect.err + } } res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1") return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil @@ -418,20 +421,29 @@ func (tmc *testTMClient) expectVRQueryResultOnKeyspaceTablets(keyspace string, q } } -func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) { +func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) { tmc.mu.Lock() defer tmc.mu.Unlock() tmc.createVReplicationWorkflowRequests[tabletID] = req } +func (tmc *testTMClient) expectCreateVReplicationWorkflowRequestOnTargetTablets(req *createVReplicationWorkflowRequestResponse) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + for _, tablet := range tmc.env.tablets[tmc.env.targetKeyspace.KeyspaceName] { + tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid] = req + } +} + func (tmc *testTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { tmc.mu.Lock() defer tmc.mu.Unlock() qrs := tmc.vrQueries[int(tablet.Alias.Uid)] if len(qrs) == 0 { - return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query) + return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query) } matched := false if qrs[0].query[0] == '/' { @@ -479,6 +491,12 @@ type vdiffRequestResponse struct { err error } +type createVReplicationWorkflowRequestResponse struct { + req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest + res *tabletmanagerdatapb.CreateVReplicationWorkflowResponse + err error +} + func (tmc *testTMClient) expectVDiffRequest(tablet *topodatapb.Tablet, vrr *vdiffRequestResponse) { tmc.mu.Lock() defer tmc.mu.Unlock() diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 3d0ca674e02..9b8251257a8 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -113,13 +113,11 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe if err := validateNewWorkflow(mz.ctx, mz.ts, mz.tmc, mz.ms.TargetKeyspace, mz.ms.Workflow); err != nil { return err } + err := mz.buildMaterializer() if err != nil { return err } - if err := mz.deploySchema(); err != nil { - return err - } var workflowSubType binlogdatapb.VReplicationWorkflowSubType workflowSubType, err = mz.getWorkflowSubType() @@ -133,6 +131,10 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe } req.Options = optionsJSON + if err := mz.deploySchema(); err != nil { + return err + } + return mz.forAllTargets(func(target *topo.ShardInfo) error { targetPrimary, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias) if err != nil { @@ -304,7 +306,7 @@ func (mz *materializer) deploySchema() error { continue } if ts.CreateDdl == "" { - return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable) + return fmt.Errorf("target table %s does not exist and there is no create ddl defined", ts.TargetTable) } var err error diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index aada59c244d..8b407dbcb0c 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -33,6 +33,7 @@ import ( "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topotools" @@ -120,16 +121,39 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M if err == nil { tableName = table.Name.String() } + var ( + cols []string + fields []*querypb.Field + ) + if ts.CreateDdl != "" { + stmt, err := env.venv.Parser().ParseStrictDDL(ts.CreateDdl) + require.NoError(t, err) + ddl, ok := stmt.(*sqlparser.CreateTable) + require.True(t, ok) + cols = make([]string, len(ddl.TableSpec.Columns)) + fields = make([]*querypb.Field, len(ddl.TableSpec.Columns)) + for i, col := range ddl.TableSpec.Columns { + cols[i] = col.Name.String() + fields[i] = &querypb.Field{ + Name: col.Name.String(), + Type: col.Type.SQLType(), + } + } + } env.tmc.schema[ms.SourceKeyspace+"."+tableName] = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ - Name: tableName, - Schema: fmt.Sprintf("%s_schema", tableName), + Name: tableName, + Schema: ts.CreateDdl, + Columns: cols, + Fields: fields, }}, } env.tmc.schema[ms.TargetKeyspace+"."+ts.TargetTable] = &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ - Name: ts.TargetTable, - Schema: fmt.Sprintf("%s_schema", ts.TargetTable), + Name: ts.TargetTable, + Schema: ts.CreateDdl, + Columns: cols, + Fields: fields, }}, } } @@ -199,7 +223,7 @@ type testMaterializerTMClient struct { mu sync.Mutex vrQueries map[int][]*queryResult - createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest + createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse // Used to confirm the number of times WorkflowDelete was called. workflowDeleteCalls int @@ -215,21 +239,29 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe sourceShards: sourceShards, tableSettings: tableSettings, vrQueries: make(map[int][]*queryResult), - createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), + createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse), } } func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { - if !proto.Equal(expect, request) { + if expect.req != nil && !proto.Equal(expect.req, request) { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect) } + if expect.res != nil { + return expect.res, expect.err + } } res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1") return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil } func (tmc *testMaterializerTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + if tmc.readVReplicationWorkflow != nil { return tmc.readVReplicationWorkflow(ctx, tablet, request) } @@ -283,6 +315,9 @@ func (tmc *testMaterializerTMClient) DeleteVReplicationWorkflow(ctx context.Cont } func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + schemaDefn := &tabletmanagerdatapb.SchemaDefinition{} for _, table := range request.Tables { if table == "/.*/" { @@ -315,7 +350,7 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r }) } -func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) { +func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) { tmc.mu.Lock() defer tmc.mu.Unlock() @@ -344,7 +379,7 @@ func (tmc *testMaterializerTMClient) VReplicationExec(ctx context.Context, table qrs := tmc.vrQueries[int(tablet.Alias.Uid)] if len(qrs) == 0 { - return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query) + return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query) } matched := false if qrs[0].query[0] == '/' { @@ -403,6 +438,9 @@ func (tmc *testMaterializerTMClient) HasVReplicationWorkflows(ctx context.Contex } func (tmc *testMaterializerTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables if len(req.IncludeWorkflows) > 0 { for _, wf := range req.IncludeWorkflows { diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 763dd7c04d3..8d3335041a3 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -18,6 +18,7 @@ package workflow import ( "context" + "errors" "fmt" "slices" "strings" @@ -1174,13 +1175,16 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) { setStartingVschema() }() } - outms, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) + outms, _, _, cancelFunc, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false) if tcase.err != "" { require.Error(t, err) require.Contains(t, err.Error(), tcase.err, "prepareCreateLookup(%s) err: %v, does not contain %v", tcase.description, err, tcase.err) return } require.NoError(t, err) + // All of these test cases create a table and thus change the target + // vschema. + require.NotNil(t, cancelFunc) want := strings.Split(tcase.out, "\n") got := strings.Split(outms.TableSettings[0].CreateDdl, "\n") require.Equal(t, want, got, tcase.description) @@ -1419,7 +1423,7 @@ func TestCreateLookupVindexSourceVSchema(t *testing.T) { t.Fatal(err) } - _, got, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) + _, got, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, tcase.out) { t.Errorf("%s: got:\n%v, want\n%v", tcase.description, got, tcase.out) @@ -1654,7 +1658,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { t.Fatal(err) } - _, _, got, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) + _, _, got, cancelFunc, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false) if tcase.err != "" { if err == nil || !strings.Contains(err.Error(), tcase.err) { t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err) @@ -1662,6 +1666,9 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { continue } require.NoError(t, err) + // withTable is a vschema that already contains the table and thus + // we don't make any vschema changes and there's nothing to cancel. + require.True(t, (cancelFunc != nil) == (tcase.targetVSchema != withTable)) utils.MustMatch(t, tcase.out, got, tcase.description) } } @@ -1772,7 +1779,7 @@ func TestCreateLookupVindexSameKeyspace(t *testing.T) { t.Fatal(err) } - _, got, _, err := env.ws.prepareCreateLookup(ctx, "keyspace", ms.TargetKeyspace, specs, false) + _, got, _, _, err := env.ws.prepareCreateLookup(ctx, "keyspace", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, want) { t.Errorf("same keyspace: got:\n%v, want\n%v", got, want) @@ -1898,7 +1905,7 @@ func TestCreateCustomizedVindex(t *testing.T) { t.Fatal(err) } - _, got, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + _, got, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, want) { t.Errorf("customize create lookup error same: got:\n%v, want\n%v", got, want) @@ -2016,7 +2023,7 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) { t.Fatal(err) } - ms, ks, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + ms, ks, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(wantKs, ks) { t.Errorf("unexpected keyspace value: got:\n%v, want\n%v", ks, wantKs) @@ -2096,21 +2103,34 @@ func TestStopAfterCopyFlag(t *testing.T) { t.Fatal(err) } - ms1, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) + ms1, _, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false) require.NoError(t, err) require.Equal(t, ms1.StopAfterCopy, true) - ms2, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, true) + ms2, _, _, _, err := env.ws.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, true) require.NoError(t, err) require.Equal(t, ms2.StopAfterCopy, false) } func TestCreateLookupVindexFailures(t *testing.T) { ms := &vtctldatapb.MaterializeSettings{ - // Keyspace where the vindex is created. - SourceKeyspace: "sourceks", - // Keyspace where the lookup table and VReplication workflow is created. + SourceKeyspace: "sourceks", // Not used + // Keyspace where the lookup table, vindex, and VReplication workflow is created. TargetKeyspace: "targetks", + TableSettings: []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: "t1", + CreateDdl: "CREATE TABLE `t1` (\n`c1` INT,\n PRIMARY KEY(`c1`)\n)", + }, + { + TargetTable: "t2", + CreateDdl: "CREATE TABLE `t2` (\n`c2` INT,\n PRIMARY KEY(`c2`)\n)", + }, + { + TargetTable: "t3", + CreateDdl: "CREATE TABLE `t3` (\n`c3` INT,\n PRIMARY KEY(`c3`)\n)", + }, + }, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -2122,7 +2142,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2135,10 +2155,10 @@ func TestCreateLookupVindexFailures(t *testing.T) { "xxhash": { Type: "xxhash", }, - "v": { + "v1": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", "write_only": "true", @@ -2148,19 +2168,28 @@ func TestCreateLookupVindexFailures(t *testing.T) { Tables: map[string]*vschemapb.Table{ "t1": { ColumnVindexes: []*vschemapb.ColumnVindex{{ - Name: "v", + Name: "v1", Column: "c1", }}, }, + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v2", + Column: "c2", + }}, + }, }, } err := env.topoServ.SaveVSchema(ctx, ms.TargetKeyspace, vs) require.NoError(t, err) testcases := []struct { - description string - input *vschemapb.Keyspace - err string + description string + input *vschemapb.Keyspace + createRequest *createVReplicationWorkflowRequestResponse + vrepExecQueries []string + schemaAdditions []*tabletmanagerdatapb.TableDefinition + err string }{ { description: "dup vindex", @@ -2208,7 +2237,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1,c2", "to": "c3", }, @@ -2224,7 +2253,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2240,7 +2269,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_noexist", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1,c2", "to": "c2", }, @@ -2264,7 +2293,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2324,7 +2353,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "v": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2377,7 +2406,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { "xxhash": { Type: "lookup_unique", Params: map[string]string{ - "table": "targetks.t", + "table": fmt.Sprintf("%s.t", ms.TargetKeyspace), "from": "c1", "to": "c2", }, @@ -2393,7 +2422,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, }, }, - err: "a conflicting vindex named xxhash already exists in the targetks keyspace", + err: fmt.Sprintf("a conflicting vindex named xxhash already exists in the %s keyspace", ms.TargetKeyspace), }, { description: "source table not in vschema", @@ -2408,7 +2437,37 @@ func TestCreateLookupVindexFailures(t *testing.T) { }, }, }, - err: "table other not found in the targetks keyspace", + err: fmt.Sprintf("table other not found in the %s keyspace", ms.TargetKeyspace), + }, + { + description: "workflow creation error", + input: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v2": { + Type: "consistent_lookup_unique", + Params: map[string]string{ + "table": fmt.Sprintf("%s.t1_lkp", ms.TargetKeyspace), + "from": "c1", + "to": "keyspace_id", + }, + }, + }, + Tables: map[string]*vschemapb.Table{ + "t2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v2", + Column: "c2", + }}, + }, + }, + }, + vrepExecQueries: []string{"CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)"}, + createRequest: &createVReplicationWorkflowRequestResponse{ + req: nil, // We don't care about defining it in this case + res: &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{}, + err: errors.New("we gots us an error"), + }, + err: "we gots us an error", }, } for _, tcase := range testcases { @@ -2418,10 +2477,40 @@ func TestCreateLookupVindexFailures(t *testing.T) { Keyspace: ms.TargetKeyspace, Vindex: tcase.input, } + if len(tcase.schemaAdditions) > 0 { + ogs := env.tmc.schema + defer func() { + env.tmc.schema = ogs + }() + // The tables are created in the target keyspace. + for _, tbl := range tcase.schemaAdditions { + env.tmc.schema[ms.TargetKeyspace+"."+tbl.Name] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{tbl}, + } + } + } + for _, tablet := range env.tablets { + if tablet.Keyspace == ms.TargetKeyspace { + for _, vrq := range tcase.vrepExecQueries { + env.tmc.expectVRQuery(int(tablet.Alias.Uid), vrq, &sqltypes.Result{}) + } + if tcase.createRequest != nil { + env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tcase.createRequest) + } + } + } _, err := env.ws.LookupVindexCreate(ctx, req) if !strings.Contains(err.Error(), tcase.err) { t.Errorf("CreateLookupVindex(%s) err: %v, must contain %v", tcase.description, err, tcase.err) } + // Confirm that the original vschema where the vindex would + // be created is still in place -- since the workflow + // creation failed in each test case. That vindex is created + // in the target keyspace based on the MaterializeSettings + // definition. + cvs, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace) + require.NoError(t, err) + require.True(t, proto.Equal(vs, cvs), "expected: %+v, got: %+v", vs, cvs) }) } } @@ -2696,7 +2785,10 @@ func TestKeyRangesEqualOptimization(t *testing.T) { if len(tc.moveTablesReq.SourceShards) > 0 && !slices.Contains(tc.moveTablesReq.SourceShards, tablet.Shard) { continue } - env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tc.wantReqs[tablet.Alias.Uid]) + reqRes := &createVReplicationWorkflowRequestResponse{ + req: tc.wantReqs[tablet.Alias.Uid], + } + env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, reqRes) } mz := &materializer{ diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index f4fb4a354fb..5d7edfdc82b 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1133,22 +1133,33 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup span.Annotate("cells", req.Cells) span.Annotate("tablet_types", req.TabletTypes) - ms, sourceVSchema, targetVSchema, err := s.prepareCreateLookup(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner) + ms, sourceVSchema, targetVSchema, cancelFunc, err := s.prepareCreateLookup(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner) if err != nil { return nil, err } + if err := s.ts.SaveVSchema(ctx, ms.TargetKeyspace, targetVSchema); err != nil { - return nil, err + return nil, vterrors.Wrapf(err, "failed to save updated vschema '%v' in the %s keyspace", + targetVSchema, ms.TargetKeyspace) } - ms.TabletTypes = topoproto.MakeStringTypeCSV(req.TabletTypes) ms.TabletSelectionPreference = req.TabletSelectionPreference if err := s.Materialize(ctx, ms); err != nil { + if cancelFunc != nil { + if cerr := cancelFunc(); cerr != nil { + err = vterrors.Wrapf(err, "failed to restore original vschema '%v' in the %s keyspace: %v", + targetVSchema, ms.TargetKeyspace, cerr) + } + } return nil, err } - if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVSchema); err != nil { - return nil, err + if ms.SourceKeyspace != ms.TargetKeyspace { + if err := s.ts.SaveVSchema(ctx, ms.SourceKeyspace, sourceVSchema); err != nil { + return nil, vterrors.Wrapf(err, "failed to save updated vschema '%v' in the %s keyspace", + sourceVSchema, ms.SourceKeyspace) + } } + if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } @@ -3845,7 +3856,8 @@ func fillStringTemplate(tmpl string, vars any) (string, error) { // prepareCreateLookup performs the preparatory steps for creating a // Lookup Vindex. -func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, err error) { +func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) ( + ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, cancelFunc func() error, err error) { // Important variables are pulled out here. var ( vindexName string @@ -3871,19 +3883,19 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str // Validate input vindex. if specs == nil { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no vindex provided") + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no vindex provided") } if len(specs.Vindexes) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "only one vindex must be specified") + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "only one vindex must be specified") } vindexName = maps.Keys(specs.Vindexes)[0] vindex = maps.Values(specs.Vindexes)[0] if !strings.Contains(vindex.Type, "lookup") { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex %s is not a lookup type", vindex.Type) + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex %s is not a lookup type", vindex.Type) } targetKeyspace, targetTableName, err = s.env.Parser().ParseTable(vindex.Params["table"]) if err != nil || targetKeyspace == "" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex table name (%s) must be in the form .", vindex.Params["table"]) + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex table name (%s) must be in the form .
", vindex.Params["table"]) } vindexFromCols = strings.Split(vindex.Params["from"], ",") for i, col := range vindexFromCols { @@ -3891,11 +3903,11 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } if strings.Contains(vindex.Type, "unique") { if len(vindexFromCols) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unique vindex 'from' should have only one column") + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unique vindex 'from' should have only one column") } } else { if len(vindexFromCols) < 2 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "non-unique vindex 'from' should have more than one column") + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "non-unique vindex 'from' should have more than one column") } } vindexToCol = vindex.Params["to"] @@ -3904,7 +3916,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str vindex.Params["write_only"] = "true" // See if we can create the vindex without errors. if _, err := vindexes.CreateVindex(vindex.Type, vindexName, vindex.Params); err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if ignoreNullsStr, ok := vindex.Params["ignore_nulls"]; ok { // This mirrors the behavior of vindexes.boolFromMap(). @@ -3914,19 +3926,22 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str case "false": vindexIgnoreNulls = false default: - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls (%s) value must be 'true' or 'false'", - ignoreNullsStr) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ignore_nulls (%s) value must be 'true' or 'false'", + ignoreNullsStr) } } // Validate input table. if len(specs.Tables) < 1 || len(specs.Tables) > 2 { - return nil, nil, nil, fmt.Errorf("one or two tables must be specified") + return nil, nil, nil, nil, fmt.Errorf("one or two tables must be specified") } // Loop executes once or twice. for tableName, table := range specs.Tables { if len(table.ColumnVindexes) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "exactly one ColumnVindex must be specified for the %s table", tableName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "exactly one ColumnVindex must be specified for the %s table", + tableName) } if tableName != targetTableName { // This is the source table. sourceTableName = tableName @@ -3940,42 +3955,55 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str vindexCols = table.ColumnVindexes[0].Columns } else { if table.ColumnVindexes[0].Column == "" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", tableName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", + tableName) } vindexCols = []string{table.ColumnVindexes[0].Column} } if !slices.Equal(vindexCols, vindexFromCols) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "columns in the lookup table %s primary vindex (%s) don't match the 'from' columns specified (%s)", - tableName, strings.Join(vindexCols, ","), strings.Join(vindexFromCols, ",")) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "columns in the lookup table %s primary vindex (%s) don't match the 'from' columns specified (%s)", + tableName, strings.Join(vindexCols, ","), strings.Join(vindexFromCols, ",")) } } // Validate input table and vindex consistency. if sourceTable == nil || len(sourceTable.ColumnVindexes) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No ColumnVindex found for the owner table in the %s keyspace", keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "No ColumnVindex found for the owner table (%s) in the %s keyspace", + sourceTable, keyspace) } if sourceTable.ColumnVindexes[0].Name != vindexName { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ColumnVindex name (%s) must match vindex name (%s)", sourceTable.ColumnVindexes[0].Name, vindexName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "ColumnVindex name (%s) must match vindex name (%s)", + sourceTable.ColumnVindexes[0].Name, vindexName) } if vindex.Owner != "" && vindex.Owner != sourceTableName { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex owner (%s) must match table name (%s)", vindex.Owner, sourceTableName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex owner (%s) must match table name (%s)", + vindex.Owner, sourceTableName) } if len(sourceTable.ColumnVindexes[0].Columns) != 0 { sourceVindexColumns = sourceTable.ColumnVindexes[0].Columns } else { if sourceTable.ColumnVindexes[0].Column == "" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", sourceTableName) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "at least one column must be specified in ColumnVindexes for the %s table", + sourceTableName) } sourceVindexColumns = []string{sourceTable.ColumnVindexes[0].Column} } if len(sourceVindexColumns) != len(vindexFromCols) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "length of table columns (%d) differs from length of vindex columns (%d)", len(sourceVindexColumns), len(vindexFromCols)) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "length of table columns (%d) differs from length of vindex columns (%d)", + len(sourceVindexColumns), len(vindexFromCols)) } // Validate against source vschema. sourceVSchema, err = s.ts.GetVSchema(ctx, keyspace) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if sourceVSchema.Vindexes == nil { sourceVSchema.Vindexes = make(map[string]*vschemapb.Vindex) @@ -3987,7 +4015,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } else { targetVSchema, err = s.ts.GetVSchema(ctx, targetKeyspace) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } } if targetVSchema.Vindexes == nil { @@ -3998,12 +4026,15 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } if existing, ok := sourceVSchema.Vindexes[vindexName]; ok { if !proto.Equal(existing, vindex) { // If the exact same vindex already exists then we can re-use it - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "a conflicting vindex named %s already exists in the %s keyspace", vindexName, keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "a conflicting vindex named %s already exists in the %s keyspace", + vindexName, keyspace) } } sourceVSchemaTable = sourceVSchema.Tables[sourceTableName] if sourceVSchemaTable == nil && !schema.IsInternalOperationTableName(sourceTableName) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s not found in the %s keyspace", sourceTableName, keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table %s not found in the %s keyspace", sourceTableName, keyspace) } for _, colVindex := range sourceVSchemaTable.ColumnVindexes { // For a conflict, the vindex name and column should match. @@ -4020,41 +4051,47 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str // are not the same then they are two distinct conflicting vindexes and we should // not proceed. if !slices.Equal(colNames, sourceVindexColumns) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting ColumnVindex on column(s) %s in table %s already exists in the %s keyspace", - strings.Join(colNames, ","), sourceTableName, keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting ColumnVindex on column(s) %s in table %s already exists in the %s keyspace", + strings.Join(colNames, ","), sourceTableName, keyspace) } } // Validate against source schema. sourceShards, err := s.ts.GetServingShards(ctx, keyspace) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } onesource := sourceShards[0] if onesource.PrimaryAlias == nil { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "source shard %s has no primary", onesource.ShardName()) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "source shard %s has no primary", onesource.ShardName()) } req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{sourceTableName}} tableSchema, err := schematools.GetSchema(ctx, s.ts, s.tmc, onesource.PrimaryAlias, req) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } if len(tableSchema.TableDefinitions) != 1 { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected number of tables (%d) returned from %s schema", len(tableSchema.TableDefinitions), keyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected number of tables (%d) returned from %s schema", + len(tableSchema.TableDefinitions), keyspace) } // Generate "create table" statement. lines := strings.Split(tableSchema.TableDefinitions[0].Schema, "\n") if len(lines) < 3 { // Should never happen. - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "schema looks incorrect: %s, expecting at least four lines", tableSchema.TableDefinitions[0].Schema) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "schema looks incorrect: %s, expecting at least four lines", + tableSchema.TableDefinitions[0].Schema) } var modified []string modified = append(modified, strings.Replace(lines[0], sourceTableName, targetTableName, 1)) for i := range sourceVindexColumns { line, err := generateColDef(lines, sourceVindexColumns[i], vindexFromCols[i]) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } modified = append(modified, line) } @@ -4077,7 +4114,8 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str createDDL = strings.Join(modified, "\n") // Confirm that our DDL is valid before we create anything. if _, err = s.env.Parser().ParseStrictDDL(createDDL); err != nil { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s", err, createDDL) + return nil, nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s", + err, createDDL) } // Generate vreplication query. @@ -4112,6 +4150,11 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } materializeQuery = buf.String() + // Save a copy of the original vschema if we modify it and need to provide + // a cancelFunc. + ogTargetVSchema := targetVSchema.CloneVT() + targetChanged := false + // Update targetVSchema. targetTable := specs.Tables[targetTableName] if targetVSchema.Sharded { @@ -4127,7 +4170,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str if targetVindexType == "" { targetVindexType, err = vindexes.ChooseVindexForType(field.Type) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } } targetVindex = &vschemapb.Vindex{ @@ -4138,14 +4181,19 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } if targetVindex == nil { // Unreachable. We validated column names when generating the DDL. - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "column %s not found in target schema %s", sourceVindexColumns[0], tableSchema.TableDefinitions[0].Schema) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INTERNAL, "column %s not found in target schema %s", + sourceVindexColumns[0], tableSchema.TableDefinitions[0].Schema) } if existing, ok := targetVSchema.Vindexes[targetVindexType]; ok { if !proto.Equal(existing, targetVindex) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting vindex named %v already exists in the %s keyspace", targetVindexType, targetKeyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting vindex named %v already exists in the %s keyspace", + targetVindexType, targetKeyspace) } } else { targetVSchema.Vindexes[targetVindexType] = targetVindex + targetChanged = true } targetTable = &vschemapb.Table{ @@ -4159,10 +4207,20 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str } if existing, ok := targetVSchema.Tables[targetTableName]; ok { if !proto.Equal(existing, targetTable) { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting table named %s already exists in the %s vschema", targetTableName, targetKeyspace) + return nil, nil, nil, nil, + vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "a conflicting table named %s already exists in the %s vschema", + targetTableName, targetKeyspace) } } else { targetVSchema.Tables[targetTableName] = targetTable + targetChanged = true + } + + if targetChanged { + cancelFunc = func() error { + // Restore the original target vschema. + return s.ts.SaveVSchema(ctx, targetKeyspace, ogTargetVSchema) + } } ms = &vtctldatapb.MaterializeSettings{ @@ -4182,7 +4240,7 @@ func (s *Server) prepareCreateLookup(ctx context.Context, workflow, keyspace str sourceVSchema.Vindexes[vindexName] = vindex sourceVSchemaTable.ColumnVindexes = append(sourceVSchemaTable.ColumnVindexes, sourceTable.ColumnVindexes[0]) - return ms, sourceVSchema, targetVSchema, nil + return ms, sourceVSchema, targetVSchema, cancelFunc, nil } func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (string, error) {