Skip to content

Commit

Permalink
Merge branch 'slack-vitess-r14.0.5' into slack-vitess-r14.0.5-go1.19.…
Browse files Browse the repository at this point in the history
…10-upgrade
  • Loading branch information
maksimov authored Sep 16, 2023
2 parents 51a17e0 + c057af8 commit 4f33744
Show file tree
Hide file tree
Showing 16 changed files with 665 additions and 93 deletions.
168 changes: 165 additions & 3 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) {

const schemaUnsharded = `
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
insert into customer_seq(id, next_id, cache) values(0, 1, 3);
`
const vschemaUnsharded = `
{
Expand Down Expand Up @@ -218,14 +219,19 @@ const vschemaSharded = `
func insertRow(keyspace, table string, id int) {
vtgateConn.ExecuteFetch(fmt.Sprintf("use %s;", keyspace), 1000, false)
vtgateConn.ExecuteFetch("begin", 1000, false)
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false)
if err != nil {
log.Infof("error inserting row %d: %v", id, err)
}
vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (cid, name) values (%d, '%s%d')", table, id+100, table, id), 1000, false)
vtgateConn.ExecuteFetch("commit", 1000, false)
}

type numEvents struct {
numRowEvents, numJournalEvents int64
numLessThan80Events, numGreaterThan80Events int64
numLessThan40Events, numGreaterThan40Events int64
numRowEvents, numJournalEvents int64
numLessThan80Events, numGreaterThan80Events int64
numLessThan40Events, numGreaterThan40Events int64
numShard0BeforeReshardEvents, numShard0AfterReshardEvents int64
}

// tests the StopOnReshard flag
Expand Down Expand Up @@ -375,6 +381,150 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID
return &ne
}

// Validate that we can continue streaming from multiple keyspaces after first copying some tables and then resharding one of the keyspaces
// Ensure that there are no missing row events during the resharding process.
func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEvents {
defaultCellName := "zone1"
allCellNames = defaultCellName
allCells := []string{allCellNames}
vc = NewVitessCluster(t, "VStreamCopyMultiKeyspaceReshard", allCells, mainClusterConfig)

require.NotNil(t, vc)
ogdr := defaultReplicas
defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets
defer func(dr int) { defaultReplicas = dr }(ogdr)

defer vc.TearDown(t)

defaultCell = vc.Cells[defaultCellName]
vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil)
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "unsharded", "0"), 1)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil)

ctx := context.Background()
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
if err != nil {
log.Fatal(err)
}
defer vstreamConn.Close()
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "/.*",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
// We want to confirm that the following two tables are streamed.
// 1. the customer_seq in the unsharded keyspace
// 2. the customer table in the sharded keyspace
Match: "/customer.*/",
}},
}
flags := &vtgatepb.VStreamFlags{}
done := false

id := 1000
// First goroutine that keeps inserting rows into the table being streamed until a minute after reshard
// We should keep getting events on the new shards
go func() {
for {
if done {
return
}
id++
time.Sleep(1 * time.Second)
insertRow("sharded", "customer", id)
}
}()
// stream events from the VStream API
var ne numEvents
reshardDone := false
go func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.RowEvent.Shard
switch shard {
case "0":
if reshardDone {
ne.numShard0AfterReshardEvents++
} else {
ne.numShard0BeforeReshardEvents++
}
case "-80":
ne.numLessThan80Events++
case "80-":
ne.numGreaterThan80Events++
case "-40":
ne.numLessThan40Events++
case "40-":
ne.numGreaterThan40Events++
}
ne.numRowEvents++
case binlogdatapb.VEventType_JOURNAL:
ne.numJournalEvents++
}
}
case io.EOF:
log.Infof("Stream Ended")
done = true
default:
log.Errorf("Returned err %v", err)
done = true
}
if done {
return
}
}
}()

ticker := time.NewTicker(1 * time.Second)
tickCount := 0
for {
<-ticker.C
tickCount++
switch tickCount {
case 1:
reshard(t, "sharded", "customer", "vstreamCopyMultiKeyspaceReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, defaultCellName)
reshardDone = true
case 60:
done = true
}
if done {
break
}
}
log.Infof("ne=%v", ne)

// The number of row events streamed by the VStream API should match the number of rows inserted.
// This is important for sharded tables, where we need to ensure that no row events are missed during the resharding process.
//
// On the other hand, we don't verify the exact number of row events for the unsharded keyspace
// because the keyspace remains unsharded and the number of rows in the customer_seq table is always 1.
// We believe that checking the number of row events for the unsharded keyspace, which should always be greater than 0 before and after resharding,
// is sufficient to confirm that the resharding of one keyspace does not affect another keyspace, while keeping the test straightforward.
customerResult := execVtgateQuery(t, vtgateConn, "sharded", "select count(*) from customer")
insertedCustomerRows, err := evalengine.ToInt64(customerResult.Rows[0][0])
require.NoError(t, err)
require.Equal(t, insertedCustomerRows, ne.numLessThan80Events+ne.numGreaterThan80Events+ne.numLessThan40Events+ne.numGreaterThan40Events)
return ne
}

func TestVStreamFailover(t *testing.T) {
testVStreamWithFailover(t, true)
}
Expand Down Expand Up @@ -406,3 +556,15 @@ func TestVStreamWithKeyspacesToWatch(t *testing.T) {

testVStreamWithFailover(t, false)
}

func TestVStreamCopyMultiKeyspaceReshard(t *testing.T) {
ne := testVStreamCopyMultiKeyspaceReshard(t, 3000)
require.Equal(t, int64(0), ne.numJournalEvents)
require.NotZero(t, ne.numRowEvents)
require.NotZero(t, ne.numShard0BeforeReshardEvents)
require.NotZero(t, ne.numShard0AfterReshardEvents)
require.NotZero(t, ne.numLessThan80Events)
require.NotZero(t, ne.numGreaterThan80Events)
require.NotZero(t, ne.numLessThan40Events)
require.NotZero(t, ne.numGreaterThan40Events)
}
63 changes: 35 additions & 28 deletions go/vt/proto/binlogdata/binlogdata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4f33744

Please sign in to comment.