Skip to content

Commit

Permalink
apply patch 13547
Browse files Browse the repository at this point in the history
Signed-off-by: Priya Bibra <[email protected]>
  • Loading branch information
pbibra committed Sep 20, 2023
1 parent 178f4cf commit 04096a0
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 7 deletions.
119 changes: 119 additions & 0 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,125 @@ func TestVStreamSharded(t *testing.T) {

}

// TestVStreamCopyTransactions tests that we are properly wrapping
// ROW events in the stream with BEGIN and COMMIT events.
func TestVStreamCopyTransactions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
keyspace := "ks"
shards := []string{"-80", "80-"}
table := "t1_copy_basic"
beginEventSeen, commitEventSeen := false, false
numResultInTrx := 0
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{
Keyspace: keyspace,
Shard: shards[0],
Gtid: "", // Start a vstream copy
},
{
Keyspace: keyspace,
Shard: shards[1],
Gtid: "", // Start a vstream copy
},
},
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: table,
Filter: fmt.Sprintf("select * from %s", table),
}},
}

gconn, conn, _, closeConnections := initialize(ctx, t)
defer closeConnections()

// Clear any existing data.
q := fmt.Sprintf("delete from %s", table)
_, err := conn.ExecuteFetch(q, -1, false)
require.NoError(t, err, "error clearing data: %v", err)

// Generate some test data. Enough to cross the default
// vstream_packet_size threshold.
for i := 1; i <= 100000; i++ {
values := fmt.Sprintf("(%d, %d)", i, i)
q := fmt.Sprintf("insert into %s (id1, id2) values %s", table, values)
_, err := conn.ExecuteFetch(q, 1, false)
require.NoError(t, err, "error inserting data: %v", err)
}

// Start a vstream.
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, nil)
require.NoError(t, err, "error starting vstream: %v", err)

recvLoop:
for {
vevents, err := reader.Recv()
numResultInTrx++
eventCount := len(vevents)
t.Logf("------------------ Received %d events in response #%d for the transaction ------------------\n",
eventCount, numResultInTrx)
switch err {
case nil:
for _, event := range vevents {
switch event.Type {
case binlogdatapb.VEventType_BEGIN:
require.False(t, beginEventSeen, "received a second BEGIN event within the transaction: numResultInTrx=%d\n",
numResultInTrx)
beginEventSeen = true
t.Logf("Found BEGIN event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx)
require.False(t, commitEventSeen, "received a BEGIN event when expecting a COMMIT event: numResultInTrx=%d\n",
numResultInTrx)
case binlogdatapb.VEventType_VGTID:
t.Logf("Found VGTID event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)
case binlogdatapb.VEventType_FIELD:
t.Logf("Found FIELD event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)
case binlogdatapb.VEventType_ROW:
// Uncomment if you need to do more debugging.
// t.Logf("Found ROW event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
// beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)
case binlogdatapb.VEventType_COMMIT:
commitEventSeen = true
t.Logf("Found COMMIT event, beginEventSeen=%t, commitEventSeen=%t, eventType=%v, numResultInTrx=%d, event=%+v\n",
beginEventSeen, commitEventSeen, event.Type, numResultInTrx, event)
require.True(t, beginEventSeen, "received COMMIT event before receiving BEGIN event: numResultInTrx=%d\n",
numResultInTrx)
case binlogdatapb.VEventType_COPY_COMPLETED:
t.Logf("Finished vstream copy\n")
t.Logf("-------------------------------------------------------------------\n\n")
cancel()
break recvLoop
default:
t.Logf("Found extraneous event: %+v\n", event)
}
if beginEventSeen && commitEventSeen {
t.Logf("Received both BEGIN and COMMIT, so resetting transactional state\n")
beginEventSeen = false
commitEventSeen = false
numResultInTrx = 0
}
}
case io.EOF:
t.Logf("vstream ended\n")
t.Logf("-------------------------------------------------------------------\n\n")
cancel()
return
default:
require.FailNowf(t, "unexpected error", "encountered error in vstream: %v", err)
return
}
}
// The last response, when the vstream copy completes, does not
// typically contain ROW events.
if beginEventSeen || commitEventSeen {
require.True(t, (beginEventSeen && commitEventSeen), "did not receive both BEGIN and COMMIT events in the final ROW event set")
}
}

var printMu sync.Mutex

func printEvents(evs []*binlogdatapb.VEvent) {
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,26 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
log.Infof("sendFieldEvent returned error %v", err)
return err
}
// sendFieldEvent() sends a BEGIN event first.
uvs.inTransaction = true
}

if len(rows.Rows) == 0 {
log.V(2).Infof("0 rows returned for table %s", tableName)
return nil
}

// We are about to send ROW events, so we need to ensure
// that we do so within a transaction. The COMMIT event
// will be sent in sendEventsForRows() below.
if !uvs.inTransaction {
evs := []*binlogdatapb.VEvent{{
Type: binlogdatapb.VEventType_BEGIN,
}}
uvs.send(evs)
uvs.inTransaction = true
}

newLastPK = sqltypes.CustomProto3ToResult(uvs.pkfields, &querypb.QueryResult{
Fields: uvs.pkfields,
Rows: []*querypb.Row{rows.Lastpk},
Expand All @@ -271,6 +285,8 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
log.Infof("sendEventsForRows returned error %v", err)
return err
}
// sendEventsForRows() sends a COMMIT event last.
uvs.inTransaction = false

uvs.setCopyState(tableName, qrLastPK)
log.V(2).Infof("NewLastPK: %v", qrLastPK)
Expand Down
18 changes: 11 additions & 7 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ type uvstreamer struct {
cancel func()

// input parameters
vse *Engine
send func([]*binlogdatapb.VEvent) error
cp dbconfigs.Connector
se *schema.Engine
startPos string
filter *binlogdatapb.Filter
inTablePKs []*binlogdatapb.TableLastPK
vse *Engine
send func([]*binlogdatapb.VEvent) error
cp dbconfigs.Connector
se *schema.Engine
startPos string
// Are we currently in an explicit transaction?
// If we are not, and we're about to send ROW
// events, then we need to send a BEGIN event first.
inTransaction bool
filter *binlogdatapb.Filter
inTablePKs []*binlogdatapb.TableLastPK

vschema *localVSchema

Expand Down

0 comments on commit 04096a0

Please sign in to comment.