Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VStreamer: fix deadlock when there are a lot of vschema changes at the same time as binlog events #11325

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions go/test/endtoend/vreplication/vschema_load_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
Copyright 2022 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vreplication

import (
"context"
"fmt"
"net"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

// TestVSchemaChangesUnderLoad tests vstreamer under a load of high binlog events and simultaneous multiple vschema changes
// see https://github.com/vitessio/vitess/issues/11169
func TestVSchemaChangesUnderLoad(t *testing.T) {

extendedTimeout := defaultTimeout * 4
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved

defaultCellName := "zone1"
allCells := []string{"zone1"}
allCellNames = "zone1"
vc = NewVitessCluster(t, "TestVSchemaChanges", allCells, mainClusterConfig)

require.NotNil(t, vc)

defer vc.TearDown(t)

defaultCell = vc.Cells[defaultCellName]
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, 1, 0, 100, sourceKsOpts)
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "product", "0"), 1)
vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()

// ch is used to signal that there is significant data inserted into the tables and when a lot of vschema changes have been applied
ch := make(chan bool, 1)

ctx := context.Background()
initialDataInserted := false
startCid := 100
warmupRowCount := startCid + 2000
insertData := func() {
timer := time.NewTimer(extendedTimeout)
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
defer timer.Stop()
log.Infof("Inserting data into customer")
cid := startCid
for {
if !initialDataInserted && cid > warmupRowCount {
log.Infof("Done inserting initial data into customer")
initialDataInserted = true
ch <- true
}
query := fmt.Sprintf("insert into customer(cid, name) values (%d, 'a')", cid)
_, _ = vtgateConn.ExecuteFetch(query, 1, false)
cid++
query = "update customer set name = concat(name, 'a')"
_, _ = vtgateConn.ExecuteFetch(query, 10000, false)
select {
case <-timer.C:
log.Infof("Done inserting data into customer")
return
default:
}
}
}
go func() {
log.Infof("Starting to vstream from replica")
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "product",
Shard: "0",
Gtid: "",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "customer",
Filter: "select * from customer",
}},
}
conn, err := vtgateconn.Dial(ctx, net.JoinHostPort("localhost", strconv.Itoa(vc.ClusterConfig.vtgateGrpcPort)))
require.NoError(t, err)
defer conn.Close()

flags := &vtgatepb.VStreamFlags{}

ctx2, cancel := context.WithTimeout(ctx, extendedTimeout/2)
defer cancel()
reader, err := conn.VStream(ctx2, topodatapb.TabletType_REPLICA, vgtid, filter, flags)
require.NoError(t, err)
_, err = reader.Recv()
require.NoError(t, err)
log.Infof("About to sleep in vstreaming to block the vstream Recv() channel")
time.Sleep(extendedTimeout)
log.Infof("Done vstreaming")
}()

go insertData()
<-ch // wait for enough data to be inserted before ApplyVSchema
const maxApplyVSchemas = 20
go func() {
numApplyVSchema := 0
timer := time.NewTimer(extendedTimeout)
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
defer timer.Stop()
log.Infof("Started ApplyVSchema")
for {
if err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "--", "--vschema={}", "product"); err != nil {
log.Errorf("ApplyVSchema command failed with %+v\n", err)
return
}
numApplyVSchema++
if numApplyVSchema > maxApplyVSchemas {
ch <- true
}
select {
case <-timer.C:
log.Infof("Done ApplyVSchema")
ch <- true
return
default:
time.Sleep(defaultTick)
}
}
}()

