Skip to content

Commit

Permalink
apply vcopy patch 11740
Browse files Browse the repository at this point in the history
Signed-off-by: Priya Bibra <[email protected]>
  • Loading branch information
pbibra committed Sep 13, 2023
1 parent 63e6952 commit 655477f
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 40 deletions.
63 changes: 35 additions & 28 deletions go/vt/proto/binlogdata/binlogdata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ create table t1(
primary key(id1)
) Engine=InnoDB;
create table t1_copy_basic(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
create table t1_copy_resume(
id1 bigint,
id2 bigint,
Expand Down Expand Up @@ -139,6 +145,12 @@ create table t1_sharded(
Name: "t1_id2_vdx",
}},
},
"t1_copy_basic": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Name: "hash",
}},
},
"t1_copy_resume": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Expand Down
37 changes: 31 additions & 6 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestVStreamCopyBasic(t *testing.T) {
gconn, conn, mconn, closeConnections := initialize(ctx, t)
defer closeConnections()

_, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false)
_, err := conn.ExecuteFetch("insert into t1_copy_basic(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -180,7 +180,7 @@ func TestVStreamCopyBasic(t *testing.T) {
}
qr := sqltypes.ResultToProto3(&lastPK)
tablePKs := []*binlogdatapb.TableLastPK{{
TableName: "t1",
TableName: "t1_copy_basic",
Lastpk: qr,
}}
var shardGtids []*binlogdatapb.ShardGtid
Expand All @@ -200,8 +200,8 @@ func TestVStreamCopyBasic(t *testing.T) {
vgtid.ShardGtids = shardGtids
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
Match: "t1_copy_basic",
Filter: "select * from t1_copy_basic",
}},
}
flags := &vtgatepb.VStreamFlags{}
Expand All @@ -210,19 +210,44 @@ func TestVStreamCopyBasic(t *testing.T) {
if err != nil {
t.Fatal(err)
}
numExpectedEvents := 2 /* num shards */ * (7 /* begin/field/vgtid:pos/2 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */)
numExpectedEvents := 2 /* num shards */ *(7 /* begin/field/vgtid:pos/2 rowevents avg/vgitd: lastpk/commit) */ +3 /* begin/vgtid/commit for completed table */ +1 /* copy operation completed */) + 1 /* fully copy operation completed */
expectedCompletedEvents := []string{
`type:COPY_COMPLETED keyspace:"ks" shard:"-80"`,
`type:COPY_COMPLETED keyspace:"ks" shard:"80-"`,
`type:COPY_COMPLETED`,
}
require.NotNil(t, reader)
var evs []*binlogdatapb.VEvent
var completedEvs []*binlogdatapb.VEvent
for {
e, err := reader.Recv()
switch err {
case nil:
evs = append(evs, e...)

for _, ev := range e {
if ev.Type == binlogdatapb.VEventType_COPY_COMPLETED {
completedEvs = append(completedEvs, ev)
}
}

printEvents(evs) // for debugging ci failures

if len(evs) == numExpectedEvents {
// The arrival order of COPY_COMPLETED events with keyspace/shard is not constant.
// On the other hand, the last event should always be a fully COPY_COMPLETED event.
// That's why the sort.Slice doesn't have to handle the last element in completedEvs.
sort.Slice(completedEvs[:len(completedEvs)-1], func(i, j int) bool {
return completedEvs[i].GetShard() < completedEvs[j].GetShard()
})
for i, ev := range completedEvs {
require.Regexp(t, expectedCompletedEvents[i], ev.String())
}
t.Logf("TestVStreamCopyBasic was successful")
return
} else if numExpectedEvents < len(evs) {
t.Fatalf("len(events)=%v are not expected\n", len(evs))
}
printEvents(evs) // for debugging ci failures
case io.EOF:
log.Infof("stream ended\n")
cancel()
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar
vsm: vsm,
eventCh: make(chan []*binlogdatapb.VEvent),
ts: ts,
copyCompletedShard: make(map[string]struct{}),
}
_ = vs.stream(ctx)
return nil
Expand Down
39 changes: 39 additions & 0 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type vstream struct {
// the timestamp of the most recent event, keyed by streamId. streamId is of the form <keyspace>.<shard>
timestamps map[string]int64

// the shard map tracking the copy completion, keyed by streamId. streamId is of the form <keyspace>.<shard>
copyCompletedShard map[string]struct{}

vsm *vstreamManager

eventCh chan []*binlogdatapb.VEvent
Expand Down Expand Up @@ -171,6 +174,7 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
eventCh: make(chan []*binlogdatapb.VEvent),
heartbeatInterval: flags.GetHeartbeatInterval(),
ts: ts,
copyCompletedShard: make(map[string]struct{}),
}
return vs.stream(ctx)
}
Expand Down Expand Up @@ -598,6 +602,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
return err
}

