From def32df3682ab79d7b8e39c8c3b6389cccc77544 Mon Sep 17 00:00:00 2001 From: Arindam Nayak Date: Fri, 10 Apr 2020 15:00:47 +0530 Subject: [PATCH] Fix the initial sharding flaky test We were creating multiple connection in loop to verify more than 2000 rows created by sharding test. This was creating 'Too many connection' error. Now, the connection is created first and then passed around to verify the rows Signed-off-by: Arindam Nayak --- go/test/endtoend/sharding/base_sharding.go | 117 +++++++----------- .../sharding/initialsharding/sharding_util.go | 12 +- 2 files changed, 50 insertions(+), 79 deletions(-) diff --git a/go/test/endtoend/sharding/base_sharding.go b/go/test/endtoend/sharding/base_sharding.go index 3fefae07e96..2171bb984bd 100644 --- a/go/test/endtoend/sharding/base_sharding.go +++ b/go/test/endtoend/sharding/base_sharding.go @@ -18,16 +18,21 @@ limitations under the License. package sharding import ( + "context" "fmt" - "io/ioutil" "math" "net/http" + "path" "reflect" "strconv" "strings" "testing" "time" + "vitess.io/vitess/go/sqltypes" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/json2" @@ -46,6 +51,10 @@ var ( InsertTabletTemplateKsID = `insert into %s (id, msg) values (%d, '%s') /* id:%d */` ) +const ( + MaxRowsToFetch = 10000 +) + // CheckSrvKeyspace verifies the schema with expectedPartition func CheckSrvKeyspace(t *testing.T, cell string, ksname string, shardingCol string, colType topodata.KeyspaceIdType, expectedPartition map[topodata.TabletType][]string, ci cluster.LocalProcessCluster) { srvKeyspace := GetSrvKeyspace(t, cell, ksname, ci) @@ -87,68 +96,23 @@ func VerifyTabletHealth(t *testing.T, vttablet cluster.Vttablet, hostname string assert.Equal(t, resp.StatusCode, 200) } -// VerifyReconciliationCounters checks that the reconciliation Counters have the expected values. -func VerifyReconciliationCounters(t *testing.T, vtworkerURL string, availabilityType string, table string, - inserts int, updates int, deletes int, equals int) { - resp, err := http.Get(vtworkerURL) - require.Nil(t, err) - assert.Equal(t, resp.StatusCode, 200) - - resultMap := make(map[string]interface{}) - respByte, _ := ioutil.ReadAll(resp.Body) - err = json2.Unmarshal(respByte, &resultMap) - require.Nil(t, err) - - value := getValueFromJSON(resultMap, "Worker"+availabilityType+"InsertsCounters", table) - if inserts == 0 { - assert.Equal(t, value, "") - } else { - assert.Equal(t, value, fmt.Sprintf("%d", inserts)) - } - - value = getValueFromJSON(resultMap, "Worker"+availabilityType+"UpdatesCounters", table) - if updates == 0 { - assert.Equal(t, value, "") - } else { - assert.Equal(t, value, fmt.Sprintf("%d", updates)) - } - - value = getValueFromJSON(resultMap, "Worker"+availabilityType+"DeletesCounters", table) - if deletes == 0 { - assert.Equal(t, value, "") - } else { - assert.Equal(t, value, fmt.Sprintf("%d", deletes)) - } - - value = getValueFromJSON(resultMap, "Worker"+availabilityType+"EqualRowsCounters", table) - if equals == 0 { - assert.Equal(t, value, "") - } else { - assert.Equal(t, value, fmt.Sprintf("%d", equals)) - } -} - -func getValueFromJSON(jsonMap map[string]interface{}, keyname string, tableName string) string { - object := reflect.ValueOf(jsonMap[keyname]) - if object.Kind() == reflect.Map { - for _, key := range object.MapKeys() { - if key.String() == tableName { - return fmt.Sprintf("%v", object.MapIndex(key)) - } - } - } - return "" -} - // CheckValues check value from sql query to table with expected values -func CheckValues(t *testing.T, vttablet cluster.Vttablet, id uint64, msg string, exists bool, tableName string, ks string, keyType querypb.Type) bool { +func CheckValues(t *testing.T, vttablet cluster.Vttablet, id uint64, msg string, exists bool, tableName string, ks string, keyType querypb.Type, dbConn *mysql.Conn) bool { query := fmt.Sprintf("select id, msg from %s where id = %d", tableName, id) if keyType == querypb.Type_VARBINARY { query = fmt.Sprintf("select id, msg from %s where id = '%d'", tableName, id) } + var result *sqltypes.Result + if dbConn != nil { + r1, err := dbConn.ExecuteFetch(query, MaxRowsToFetch, true) + require.Nil(t, err) + result = r1 + } else { + r2, err := vttablet.VttabletProcess.QueryTablet(query, ks, true) + require.Nil(t, err) + result = r2 + } - result, err := vttablet.VttabletProcess.QueryTablet(query, ks, true) - require.Nil(t, err) isFound := false if exists && len(result.Rows) > 0 { if keyType == querypb.Type_VARBINARY { @@ -269,15 +233,25 @@ func CheckBinlogServerVars(t *testing.T, vttablet cluster.Vttablet, minStatement func InsertLots(t *testing.T, count uint64, vttablet cluster.Vttablet, table string, ks string) { var query1, query2 string var i uint64 + dbConn := getDBConnFromTablet(t, &vttablet, ks) + defer dbConn.Close() for i = 0; i < count; i++ { query1 = fmt.Sprintf(InsertTabletTemplateKsID, table, lotRange1+i, fmt.Sprintf("msg-range1-%d", 10000+i), lotRange1+i) query2 = fmt.Sprintf(InsertTabletTemplateKsID, table, lotRange2+i, fmt.Sprintf("msg-range2-%d", 20000+i), lotRange2+i) - ExecuteOnTablet(t, query1, vttablet, ks, false) - ExecuteOnTablet(t, query2, vttablet, ks, false) + // insert first query + executeQueryInTransaction(t, query1, dbConn) + executeQueryInTransaction(t, query2, dbConn) } } +func executeQueryInTransaction(t *testing.T, query string, dbConn *mysql.Conn) { + dbConn.ExecuteFetch("begin", MaxRowsToFetch, true) + _, err := dbConn.ExecuteFetch(query, MaxRowsToFetch, true) + require.NoError(t, err) + dbConn.ExecuteFetch("commit", MaxRowsToFetch, true) +} + // ExecuteOnTablet executes a write query on specified vttablet // It should always be called with a master tablet for the keyspace/shard func ExecuteOnTablet(t *testing.T, query string, vttablet cluster.Vttablet, ks string, expectFail bool) { @@ -330,32 +304,22 @@ func CheckLotsTimeout(t *testing.T, vttablet cluster.Vttablet, count uint64, tab return false } -// CheckLotsNotPresent verifies that no rows should be present in vttablet -func CheckLotsNotPresent(t *testing.T, vttablet cluster.Vttablet, count uint64, table string, ks string, keyType querypb.Type) { - var i uint64 - for i = 0; i < count; i++ { - assert.False(t, CheckValues(t, vttablet, - lotRange1+i, fmt.Sprintf("msg-range1-%d", 10000+i), true, table, ks, keyType)) - - assert.False(t, CheckValues(t, vttablet, - lotRange2+i, fmt.Sprintf("msg-range2-%d", 20000+i), true, table, ks, keyType)) - } -} - func checkLots(t *testing.T, vttablet cluster.Vttablet, count uint64, table string, ks string, keyType querypb.Type) float64 { var isFound bool var totalFound int var i uint64 + dbConn := getDBConnFromTablet(t, &vttablet, ks) + defer dbConn.Close() for i = 0; i < count; i++ { isFound = CheckValues(t, vttablet, - lotRange1+i, fmt.Sprintf("msg-range1-%d", 10000+i), true, table, ks, keyType) + lotRange1+i, fmt.Sprintf("msg-range1-%d", 10000+i), true, table, ks, keyType, dbConn) if isFound { totalFound++ } isFound = CheckValues(t, vttablet, - lotRange2+i, fmt.Sprintf("msg-range2-%d", 20000+i), true, table, ks, keyType) + lotRange2+i, fmt.Sprintf("msg-range2-%d", 20000+i), true, table, ks, keyType, dbConn) if isFound { totalFound++ } @@ -533,3 +497,10 @@ func CheckThrottlerService(t *testing.T, server string, names []string, rate int checkThrottlerServiceMaxRates(t, server, names, rate, ci) checkThrottlerServiceConfiguration(t, server, names, ci) } + +func getDBConnFromTablet(t *testing.T, vttablet *cluster.Vttablet, ks string) *mysql.Conn { + dbParams := cluster.NewConnParams(vttablet.VttabletProcess.DbPort, vttablet.VttabletProcess.DbPassword, path.Join(vttablet.VttabletProcess.Directory, "mysql.sock"), ks) + dbConn, err := mysql.Connect(context.Background(), &dbParams) + require.NoError(t, err) + return dbConn +} diff --git a/go/test/endtoend/sharding/initialsharding/sharding_util.go b/go/test/endtoend/sharding/initialsharding/sharding_util.go index d9b6f47d19f..09b603fc3df 100644 --- a/go/test/endtoend/sharding/initialsharding/sharding_util.go +++ b/go/test/endtoend/sharding/initialsharding/sharding_util.go @@ -449,27 +449,27 @@ func TestInitialSharding(t *testing.T, keyspace *cluster.Keyspace, keyType query // check first value is in the left shard for _, tablet := range shard21.Vttablets { - sharding.CheckValues(t, *tablet, 0x1000000000000000, "msg1", true, tableName, keyspaceName, keyType) + sharding.CheckValues(t, *tablet, 0x1000000000000000, "msg1", true, tableName, keyspaceName, keyType, nil) } for _, tablet := range shard22.Vttablets { - sharding.CheckValues(t, *tablet, 0x1000000000000000, "msg1", false, tableName, keyspaceName, keyType) + sharding.CheckValues(t, *tablet, 0x1000000000000000, "msg1", false, tableName, keyspaceName, keyType, nil) } for _, tablet := range shard21.Vttablets { - sharding.CheckValues(t, *tablet, 0x9000000000000000, "msg2", false, tableName, keyspaceName, keyType) + sharding.CheckValues(t, *tablet, 0x9000000000000000, "msg2", false, tableName, keyspaceName, keyType, nil) } for _, tablet := range shard22.Vttablets { - sharding.CheckValues(t, *tablet, 0x9000000000000000, "msg2", true, tableName, keyspaceName, keyType) + sharding.CheckValues(t, *tablet, 0x9000000000000000, "msg2", true, tableName, keyspaceName, keyType, nil) } for _, tablet := range shard21.Vttablets { - sharding.CheckValues(t, *tablet, 0xD000000000000000, "msg3", false, tableName, keyspaceName, keyType) + sharding.CheckValues(t, *tablet, 0xD000000000000000, "msg3", false, tableName, keyspaceName, keyType, nil) } for _, tablet := range shard22.Vttablets { - sharding.CheckValues(t, *tablet, 0xD000000000000000, "msg3", true, tableName, keyspaceName, keyType) + sharding.CheckValues(t, *tablet, 0xD000000000000000, "msg3", true, tableName, keyspaceName, keyType, nil) } err = ClusterInstance.VtctlclientProcess.ExecuteCommand("ValidateSchemaKeyspace", keyspaceName)