<-ch // wait for enough ApplyVSchema calls before doing a PRS
if err := vc.VtctlClient.ExecuteCommand("PlannedReparentShard", "--", "--keyspace_shard", "product/0",
"--new_primary", "zone1-101", "--wait_replicas_timeout", defaultTimeout.String()); err != nil {
require.NoError(t, err, "PlannedReparentShard command failed")
}
}
44 changes: 20 additions & 24 deletions go/vt/vttablet/tabletserver/vstreamer/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/dbconfigs"
Expand All @@ -45,6 +46,7 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand All @@ -67,7 +69,7 @@ type Engine struct {
wg sync.WaitGroup

mu sync.Mutex
isOpen bool
isOpen int32 // 0 or 1 in place of atomic.Bool added in go 1.19
streamIdx int
streamers map[int]*uvstreamer
rowStreamers map[int]*rowStreamer
Expand Down Expand Up @@ -148,30 +150,24 @@ func (vse *Engine) InitDBConfig(keyspace, shard string) {

// Open starts the Engine service.
func (vse *Engine) Open() {
vse.mu.Lock()
defer vse.mu.Unlock()
if vse.isOpen {
return
}
log.Info("VStreamer: opening")
vse.isOpen = true
// If it's not already open, then open it now.
atomic.CompareAndSwapInt32(&vse.isOpen, 0, 1)
}

// IsOpen checks if the engine is opened
func (vse *Engine) IsOpen() bool {
vse.mu.Lock()
defer vse.mu.Unlock()
return vse.isOpen
return atomic.LoadInt32(&vse.isOpen) == 1
}

// Close closes the Engine service.
func (vse *Engine) Close() {
func() {
vse.mu.Lock()
defer vse.mu.Unlock()
if !vse.isOpen {
if atomic.LoadInt32(&vse.isOpen) == 0 {
return
}
vse.mu.Lock()
defer vse.mu.Unlock()
// cancels are non-blocking.
for _, s := range vse.streamers {
s.Cancel()
Expand All @@ -182,7 +178,7 @@ func (vse *Engine) Close() {
for _, s := range vse.resultStreamers {
s.Cancel()
}
vse.isOpen = false
atomic.StoreInt32(&vse.isOpen, 0)
}()

// Wait only after releasing the lock because the end of every
Expand All @@ -207,11 +203,11 @@ func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binl

// Create stream and add it to the map.
streamer, idx, err := func() (*uvstreamer, int, error) {
vse.mu.Lock()
defer vse.mu.Unlock()
if !vse.isOpen {
if atomic.LoadInt32(&vse.isOpen) == 0 {
return nil, 0, errors.New("VStreamer is not open")
}
vse.mu.Lock()
defer vse.mu.Unlock()
streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, send)
idx := vse.streamIdx
vse.streamers[idx] = streamer
Expand Down Expand Up @@ -248,11 +244,11 @@ func (vse *Engine) StreamRows(ctx context.Context, query string, lastpk []sqltyp

// Create stream and add it to the map.
rowStreamer, idx, err := func() (*rowStreamer, int, error) {
vse.mu.Lock()
defer vse.mu.Unlock()
if !vse.isOpen {
if atomic.LoadInt32(&vse.isOpen) == 0 {
return nil, 0, errors.New("VStreamer is not open")
}
vse.mu.Lock()
defer vse.mu.Unlock()

rowStreamer := newRowStreamer(ctx, vse.env.Config().DB.FilteredWithDB(), vse.se, query, lastpk, vse.lvschema, send, vse)
idx := vse.streamIdx
Expand Down Expand Up @@ -283,11 +279,11 @@ func (vse *Engine) StreamRows(ctx context.Context, query string, lastpk []sqltyp
func (vse *Engine) StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error {
// Create stream and add it to the map.
resultStreamer, idx, err := func() (*resultStreamer, int, error) {
vse.mu.Lock()
defer vse.mu.Unlock()
if !vse.isOpen {
if atomic.LoadInt32(&vse.isOpen) == 0 {
return nil, 0, errors.New("VStreamer is not open")
}
vse.mu.Lock()
defer vse.mu.Unlock()
resultStreamer := newResultStreamer(ctx, vse.env.Config().DB.FilteredWithDB(), query, send, vse)
idx := vse.streamIdx
vse.resultStreamers[idx] = resultStreamer
Expand Down Expand Up @@ -441,7 +437,7 @@ func (vse *Engine) waitForMySQL(ctx context.Context, db dbconfigs.Connector, tab
}
select {
case <-ctx.Done():
return ctx.Err()
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-time.After(backoff):
// Exponential backoff with 1.5 as a factor
if backoff != backoffLimit {
Expand Down
40 changes: 29 additions & 11 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/timer"
vtschema "vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
Expand All @@ -40,6 +41,7 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand Down Expand Up @@ -143,6 +145,12 @@ func (vs *vstreamer) SetVSchema(vschema *localVSchema) {
select {
case vs.vevents <- vschema:
case <-vs.ctx.Done():
default: // if there is a pending vschema in the channel, drain it and update it with the latest one
select {
case <-vs.vevents:
vs.vevents <- vschema
default:
}
}
}

Expand Down Expand Up @@ -278,13 +286,18 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog

injectHeartbeat := func(throttled bool) error {
now := time.Now().UnixNano()
err := bufferAndTransmit(&binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_HEARTBEAT,
Timestamp: now / 1e9,
CurrentTime: now,
Throttled: throttled,
})
return err
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
default:
err := bufferAndTransmit(&binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_HEARTBEAT,
Timestamp: now / 1e9,
CurrentTime: now,
Throttled: throttled,
})
return err
}
}

throttleEvents := func(throttledEvents chan mysql.BinlogEvent) {
Expand Down Expand Up @@ -361,11 +374,16 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
}
case vs.vschema = <-vs.vevents:
if err := vs.rebuildPlans(); err != nil {
return err
select {
case <-ctx.Done():
return nil
default:
if err := vs.rebuildPlans(); err != nil {
return err
}
// Increment this counter for testing.
vschemaUpdateCount.Add(1)
}
// Increment this counter for testing.
vschemaUpdateCount.Add(1)
case <-ctx.Done():
return nil
case <-hbTimer.C:
Expand Down
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,15 @@
"RetryMax": 2,
"Tags": []
},
"vreplication_vschema_load": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVSchemaChangesUnderLoad"],
"Command": [],
"Manual": false,
"Shard": "vreplication_cellalias",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably rename the shard, e.g. vreplication_cellalias_vschema or something.

"RetryMax": 2,
"Tags": []
},
"vreplication_basic": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestBasicVreplicationWorkflow"],
Expand Down