Skip to content

Commit

Permalink
apply vcopy patch 11103
Browse files Browse the repository at this point in the history
Signed-off-by: Priya Bibra <[email protected]>
  • Loading branch information
pbibra committed Sep 12, 2023
1 parent 68cf290 commit 1276069
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 20 deletions.
12 changes: 12 additions & 0 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ create table t1(
primary key(id1)
) Engine=InnoDB;
create table t1_copy_resume(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
create table t1_id2_idx(
id2 bigint,
keyspace_id varbinary(10),
Expand Down Expand Up @@ -133,6 +139,12 @@ create table t1_sharded(
Name: "t1_id2_vdx",
}},
},
"t1_copy_resume": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Name: "hash",
}},
},
"t1_sharded": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Expand Down
142 changes: 142 additions & 0 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"sort"
"sync"
"testing"

Expand Down Expand Up @@ -232,6 +233,119 @@ func TestVStreamCopyBasic(t *testing.T) {
}
}

func TestVStreamCopyResume(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
gconn, conn, mconn, closeConnections := initialize(ctx, t)
defer closeConnections()

_, err := conn.ExecuteFetch("insert into t1_copy_resume(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false)
if err != nil {
t.Fatal(err)
}

// Any subsequent GTIDs will be part of the stream
mpos, err := mconn.PrimaryPosition()
require.NoError(t, err)

// lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9)
lastPK := sqltypes.Result{
Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}},
Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}},
}
tableLastPK := []*binlogdatapb.TableLastPK{{
TableName: "t1_copy_resume",
Lastpk: sqltypes.ResultToProto3(&lastPK),
}}

catchupQueries := []string{
"insert into t1_copy_resume(id1,id2) values(9,9)", // this row will show up twice: once in catchup and copy
"update t1_copy_resume set id2 = 10 where id1 = 1",
"insert into t1(id1, id2) values(100,100)",
"delete from t1_copy_resume where id1 = 1",
"update t1_copy_resume set id2 = 90 where id1 = 9",
}
for _, query := range catchupQueries {
_, err = conn.ExecuteFetch(query, 1, false)
require.NoError(t, err)
}

var shardGtids []*binlogdatapb.ShardGtid
var vgtid = &binlogdatapb.VGtid{}
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "-80",
Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
TablePKs: tableLastPK,
})
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "80-",
Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
TablePKs: tableLastPK,
})
vgtid.ShardGtids = shardGtids
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1_copy_resume",
Filter: "select * from t1_copy_resume",
}},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
if err != nil {
t.Fatal(err)
}
require.NotNil(t, reader)

expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9)
expectedCatchupEvents := len(catchupQueries) - 1 // insert into t1 should never reach
rowCopyEvents, replCatchupEvents := 0, 0
expectedEvents := []string{
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`,
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"99"} after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
}
var evs []*binlogdatapb.VEvent
for {
e, err := reader.Recv()
switch err {
case nil:
for _, ev := range e {
if ev.Type == binlogdatapb.VEventType_ROW {
evs = append(evs, ev)
if ev.Timestamp == 0 {
rowCopyEvents++
} else {
replCatchupEvents++
}
printEvents(evs) // for debugging ci failures
}
}
if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents {
sort.Sort(VEventSorter(evs))
for i, ev := range evs {
require.Regexp(t, expectedEvents[i], ev.String())
}
t.Logf("TestVStreamCopyResume was successful")
return
}
case io.EOF:
log.Infof("stream ended\n")
cancel()
default:
log.Errorf("Returned err %v", err)
t.Fatalf("remote error: %v\n", err)
}
}
}

func TestVStreamCurrent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -396,3 +510,31 @@ func printEvents(evs []*binlogdatapb.VEvent) {
s += "===END===" + "\n"
log.Infof("%s", s)
}

// Sort the VEvents by the first row change's after value bytes primarily, with
// secondary ordering by timestamp (ASC). Note that row copy events do not have
// a timestamp and the value will be 0.
type VEventSorter []*binlogdatapb.VEvent

func (v VEventSorter) Len() int {
return len(v)
}
func (v VEventSorter) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}
func (v VEventSorter) Less(i, j int) bool {
valsI := v[i].GetRowEvent().RowChanges[0].After
if valsI == nil {
valsI = v[i].GetRowEvent().RowChanges[0].Before
}
valsJ := v[j].GetRowEvent().RowChanges[0].After
if valsJ == nil {
valsJ = v[j].GetRowEvent().RowChanges[0].Before
}
valI := string(valsI.Values)
valJ := string(valsJ.Values)
if valI == valJ {
return v[i].Timestamp < v[j].Timestamp
}
return valI < valJ
}
77 changes: 58 additions & 19 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ func getQuery(tableName string, filter string) string {
query = buf.String()
case key.IsKeyRange(filter):
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("select * from %v where in_keyrange(%v)", sqlparser.NewTableIdent(tableName), sqlparser.NewStrLiteral(filter))
// note: sqlparser.NewTableIdent is renamed to NewIdentifierCS in v15
buf.Myprintf("select * from %v where in_keyrange(%v)",
sqlparser.NewTableIdent(tableName), sqlparser.NewStrLiteral(filter))
query = buf.String()
}
return query
Expand All @@ -229,7 +231,40 @@ func (uvs *uvstreamer) Cancel() {
uvs.cancel()
}

// during copy phase only send streaming events (during catchup/fastforward) for pks already seen
// We have not yet implemented the logic to check if an event is for a row that is already copied,
// so we always return true so that we send all events for this table and so we don't miss events.
func (uvs *uvstreamer) isRowCopied(tableName string, ev *binlogdatapb.VEvent) bool {
return true
}

// Only send catchup/fastforward events for tables whose copy phase is complete or in progress.
// This ensures we fulfill the at-least-once delivery semantics for events.
// TODO: filter out events for rows not yet copied. Note that we can only do this as a best-effort
// for comparable PKs.
func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatapb.VEvent) bool {
table, ok := uvs.plans[tableName]
// Event is for a table which is not in its copy phase.
if !ok {
return true
}

// if table copy was not started and no tablePK was specified we can ignore catchup/fastforward events for it
if table.tablePK == nil || table.tablePK.Lastpk == nil {
return false
}

// Table is currently in its copy phase. We have not yet implemented the logic to
// check if an event is for a row that is already copied, so we always return true
// there so that we don't miss events.
// We may send duplicate insert events or update/delete events for rows not yet seen
// to the client for the table being copied. This is ok as the client is expected to be
// idempotent: we only promise at-least-once semantics for VStream API (not exactly-once).
// Aside: vreplication workflows handle at-least-once by adding where clauses that render
// DML queries, related to events for rows not yet copied, as no-ops.
return uvs.isRowCopied(tableName, ev)
}

// Do not send internal heartbeat events. Filter out events for tables whose copy has not been started.
func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.VEvent {
if len(uvs.plans) == 0 {
return evs
Expand All @@ -239,25 +274,21 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.
var shouldSend bool

for _, ev := range evs {
shouldSend = false
tableName = ""
switch ev.Type {
case binlogdatapb.VEventType_ROW:
tableName = ev.RowEvent.TableName
case binlogdatapb.VEventType_FIELD:
tableName = ev.FieldEvent.TableName
default:
tableName = ""
}
switch ev.Type {
case binlogdatapb.VEventType_HEARTBEAT:
shouldSend = false
default:
shouldSend = true
}
if !shouldSend && tableName != "" {
shouldSend = true
_, ok := uvs.plans[tableName]
if ok {
shouldSend = false
}
shouldSend = uvs.shouldSendEventForTable(tableName, ev)
}

if shouldSend {
evs2 = append(evs2, ev)
}
Expand Down Expand Up @@ -331,7 +362,9 @@ func (uvs *uvstreamer) setStreamStartPosition() error {
}
if !curPos.AtLeast(pos) {
uvs.vse.errorCounts.Add("GTIDSet Mismatch", 1)
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"GTIDSet Mismatch: requested source position:%v, current target vrep position: %v",
mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
}
uvs.pos = pos
return nil
Expand All @@ -346,17 +379,22 @@ func (uvs *uvstreamer) currentPosition() (mysql.Position, error) {
return conn.PrimaryPosition()
}

// Possible states:
// 1. TablePKs nil, startPos set to gtid or "current" => start replicating from pos
// 2. TablePKs nil, startPos empty => full table copy of tables matching filter
// 3. TablePKs not nil, startPos empty => table copy (for pks > lastPK)
// 4. TablePKs not nil, startPos set => run catchup from startPos, then table copy (for pks > lastPK)
func (uvs *uvstreamer) init() error {
if uvs.startPos != "" {
if err := uvs.setStreamStartPosition(); err != nil {
if uvs.startPos == "" /* full copy */ || len(uvs.inTablePKs) > 0 /* resume copy */ {
if err := uvs.buildTablePlan(); err != nil {
return err
}
} else if uvs.startPos == "" || len(uvs.inTablePKs) > 0 {
if err := uvs.buildTablePlan(); err != nil {
}
if uvs.startPos != "" {
if err := uvs.setStreamStartPosition(); err != nil {
return err
}
}

if uvs.pos.IsZero() && (len(uvs.plans) == 0) {
return fmt.Errorf("stream needs a position or a table to copy")
}
Expand All @@ -378,7 +416,8 @@ func (uvs *uvstreamer) Stream() error {
}
uvs.sendTestEvent("Copy Done")
}
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse)
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos),
uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse)

uvs.setVs(vs)
return vs.Stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
uvstreamerTestMode = true
defer func() { uvstreamerTestMode = false }()
initialize(t)

if err := engine.se.Reload(context.Background()); err != nil {
t.Fatal("Error reloading schema")
}
Expand All @@ -190,6 +191,12 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
var tablePKs []*binlogdatapb.TableLastPK
for i, table := range testState.tables {
rules = append(rules, getRule(table))

// for table t2, let tablepk be nil, so that we don't send events for the insert in initTables()
if table == "t2" {
continue
}

tablePKs = append(tablePKs, getTablePK(table, i+1))
}
filter := &binlogdatapb.Filter{
Expand Down Expand Up @@ -246,7 +253,7 @@ commit;"

numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/)
numCopyEvents += 2 /* GTID + Test event after all copy is done */
numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/
numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */
numFastForwardEvents := 5 /*t1:FIELD+ROW*/
numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */
numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit */
Expand Down

0 comments on commit 1276069

Please sign in to comment.