From 0906d05b13e13126bef3f22f798a3b52fcac2625 Mon Sep 17 00:00:00 2001 From: pbibra Date: Thu, 14 Sep 2023 10:56:26 -0700 Subject: [PATCH 1/2] apply vcopy patch 11740 (#130) Signed-off-by: Priya Bibra --- go/vt/proto/binlogdata/binlogdata.pb.go | 63 ++++++++++--------- go/vt/vtgate/endtoend/main_test.go | 12 ++++ go/vt/vtgate/endtoend/vstream_test.go | 37 +++++++++-- go/vt/vtgate/executor.go | 1 + go/vt/vtgate/vstream_manager.go | 39 ++++++++++++ .../tabletserver/vstreamer/uvstreamer.go | 15 ++++- .../vstreamer/uvstreamer_flaky_test.go | 6 +- .../vstreamer/vstreamer_flaky_test.go | 6 +- proto/binlogdata.proto | 4 ++ web/vtadmin/src/proto/vtadmin.d.ts | 3 +- web/vtadmin/src/proto/vtadmin.js | 7 +++ 11 files changed, 153 insertions(+), 40 deletions(-) diff --git a/go/vt/proto/binlogdata/binlogdata.pb.go b/go/vt/proto/binlogdata/binlogdata.pb.go index 545e29ec8f2..7f115eb7ee7 100644 --- a/go/vt/proto/binlogdata/binlogdata.pb.go +++ b/go/vt/proto/binlogdata/binlogdata.pb.go @@ -187,6 +187,10 @@ const ( VEventType_VERSION VEventType = 17 VEventType_LASTPK VEventType = 18 VEventType_SAVEPOINT VEventType = 19 + // COPY_COMPLETED is sent when VTGate's VStream copy operation is done. + // If a client experiences some disruptions before receiving the event, + // the client should restart the copy operation. + VEventType_COPY_COMPLETED VEventType = 20 ) // Enum value maps for VEventType. @@ -212,28 +216,30 @@ var ( 17: "VERSION", 18: "LASTPK", 19: "SAVEPOINT", + 20: "COPY_COMPLETED", } VEventType_value = map[string]int32{ - "UNKNOWN": 0, - "GTID": 1, - "BEGIN": 2, - "COMMIT": 3, - "ROLLBACK": 4, - "DDL": 5, - "INSERT": 6, - "REPLACE": 7, - "UPDATE": 8, - "DELETE": 9, - "SET": 10, - "OTHER": 11, - "ROW": 12, - "FIELD": 13, - "HEARTBEAT": 14, - "VGTID": 15, - "JOURNAL": 16, - "VERSION": 17, - "LASTPK": 18, - "SAVEPOINT": 19, + "UNKNOWN": 0, + "GTID": 1, + "BEGIN": 2, + "COMMIT": 3, + "ROLLBACK": 4, + "DDL": 5, + "INSERT": 6, + "REPLACE": 7, + "UPDATE": 8, + "DELETE": 9, + "SET": 10, + "OTHER": 11, + "ROW": 12, + "FIELD": 13, + "HEARTBEAT": 14, + "VGTID": 15, + "JOURNAL": 16, + "VERSION": 17, + "LASTPK": 18, + "SAVEPOINT": 19, + "COPY_COMPLETED": 20, } ) @@ -2981,7 +2987,7 @@ var file_binlogdata_proto_rawDesc = []byte{ 0x4b, 0x55, 0x50, 0x49, 0x4e, 0x44, 0x45, 0x58, 0x10, 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x4d, 0x49, 0x47, 0x52, 0x41, 0x54, 0x45, 0x10, 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x45, 0x53, 0x48, 0x41, 0x52, 0x44, 0x10, 0x04, 0x12, 0x0d, 0x0a, 0x09, 0x4f, 0x4e, 0x4c, 0x49, 0x4e, 0x45, 0x44, 0x44, - 0x4c, 0x10, 0x05, 0x2a, 0xf9, 0x01, 0x0a, 0x0a, 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, + 0x4c, 0x10, 0x05, 0x2a, 0x8d, 0x02, 0x0a, 0x0a, 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x47, 0x54, 0x49, 0x44, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x03, @@ -2996,13 +3002,14 @@ var file_binlogdata_proto_rawDesc = []byte{ 0x09, 0x0a, 0x05, 0x56, 0x47, 0x54, 0x49, 0x44, 0x10, 0x0f, 0x12, 0x0b, 0x0a, 0x07, 0x4a, 0x4f, 0x55, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x10, 0x12, 0x0b, 0x0a, 0x07, 0x56, 0x45, 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x11, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x41, 0x53, 0x54, 0x50, 0x4b, 0x10, 0x12, - 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x41, 0x56, 0x45, 0x50, 0x4f, 0x49, 0x4e, 0x54, 0x10, 0x13, 0x2a, - 0x27, 0x0a, 0x0d, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, - 0x12, 0x0a, 0x0a, 0x06, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x53, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, - 0x53, 0x48, 0x41, 0x52, 0x44, 0x53, 0x10, 0x01, 0x42, 0x29, 0x5a, 0x27, 0x76, 0x69, 0x74, 0x65, - 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, - 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, - 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x41, 0x56, 0x45, 0x50, 0x4f, 0x49, 0x4e, 0x54, 0x10, 0x13, 0x12, + 0x12, 0x0a, 0x0e, 0x43, 0x4f, 0x50, 0x59, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, + 0x44, 0x10, 0x14, 0x2a, 0x27, 0x0a, 0x0d, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x53, 0x10, 0x00, + 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x48, 0x41, 0x52, 0x44, 0x53, 0x10, 0x01, 0x42, 0x29, 0x5a, 0x27, + 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, + 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x69, 0x6e, + 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/go/vt/vtgate/endtoend/main_test.go b/go/vt/vtgate/endtoend/main_test.go index 17cf3e6dd01..c91c61ec2cd 100644 --- a/go/vt/vtgate/endtoend/main_test.go +++ b/go/vt/vtgate/endtoend/main_test.go @@ -45,6 +45,12 @@ create table t1( primary key(id1) ) Engine=InnoDB; +create table t1_copy_basic( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + create table t1_copy_resume( id1 bigint, id2 bigint, @@ -139,6 +145,12 @@ create table t1_sharded( Name: "t1_id2_vdx", }}, }, + "t1_copy_basic": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id1", + Name: "hash", + }}, + }, "t1_copy_resume": { ColumnVindexes: []*vschemapb.ColumnVindex{{ Column: "id1", diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index a13aac8291d..832799366b1 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -169,7 +169,7 @@ func TestVStreamCopyBasic(t *testing.T) { gconn, conn, mconn, closeConnections := initialize(ctx, t) defer closeConnections() - _, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) + _, err := conn.ExecuteFetch("insert into t1_copy_basic(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) if err != nil { t.Fatal(err) } @@ -180,7 +180,7 @@ func TestVStreamCopyBasic(t *testing.T) { } qr := sqltypes.ResultToProto3(&lastPK) tablePKs := []*binlogdatapb.TableLastPK{{ - TableName: "t1", + TableName: "t1_copy_basic", Lastpk: qr, }} var shardGtids []*binlogdatapb.ShardGtid @@ -200,8 +200,8 @@ func TestVStreamCopyBasic(t *testing.T) { vgtid.ShardGtids = shardGtids filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select * from t1", + Match: "t1_copy_basic", + Filter: "select * from t1_copy_basic", }}, } flags := &vtgatepb.VStreamFlags{} @@ -210,19 +210,44 @@ func TestVStreamCopyBasic(t *testing.T) { if err != nil { t.Fatal(err) } - numExpectedEvents := 2 /* num shards */ * (7 /* begin/field/vgtid:pos/2 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */) + numExpectedEvents := 2 /* num shards */ *(7 /* begin/field/vgtid:pos/2 rowevents avg/vgitd: lastpk/commit) */ +3 /* begin/vgtid/commit for completed table */ +1 /* copy operation completed */) + 1 /* fully copy operation completed */ + expectedCompletedEvents := []string{ + `type:COPY_COMPLETED keyspace:"ks" shard:"-80"`, + `type:COPY_COMPLETED keyspace:"ks" shard:"80-"`, + `type:COPY_COMPLETED`, + } require.NotNil(t, reader) var evs []*binlogdatapb.VEvent + var completedEvs []*binlogdatapb.VEvent for { e, err := reader.Recv() switch err { case nil: evs = append(evs, e...) + + for _, ev := range e { + if ev.Type == binlogdatapb.VEventType_COPY_COMPLETED { + completedEvs = append(completedEvs, ev) + } + } + + printEvents(evs) // for debugging ci failures + if len(evs) == numExpectedEvents { + // The arrival order of COPY_COMPLETED events with keyspace/shard is not constant. + // On the other hand, the last event should always be a fully COPY_COMPLETED event. + // That's why the sort.Slice doesn't have to handle the last element in completedEvs. + sort.Slice(completedEvs[:len(completedEvs)-1], func(i, j int) bool { + return completedEvs[i].GetShard() < completedEvs[j].GetShard() + }) + for i, ev := range completedEvs { + require.Regexp(t, expectedCompletedEvents[i], ev.String()) + } t.Logf("TestVStreamCopyBasic was successful") return + } else if numExpectedEvents < len(evs) { + t.Fatalf("len(events)=%v are not expected\n", len(evs)) } - printEvents(evs) // for debugging ci failures case io.EOF: log.Infof("stream ended\n") cancel() diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index c1ae7e209bb..a960720dced 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -1336,6 +1336,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar vsm: vsm, eventCh: make(chan []*binlogdatapb.VEvent), ts: ts, + copyCompletedShard: make(map[string]struct{}), } _ = vs.stream(ctx) return nil diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 8c6dd9f04f4..a815cdc4f31 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -110,6 +110,9 @@ type vstream struct { // the timestamp of the most recent event, keyed by streamId. streamId is of the form . timestamps map[string]int64 + // the shard map tracking the copy completion, keyed by streamId. streamId is of the form . + copyCompletedShard map[string]struct{} + vsm *vstreamManager eventCh chan []*binlogdatapb.VEvent @@ -171,6 +174,7 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta eventCh: make(chan []*binlogdatapb.VEvent), heartbeatInterval: flags.GetHeartbeatInterval(), ts: ts, + copyCompletedShard: make(map[string]struct{}), } return vs.stream(ctx) } @@ -598,6 +602,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha return err } + if err := vs.sendAll(ctx, sgtid, eventss); err != nil { + return err + } + eventss = nil + sendevents = nil + case binlogdatapb.VEventType_COPY_COMPLETED: + sendevents = append(sendevents, event) + if fullyCopied, doneEvent := vs.isCopyFullyCompleted(ctx, sgtid, event); fullyCopied { + sendevents = append(sendevents, doneEvent) + } + eventss = append(eventss, sendevents) + + if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { + return err + } + if err := vs.sendAll(ctx, sgtid, eventss); err != nil { return err } @@ -733,6 +753,25 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e return nil } +// isCopyFullyCompleted returns true if all stream has received a copy_completed event. +// If true, it will also return a new copy_completed event that needs to be sent. +// This new event represents the completion of all the copy operations. +func (vs *vstream) isCopyFullyCompleted(ctx context.Context, sgtid *binlogdatapb.ShardGtid, event *binlogdatapb.VEvent) (bool, *binlogdatapb.VEvent) { + vs.mu.Lock() + defer vs.mu.Unlock() + + vs.copyCompletedShard[fmt.Sprintf("%s/%s", event.Keyspace, event.Shard)] = struct{}{} + + for _, shard := range vs.vgtid.ShardGtids { + if _, ok := vs.copyCompletedShard[fmt.Sprintf("%s/%s", shard.Keyspace, shard.Shard)]; !ok { + return false, nil + } + } + return true, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_COPY_COMPLETED, + } +} + func (vs *vstream) getError() error { vs.errMu.Lock() defer vs.errMu.Unlock() diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 056d5da1822..2508a625ea1 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -414,7 +414,9 @@ func (uvs *uvstreamer) Stream() error { uvs.vse.errorCounts.Add("Copy", 1) return err } - uvs.sendTestEvent("Copy Done") + if err := uvs.allCopyComplete(); err != nil { + return err + } } vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) @@ -457,6 +459,17 @@ func (uvs *uvstreamer) setCopyState(tableName string, qr *querypb.QueryResult) { uvs.plans[tableName].tablePK.Lastpk = qr } +func (uvs *uvstreamer) allCopyComplete() error { + ev := &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_COPY_COMPLETED, + } + + if err := uvs.send([]*binlogdatapb.VEvent{ev}); err != nil { + return err + } + return nil +} + // dummy event sent only in test mode func (uvs *uvstreamer) sendTestEvent(msg string) { if !uvstreamerTestMode { diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index 1ed673ebf90..610b9012f7f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -233,7 +233,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { } - callbacks["OTHER.*Copy Done"] = func() { + callbacks["COPY_COMPLETED"] = func() { log.Info("Copy done, inserting events to stream") insertRow(t, "t1", 1, numInitialRows+4) insertRow(t, "t2", 2, numInitialRows+3) @@ -252,7 +252,7 @@ commit;" } numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/) - numCopyEvents += 2 /* GTID + Test event after all copy is done */ + numCopyEvents += 2 /* GTID + Event after all copy is done */ numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */ numFastForwardEvents := 5 /*t1:FIELD+ROW*/ numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */ @@ -539,7 +539,7 @@ var expectedEvents = []string{ "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t3\"} completed:true}", "type:COMMIT", - "type:OTHER gtid:\"Copy Done\"", + "type:COPY_COMPLETED", "type:BEGIN", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"14140\"}}}", diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index 43abed35e23..c2e6f8cef55 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -447,7 +447,7 @@ func TestVStreamCopySimpleFlow(t *testing.T) { testcases := []testcase{ { input: []string{}, - output: [][]string{t1FieldEvent, {"gtid"}, t1Events, {"begin", "lastpk", "commit"}, t2FieldEvent, t2Events, {"begin", "lastpk", "commit"}}, + output: [][]string{t1FieldEvent, {"gtid"}, t1Events, {"begin", "lastpk", "commit"}, t2FieldEvent, t2Events, {"begin", "lastpk", "commit"}, {"copy_completed"}}, }, { @@ -2178,6 +2178,10 @@ func expectLog(ctx context.Context, t *testing.T, input any, ch <-chan []*binlog if evs[i].Type != binlogdatapb.VEventType_DDL { t.Fatalf("%v (%d): event: %v, want ddl", input, i, evs[i]) } + case "copy_completed": + if evs[i].Type != binlogdatapb.VEventType_COPY_COMPLETED { + t.Fatalf("%v (%d): event: %v, want copy_completed", input, i, evs[i]) + } default: evs[i].Timestamp = 0 if evs[i].Type == binlogdatapb.VEventType_FIELD { diff --git a/proto/binlogdata.proto b/proto/binlogdata.proto index 8433e105025..d697482a2f7 100644 --- a/proto/binlogdata.proto +++ b/proto/binlogdata.proto @@ -289,6 +289,10 @@ enum VEventType { VERSION = 17; LASTPK = 18; SAVEPOINT = 19; + // COPY_COMPLETED is sent when VTGate's VStream copy operation is done. + // If a client experiences some disruptions before receiving the event, + // the client should restart the copy operation. + COPY_COMPLETED = 20; } // RowChange represents one row change. diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 9c2155bfa86..d36051d73d7 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -49368,7 +49368,8 @@ export namespace binlogdata { JOURNAL = 16, VERSION = 17, LASTPK = 18, - SAVEPOINT = 19 + SAVEPOINT = 19, + COPY_COMPLETED = 20 } /** Properties of a RowChange. */ diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index a439f77b73c..11040565de6 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -116986,6 +116986,7 @@ $root.binlogdata = (function() { * @property {number} VERSION=17 VERSION value * @property {number} LASTPK=18 LASTPK value * @property {number} SAVEPOINT=19 SAVEPOINT value + * @property {number} COPY_COMPLETED=20 COPY_COMPLETED value */ binlogdata.VEventType = (function() { var valuesById = {}, values = Object.create(valuesById); @@ -117009,6 +117010,7 @@ $root.binlogdata = (function() { values[valuesById[17] = "VERSION"] = 17; values[valuesById[18] = "LASTPK"] = 18; values[valuesById[19] = "SAVEPOINT"] = 19; + values[valuesById[20] = "COPY_COMPLETED"] = 20; return values; })(); @@ -119251,6 +119253,7 @@ $root.binlogdata = (function() { case 17: case 18: case 19: + case 20: break; } if (message.timestamp != null && message.hasOwnProperty("timestamp")) @@ -119398,6 +119401,10 @@ $root.binlogdata = (function() { case 19: message.type = 19; break; + case "COPY_COMPLETED": + case 20: + message.type = 20; + break; } if (object.timestamp != null) if ($util.Long) From c057af8188057871760f79e6aef019d66ad29e10 Mon Sep 17 00:00:00 2001 From: pbibra Date: Thu, 14 Sep 2023 14:38:07 -0700 Subject: [PATCH 2/2] apply patch 11909 (#131) * apply patch 11909 Signed-off-by: Priya Bibra * fix typo Signed-off-by: Priya Bibra --------- Signed-off-by: Priya Bibra --- go/test/endtoend/vreplication/vstream_test.go | 168 +++++++++++++++++- go/vt/vtgate/endtoend/main_test.go | 68 ++++++- go/vt/vtgate/endtoend/misc_test.go | 11 ++ go/vt/vtgate/endtoend/row_count_test.go | 2 + go/vt/vtgate/endtoend/vstream_test.go | 156 +++++++++++++++- go/vt/vtgate/vstream_manager.go | 61 ++++--- go/vt/vtgate/vstream_manager_test.go | 87 +++++++-- go/vt/vttest/local_cluster.go | 24 +-- 8 files changed, 518 insertions(+), 59 deletions(-) diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index c596ed084f7..7ce278318cc 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -177,6 +177,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) { const schemaUnsharded = ` create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; +insert into customer_seq(id, next_id, cache) values(0, 1, 3); ` const vschemaUnsharded = ` { @@ -218,14 +219,19 @@ const vschemaSharded = ` func insertRow(keyspace, table string, id int) { vtgateConn.ExecuteFetch(fmt.Sprintf("use %s;", keyspace), 1000, false) vtgateConn.ExecuteFetch("begin", 1000, false) + _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false) + if err != nil { + log.Infof("error inserting row %d: %v", id, err) + } vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (cid, name) values (%d, '%s%d')", table, id+100, table, id), 1000, false) vtgateConn.ExecuteFetch("commit", 1000, false) } type numEvents struct { - numRowEvents, numJournalEvents int64 - numLessThan80Events, numGreaterThan80Events int64 - numLessThan40Events, numGreaterThan40Events int64 + numRowEvents, numJournalEvents int64 + numLessThan80Events, numGreaterThan80Events int64 + numLessThan40Events, numGreaterThan40Events int64 + numShard0BeforeReshardEvents, numShard0AfterReshardEvents int64 } // tests the StopOnReshard flag @@ -375,6 +381,150 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID return &ne } +// Validate that we can continue streaming from multiple keyspaces after first copying some tables and then resharding one of the keyspaces +// Ensure that there are no missing row events during the resharding process. +func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEvents { + defaultCellName := "zone1" + allCellNames = defaultCellName + allCells := []string{allCellNames} + vc = NewVitessCluster(t, "VStreamCopyMultiKeyspaceReshard", allCells, mainClusterConfig) + + require.NotNil(t, vc) + ogdr := defaultReplicas + defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets + defer func(dr int) { defaultReplicas = dr }(ogdr) + + defer vc.TearDown(t) + + defaultCell = vc.Cells[defaultCellName] + vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) + vtgate = defaultCell.Vtgates[0] + require.NotNil(t, vtgate) + vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "unsharded", "0"), 1) + + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t, vc) + + vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil) + + ctx := context.Background() + vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) + if err != nil { + log.Fatal(err) + } + defer vstreamConn.Close() + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "/.*", + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + // We want to confirm that the following two tables are streamed. + // 1. the customer_seq in the unsharded keyspace + // 2. the customer table in the sharded keyspace + Match: "/customer.*/", + }}, + } + flags := &vtgatepb.VStreamFlags{} + done := false + + id := 1000 + // First goroutine that keeps inserting rows into the table being streamed until a minute after reshard + // We should keep getting events on the new shards + go func() { + for { + if done { + return + } + id++ + time.Sleep(1 * time.Second) + insertRow("sharded", "customer", id) + } + }() + // stream events from the VStream API + var ne numEvents + reshardDone := false + go func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.RowEvent.Shard + switch shard { + case "0": + if reshardDone { + ne.numShard0AfterReshardEvents++ + } else { + ne.numShard0BeforeReshardEvents++ + } + case "-80": + ne.numLessThan80Events++ + case "80-": + ne.numGreaterThan80Events++ + case "-40": + ne.numLessThan40Events++ + case "40-": + ne.numGreaterThan40Events++ + } + ne.numRowEvents++ + case binlogdatapb.VEventType_JOURNAL: + ne.numJournalEvents++ + } + } + case io.EOF: + log.Infof("Stream Ended") + done = true + default: + log.Errorf("Returned err %v", err) + done = true + } + if done { + return + } + } + }() + + ticker := time.NewTicker(1 * time.Second) + tickCount := 0 + for { + <-ticker.C + tickCount++ + switch tickCount { + case 1: + reshard(t, "sharded", "customer", "vstreamCopyMultiKeyspaceReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, defaultCellName) + reshardDone = true + case 60: + done = true + } + if done { + break + } + } + log.Infof("ne=%v", ne) + + // The number of row events streamed by the VStream API should match the number of rows inserted. + // This is important for sharded tables, where we need to ensure that no row events are missed during the resharding process. + // + // On the other hand, we don't verify the exact number of row events for the unsharded keyspace + // because the keyspace remains unsharded and the number of rows in the customer_seq table is always 1. + // We believe that checking the number of row events for the unsharded keyspace, which should always be greater than 0 before and after resharding, + // is sufficient to confirm that the resharding of one keyspace does not affect another keyspace, while keeping the test straightforward. + customerResult := execVtgateQuery(t, vtgateConn, "sharded", "select count(*) from customer") + insertedCustomerRows, err := evalengine.ToInt64(customerResult.Rows[0][0]) + require.NoError(t, err) + require.Equal(t, insertedCustomerRows, ne.numLessThan80Events+ne.numGreaterThan80Events+ne.numLessThan40Events+ne.numGreaterThan40Events) + return ne +} + func TestVStreamFailover(t *testing.T) { testVStreamWithFailover(t, true) } @@ -406,3 +556,15 @@ func TestVStreamWithKeyspacesToWatch(t *testing.T) { testVStreamWithFailover(t, false) } + +func TestVStreamCopyMultiKeyspaceReshard(t *testing.T) { + ne := testVStreamCopyMultiKeyspaceReshard(t, 3000) + require.Equal(t, int64(0), ne.numJournalEvents) + require.NotZero(t, ne.numRowEvents) + require.NotZero(t, ne.numShard0BeforeReshardEvents) + require.NotZero(t, ne.numShard0AfterReshardEvents) + require.NotZero(t, ne.numLessThan80Events) + require.NotZero(t, ne.numGreaterThan80Events) + require.NotZero(t, ne.numLessThan40Events) + require.NotZero(t, ne.numGreaterThan40Events) +} diff --git a/go/vt/vtgate/endtoend/main_test.go b/go/vt/vtgate/endtoend/main_test.go index c91c61ec2cd..089a6a616ee 100644 --- a/go/vt/vtgate/endtoend/main_test.go +++ b/go/vt/vtgate/endtoend/main_test.go @@ -51,6 +51,12 @@ create table t1_copy_basic( primary key(id1) ) Engine=InnoDB; +create table t1_copy_all( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + create table t1_copy_resume( id1 bigint, id2 bigint, @@ -151,6 +157,12 @@ create table t1_sharded( Name: "hash", }}, }, + "t1_copy_all": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id1", + Name: "hash", + }}, + }, "t1_copy_resume": { ColumnVindexes: []*vschemapb.ColumnVindex{{ Column: "id1", @@ -218,6 +230,31 @@ create table t1_sharded( }, }, } + + schema2 = ` +create table t1_copy_all_ks2( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; +` + + vschema2 = &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1_copy_all_ks2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id1", + Name: "hash", + }}, + }, + }, + } ) func TestMain(m *testing.M) { @@ -226,14 +263,24 @@ func TestMain(m *testing.M) { exitCode := func() int { var cfg vttest.Config cfg.Topology = &vttestpb.VTTestTopology{ - Keyspaces: []*vttestpb.Keyspace{{ - Name: "ks", - Shards: []*vttestpb.Shard{{ - Name: "-80", - }, { - Name: "80-", - }}, - }}, + Keyspaces: []*vttestpb.Keyspace{ + { + Name: "ks", + Shards: []*vttestpb.Shard{{ + Name: "-80", + }, { + Name: "80-", + }}, + }, + { + Name: "ks2", + Shards: []*vttestpb.Shard{{ + Name: "-80", + }, { + Name: "80-", + }}, + }, + }, } if err := cfg.InitSchemas("ks", schema, vschema); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) @@ -241,6 +288,11 @@ func TestMain(m *testing.M) { return 1 } defer os.RemoveAll(cfg.SchemaDir) + if err := cfg.InitSchemas("ks2", schema2, vschema2); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.RemoveAll(cfg.SchemaDir) + return 1 + } cfg.TabletHostName = *tabletHostName diff --git a/go/vt/vtgate/endtoend/misc_test.go b/go/vt/vtgate/endtoend/misc_test.go index 138b68d0aa3..aeeb1c122db 100644 --- a/go/vt/vtgate/endtoend/misc_test.go +++ b/go/vt/vtgate/endtoend/misc_test.go @@ -19,6 +19,7 @@ package endtoend import ( "context" "fmt" + osExec "os/exec" "testing" "github.com/stretchr/testify/assert" @@ -55,6 +56,16 @@ func TestCreateAndDropDatabase(t *testing.T) { require.NoError(t, err) defer conn.Close() + // cleanup the keyspace from the topology. + defer func() { + // the corresponding database needs to be created in advance. + // a subsequent DeleteKeyspace command returns the error of 'node doesn't exist' without it. + _ = exec(t, conn, "create database testitest") + + _, err := osExec.Command("vtctldclient", "--server", grpcAddress, "DeleteKeyspace", "--recursive", "--force", "testitest").CombinedOutput() + require.NoError(t, err) + }() + // run it 3 times. for count := 0; count < 3; count++ { t.Run(fmt.Sprintf("exec:%d", count), func(t *testing.T) { diff --git a/go/vt/vtgate/endtoend/row_count_test.go b/go/vt/vtgate/endtoend/row_count_test.go index 9ac200b33fa..5a29f6177a9 100644 --- a/go/vt/vtgate/endtoend/row_count_test.go +++ b/go/vt/vtgate/endtoend/row_count_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/utils" ) func TestRowCount(t *testing.T) { @@ -31,6 +32,7 @@ func TestRowCount(t *testing.T) { conn, err := mysql.Connect(ctx, &vtParams) require.NoError(t, err) defer conn.Close() + utils.Exec(t, conn, "use ks") type tc struct { query string expected int diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 832799366b1..f2ba9af992b 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -234,12 +234,7 @@ func TestVStreamCopyBasic(t *testing.T) { printEvents(evs) // for debugging ci failures if len(evs) == numExpectedEvents { - // The arrival order of COPY_COMPLETED events with keyspace/shard is not constant. - // On the other hand, the last event should always be a fully COPY_COMPLETED event. - // That's why the sort.Slice doesn't have to handle the last element in completedEvs. - sort.Slice(completedEvs[:len(completedEvs)-1], func(i, j int) bool { - return completedEvs[i].GetShard() < completedEvs[j].GetShard() - }) + sortCopyCompletedEvents(completedEvs) for i, ev := range completedEvs { require.Regexp(t, expectedCompletedEvents[i], ev.String()) } @@ -258,6 +253,139 @@ func TestVStreamCopyBasic(t *testing.T) { } } +// TestVStreamCopyUnspecifiedShardGtid tests the case where the keyspace contains wildcards and/or the shard is not specified in the request. +// Verify that the Vstream API resolves the unspecified ShardGtid input to a list of all the matching keyspaces and all the shards in the topology. +// - If the keyspace contains wildcards and the shard is not specified, the copy operation should be performed on all shards of all matching keyspaces. +// - If the keyspace is specified and the shard is not specified, the copy operation should be performed on all shards of the specified keyspace. +func TestVStreamCopyUnspecifiedShardGtid(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + require.NoError(t, err) + } + defer conn.Close() + + _, err = conn.ExecuteFetch("insert into t1_copy_all(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) + if err != nil { + require.NoError(t, err) + } + + _, err = conn.ExecuteFetch("insert into t1_copy_all_ks2(id1,id2) values(10,10), (20,20)", 1, false) + if err != nil { + require.NoError(t, err) + } + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "/t1_copy_all.*/", + }}, + } + flags := &vtgatepb.VStreamFlags{} + + // We have 2 shards in each keyspace. We assume the rows are + // evenly split across each shard. For each INSERT statement, which + // is a transaction and gets a global transaction identifier or GTID, we + // have 1 each of the following events: + // begin, field, position, lastpk, commit (5) + // For each row created in the INSERT statement -- 8 on ks1 and + // 2 on ks2 -- we have 1 row event between the begin and commit. + // When we have copied all rows for a table in the shard, the shard + // also gets events marking the transition from the copy phase to + // the streaming phase for that table with 1 each of the following: + // begin, vgtid, commit (3) + // As the copy phase completes for all tables on the shard, the shard + // gets 1 copy phase completed event. + // Lastly the stream has 1 final event to mark the final end to all + // copy phase operations in the vstream. + expectedKs1EventNum := 2 /* num shards */ * (9 /* begin/field/vgtid:pos/4 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */ + 1 /* copy operation completed */) + expectedKs2EventNum := 2 /* num shards */ * (6 /* begin/field/vgtid:pos/1 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */ + 1 /* copy operation completed */) + expectedFullyCopyCompletedNum := 1 + + cases := []struct { + name string + shardGtid *binlogdatapb.ShardGtid + expectedEventNum int + expectedCompletedEvents []string + }{ + { + name: "copy from all keyspaces", + shardGtid: &binlogdatapb.ShardGtid{ + Keyspace: "/.*", + }, + expectedEventNum: expectedKs1EventNum + expectedKs2EventNum + expectedFullyCopyCompletedNum, + expectedCompletedEvents: []string{ + `type:COPY_COMPLETED keyspace:"ks" shard:"-80"`, + `type:COPY_COMPLETED keyspace:"ks" shard:"80-"`, + `type:COPY_COMPLETED keyspace:"ks2" shard:"-80"`, + `type:COPY_COMPLETED keyspace:"ks2" shard:"80-"`, + `type:COPY_COMPLETED`, + }, + }, + { + name: "copy from all shards in one keyspace", + shardGtid: &binlogdatapb.ShardGtid{ + Keyspace: "ks", + }, + expectedEventNum: expectedKs1EventNum + expectedFullyCopyCompletedNum, + expectedCompletedEvents: []string{ + `type:COPY_COMPLETED keyspace:"ks" shard:"-80"`, + `type:COPY_COMPLETED keyspace:"ks" shard:"80-"`, + `type:COPY_COMPLETED`, + }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + gconn, conn, mconn, closeConnections := initialize(ctx, t) + defer closeConnections() + + var vgtid = &binlogdatapb.VGtid{} + vgtid.ShardGtids = []*binlogdatapb.ShardGtid{c.shardGtid} + reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + _, _ = conn, mconn + if err != nil { + require.NoError(t, err) + } + require.NotNil(t, reader) + var evs []*binlogdatapb.VEvent + var completedEvs []*binlogdatapb.VEvent + for { + e, err := reader.Recv() + switch err { + case nil: + evs = append(evs, e...) + + for _, ev := range e { + if ev.Type == binlogdatapb.VEventType_COPY_COMPLETED { + completedEvs = append(completedEvs, ev) + } + } + + if len(evs) == c.expectedEventNum { + sortCopyCompletedEvents(completedEvs) + for i, ev := range completedEvs { + require.Equal(t, c.expectedCompletedEvents[i], ev.String()) + } + t.Logf("TestVStreamCopyUnspecifiedShardGtid was successful") + return + } else if c.expectedEventNum < len(evs) { + printEvents(evs) // for debugging ci failures + require.FailNow(t, "len(events)=%v are not expected\n", len(evs)) + } + case io.EOF: + log.Infof("stream ended\n") + cancel() + default: + log.Errorf("Returned err %v", err) + require.FailNow(t, "remote error: %v\n", err) + } + } + }) + } +} + func TestVStreamCopyResume(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -563,3 +691,19 @@ func (v VEventSorter) Less(i, j int) bool { } return valI < valJ } + +// The arrival order of COPY_COMPLETED events with keyspace/shard is not constant. +// On the other hand, the last event should always be a fully COPY_COMPLETED event. +// That's why the sort.Slice doesn't have to handle the last element in completedEvs. +func sortCopyCompletedEvents(completedEvs []*binlogdatapb.VEvent) { + sortVEventByKeyspaceAndShard(completedEvs[:len(completedEvs)-1]) +} + +func sortVEventByKeyspaceAndShard(evs []*binlogdatapb.VEvent) { + sort.Slice(evs, func(i, j int) bool { + if evs[i].Keyspace == evs[j].Keyspace { + return evs[i].Shard < evs[j].Shard + } + return evs[i].Keyspace < evs[j].Keyspace + }) +} diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index a815cdc4f31..3fb5e0ac9b4 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "regexp" "strings" "sync" "time" @@ -198,31 +199,51 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position") } // To fetch from all keyspaces, the input must contain a single ShardGtid - // that has an empty keyspace, and the Gtid must be "current". In the - // future, we'll allow the Gtid to be empty which will also support - // copying of existing data. - if len(vgtid.ShardGtids) == 1 && vgtid.ShardGtids[0].Keyspace == "" { - if vgtid.ShardGtids[0].Gtid != "current" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "for an empty keyspace, the Gtid value must be 'current': %v", vgtid) - } - keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell, false) - if err != nil { - return nil, nil, nil, err - } - newvgtid := &binlogdatapb.VGtid{} - for _, keyspace := range keyspaces { - newvgtid.ShardGtids = append(newvgtid.ShardGtids, &binlogdatapb.ShardGtid{ - Keyspace: keyspace, - Gtid: "current", - }) + // that has an empty keyspace, and the Gtid must be "current". + // Or the input must contain a single ShardGtid that has keyspace wildcards. + if len(vgtid.ShardGtids) == 1 { + inputKeyspace := vgtid.ShardGtids[0].Keyspace + isEmpty := inputKeyspace == "" + isRegexp := strings.HasPrefix(inputKeyspace, "/") + if isEmpty || isRegexp { + newvgtid := &binlogdatapb.VGtid{} + keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell, false) + if err != nil { + return nil, nil, nil, err + } + + if isEmpty { + if vgtid.ShardGtids[0].Gtid != "current" { + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "for an empty keyspace, the Gtid value must be 'current': %v", vgtid) + } + for _, keyspace := range keyspaces { + newvgtid.ShardGtids = append(newvgtid.ShardGtids, &binlogdatapb.ShardGtid{ + Keyspace: keyspace, + Gtid: "current", + }) + } + } else { + re, err := regexp.Compile(strings.Trim(inputKeyspace, "/")) + if err != nil { + return nil, nil, nil, err + } + for _, keyspace := range keyspaces { + if re.MatchString(keyspace) { + newvgtid.ShardGtids = append(newvgtid.ShardGtids, &binlogdatapb.ShardGtid{ + Keyspace: keyspace, + Gtid: vgtid.ShardGtids[0].Gtid, + }) + } + } + } + vgtid = newvgtid } - vgtid = newvgtid } newvgtid := &binlogdatapb.VGtid{} for _, sgtid := range vgtid.ShardGtids { if sgtid.Shard == "" { - if sgtid.Gtid != "current" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current': %v", vgtid) + if sgtid.Gtid != "current" && sgtid.Gtid != "" { + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current' or empty; got: %v", vgtid) } // TODO(sougou): this should work with the new Migrate workflow _, _, allShards, err := vsm.resolver.GetKeyspaceShards(ctx, sgtid.Keyspace, tabletType) diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 56586cf6fef..c9fe153c8b8 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -1017,9 +1017,44 @@ func TestResolveVStreamParams(t *testing.T) { input: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ Keyspace: "TestVStream", + Gtid: "other", + }}, + }, + err: "if shards are unspecified, the Gtid value must be 'current' or empty", + }, { + // Verify that the function maps the input missing the shard to a list of all shards in the topology. + input: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "TestVStream", + }}, + }, + output: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "TestVStream", + Shard: "-20", + }, { + Keyspace: "TestVStream", + Shard: "20-40", + }, { + Keyspace: "TestVStream", + Shard: "40-60", + }, { + Keyspace: "TestVStream", + Shard: "60-80", + }, { + Keyspace: "TestVStream", + Shard: "80-a0", + }, { + Keyspace: "TestVStream", + Shard: "a0-c0", + }, { + Keyspace: "TestVStream", + Shard: "c0-e0", + }, { + Keyspace: "TestVStream", + Shard: "e0-", }}, }, - err: "if shards are unspecified, the Gtid value must be 'current'", }, { input: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ @@ -1111,17 +1146,49 @@ func TestResolveVStreamParams(t *testing.T) { assert.Equal(t, wantFilter, filter, tcase.input) require.False(t, flags.MinimizeSkew) } - // Special-case: empty keyspace because output is too big. - input := &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{{ - Gtid: "current", - }}, + + // Special-case: empty keyspace or keyspace containing wildcards because output is too big. + // Verify that the function resolves input for multiple keyspaces into a list of all corresponding shards. + // Ensure that the number of shards returned is greater than the number of shards in a single keyspace named 'TestVStream.' + specialCases := []struct { + input *binlogdatapb.ShardGtid + }{ + { + input: &binlogdatapb.ShardGtid{ + Gtid: "current", + }, + }, + { + input: &binlogdatapb.ShardGtid{ + Keyspace: "/.*", + }, + }, + { + input: &binlogdatapb.ShardGtid{ + Keyspace: "/.*", + Gtid: "current", + }, + }, + { + input: &binlogdatapb.ShardGtid{ + Keyspace: "/Test.*", + }, + }, } - vgtid, _, _, err := vsm.resolveParams(context.Background(), topodatapb.TabletType_REPLICA, input, nil, nil) - require.NoError(t, err, input) - if got, want := len(vgtid.ShardGtids), 8; want >= got { - t.Errorf("len(vgtid.ShardGtids): %v, must be >%d", got, want) + for _, tcase := range specialCases { + input := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{tcase.input}, + } + vgtid, _, _, err := vsm.resolveParams(context.Background(), topodatapb.TabletType_REPLICA, input, nil, nil) + require.NoError(t, err, tcase.input) + if got, expectTestVStreamShardNumber := len(vgtid.ShardGtids), 8; expectTestVStreamShardNumber >= got { + t.Errorf("len(vgtid.ShardGtids): %v, must be >%d", got, expectTestVStreamShardNumber) + } + for _, s := range vgtid.ShardGtids { + require.Equal(t, tcase.input.Gtid, s.Gtid) + } } + for _, minimizeSkew := range []bool{true, false} { t.Run(fmt.Sprintf("resolveParams MinimizeSkew %t", minimizeSkew), func(t *testing.T) { flags := &vtgatepb.VStreamFlags{MinimizeSkew: minimizeSkew} diff --git a/go/vt/vttest/local_cluster.go b/go/vt/vttest/local_cluster.go index 484acc8c3e7..5a07fee1529 100644 --- a/go/vt/vttest/local_cluster.go +++ b/go/vt/vttest/local_cluster.go @@ -156,20 +156,20 @@ type Config struct { // It then sets the right value for cfg.SchemaDir. // At the end of the test, the caller should os.RemoveAll(cfg.SchemaDir). func (cfg *Config) InitSchemas(keyspace, schema string, vschema *vschemapb.Keyspace) error { - if cfg.SchemaDir != "" { - return fmt.Errorf("SchemaDir is already set to %v", cfg.SchemaDir) - } - - // Create a base temporary directory. - tempSchemaDir, err := os.MkdirTemp("", "vttest") - if err != nil { - return err + schemaDir := cfg.SchemaDir + if schemaDir == "" { + // Create a base temporary directory. + tempSchemaDir, err := os.MkdirTemp("", "vttest") + if err != nil { + return err + } + schemaDir = tempSchemaDir } // Write the schema if set. if schema != "" { - ksDir := path.Join(tempSchemaDir, keyspace) - err = os.Mkdir(ksDir, os.ModeDir|0775) + ksDir := path.Join(schemaDir, keyspace) + err := os.Mkdir(ksDir, os.ModeDir|0775) if err != nil { return err } @@ -182,7 +182,7 @@ func (cfg *Config) InitSchemas(keyspace, schema string, vschema *vschemapb.Keysp // Write in the vschema if set. if vschema != nil { - vschemaFilePath := path.Join(tempSchemaDir, keyspace, "vschema.json") + vschemaFilePath := path.Join(schemaDir, keyspace, "vschema.json") vschemaJSON, err := json.Marshal(vschema) if err != nil { return err @@ -191,7 +191,7 @@ func (cfg *Config) InitSchemas(keyspace, schema string, vschema *vschemapb.Keysp return err } } - cfg.SchemaDir = tempSchemaDir + cfg.SchemaDir = schemaDir return nil }