From b0ff11b16ddd594b7216dd8a3ae5ca8722678a1d Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 30 Mar 2023 15:55:31 +0530 Subject: [PATCH 1/2] fix: reset transaction session with better checks when no reserved connection Signed-off-by: Harshit Gangal --- go/test/endtoend/cluster/cluster_process.go | 6 +- .../reparent/prssettingspool/main_test.go | 148 ++++++++++++++++++ .../reparent/prssettingspool/schema.sql | 5 + go/test/endtoend/reparent/utils/utils.go | 24 +++ go/vt/vtgate/safe_session.go | 19 ++- 5 files changed, 195 insertions(+), 7 deletions(-) create mode 100644 go/test/endtoend/reparent/prssettingspool/main_test.go create mode 100644 go/test/endtoend/reparent/prssettingspool/schema.sql diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 8d922a3c179..3974aa963e1 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -798,7 +798,7 @@ func (cluster *LocalProcessCluster) ExecOnTablet(ctx context.Context, vttablet * return nil, err } - tablet, err := cluster.vtctlclientGetTablet(vttablet) + tablet, err := cluster.VtctlclientGetTablet(vttablet) if err != nil { return nil, err } @@ -841,7 +841,7 @@ func (cluster *LocalProcessCluster) ExecOnVTGate(ctx context.Context, addr strin // returns the responses. It returns an error if the stream ends with fewer than // `count` responses. func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vttablet *Vttablet, count int) (responses []*querypb.StreamHealthResponse, err error) { - tablet, err := cluster.vtctlclientGetTablet(vttablet) + tablet, err := cluster.VtctlclientGetTablet(vttablet) if err != nil { return nil, err } @@ -873,7 +873,7 @@ func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vtta return responses, nil } -func (cluster *LocalProcessCluster) vtctlclientGetTablet(tablet *Vttablet) (*topodatapb.Tablet, error) { +func (cluster *LocalProcessCluster) VtctlclientGetTablet(tablet *Vttablet) (*topodatapb.Tablet, error) { result, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", "--", tablet.Alias) if err != nil { return nil, err diff --git a/go/test/endtoend/reparent/prssettingspool/main_test.go b/go/test/endtoend/reparent/prssettingspool/main_test.go new file mode 100644 index 00000000000..a9f4312caea --- /dev/null +++ b/go/test/endtoend/reparent/prssettingspool/main_test.go @@ -0,0 +1,148 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package misc + +import ( + "context" + _ "embed" + "flag" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + rutils "vitess.io/vitess/go/test/endtoend/reparent/utils" + "vitess.io/vitess/go/test/endtoend/utils" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "ks" + cell = "test" + + //go:embed schema.sql + schemaSQL string +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, "localhost") + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: schemaSQL, + } + clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, + "--queryserver-enable-settings-pool") + err = clusterInstance.StartUnshardedKeyspace(*keyspace, 2, false) + if err != nil { + return 1 + } + + // Start vtgate + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, + "--planner-version", "gen4") + err = clusterInstance.StartVtgate() + if err != nil { + return 1 + } + + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} + +func TestSettingsPoolWithTXAndPRS(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + // set a system settings that will trigger reserved connection usage. + utils.Exec(t, conn, "set default_week_format = 5") + + // have transaction on the session + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "select id1, id2 from t1") + utils.Exec(t, conn, "commit") + + tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets + + // prs should happen without any error. + text, err := rutils.Prs(t, clusterInstance, tablets[1]) + require.NoError(t, err, text) + rutils.WaitForTabletToBeServing(t, clusterInstance, tablets[0], 1*time.Minute) + + defer func() { + // reset state + text, err = rutils.Prs(t, clusterInstance, tablets[0]) + require.NoError(t, err, text) + rutils.WaitForTabletToBeServing(t, clusterInstance, tablets[1], 1*time.Minute) + }() + + // no error should occur and it should go to the right tablet. + utils.Exec(t, conn, "select id1, id2 from t1") +} + +func TestSettingsPoolWithoutTXAndPRS(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + // set a system settings that will trigger reserved connection usage. + utils.Exec(t, conn, "set default_week_format = 5") + + // execute non-tx query + utils.Exec(t, conn, "select id1, id2 from t1") + + tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets + + // prs should happen without any error. + text, err := rutils.Prs(t, clusterInstance, tablets[1]) + require.NoError(t, err, text) + rutils.WaitForTabletToBeServing(t, clusterInstance, tablets[0], 1*time.Minute) + defer func() { + // reset state + text, err = rutils.Prs(t, clusterInstance, tablets[0]) + require.NoError(t, err, text) + rutils.WaitForTabletToBeServing(t, clusterInstance, tablets[1], 1*time.Minute) + }() + + // no error should occur and it should go to the right tablet. + utils.Exec(t, conn, "select id1, id2 from t1") + +} diff --git a/go/test/endtoend/reparent/prssettingspool/schema.sql b/go/test/endtoend/reparent/prssettingspool/schema.sql new file mode 100644 index 00000000000..3e78cab09d6 --- /dev/null +++ b/go/test/endtoend/reparent/prssettingspool/schema.sql @@ -0,0 +1,5 @@ +create table t1( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; \ No newline at end of file diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index d85b3ac52c0..3ea2dc04e38 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -31,6 +31,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/json2" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" @@ -705,3 +708,24 @@ func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.V require.Equal(t, "No", res.Rows[0][11].ToString()) } } + +func WaitForTabletToBeServing(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, timeout time.Duration) { + vTablet, err := clusterInstance.VtctlclientGetTablet(tablet) + require.NoError(t, err) + + tConn, err := tabletconn.GetDialer()(vTablet, false) + require.NoError(t, err) + + newCtx, cancel := context.WithTimeout(context.Background(), timeout) + err = tConn.StreamHealth(newCtx, func(shr *querypb.StreamHealthResponse) error { + if shr.Serving { + cancel() + } + return nil + }) + + // the error should only be because we cancelled the context when the tablet became serving again. + if err != nil && !strings.Contains(err.Error(), "context canceled") { + t.Fatal(err.Error()) + } +} diff --git a/go/vt/vtgate/safe_session.go b/go/vt/vtgate/safe_session.go index e710df27755..af2509b94e7 100644 --- a/go/vt/vtgate/safe_session.go +++ b/go/vt/vtgate/safe_session.go @@ -151,11 +151,22 @@ func (session *SafeSession) ResetTx() { session.Session.InTransaction = false session.commitOrder = vtgatepb.CommitOrder_NORMAL session.Savepoints = nil - if !session.Session.InReservedConn { - session.ShardSessions = nil - session.PreSessions = nil - session.PostSessions = nil + // If settings pools is enabled on the vttablet. + // This variable will be true but there will not be a shard session with reserved connection id. + // So, we should check the shard session and not just this variable. + if session.Session.InReservedConn { + allSessions := append(session.ShardSessions, append(session.PreSessions, session.PostSessions...)...) + for _, ss := range allSessions { + if ss.ReservedId != 0 { + // found that reserved connection exists. + // abort here, we should keep the shard sessions. + return + } + } } + session.ShardSessions = nil + session.PreSessions = nil + session.PostSessions = nil } // Reset clears the session From ae2f6bd4b95f7754dc48b6c6eae8fe998881c38a Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 30 Mar 2023 15:57:31 +0530 Subject: [PATCH 2/2] add to ci run Signed-off-by: Harshit Gangal --- test/config.json | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test/config.json b/test/config.json index b3d926ed016..f9e27153c99 100644 --- a/test/config.json +++ b/test/config.json @@ -1194,6 +1194,15 @@ "Shard": "vttablet_prscomplex", "RetryMax": 1, "Tags": [""] + }, + "prssettingspool": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/reparent/prssettingspool"], + "Command": [], + "Manual": false, + "Shard": "vttablet_prscomplex", + "RetryMax": 1, + "Tags": [""] } } }