if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
}
eventss = nil
sendevents = nil
case binlogdatapb.VEventType_COPY_COMPLETED:
sendevents = append(sendevents, event)
if fullyCopied, doneEvent := vs.isCopyFullyCompleted(ctx, sgtid, event); fullyCopied {
sendevents = append(sendevents, doneEvent)
}
eventss = append(eventss, sendevents)

if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
}

if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
}
Expand Down Expand Up @@ -733,6 +753,25 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
return nil
}

// isCopyFullyCompleted returns true if all stream has received a copy_completed event.
// If true, it will also return a new copy_completed event that needs to be sent.
// This new event represents the completion of all the copy operations.
func (vs *vstream) isCopyFullyCompleted(ctx context.Context, sgtid *binlogdatapb.ShardGtid, event *binlogdatapb.VEvent) (bool, *binlogdatapb.VEvent) {
vs.mu.Lock()
defer vs.mu.Unlock()

vs.copyCompletedShard[fmt.Sprintf("%s/%s", event.Keyspace, event.Shard)] = struct{}{}

for _, shard := range vs.vgtid.ShardGtids {
if _, ok := vs.copyCompletedShard[fmt.Sprintf("%s/%s", shard.Keyspace, shard.Shard)]; !ok {
return false, nil
}
}
return true, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_COPY_COMPLETED,
}
}

func (vs *vstream) getError() error {
vs.errMu.Lock()
defer vs.errMu.Unlock()
Expand Down
15 changes: 14 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,9 @@ func (uvs *uvstreamer) Stream() error {
uvs.vse.errorCounts.Add("Copy", 1)
return err
}
uvs.sendTestEvent("Copy Done")
if err := uvs.allCopyComplete(); err != nil {
return err
}
}
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos),
uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse)
Expand Down Expand Up @@ -457,6 +459,17 @@ func (uvs *uvstreamer) setCopyState(tableName string, qr *querypb.QueryResult) {
uvs.plans[tableName].tablePK.Lastpk = qr
}

func (uvs *uvstreamer) allCopyComplete() error {
ev := &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_COPY_COMPLETED,
}

if err := uvs.send([]*binlogdatapb.VEvent{ev}); err != nil {
return err
}
return nil
}

// dummy event sent only in test mode
func (uvs *uvstreamer) sendTestEvent(msg string) {
if !uvstreamerTestMode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {

}

callbacks["OTHER.*Copy Done"] = func() {
callbacks["COPY_COMPLETED"] = func() {
log.Info("Copy done, inserting events to stream")
insertRow(t, "t1", 1, numInitialRows+4)
insertRow(t, "t2", 2, numInitialRows+3)
Expand All @@ -252,7 +252,7 @@ commit;"
}

numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/)
numCopyEvents += 2 /* GTID + Test event after all copy is done */
numCopyEvents += 2 /* GTID + Event after all copy is done */
numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */
numFastForwardEvents := 5 /*t1:FIELD+ROW*/
numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */
Expand Down Expand Up @@ -539,7 +539,7 @@ var expectedEvents = []string{
"type:BEGIN",
"type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t3\"} completed:true}",
"type:COMMIT",
"type:OTHER gtid:\"Copy Done\"",
"type:COPY_COMPLETED",
"type:BEGIN",
"type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}",
"type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"14140\"}}}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func TestVStreamCopySimpleFlow(t *testing.T) {
testcases := []testcase{
{
input: []string{},
output: [][]string{t1FieldEvent, {"gtid"}, t1Events, {"begin", "lastpk", "commit"}, t2FieldEvent, t2Events, {"begin", "lastpk", "commit"}},
output: [][]string{t1FieldEvent, {"gtid"}, t1Events, {"begin", "lastpk", "commit"}, t2FieldEvent, t2Events, {"begin", "lastpk", "commit"}, {"copy_completed"}},
},

{
Expand Down Expand Up @@ -2178,6 +2178,10 @@ func expectLog(ctx context.Context, t *testing.T, input any, ch <-chan []*binlog
if evs[i].Type != binlogdatapb.VEventType_DDL {
t.Fatalf("%v (%d): event: %v, want ddl", input, i, evs[i])
}
case "copy_completed":
if evs[i].Type != binlogdatapb.VEventType_COPY_COMPLETED {
t.Fatalf("%v (%d): event: %v, want copy_completed", input, i, evs[i])
}
default:
evs[i].Timestamp = 0
if evs[i].Type == binlogdatapb.VEventType_FIELD {
Expand Down
4 changes: 4 additions & 0 deletions proto/binlogdata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ enum VEventType {
VERSION = 17;
LASTPK = 18;
SAVEPOINT = 19;
// COPY_COMPLETED is sent when VTGate's VStream copy operation is done.
// If a client experiences some disruptions before receiving the event,
// the client should restart the copy operation.
COPY_COMPLETED = 20;
}

// RowChange represents one row change.
Expand Down
3 changes: 2 additions & 1 deletion web/vtadmin/src/proto/vtadmin.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 655477f

Please sign in to comment.