Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VTGate VStream: Ensure reasonable delivery time for reshard journal event #16639

Merged
merged 14 commits into from
Aug 29, 2024
17 changes: 10 additions & 7 deletions examples/local/vstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
"log"
"time"

vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

/*
Expand Down Expand Up @@ -73,15 +73,18 @@ func main() {
}
defer conn.Close()
flags := &vtgatepb.VStreamFlags{
//MinimizeSkew: false,
//HeartbeatInterval: 60, //seconds
// MinimizeSkew: false,
// HeartbeatInterval: 60, //seconds
// StopOnReshard: true,
}
reader, err := conn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
if err != nil {
log.Fatal(err)
}
for {
e, err := reader.Recv()
switch err {
case nil:
_ = e
fmt.Printf("%v\n", e)
case io.EOF:
fmt.Printf("stream ended\n")
Expand Down
218 changes: 208 additions & 10 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"

"vitess.io/vitess/go/vt/vtgate/vtgateconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

// Validates that we have a working VStream API
Expand Down Expand Up @@ -603,8 +603,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
// Stream events but stop once we have a VGTID with positions for the old/original shards.
var newVGTID *binlogdatapb.VGtid
func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()
Expand Down Expand Up @@ -658,8 +657,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {

// Now start a new VStream from our previous VGTID which only has the old/original shards.
func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags)
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()
Expand Down Expand Up @@ -694,8 +692,8 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
}()

// We should have a mix of events across the old and new shards.
require.NotZero(t, oldShardRowEvents)
require.NotZero(t, newShardRowEvents)
require.Greater(t, oldShardRowEvents, 0)
require.Greater(t, newShardRowEvents, 0)

// The number of row events streamed by the VStream API should match the number of rows inserted.
customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer")
Expand All @@ -704,6 +702,206 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
require.Equal(t, customerCount, int64(oldShardRowEvents+newShardRowEvents))
}

// TestMultiVStreamsKeyspaceStopOnReshard confirms that journal events are received
// when resuming a VStream after a reshard.
func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
ctx := context.Background()
ks := "testks"
wf := "multiVStreamsKeyspaceReshard"
baseTabletID := 100
tabletType := topodatapb.TabletType_PRIMARY.String()
oldShards := "-80,80-"
newShards := "-40,40-80,80-c0,c0-"
oldShardRowEvents, journalEvents := 0, 0
vc = NewVitessCluster(t, nil)
defer vc.TearDown()
defaultCell := vc.Cells[vc.CellNames[0]]
ogdr := defaultReplicas
defaultReplicas = 0 // Because of CI resource constraints we can only run this test with primary tablets
defer func(dr int) { defaultReplicas = dr }(ogdr)

// For our sequences etc.
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil)
require.NoError(t, err)

// Setup the keyspace with our old/original shards.
keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil)
require.NoError(t, err)

// Add the new shards.
err = vc.AddShards(t, []*Cell{defaultCell}, keyspace, newShards, defaultReplicas, defaultRdonly, baseTabletID+2000, targetKsOpts)
require.NoError(t, err)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

// Ensure that we're starting with a clean slate.
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false)
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute)
defer streamCancel()
done := make(chan struct{})

// First goroutine that keeps inserting rows into the table being streamed until the
// stream context is cancelled.
go func() {
id := 1
for {
select {
case <-streamCtx.Done():
// Give the VStream a little catch-up time before telling it to stop
// via the done channel.
time.Sleep(10 * time.Second)
close(done)
return
default:
insertRow(ks, "customer", id)
time.Sleep(250 * time.Millisecond)
id++
}
}
}()

// Create the Reshard workflow and wait for it to finish the copy phase.
reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String())

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
// Only stream the keyspace that we're resharding. Otherwise the client stream
// will continue to run with only the tablet stream from the global keyspace.
Keyspace: ks,
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
// Stream all tables.
Match: "/.*",
}},
}
flags := &vtgatepb.VStreamFlags{
StopOnReshard: true,
}

// Stream events but stop once we have a VGTID with positions for the old/original shards.
var newVGTID *binlogdatapb.VGtid
func() {
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.GetRowEvent().GetShard()
switch shard {
case "-80", "80-":
oldShardRowEvents++
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
case binlogdatapb.VEventType_VGTID:
newVGTID = ev.GetVgtid()
// We want a VGTID with a ShardGtid for both of the old shards.
if len(newVGTID.GetShardGtids()) == 2 {
canStop := true
for _, sg := range newVGTID.GetShardGtids() {
if sg.GetGtid() == "" {
canStop = false
}
}
if canStop {
return
}
}
}
}
default:
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
}
select {
case <-streamCtx.Done():
return
default:
}
}
}()

// Confirm that we have shard GTIDs for the old/original shards.
require.Len(t, newVGTID.GetShardGtids(), 2)
t.Logf("Position at end of first stream: %+v", newVGTID.GetShardGtids())

// Switch the traffic to the new shards.
reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType)

// Now start a new VStream from our previous VGTID which only has the old/original shards.
expectedJournalEvents := 2 // One for each old shard: -80,80-
var streamStopped bool // We expect the stream to end with io.EOF from the reshard
runResumeStream := func() {
journalEvents = 0
streamStopped = false
t.Logf("Streaming from position: %+v", newVGTID.GetShardGtids())
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for i, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.RowEvent.Shard
switch shard {
case "-80", "80-":
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
case binlogdatapb.VEventType_JOURNAL:
t.Logf("Journal event: %+v", ev)
journalEvents++
require.Equal(t, binlogdatapb.VEventType_BEGIN, evs[i-1].Type, "JOURNAL event not preceded by BEGIN event")
require.Equal(t, binlogdatapb.VEventType_VGTID, evs[i+1].Type, "JOURNAL event not followed by VGTID event")
require.Equal(t, binlogdatapb.VEventType_COMMIT, evs[i+2].Type, "JOURNAL event not followed by COMMIT event")
}
}
case io.EOF:
streamStopped = true
return
default:
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
}
select {
case <-done:
return
default:
}
}
}

// Multiple VStream clients should be able to resume from where they left off and
// get the reshard journal event.
for i := 1; i <= expectedJournalEvents; i++ {
runResumeStream()
// We should have seen the journal event for each shard in the stream due to
// using StopOnReshard.
require.Equal(t, expectedJournalEvents, journalEvents,
"did not get expected journal events on resume vstream #%d", i)
// Confirm that the stream stopped on the reshard.
require.True(t, streamStopped, "the vstream did not stop with io.EOF as expected")
}
}

func TestVStreamFailover(t *testing.T) {
testVStreamWithFailover(t, true)
}
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,6 @@ func (ts *trafficSwitcher) createJournals(ctx context.Context, sourceWorkflows [
})

}
ts.Logger().Infof("Creating journal %v", journal)
ts.Logger().Infof("Creating journal: %v", journal)
statement := fmt.Sprintf("insert into _vt.resharding_journal "+
"(id, db_name, val) "+
Expand Down
35 changes: 31 additions & 4 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ const maxSkewTimeoutSeconds = 10 * 60
// for a vstream
const tabletPickerContextTimeout = 90 * time.Second

// stopOnReshardDelay is how long we wait, at a minimum, after sending a reshard journal event before
// ending the stream from the tablet.
const stopOnReshardDelay = 500 * time.Millisecond

// vstream contains the metadata for one VStream request.
type vstream struct {
// mu protects parts of vgtid, the semantics of a send, and journaler.
Expand Down Expand Up @@ -608,7 +612,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}

sendevents := make([]*binlogdatapb.VEvent, 0, len(events))
for _, event := range events {
for i, event := range events {
switch event.Type {
case binlogdatapb.VEventType_FIELD:
// Update table names and send.
Expand Down Expand Up @@ -658,12 +662,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
}

case binlogdatapb.VEventType_JOURNAL:
journal := event.Journal
// Journal events are not sent to clients by default, but only when StopOnReshard is set
// Journal events are not sent to clients by default, but only when
// StopOnReshard is set.
if vs.stopOnReshard && journal.MigrationType == binlogdatapb.MigrationType_SHARDS {
sendevents = append(sendevents, event)
// Read any subsequent events until we get the VGTID->COMMIT events that
// always follow the JOURNAL event which is generated as a result of
// an autocommit insert into the _vt.resharding_journal table on the
// tablet.
for j := i + 1; j < len(events); j++ {
sendevents = append(sendevents, events[j])
if events[j].Type == binlogdatapb.VEventType_COMMIT {
break
}
}
eventss = append(eventss, sendevents)
if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
Expand All @@ -676,12 +690,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
return err
}
if je != nil {
// Wait till all other participants converge and return EOF.
// We're going to be ending the tablet stream, so we ensure a reasonable
// minimum amount of time is alloted for clients to Recv the journal event
// before the stream's context is cancelled (which would cause the grpc
// SendMsg or RecvMsg to fail). If the client doesn't Recv the journal
// event before the stream ends then they'll have to resume from the last
// ShardGtid they received before the journal event.
endTimer := time.NewTimer(stopOnReshardDelay)
defer endTimer.Stop()
// Wait until all other participants converge and then return EOF after
// the minimum delay has passed.
journalDone = je.done
select {
case <-ctx.Done():
return ctx.Err()
case <-journalDone:
<-endTimer.C
return io.EOF
}
}
Expand Down Expand Up @@ -954,6 +978,9 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string
return false, err
}

vs.mu.Lock()
defer vs.mu.Unlock()

// First check the typical case, where the VGTID shards match the serving shards.
// In that case it's NOT possible that an applicable reshard has happened because
// the VGTID contains shards that are all serving.
Expand Down
Loading