diff --git a/go/test/endtoend/sharding/base_sharding.go b/go/test/endtoend/sharding/base_sharding.go index 3fefae07e96..2171bb984bd 100644 --- a/go/test/endtoend/sharding/base_sharding.go +++ b/go/test/endtoend/sharding/base_sharding.go @@ -18,16 +18,21 @@ limitations under the License. package sharding import ( + "context" "fmt" - "io/ioutil" "math" "net/http" + "path" "reflect" "strconv" "strings" "testing" "time" + "vitess.io/vitess/go/sqltypes" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/json2" @@ -46,6 +51,10 @@ var ( InsertTabletTemplateKsID = `insert into %s (id, msg) values (%d, '%s') /* id:%d */` ) +const ( + MaxRowsToFetch = 10000 +) + // CheckSrvKeyspace verifies the schema with expectedPartition func CheckSrvKeyspace(t *testing.T, cell string, ksname string, shardingCol string, colType topodata.KeyspaceIdType, expectedPartition map[topodata.TabletType][]string, ci cluster.LocalProcessCluster) { srvKeyspace := GetSrvKeyspace(t, cell, ksname, ci) @@ -87,68 +96,23 @@ func VerifyTabletHealth(t *testing.T, vttablet cluster.Vttablet, hostname string assert.Equal(t, resp.StatusCode, 200) } -// VerifyReconciliationCounters checks that the reconciliation Counters have the expected values. -func VerifyReconciliationCounters(t *testing.T, vtworkerURL string, availabilityType string, table string, - inserts int, updates int, deletes int, equals int) { - resp, err := http.Get(vtworkerURL) - require.Nil(t, err) - assert.Equal(t, resp.StatusCode, 200) - - resultMap := make(map[string]interface{}) - respByte, _ := ioutil.ReadAll(resp.Body) - err = json2.Unmarshal(respByte, &resultMap) - require.Nil(t, err) - - value := getValueFromJSON(resultMap, "Worker"+availabilityType+"InsertsCounters", table) - if inserts == 0 { - assert.Equal(t, value, "") - } else { - assert.Equal(t, value, fmt.Sprintf("%d", inserts)) - } - - value = getValueFromJSON(resultMap, "Worker"+availabilityType+"UpdatesCounters", table) - if updates == 0 { - assert.Equal(t, value, "") - } else { - assert.Equal(t, value, fmt.Sprintf("%d", updates)) - } - - value = getValueFromJSON(resultMap, "Worker"+availabilityType+"DeletesCounters", table) - if deletes == 0 { - assert.Equal(t, value, "") - } else { - assert.Equal(t, value, fmt.Sprintf("%d", deletes)) - } - - value = getValueFromJSON(resultMap, "Worker"+availabilityType+"EqualRowsCounters", table) - if equals == 0 { - assert.Equal(t, value, "") - } else { - assert.Equal(t, value, fmt.Sprintf("%d", equals)) - } -} - -func getValueFromJSON(jsonMap map[string]interface{}, keyname string, tableName string) string { - object := reflect.ValueOf(jsonMap[keyname]) - if object.Kind() == reflect.Map { - for _, key := range object.MapKeys() { - if key.String() == tableName { - return fmt.Sprintf("%v", object.MapIndex(key)) - } - } - } - return "" -} - // CheckValues check value from sql query to table with expected values -func CheckValues(t *testing.T, vttablet cluster.Vttablet, id uint64, msg string, exists bool, tableName string, ks string, keyType querypb.Type) bool { +func CheckValues(t *testing.T, vttablet cluster.Vttablet, id uint64, msg string, exists bool, tableName string, ks string, keyType querypb.Type, dbConn *mysql.Conn) bool { query := fmt.Sprintf("select id, msg from %s where id = %d", tableName, id) if keyType == querypb.Type_VARBINARY { query = fmt.Sprintf("select id, msg from %s where id = '%d'", tableName, id) } + var result *sqltypes.Result + if dbConn != nil { + r1, err := dbConn.ExecuteFetch(query, MaxRowsToFetch, true) + require.Nil(t, err) + result = r1 + } else { + r2, err := vttablet.VttabletProcess.QueryTablet(query, ks, true) + require.Nil(t, err) + result = r2 + } - result, err := vttablet.VttabletProcess.QueryTablet(query, ks, true) - require.Nil(t, err) isFound := false if exists && len(result.Rows) > 0 { if keyType == querypb.Type_VARBINARY { @@ -269,15 +233,25 @@ func CheckBinlogServerVars(t *testing.T, vttablet cluster.Vttablet, minStatement func InsertLots(t *testing.T, count uint64, vttablet cluster.Vttablet, table string, ks string) { var query1, query2 string var i uint64 + dbConn := getDBConnFromTablet(t, &vttablet, ks) + defer dbConn.Close() for i = 0; i < count; i++ { query1 = fmt.Sprintf(InsertTabletTemplateKsID, table, lotRange1+i, fmt.Sprintf("msg-range1-%d", 10000+i), lotRange1+i) query2 = fmt.Sprintf(InsertTabletTemplateKsID, table, lotRange2+i, fmt.Sprintf("msg-range2-%d", 20000+i), lotRange2+i) - ExecuteOnTablet(t, query1, vttablet, ks, false) - ExecuteOnTablet(t, query2, vttablet, ks, false) + // insert first query + executeQueryInTransaction(t, query1, dbConn) + executeQueryInTransaction(t, query2, dbConn) } } +func executeQueryInTransaction(t *testing.T, query string, dbConn *mysql.Conn) { + dbConn.ExecuteFetch("begin", MaxRowsToFetch, true) + _, err := dbConn.ExecuteFetch(query, MaxRowsToFetch, true) + require.NoError(t, err) + dbConn.ExecuteFetch("commit", MaxRowsToFetch, true) +} + // ExecuteOnTablet executes a write query on specified vttablet // It should always be called with a master tablet for the keyspace/shard func ExecuteOnTablet(t *testing.T, query string, vttablet cluster.Vttablet, ks string, expectFail bool) { @@ -330,32 +304,22 @@ func CheckLotsTimeout(t *testing.T, vttablet cluster.Vttablet, count uint64, tab return false } -// CheckLotsNotPresent verifies that no rows should be present in vttablet -func CheckLotsNotPresent(t *testing.T, vttablet cluster.Vttablet, count uint64, table string, ks string, keyType querypb.Type) { - var i uint64 - for i = 0; i < count; i++ { - assert.False(t, CheckValues(t, vttablet, - lotRange1+i, fmt.Sprintf("msg-range1-%d", 10000+i), true, table, ks, keyType)) - - assert.False(t, CheckValues(t, vttablet, - lotRange2+i, fmt.Sprintf("msg-range2-%d", 20000+i), true, table, ks, keyType)) - } -} - func checkLots(t *testing.T, vttablet cluster.Vttablet, count uint64, table string, ks string, keyType querypb.Type) float64 { var isFound bool var totalFound int var i uint64 + dbConn := getDBConnFromTablet(t, &vttablet, ks) + defer dbConn.Close() for i = 0; i < count; i++ { isFound = CheckValues(t, vttablet, - lotRange1+i, fmt.Sprintf("msg-range1-%d", 10000+i), true, table, ks, keyType) + lotRange1+i, fmt.Sprintf("msg-range1-%d", 10000+i), true, table, ks, keyType, dbConn) if isFound { totalFound++ } isFound = CheckValues(t, vttablet, - lotRange2+i, fmt.Sprintf("msg-range2-%d", 20000+i), true, table, ks, keyType) + lotRange2+i, fmt.Sprintf("msg-range2-%d", 20000+i), true, table, ks, keyType, dbConn) if isFound { totalFound++ } @@ -533,3 +497,10 @@ func CheckThrottlerService(t *testing.T, server string, names []string, rate int checkThrottlerServiceMaxRates(t, server, names, rate, ci) checkThrottlerServiceConfiguration(t, server, names, ci) } + +func getDBConnFromTablet(t *testing.T, vttablet *cluster.Vttablet, ks string) *mysql.Conn { + dbParams := cluster.NewConnParams(vttablet.VttabletProcess.DbPort, vttablet.VttabletProcess.DbPassword, path.Join(vttablet.VttabletProcess.Directory, "mysql.sock"), ks) + dbConn, err := mysql.Connect(context.Background(), &dbParams) + require.NoError(t, err) + return dbConn +} diff --git a/go/test/endtoend/sharding/initialsharding/sharding_util.go b/go/test/endtoend/sharding/initialsharding/sharding_util.go index d9b6f47d19f..09b603fc3df 100644 --- a/go/test/endtoend/sharding/initialsharding/sharding_util.go +++ b/go/test/endtoend/sharding/initialsharding/sharding_util.go @@ -449,27 +449,27 @@ func TestInitialSharding(t *testing.T, keyspace *cluster.Keyspace, keyType query // check first value is in the left shard for _, tablet := range shard21.Vttablets { - sharding.CheckValues(t, *tablet, 0x1000000000000000, "msg1", true, tableName, keyspaceName, keyType) + sharding.CheckValues(t, *tablet, 0x1000000000000000, "msg1", true, tableName, keyspaceName, keyType, nil) } for _, tablet := range shard22.Vttablets { - sharding.CheckValues(t, *tablet, 0x1000000000000000, "msg1", false, tableName, keyspaceName, keyType) + sharding.CheckValues(t, *tablet, 0x1000000000000000, "msg1", false, tableName, keyspaceName, keyType, nil) } for _, tablet := range shard21.Vttablets { - sharding.CheckValues(t, *tablet, 0x9000000000000000, "msg2", false, tableName, keyspaceName, keyType) + sharding.CheckValues(t, *tablet, 0x9000000000000000, "msg2", false, tableName, keyspaceName, keyType, nil) } for _, tablet := range shard22.Vttablets { - sharding.CheckValues(t, *tablet, 0x9000000000000000, "msg2", true, tableName, keyspaceName, keyType) + sharding.CheckValues(t, *tablet, 0x9000000000000000, "msg2", true, tableName, keyspaceName, keyType, nil) } for _, tablet := range shard21.Vttablets { - sharding.CheckValues(t, *tablet, 0xD000000000000000, "msg3", false, tableName, keyspaceName, keyType) + sharding.CheckValues(t, *tablet, 0xD000000000000000, "msg3", false, tableName, keyspaceName, keyType, nil) } for _, tablet := range shard22.Vttablets { - sharding.CheckValues(t, *tablet, 0xD000000000000000, "msg3", true, tableName, keyspaceName, keyType) + sharding.CheckValues(t, *tablet, 0xD000000000000000, "msg3", true, tableName, keyspaceName, keyType, nil) } err = ClusterInstance.VtctlclientProcess.ExecuteCommand("ValidateSchemaKeyspace", keyspaceName)