From 698da1685a57c5b92088bdf538d6cbe75d3c1bd0 Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Tue, 3 Oct 2023 18:46:15 -0700 Subject: [PATCH] try prs vstream patch again Signed-off-by: Priya Bibra --- .github/workflows/static_checks_etc.yml | 6 - .../vreplication/vschema_load_test.go | 156 ++++++++++++++++++ .../vttablet/tabletserver/vstreamer/engine.go | 44 +++-- .../tabletserver/vstreamer/vstreamer.go | 40 +++-- test/config.json | 9 + 5 files changed, 214 insertions(+), 41 deletions(-) create mode 100644 go/test/endtoend/vreplication/vschema_load_test.go diff --git a/.github/workflows/static_checks_etc.yml b/.github/workflows/static_checks_etc.yml index a4b0f04a0c6..d9a90a36d8f 100644 --- a/.github/workflows/static_checks_etc.yml +++ b/.github/workflows/static_checks_etc.yml @@ -24,12 +24,6 @@ jobs: if: steps.skip-workflow.outputs.skip-workflow == 'false' uses: actions/checkout@v3 - - name: Run FOSSA scan and upload build data - if: steps.skip-workflow.outputs.skip-workflow == 'false' - uses: fossa-contrib/fossa-action@v1 - with: - fossa-api-key: 76d7483ea206d530d9452e44bffe7ba8 - - name: Check for changes in Go files if: steps.skip-workflow.outputs.skip-workflow == 'false' uses: frouioui/paths-filter@main diff --git a/go/test/endtoend/vreplication/vschema_load_test.go b/go/test/endtoend/vreplication/vschema_load_test.go new file mode 100644 index 00000000000..731679e1eba --- /dev/null +++ b/go/test/endtoend/vreplication/vschema_load_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "context" + "fmt" + "net" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/log" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + "vitess.io/vitess/go/vt/vtgate/vtgateconn" +) + +// TestVSchemaChangesUnderLoad tests vstreamer under a load of high binlog events and simultaneous multiple vschema changes +// see https://github.com/vitessio/vitess/issues/11169 +func TestVSchemaChangesUnderLoad(t *testing.T) { + + extendedTimeout := defaultTimeout * 4 + + defaultCellName := "zone1" + allCells := []string{"zone1"} + allCellNames = "zone1" + vc = NewVitessCluster(t, "TestVSchemaChanges", allCells, mainClusterConfig) + + require.NotNil(t, vc) + + defer vc.TearDown(t) + + defaultCell = vc.Cells[defaultCellName] + vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, 1, 0, 100, sourceKsOpts) + vtgate = defaultCell.Vtgates[0] + require.NotNil(t, vtgate) + vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1) + vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "product", "0"), 1) + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + + // ch is used to signal that there is significant data inserted into the tables and when a lot of vschema changes have been applied + ch := make(chan bool, 1) + + ctx := context.Background() + initialDataInserted := false + startCid := 100 + warmupRowCount := startCid + 2000 + insertData := func() { + timer := time.NewTimer(extendedTimeout) + defer timer.Stop() + log.Infof("Inserting data into customer") + cid := startCid + for { + if !initialDataInserted && cid > warmupRowCount { + log.Infof("Done inserting initial data into customer") + initialDataInserted = true + ch <- true + } + query := fmt.Sprintf("insert into customer(cid, name) values (%d, 'a')", cid) + _, _ = vtgateConn.ExecuteFetch(query, 1, false) + cid++ + query = "update customer set name = concat(name, 'a')" + _, _ = vtgateConn.ExecuteFetch(query, 10000, false) + select { + case <-timer.C: + log.Infof("Done inserting data into customer") + return + default: + } + } + } + go func() { + log.Infof("Starting to vstream from replica") + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "product", + Shard: "0", + Gtid: "", + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "customer", + Filter: "select * from customer", + }}, + } + conn, err := vtgateconn.Dial(ctx, net.JoinHostPort("localhost", strconv.Itoa(vc.ClusterConfig.vtgateGrpcPort))) + require.NoError(t, err) + defer conn.Close() + + flags := &vtgatepb.VStreamFlags{} + + ctx2, cancel := context.WithTimeout(ctx, extendedTimeout/2) + defer cancel() + reader, err := conn.VStream(ctx2, topodatapb.TabletType_REPLICA, vgtid, filter, flags) + require.NoError(t, err) + _, err = reader.Recv() + require.NoError(t, err) + log.Infof("About to sleep in vstreaming to block the vstream Recv() channel") + time.Sleep(extendedTimeout) + log.Infof("Done vstreaming") + }() + + go insertData() + <-ch // wait for enough data to be inserted before ApplyVSchema + const maxApplyVSchemas = 20 + go func() { + numApplyVSchema := 0 + timer := time.NewTimer(extendedTimeout) + defer timer.Stop() + log.Infof("Started ApplyVSchema") + for { + if err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "--", "--vschema={}", "product"); err != nil { + log.Errorf("ApplyVSchema command failed with %+v\n", err) + return + } + numApplyVSchema++ + if numApplyVSchema > maxApplyVSchemas { + ch <- true + } + select { + case <-timer.C: + log.Infof("Done ApplyVSchema") + ch <- true + return + default: + time.Sleep(defaultTick) + } + } + }() + + <-ch // wait for enough ApplyVSchema calls before doing a PRS + if err := vc.VtctlClient.ExecuteCommand("PlannedReparentShard", "--", "--keyspace_shard", "product/0", + "--new_primary", "zone1-101", "--wait_replicas_timeout", defaultTimeout.String()); err != nil { + require.NoError(t, err, "PlannedReparentShard command failed") + } +} diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index d253cc997a0..c3888f5e2d0 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -25,6 +25,7 @@ import ( "net/http" "strings" "sync" + "sync/atomic" "time" "vitess.io/vitess/go/vt/dbconfigs" @@ -45,6 +46,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const ( @@ -67,7 +69,7 @@ type Engine struct { wg sync.WaitGroup mu sync.Mutex - isOpen bool + isOpen int32 // 0 or 1 in place of atomic.Bool added in go 1.19 streamIdx int streamers map[int]*uvstreamer rowStreamers map[int]*rowStreamer @@ -150,30 +152,24 @@ func (vse *Engine) InitDBConfig(keyspace, shard string) { // Open starts the Engine service. func (vse *Engine) Open() { - vse.mu.Lock() - defer vse.mu.Unlock() - if vse.isOpen { - return - } log.Info("VStreamer: opening") - vse.isOpen = true + // If it's not already open, then open it now. + atomic.CompareAndSwapInt32(&vse.isOpen, 0, 1) } // IsOpen checks if the engine is opened func (vse *Engine) IsOpen() bool { - vse.mu.Lock() - defer vse.mu.Unlock() - return vse.isOpen + return atomic.LoadInt32(&vse.isOpen) == 1 } // Close closes the Engine service. func (vse *Engine) Close() { func() { - vse.mu.Lock() - defer vse.mu.Unlock() - if !vse.isOpen { + if atomic.LoadInt32(&vse.isOpen) == 0 { return } + vse.mu.Lock() + defer vse.mu.Unlock() // cancels are non-blocking. for _, s := range vse.streamers { s.Cancel() @@ -184,7 +180,7 @@ func (vse *Engine) Close() { for _, s := range vse.resultStreamers { s.Cancel() } - vse.isOpen = false + atomic.StoreInt32(&vse.isOpen, 0) }() // Wait only after releasing the lock because the end of every @@ -209,11 +205,11 @@ func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binl // Create stream and add it to the map. streamer, idx, err := func() (*uvstreamer, int, error) { - vse.mu.Lock() - defer vse.mu.Unlock() - if !vse.isOpen { + if atomic.LoadInt32(&vse.isOpen) == 0 { return nil, 0, errors.New("VStreamer is not open") } + vse.mu.Lock() + defer vse.mu.Unlock() streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, send) idx := vse.streamIdx vse.streamers[idx] = streamer @@ -250,11 +246,11 @@ func (vse *Engine) StreamRows(ctx context.Context, query string, lastpk []sqltyp // Create stream and add it to the map. rowStreamer, idx, err := func() (*rowStreamer, int, error) { - vse.mu.Lock() - defer vse.mu.Unlock() - if !vse.isOpen { + if atomic.LoadInt32(&vse.isOpen) == 0 { return nil, 0, errors.New("VStreamer is not open") } + vse.mu.Lock() + defer vse.mu.Unlock() rowStreamer := newRowStreamer(ctx, vse.env.Config().DB.FilteredWithDB(), vse.se, query, lastpk, vse.lvschema, send, vse) idx := vse.streamIdx @@ -285,11 +281,11 @@ func (vse *Engine) StreamRows(ctx context.Context, query string, lastpk []sqltyp func (vse *Engine) StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error { // Create stream and add it to the map. resultStreamer, idx, err := func() (*resultStreamer, int, error) { - vse.mu.Lock() - defer vse.mu.Unlock() - if !vse.isOpen { + if atomic.LoadInt32(&vse.isOpen) == 0 { return nil, 0, errors.New("VStreamer is not open") } + vse.mu.Lock() + defer vse.mu.Unlock() resultStreamer := newResultStreamer(ctx, vse.env.Config().DB.FilteredWithDB(), query, send, vse) idx := vse.streamIdx vse.resultStreamers[idx] = resultStreamer @@ -443,7 +439,7 @@ func (vse *Engine) waitForMySQL(ctx context.Context, db dbconfigs.Connector, tab } select { case <-ctx.Done(): - return ctx.Err() + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") case <-time.After(backoff): // Exponential backoff with 1.5 as a factor if backoff != backoffLimit { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index d305a9f2cde..c50fc21b375 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -28,6 +28,7 @@ import ( "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/timer" vtschema "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" @@ -40,6 +41,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const ( @@ -139,6 +141,12 @@ func (vs *vstreamer) SetVSchema(vschema *localVSchema) { select { case vs.vevents <- vschema: case <-vs.ctx.Done(): + default: // if there is a pending vschema in the channel, drain it and update it with the latest one + select { + case <-vs.vevents: + vs.vevents <- vschema + default: + } } } @@ -278,13 +286,18 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog injectHeartbeat := func(throttled bool) error { now := time.Now().UnixNano() - err := bufferAndTransmit(&binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_HEARTBEAT, - Timestamp: now / 1e9, - CurrentTime: now, - Throttled: throttled, - }) - return err + select { + case <-ctx.Done(): + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired") + default: + err := bufferAndTransmit(&binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_HEARTBEAT, + Timestamp: now / 1e9, + CurrentTime: now, + Throttled: throttled, + }) + return err + } } throttleEvents := func(throttledEvents chan mysql.BinlogEvent) { @@ -361,11 +374,16 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } } case vs.vschema = <-vs.vevents: - if err := vs.rebuildPlans(); err != nil { - return err + select { + case <-ctx.Done(): + return nil + default: + if err := vs.rebuildPlans(); err != nil { + return err + } + // Increment this counter for testing. + vschemaUpdateCount.Add(1) } - // Increment this counter for testing. - vschemaUpdateCount.Add(1) case <-ctx.Done(): return nil case <-hbTimer.C: diff --git a/test/config.json b/test/config.json index 2d5ee723dd3..a2c72032593 100644 --- a/test/config.json +++ b/test/config.json @@ -1074,6 +1074,15 @@ "RetryMax": 2, "Tags": [] }, + "vreplication_vschema_load": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVSchemaChangesUnderLoad"], + "Command": [], + "Manual": false, + "Shard": "vreplication_cellalias", + "RetryMax": 2, + "Tags": [] + }, "vreplication_basic": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestBasicVreplicationWorkflow"],