Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apply vcopy patch 11740 #130

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 35 additions & 28 deletions go/vt/proto/binlogdata/binlogdata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ create table t1(
primary key(id1)
) Engine=InnoDB;

create table t1_copy_basic(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;

create table t1_copy_resume(
id1 bigint,
id2 bigint,
Expand Down Expand Up @@ -139,6 +145,12 @@ create table t1_sharded(
Name: "t1_id2_vdx",
}},
},
"t1_copy_basic": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Name: "hash",
}},
},
"t1_copy_resume": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Expand Down
37 changes: 31 additions & 6 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestVStreamCopyBasic(t *testing.T) {
gconn, conn, mconn, closeConnections := initialize(ctx, t)
defer closeConnections()

_, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false)
_, err := conn.ExecuteFetch("insert into t1_copy_basic(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -180,7 +180,7 @@ func TestVStreamCopyBasic(t *testing.T) {
}
qr := sqltypes.ResultToProto3(&lastPK)
tablePKs := []*binlogdatapb.TableLastPK{{
TableName: "t1",
TableName: "t1_copy_basic",
Lastpk: qr,
}}
var shardGtids []*binlogdatapb.ShardGtid
Expand All @@ -200,8 +200,8 @@ func TestVStreamCopyBasic(t *testing.T) {
vgtid.ShardGtids = shardGtids
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
Match: "t1_copy_basic",
Filter: "select * from t1_copy_basic",
}},
}
flags := &vtgatepb.VStreamFlags{}
Expand All @@ -210,19 +210,44 @@ func TestVStreamCopyBasic(t *testing.T) {
if err != nil {
t.Fatal(err)
}
numExpectedEvents := 2 /* num shards */ * (7 /* begin/field/vgtid:pos/2 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */)
numExpectedEvents := 2 /* num shards */ *(7 /* begin/field/vgtid:pos/2 rowevents avg/vgitd: lastpk/commit) */ +3 /* begin/vgtid/commit for completed table */ +1 /* copy operation completed */) + 1 /* fully copy operation completed */
expectedCompletedEvents := []string{
`type:COPY_COMPLETED keyspace:"ks" shard:"-80"`,
`type:COPY_COMPLETED keyspace:"ks" shard:"80-"`,
`type:COPY_COMPLETED`,
}
require.NotNil(t, reader)
var evs []*binlogdatapb.VEvent
var completedEvs []*binlogdatapb.VEvent
for {
e, err := reader.Recv()
switch err {
case nil:
evs = append(evs, e...)

for _, ev := range e {
if ev.Type == binlogdatapb.VEventType_COPY_COMPLETED {
completedEvs = append(completedEvs, ev)
}
}

printEvents(evs) // for debugging ci failures

if len(evs) == numExpectedEvents {
// The arrival order of COPY_COMPLETED events with keyspace/shard is not constant.
// On the other hand, the last event should always be a fully COPY_COMPLETED event.
// That's why the sort.Slice doesn't have to handle the last element in completedEvs.
sort.Slice(completedEvs[:len(completedEvs)-1], func(i, j int) bool {
return completedEvs[i].GetShard() < completedEvs[j].GetShard()
})
for i, ev := range completedEvs {
require.Regexp(t, expectedCompletedEvents[i], ev.String())
}
t.Logf("TestVStreamCopyBasic was successful")
return
} else if numExpectedEvents < len(evs) {
t.Fatalf("len(events)=%v are not expected\n", len(evs))
}
printEvents(evs) // for debugging ci failures
case io.EOF:
log.Infof("stream ended\n")
cancel()
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar
vsm: vsm,
eventCh: make(chan []*binlogdatapb.VEvent),
ts: ts,
copyCompletedShard: make(map[string]struct{}),
}
_ = vs.stream(ctx)
return nil
Expand Down
39 changes: 39 additions & 0 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ type vstream struct {
// the timestamp of the most recent event, keyed by streamId. streamId is of the form <keyspace>.<shard>
timestamps map[string]int64

// the shard map tracking the copy completion, keyed by streamId. streamId is of the form <keyspace>.<shard>
copyCompletedShard map[string]struct{}

vsm *vstreamManager

eventCh chan []*binlogdatapb.VEvent
Expand Down Expand Up @@ -171,6 +174,7 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
eventCh: make(chan []*binlogdatapb.VEvent),
heartbeatInterval: flags.GetHeartbeatInterval(),
ts: ts,
copyCompletedShard: make(map[string]struct{}),
}
return vs.stream(ctx)
}
Expand Down Expand Up @@ -598,6 +602,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
return err
}

if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
}
eventss = nil
sendevents = nil
case binlogdatapb.VEventType_COPY_COMPLETED:
sendevents = append(sendevents, event)
if fullyCopied, doneEvent := vs.isCopyFullyCompleted(ctx, sgtid, event); fullyCopied {
sendevents = append(sendevents, doneEvent)
}
eventss = append(eventss, sendevents)

