From 6f11785d074b26318c6f70c6f19136cb3e7f580a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 22 Aug 2024 15:33:46 -0400 Subject: [PATCH 01/12] Add journal event check to test: which fails on main Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vstream_test.go | 201 +++++++++++++++++- 1 file changed, 195 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index e13c3e24e80..1b647caec79 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -28,13 +28,13 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" _ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient" _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" - "vitess.io/vitess/go/vt/vtgate/vtgateconn" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" ) // Validates that we have a working VStream API @@ -694,8 +694,8 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { }() // We should have a mix of events across the old and new shards. - require.NotZero(t, oldShardRowEvents) - require.NotZero(t, newShardRowEvents) + require.Greater(t, oldShardRowEvents, 0) + require.Greater(t, newShardRowEvents, 0) // The number of row events streamed by the VStream API should match the number of rows inserted. customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer") @@ -704,6 +704,195 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { require.Equal(t, customerCount, int64(oldShardRowEvents+newShardRowEvents)) } +func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { + ctx := context.Background() + ks := "testks" + wf := "multiVStreamsKeyspaceReshard" + baseTabletID := 100 + tabletType := topodatapb.TabletType_PRIMARY.String() + oldShards := "-80,80-" + newShards := "-40,40-80,80-c0,c0-" + oldShardRowEvents, newShardRowEvents, journalEvents := 0, 0, 0 + vc = NewVitessCluster(t, nil) + defer vc.TearDown() + defaultCell := vc.Cells[vc.CellNames[0]] + ogdr := defaultReplicas + defaultReplicas = 0 // Because of CI resource constraints we can only run this test with primary tablets + defer func(dr int) { defaultReplicas = dr }(ogdr) + + // For our sequences etc. + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil) + require.NoError(t, err) + + // Setup the keyspace with our old/original shards. + keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil) + require.NoError(t, err) + + // Add the new shards. + err = vc.AddShards(t, []*Cell{defaultCell}, keyspace, newShards, defaultReplicas, defaultRdonly, baseTabletID+2000, targetKsOpts) + require.NoError(t, err) + + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t, vc) + + vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) + require.NoError(t, err) + defer vstreamConn.Close() + + // Ensure that we're starting with a clean slate. + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false) + require.NoError(t, err) + + // Coordinate go-routines. + streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute) + defer streamCancel() + done := make(chan struct{}) + + // First goroutine that keeps inserting rows into the table being streamed until the + // stream context is cancelled. + go func() { + id := 1 + for { + select { + case <-streamCtx.Done(): + // Give the VStream a little catch-up time before telling it to stop + // via the done channel. + time.Sleep(10 * time.Second) + close(done) + return + default: + insertRow(ks, "customer", id) + time.Sleep(250 * time.Millisecond) + id++ + } + } + }() + + // Create the Reshard workflow and wait for it to finish the copy phase. + reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType) + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String()) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "/.*", // Match all keyspaces just to be more realistic. + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + // Only stream the customer table and its sequence backing table. + Match: "/customer.*", + }}, + } + flags := &vtgatepb.VStreamFlags{ + StopOnReshard: true, + } + + // Stream events but stop once we have a VGTID with positions for the old/original shards. + var newVGTID *binlogdatapb.VGtid + func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.GetRowEvent().GetShard() + switch shard { + case "-80", "80-": + oldShardRowEvents++ + case "0": + // We expect some for the sequence backing table, but don't care. + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + case binlogdatapb.VEventType_VGTID: + newVGTID = ev.GetVgtid() + if len(newVGTID.GetShardGtids()) == 3 { + // We want a VGTID with a position for the global shard and the old shards. + canStop := true + for _, sg := range newVGTID.GetShardGtids() { + if sg.GetGtid() == "" { + canStop = false + } + } + if canStop { + return + } + } + } + } + default: + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) + } + select { + case <-streamCtx.Done(): + return + default: + } + } + }() + + // Confirm that we have shard GTIDs for the global shard and the old/original shards. + require.Len(t, newVGTID.GetShardGtids(), 3) + t.Logf("Position at end of first stream: %+v", newVGTID.GetShardGtids()) + + // Switch the traffic to the new shards. + reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType) + + // Now start a new VStream from our previous VGTID which only has the old/original shards. + func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.RowEvent.Shard + switch shard { + case "-80", "80-": + oldShardRowEvents++ + case "-40", "40-80", "80-c0", "c0-": + newShardRowEvents++ + case "0": + // Again, we expect some for the sequence backing table, but don't care. + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + case binlogdatapb.VEventType_JOURNAL: + t.Logf("Journal event: %+v", ev) + journalEvents++ + } + } + default: + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) + } + select { + case <-done: + return + default: + } + } + }() + + // We should have stopped on the reshard event and thus only have events for the old shards. + require.Greater(t, oldShardRowEvents, 0) + require.Equal(t, 0, newShardRowEvents) + + // We should have seen the journal event in the stream due to using StopOnReshard. + require.Equal(t, 2, journalEvents) +} + func TestVStreamFailover(t *testing.T) { testVStreamWithFailover(t, true) } From 98bb5f76ce31a013a7b990efc6071bb1dc001ee2 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 22 Aug 2024 17:41:30 -0400 Subject: [PATCH 02/12] Perform 2 vstreams, each should get the reshard journal Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vstream_test.go | 46 +++++++++---------- go/vt/vtctl/workflow/traffic_switcher.go | 1 - go/vt/vtgate/vstream_manager.go | 3 +- .../tabletserver/vstreamer/vstreamer.go | 1 - 4 files changed, 25 insertions(+), 26 deletions(-) diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 1b647caec79..1450a51eefb 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -603,8 +603,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { // Stream events but stop once we have a VGTID with positions for the old/original shards. var newVGTID *binlogdatapb.VGtid func() { - var reader vtgateconn.VStreamReader - reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) require.NoError(t, err) for { evs, err := reader.Recv() @@ -658,8 +657,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { // Now start a new VStream from our previous VGTID which only has the old/original shards. func() { - var reader vtgateconn.VStreamReader - reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) + reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) require.NoError(t, err) for { evs, err := reader.Recv() @@ -712,7 +710,7 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { tabletType := topodatapb.TabletType_PRIMARY.String() oldShards := "-80,80-" newShards := "-40,40-80,80-c0,c0-" - oldShardRowEvents, newShardRowEvents, journalEvents := 0, 0, 0 + oldShardRowEvents, journalEvents := 0, 0 vc = NewVitessCluster(t, nil) defer vc.TearDown() defaultCell := vc.Cells[vc.CellNames[0]] @@ -791,8 +789,7 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { // Stream events but stop once we have a VGTID with positions for the old/original shards. var newVGTID *binlogdatapb.VGtid func() { - var reader vtgateconn.VStreamReader - reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) require.NoError(t, err) for { evs, err := reader.Recv() @@ -846,9 +843,11 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType) // Now start a new VStream from our previous VGTID which only has the old/original shards. - func() { - var reader vtgateconn.VStreamReader - reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) + expectedJournalEvents := 2 // One for each old shard + runResumeStream := func() { + journalEvents = 0 + t.Logf("Streaming from position: %+v", newVGTID.GetShardGtids()) + reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) require.NoError(t, err) for { evs, err := reader.Recv() @@ -860,20 +859,20 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { case binlogdatapb.VEventType_ROW: shard := ev.RowEvent.Shard switch shard { - case "-80", "80-": - oldShardRowEvents++ - case "-40", "40-80", "80-c0", "c0-": - newShardRowEvents++ - case "0": - // Again, we expect some for the sequence backing table, but don't care. + case "0", "-80", "80-": default: require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) } case binlogdatapb.VEventType_JOURNAL: t.Logf("Journal event: %+v", ev) journalEvents++ + if journalEvents == expectedJournalEvents { + return + } } } + case io.EOF: + return default: require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) } @@ -883,14 +882,15 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { default: } } - }() - - // We should have stopped on the reshard event and thus only have events for the old shards. - require.Greater(t, oldShardRowEvents, 0) - require.Equal(t, 0, newShardRowEvents) + } - // We should have seen the journal event in the stream due to using StopOnReshard. - require.Equal(t, 2, journalEvents) + // Multiple VStream clients should be able to resume from where they left off and + // get the reshard journal event. + for i := 1; i <= 2; i++ { + runResumeStream() + // We should have seen the journal event for each shard in the stream due to using StopOnReshard. + require.Equal(t, 2, journalEvents, "did not get expected journal events on resume vstream #%d", i) + } } func TestVStreamFailover(t *testing.T) { diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 34e1e4e4329..3ac6861dce5 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -690,7 +690,6 @@ func (ts *trafficSwitcher) createJournals(ctx context.Context, sourceWorkflows [ }) } - ts.Logger().Infof("Creating journal %v", journal) ts.Logger().Infof("Creating journal: %v", journal) statement := fmt.Sprintf("insert into _vt.resharding_journal "+ "(id, db_name, val) "+ diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index e0d195853cf..a8827f5c29d 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -661,7 +661,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha case binlogdatapb.VEventType_JOURNAL: journal := event.Journal - // Journal events are not sent to clients by default, but only when StopOnReshard is set + // Journal events are not sent to clients by default, but only when + // StopOnReshard is set. if vs.stopOnReshard && journal.MigrationType == binlogdatapb.MigrationType_SHARDS { sendevents = append(sendevents, event) eventss = append(eventss, sendevents) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 824f79e20f1..f323bbbe07d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -629,7 +629,6 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e Type: binlogdatapb.VEventType_VERSION, } vevents = append(vevents, vevent) - } else { vevents, err = vs.processRowEvent(vevents, plan, rows) } From c9a072861585add047a38df2938993e9d5329a13 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 22 Aug 2024 21:29:42 -0400 Subject: [PATCH 03/12] Send journal events right away in vstreamer Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vstream_test.go | 6 +++--- go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | 9 +++------ 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 1450a51eefb..bcaefc7d0da 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -843,7 +843,7 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType) // Now start a new VStream from our previous VGTID which only has the old/original shards. - expectedJournalEvents := 2 // One for each old shard + expectedJournalEvents := 2 // One for each old shard: -80,80- runResumeStream := func() { journalEvents = 0 t.Logf("Streaming from position: %+v", newVGTID.GetShardGtids()) @@ -886,10 +886,10 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { // Multiple VStream clients should be able to resume from where they left off and // get the reshard journal event. - for i := 1; i <= 2; i++ { + for i := 1; i <= expectedJournalEvents; i++ { runResumeStream() // We should have seen the journal event for each shard in the stream due to using StopOnReshard. - require.Equal(t, 2, journalEvents, "did not get expected journal events on resume vstream #%d", i) + require.Equal(t, expectedJournalEvents, journalEvents, "did not get expected journal events on resume vstream #%d", i) } } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index f323bbbe07d..55a1d4e2e76 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -226,15 +226,12 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog vevent.Shard = vs.vse.shard switch vevent.Type { - case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD, - binlogdatapb.VEventType_JOURNAL: + case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD: // We never have to send GTID, BEGIN, FIELD events on their own. - // A JOURNAL event is always preceded by a BEGIN and followed by a COMMIT. - // So, we don't have to send it right away. bufferedEvents = append(bufferedEvents, vevent) case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER, - binlogdatapb.VEventType_HEARTBEAT, binlogdatapb.VEventType_VERSION: - // COMMIT, DDL, OTHER and HEARTBEAT must be immediately sent. + binlogdatapb.VEventType_HEARTBEAT, binlogdatapb.VEventType_VERSION, binlogdatapb.VEventType_JOURNAL: + // COMMIT, DDL, JOURNAL, OTHER and HEARTBEAT must be immediately sent. // Although unlikely, it's possible to get a HEARTBEAT in the middle // of a transaction. If so, we still send the partial transaction along // with the heartbeat. From 81a4d856816064828fe92cedce018e05bbe53957 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 22 Aug 2024 23:56:13 -0400 Subject: [PATCH 04/12] Adjust unit test Signed-off-by: Matt Lord --- .../vttablet/tabletserver/vstreamer/vstreamer.go | 3 ++- .../tabletserver/vstreamer/vstreamer_test.go | 16 ++++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 55a1d4e2e76..9f94d3d1fed 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -210,7 +210,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog // Only the following patterns are possible: // BEGIN->ROWs or Statements->GTID->COMMIT. In the case of large transactions, this can be broken into chunks. - // BEGIN->JOURNAL->GTID->COMMIT + // BEGIN->JOURNAL + // ->GTID->COMMIT. This is a special case where the journal is sent immediately as some consumers stop on reshard events. // GTID->DDL // GTID->OTHER // HEARTBEAT is issued if there's inactivity, which is likely diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index df565b8f18b..d53c3a8af49 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -1731,12 +1731,16 @@ func TestJournal(t *testing.T) { "commit", }, // External table events don't get sent. - output: [][]string{{ - `begin`, - `type:JOURNAL journal:{id:1 migration_type:SHARDS}`, - `gtid`, - `commit`, - }}, + output: [][]string{ + { + `begin`, + `type:JOURNAL journal:{id:1 migration_type:SHARDS}`, + }, + { + `gtid`, + `commit`, + }, + }, }} runCases(t, nil, testcases, "", nil) } From 0cd4e186626c9a6dfd4e091700703ac64bc4f12a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 23 Aug 2024 00:10:10 -0400 Subject: [PATCH 05/12] Add commit event Signed-off-by: Matt Lord --- go/vt/vtgate/vstream_manager.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index a8827f5c29d..946ed3790c1 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -665,6 +665,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // StopOnReshard is set. if vs.stopOnReshard && journal.MigrationType == binlogdatapb.MigrationType_SHARDS { sendevents = append(sendevents, event) + // Include our own commit event to complete the BEGIN->JOURNAL-COMMIT + // sequence in the stream. + sendevents := append(sendevents, &binlogdatapb.VEvent{Type: binlogdatapb.VEventType_COMMIT}) eventss = append(eventss, sendevents) if err := vs.sendAll(ctx, sgtid, eventss); err != nil { return err From 9dc41330ba660ee97f1fac701e2862d9e8c15f45 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 23 Aug 2024 17:09:28 -0400 Subject: [PATCH 06/12] Handle this entirely in vstream manager Signed-off-by: Matt Lord --- go/vt/vtgate/vstream_manager.go | 24 +++++++++++++++---- .../tabletserver/vstreamer/vstreamer.go | 13 ++++++---- .../tabletserver/vstreamer/vstreamer_test.go | 16 +++++-------- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 946ed3790c1..75b939115eb 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -608,7 +608,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } sendevents := make([]*binlogdatapb.VEvent, 0, len(events)) - for _, event := range events { + for i, event := range events { switch event.Type { case binlogdatapb.VEventType_FIELD: // Update table names and send. @@ -658,22 +658,36 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { return err } - case binlogdatapb.VEventType_JOURNAL: journal := event.Journal // Journal events are not sent to clients by default, but only when // StopOnReshard is set. if vs.stopOnReshard && journal.MigrationType == binlogdatapb.MigrationType_SHARDS { sendevents = append(sendevents, event) - // Include our own commit event to complete the BEGIN->JOURNAL-COMMIT - // sequence in the stream. - sendevents := append(sendevents, &binlogdatapb.VEvent{Type: binlogdatapb.VEventType_COMMIT}) + // Read any subsequent events until we get the VGTID->COMMIT events that + // always follow the JOURNAL event which is generated as a result of + // an autocommit insert into the _vt.resharding_journal table on the + // tablet. This batch of events we're currently processing may not + // contain these events. + for j := i + 1; j < len(events); j++ { + sendevents = append(sendevents, events[j]) + if events[j].Type == binlogdatapb.VEventType_COMMIT { + break + } + } eventss = append(eventss, sendevents) if err := vs.sendAll(ctx, sgtid, eventss); err != nil { return err } eventss = nil sendevents = nil + // We're going to be stopping the stream anyway, so we pause to give clients + // time to recv the journal event before the stream's context is cancelled + // (which causes the grpc SendMsg to fail). + // If the client doesn't (grpc) Recv the journal event before the stream + // ends then they'll have to resume from the last ShardGtid they received + // before the journal event. + time.Sleep(2 * time.Second) } je, err := vs.getJournalEvent(ctx, sgtid, journal) if err != nil { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 9f94d3d1fed..824f79e20f1 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -210,8 +210,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog // Only the following patterns are possible: // BEGIN->ROWs or Statements->GTID->COMMIT. In the case of large transactions, this can be broken into chunks. - // BEGIN->JOURNAL - // ->GTID->COMMIT. This is a special case where the journal is sent immediately as some consumers stop on reshard events. + // BEGIN->JOURNAL->GTID->COMMIT // GTID->DDL // GTID->OTHER // HEARTBEAT is issued if there's inactivity, which is likely @@ -227,12 +226,15 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog vevent.Shard = vs.vse.shard switch vevent.Type { - case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD: + case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD, + binlogdatapb.VEventType_JOURNAL: // We never have to send GTID, BEGIN, FIELD events on their own. + // A JOURNAL event is always preceded by a BEGIN and followed by a COMMIT. + // So, we don't have to send it right away. bufferedEvents = append(bufferedEvents, vevent) case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER, - binlogdatapb.VEventType_HEARTBEAT, binlogdatapb.VEventType_VERSION, binlogdatapb.VEventType_JOURNAL: - // COMMIT, DDL, JOURNAL, OTHER and HEARTBEAT must be immediately sent. + binlogdatapb.VEventType_HEARTBEAT, binlogdatapb.VEventType_VERSION: + // COMMIT, DDL, OTHER and HEARTBEAT must be immediately sent. // Although unlikely, it's possible to get a HEARTBEAT in the middle // of a transaction. If so, we still send the partial transaction along // with the heartbeat. @@ -627,6 +629,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e Type: binlogdatapb.VEventType_VERSION, } vevents = append(vevents, vevent) + } else { vevents, err = vs.processRowEvent(vevents, plan, rows) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index d53c3a8af49..df565b8f18b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -1731,16 +1731,12 @@ func TestJournal(t *testing.T) { "commit", }, // External table events don't get sent. - output: [][]string{ - { - `begin`, - `type:JOURNAL journal:{id:1 migration_type:SHARDS}`, - }, - { - `gtid`, - `commit`, - }, - }, + output: [][]string{{ + `begin`, + `type:JOURNAL journal:{id:1 migration_type:SHARDS}`, + `gtid`, + `commit`, + }}, }} runCases(t, nil, testcases, "", nil) } From 54900c586269e0ac5720de06dd4822a9c1a1ed6f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 23 Aug 2024 22:55:28 -0400 Subject: [PATCH 07/12] Minor improvements Signed-off-by: Matt Lord --- go/vt/vtgate/vstream_manager.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 75b939115eb..a300a3dcbc8 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -61,6 +61,10 @@ const maxSkewTimeoutSeconds = 10 * 60 // for a vstream const tabletPickerContextTimeout = 90 * time.Second +// stopOnReshardDelay is how long we wait after sending a reshard journal event before ending the stream +// from the tablet. +const stopOnReshardDelay = 2 * time.Second + // vstream contains the metadata for one VStream request. type vstream struct { // mu protects parts of vgtid, the semantics of a send, and journaler. @@ -681,13 +685,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } eventss = nil sendevents = nil - // We're going to be stopping the stream anyway, so we pause to give clients - // time to recv the journal event before the stream's context is cancelled - // (which causes the grpc SendMsg to fail). - // If the client doesn't (grpc) Recv the journal event before the stream - // ends then they'll have to resume from the last ShardGtid they received - // before the journal event. - time.Sleep(2 * time.Second) } je, err := vs.getJournalEvent(ctx, sgtid, journal) if err != nil { @@ -700,6 +697,13 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha case <-ctx.Done(): return ctx.Err() case <-journalDone: + // We're going to be ending the tablet stream anyway, so we pause to give + // clients time to recv the journal event before the stream's context is + // cancelled (which causes the grpc SendMsg to fail). + // If the client doesn't (grpc) Recv the journal event before the stream + // ends then they'll have to resume from the last ShardGtid they received + // before the journal event. + time.Sleep(stopOnReshardDelay) return io.EOF } } From fe792188e8aa7aca4be194c2f75a19f24452ab5b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 23 Aug 2024 23:20:37 -0400 Subject: [PATCH 08/12] Further minor improvements Signed-off-by: Matt Lord --- go/vt/vtgate/vstream_manager.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index a300a3dcbc8..1afc0a7cf14 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -61,8 +61,8 @@ const maxSkewTimeoutSeconds = 10 * 60 // for a vstream const tabletPickerContextTimeout = 90 * time.Second -// stopOnReshardDelay is how long we wait after sending a reshard journal event before ending the stream -// from the tablet. +// stopOnReshardDelay is how long we wait, at a minimum, after sending a reshard journal event before +// ending the stream from the tablet. const stopOnReshardDelay = 2 * time.Second // vstream contains the metadata for one VStream request. @@ -671,8 +671,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Read any subsequent events until we get the VGTID->COMMIT events that // always follow the JOURNAL event which is generated as a result of // an autocommit insert into the _vt.resharding_journal table on the - // tablet. This batch of events we're currently processing may not - // contain these events. + // tablet. for j := i + 1; j < len(events); j++ { sendevents = append(sendevents, events[j]) if events[j].Type == binlogdatapb.VEventType_COMMIT { @@ -691,19 +690,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha return err } if je != nil { - // Wait till all other participants converge and return EOF. + // We're going to be ending the tablet stream, so we ensure a reasonable + // minimum amount of time is alloted for clients to Recv the journal event + // before the stream's context is cancelled (which would cause the grpc + // Send or Recv to fail). If the client doesn't (grpc) Recv the journal + // event before the stream ends then they'll have to resume from the last + // ShardGtid they received before the journal event. + endTimer := time.NewTimer(stopOnReshardDelay) + defer endTimer.Stop() + // Wait until all other participants converge and then return EOF after + // the minimum delay has passed. journalDone = je.done select { case <-ctx.Done(): return ctx.Err() case <-journalDone: - // We're going to be ending the tablet stream anyway, so we pause to give - // clients time to recv the journal event before the stream's context is - // cancelled (which causes the grpc SendMsg to fail). - // If the client doesn't (grpc) Recv the journal event before the stream - // ends then they'll have to resume from the last ShardGtid they received - // before the journal event. - time.Sleep(stopOnReshardDelay) + <-endTimer.C return io.EOF } } From 6b2706e254e7f47a6f97062ae3dfd3d7c16fab63 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 24 Aug 2024 00:01:27 -0400 Subject: [PATCH 09/12] Self review Signed-off-by: Matt Lord --- go/vt/vtgate/vstream_manager.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 1afc0a7cf14..f56e1f963f8 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -978,6 +978,9 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string return false, err } + vs.mu.Lock() + defer vs.mu.Unlock() + // First check the typical case, where the VGTID shards match the serving shards. // In that case it's NOT possible that an applicable reshard has happened because // the VGTID contains shards that are all serving. From a43ba6a0c02b6414c3885939bc0313927208f758 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 24 Aug 2024 11:11:25 -0400 Subject: [PATCH 10/12] More tweaks Signed-off-by: Matt Lord --- examples/local/vstream_client.go | 17 ++++++++++------- go/test/endtoend/vreplication/vstream_test.go | 13 ++++++++++--- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/examples/local/vstream_client.go b/examples/local/vstream_client.go index 98d2129f898..e1d672eac5e 100644 --- a/examples/local/vstream_client.go +++ b/examples/local/vstream_client.go @@ -23,13 +23,13 @@ import ( "log" "time" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" _ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient" _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" "vitess.io/vitess/go/vt/vtgate/vtgateconn" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" ) /* @@ -73,15 +73,18 @@ func main() { } defer conn.Close() flags := &vtgatepb.VStreamFlags{ - //MinimizeSkew: false, - //HeartbeatInterval: 60, //seconds + // MinimizeSkew: false, + // HeartbeatInterval: 60, //seconds + // StopOnReshard: true, } reader, err := conn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + if err != nil { + log.Fatal(err) + } for { e, err := reader.Recv() switch err { case nil: - _ = e fmt.Printf("%v\n", e) case io.EOF: fmt.Printf("stream ended\n") diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index bcaefc7d0da..95fdb383a46 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -702,6 +702,8 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { require.Equal(t, customerCount, int64(oldShardRowEvents+newShardRowEvents)) } +// TestMultiVStreamsKeyspaceStopOnReshard confirms that journal events are received +// when resuming a VStream after a reshard. func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { ctx := context.Background() ks := "testks" @@ -854,7 +856,7 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { switch err { case nil: - for _, ev := range evs { + for i, ev := range evs { switch ev.Type { case binlogdatapb.VEventType_ROW: shard := ev.RowEvent.Shard @@ -866,6 +868,9 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { case binlogdatapb.VEventType_JOURNAL: t.Logf("Journal event: %+v", ev) journalEvents++ + require.Equal(t, binlogdatapb.VEventType_BEGIN, evs[i-1].Type, "JOURNAL event not preceded by BEGIN event") + require.Equal(t, binlogdatapb.VEventType_VGTID, evs[i+1].Type, "JOURNAL event not followed by VGTID event") + require.Equal(t, binlogdatapb.VEventType_COMMIT, evs[i+2].Type, "JOURNAL event not followed by COMMIT event") if journalEvents == expectedJournalEvents { return } @@ -888,8 +893,10 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { // get the reshard journal event. for i := 1; i <= expectedJournalEvents; i++ { runResumeStream() - // We should have seen the journal event for each shard in the stream due to using StopOnReshard. - require.Equal(t, expectedJournalEvents, journalEvents, "did not get expected journal events on resume vstream #%d", i) + // We should have seen the journal event for each shard in the stream due to + // using StopOnReshard. + require.Equal(t, expectedJournalEvents, journalEvents, + "did not get expected journal events on resume vstream #%d", i) } } From 9159b4f67b942b55ff53872b14b4251a16b724fb Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 24 Aug 2024 11:27:57 -0400 Subject: [PATCH 11/12] Improve e2e test Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vstream_test.go | 28 ++++++++++--------- go/vt/vtgate/vstream_manager.go | 2 +- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 95fdb383a46..62bb645f78e 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -775,13 +775,15 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: "/.*", // Match all keyspaces just to be more realistic. + // Only stream the keyspace that we're resharding. Otherwise the client stream + // will continue to run with only the tablet stream from the global keyspace. + Keyspace: ks, }}} filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ - // Only stream the customer table and its sequence backing table. - Match: "/customer.*", + // Stream all tables. + Match: "/.*", }}, } flags := &vtgatepb.VStreamFlags{ @@ -805,15 +807,13 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { switch shard { case "-80", "80-": oldShardRowEvents++ - case "0": - // We expect some for the sequence backing table, but don't care. default: require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) } case binlogdatapb.VEventType_VGTID: newVGTID = ev.GetVgtid() - if len(newVGTID.GetShardGtids()) == 3 { - // We want a VGTID with a position for the global shard and the old shards. + // We want a VGTID with a ShardGtid for both of the old shards. + if len(newVGTID.GetShardGtids()) == 2 { canStop := true for _, sg := range newVGTID.GetShardGtids() { if sg.GetGtid() == "" { @@ -837,8 +837,8 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { } }() - // Confirm that we have shard GTIDs for the global shard and the old/original shards. - require.Len(t, newVGTID.GetShardGtids(), 3) + // Confirm that we have shard GTIDs for the old/original shards. + require.Len(t, newVGTID.GetShardGtids(), 2) t.Logf("Position at end of first stream: %+v", newVGTID.GetShardGtids()) // Switch the traffic to the new shards. @@ -846,8 +846,10 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { // Now start a new VStream from our previous VGTID which only has the old/original shards. expectedJournalEvents := 2 // One for each old shard: -80,80- + var streamStopped bool // We expect the stream to end with io.EOF from the reshard runResumeStream := func() { journalEvents = 0 + streamStopped = false t.Logf("Streaming from position: %+v", newVGTID.GetShardGtids()) reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) require.NoError(t, err) @@ -861,7 +863,7 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { case binlogdatapb.VEventType_ROW: shard := ev.RowEvent.Shard switch shard { - case "0", "-80", "80-": + case "-80", "80-": default: require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) } @@ -871,12 +873,10 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { require.Equal(t, binlogdatapb.VEventType_BEGIN, evs[i-1].Type, "JOURNAL event not preceded by BEGIN event") require.Equal(t, binlogdatapb.VEventType_VGTID, evs[i+1].Type, "JOURNAL event not followed by VGTID event") require.Equal(t, binlogdatapb.VEventType_COMMIT, evs[i+2].Type, "JOURNAL event not followed by COMMIT event") - if journalEvents == expectedJournalEvents { - return - } } } case io.EOF: + streamStopped = true return default: require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) @@ -897,6 +897,8 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { // using StopOnReshard. require.Equal(t, expectedJournalEvents, journalEvents, "did not get expected journal events on resume vstream #%d", i) + // Confirm that the stream stopped on the reshard. + require.True(t, streamStopped, "the vstream did not stop with io.EOF as expected") } } diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index f56e1f963f8..8913e98b1c4 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -693,7 +693,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // We're going to be ending the tablet stream, so we ensure a reasonable // minimum amount of time is alloted for clients to Recv the journal event // before the stream's context is cancelled (which would cause the grpc - // Send or Recv to fail). If the client doesn't (grpc) Recv the journal + // SendMsg or RecvMsg to fail). If the client doesn't Recv the journal // event before the stream ends then they'll have to resume from the last // ShardGtid they received before the journal event. endTimer := time.NewTimer(stopOnReshardDelay) From 5fdbb17cc7f6c540652238f866b092829fe0283f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 26 Aug 2024 08:38:52 -0400 Subject: [PATCH 12/12] Trim delay Signed-off-by: Matt Lord --- go/vt/vtgate/vstream_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 8913e98b1c4..935a437c869 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -63,7 +63,7 @@ const tabletPickerContextTimeout = 90 * time.Second // stopOnReshardDelay is how long we wait, at a minimum, after sending a reshard journal event before // ending the stream from the tablet. -const stopOnReshardDelay = 2 * time.Second +const stopOnReshardDelay = 500 * time.Millisecond // vstream contains the metadata for one VStream request. type vstream struct {