if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
}

if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
}
Expand Down Expand Up @@ -733,6 +753,25 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
return nil
}

// isCopyFullyCompleted returns true if all stream has received a copy_completed event.
// If true, it will also return a new copy_completed event that needs to be sent.
// This new event represents the completion of all the copy operations.
func (vs *vstream) isCopyFullyCompleted(ctx context.Context, sgtid *binlogdatapb.ShardGtid, event *binlogdatapb.VEvent) (bool, *binlogdatapb.VEvent) {
vs.mu.Lock()
defer vs.mu.Unlock()

vs.copyCompletedShard[fmt.Sprintf("%s/%s", event.Keyspace, event.Shard)] = struct{}{}

for _, shard := range vs.vgtid.ShardGtids {
if _, ok := vs.copyCompletedShard[fmt.Sprintf("%s/%s", shard.Keyspace, shard.Shard)]; !ok {
return false, nil
}
}
return true, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_COPY_COMPLETED,
}
}

func (vs *vstream) getError() error {
vs.errMu.Lock()
defer vs.errMu.Unlock()
Expand Down
15 changes: 14 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,9 @@ func (uvs *uvstreamer) Stream() error {
uvs.vse.errorCounts.Add("Copy", 1)
return err
}
uvs.sendTestEvent("Copy Done")
if err := uvs.allCopyComplete(); err != nil {
return err
}
}
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos),
uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse)
Expand Down Expand Up @@ -457,6 +459,17 @@ func (uvs *uvstreamer) setCopyState(tableName string, qr *querypb.QueryResult) {
uvs.plans[tableName].tablePK.Lastpk = qr
}

func (uvs *uvstreamer) allCopyComplete() error {
ev := &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_COPY_COMPLETED,
}

if err := uvs.send([]*binlogdatapb.VEvent{ev}); err != nil {
return err
}
return nil
}

// dummy event sent only in test mode
func (uvs *uvstreamer) sendTestEvent(msg string) {
if !uvstreamerTestMode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {

}

callbacks["OTHER.*Copy Done"] = func() {
callbacks["COPY_COMPLETED"] = func() {
log.Info("Copy done, inserting events to stream")
insertRow(t, "t1", 1, numInitialRows+4)
insertRow(t, "t2", 2, numInitialRows+3)
Expand All @@ -252,7 +252,7 @@ commit;"
}

numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/)
numCopyEvents += 2 /* GTID + Test event after all copy is done */
numCopyEvents += 2 /* GTID + Event after all copy is done */
numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */
numFastForwardEvents := 5 /*t1:FIELD+ROW*/
numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */
Expand Down Expand Up @@ -539,7 +539,7 @@ var expectedEvents = []string{
"type:BEGIN",
"type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t3\"} completed:true}",
"type:COMMIT",
"type:OTHER gtid:\"Copy Done\"",
"type:COPY_COMPLETED",
"type:BEGIN",
"type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}",
"type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"14140\"}}}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func TestVStreamCopySimpleFlow(t *testing.T) {
testcases := []testcase{
{
input: []string{},
output: [][]string{t1FieldEvent, {"gtid"}, t1Events, {"begin", "lastpk", "commit"}, t2FieldEvent, t2Events, {"begin", "lastpk", "commit"}},
output: [][]string{t1FieldEvent, {"gtid"}, t1Events, {"begin", "lastpk", "commit"}, t2FieldEvent, t2Events, {"begin", "lastpk", "commit"}, {"copy_completed"}},
},

{
Expand Down Expand Up @@ -2178,6 +2178,10 @@ func expectLog(ctx context.Context, t *testing.T, input any, ch <-chan []*binlog
if evs[i].Type != binlogdatapb.VEventType_DDL {
t.Fatalf("%v (%d): event: %v, want ddl", input, i, evs[i])
}
case "copy_completed":
if evs[i].Type != binlogdatapb.VEventType_COPY_COMPLETED {
t.Fatalf("%v (%d): event: %v, want copy_completed", input, i, evs[i])
}
default:
evs[i].Timestamp = 0
if evs[i].Type == binlogdatapb.VEventType_FIELD {
Expand Down
4 changes: 4 additions & 0 deletions proto/binlogdata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ enum VEventType {
VERSION = 17;
LASTPK = 18;
SAVEPOINT = 19;
// COPY_COMPLETED is sent when VTGate's VStream copy operation is done.
// If a client experiences some disruptions before receiving the event,
// the client should restart the copy operation.
COPY_COMPLETED = 20;
}

// RowChange represents one row change.
Expand Down
3 changes: 2 additions & 1 deletion web/vtadmin/src/proto/vtadmin.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading