Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the initial sharding flaky test #6043

Merged
merged 1 commit into from
Apr 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 44 additions & 73 deletions go/test/endtoend/sharding/base_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@ limitations under the License.
package sharding

import (
"context"
"fmt"
"io/ioutil"
"math"
"net/http"
"path"
"reflect"
"strconv"
"strings"
"testing"
"time"

"vitess.io/vitess/go/sqltypes"

"vitess.io/vitess/go/mysql"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/json2"
Expand All @@ -46,6 +51,10 @@ var (
InsertTabletTemplateKsID = `insert into %s (id, msg) values (%d, '%s') /* id:%d */`
)

const (
MaxRowsToFetch = 10000
)

// CheckSrvKeyspace verifies the schema with expectedPartition
func CheckSrvKeyspace(t *testing.T, cell string, ksname string, shardingCol string, colType topodata.KeyspaceIdType, expectedPartition map[topodata.TabletType][]string, ci cluster.LocalProcessCluster) {
srvKeyspace := GetSrvKeyspace(t, cell, ksname, ci)
Expand Down Expand Up @@ -87,68 +96,23 @@ func VerifyTabletHealth(t *testing.T, vttablet cluster.Vttablet, hostname string
assert.Equal(t, resp.StatusCode, 200)
}

// VerifyReconciliationCounters checks that the reconciliation Counters have the expected values.
func VerifyReconciliationCounters(t *testing.T, vtworkerURL string, availabilityType string, table string,
inserts int, updates int, deletes int, equals int) {
resp, err := http.Get(vtworkerURL)
require.Nil(t, err)
assert.Equal(t, resp.StatusCode, 200)

resultMap := make(map[string]interface{})
respByte, _ := ioutil.ReadAll(resp.Body)
err = json2.Unmarshal(respByte, &resultMap)
require.Nil(t, err)

value := getValueFromJSON(resultMap, "Worker"+availabilityType+"InsertsCounters", table)
if inserts == 0 {
assert.Equal(t, value, "")
} else {
assert.Equal(t, value, fmt.Sprintf("%d", inserts))
}

value = getValueFromJSON(resultMap, "Worker"+availabilityType+"UpdatesCounters", table)
if updates == 0 {
assert.Equal(t, value, "")
} else {
assert.Equal(t, value, fmt.Sprintf("%d", updates))
}

value = getValueFromJSON(resultMap, "Worker"+availabilityType+"DeletesCounters", table)
if deletes == 0 {
assert.Equal(t, value, "")
} else {
assert.Equal(t, value, fmt.Sprintf("%d", deletes))
}

value = getValueFromJSON(resultMap, "Worker"+availabilityType+"EqualRowsCounters", table)
if equals == 0 {
assert.Equal(t, value, "")
} else {
assert.Equal(t, value, fmt.Sprintf("%d", equals))
}
}

func getValueFromJSON(jsonMap map[string]interface{}, keyname string, tableName string) string {
object := reflect.ValueOf(jsonMap[keyname])
if object.Kind() == reflect.Map {
for _, key := range object.MapKeys() {
if key.String() == tableName {
return fmt.Sprintf("%v", object.MapIndex(key))
}
}
}
return ""
}

// CheckValues check value from sql query to table with expected values
func CheckValues(t *testing.T, vttablet cluster.Vttablet, id uint64, msg string, exists bool, tableName string, ks string, keyType querypb.Type) bool {
func CheckValues(t *testing.T, vttablet cluster.Vttablet, id uint64, msg string, exists bool, tableName string, ks string, keyType querypb.Type, dbConn *mysql.Conn) bool {
query := fmt.Sprintf("select id, msg from %s where id = %d", tableName, id)
if keyType == querypb.Type_VARBINARY {
query = fmt.Sprintf("select id, msg from %s where id = '%d'", tableName, id)
}
var result *sqltypes.Result
if dbConn != nil {
r1, err := dbConn.ExecuteFetch(query, MaxRowsToFetch, true)
require.Nil(t, err)
result = r1
} else {
r2, err := vttablet.VttabletProcess.QueryTablet(query, ks, true)
require.Nil(t, err)
result = r2
}

result, err := vttablet.VttabletProcess.QueryTablet(query, ks, true)
require.Nil(t, err)
isFound := false
if exists && len(result.Rows) > 0 {
if keyType == querypb.Type_VARBINARY {
Expand Down Expand Up @@ -269,15 +233,25 @@ func CheckBinlogServerVars(t *testing.T, vttablet cluster.Vttablet, minStatement
func InsertLots(t *testing.T, count uint64, vttablet cluster.Vttablet, table string, ks string) {
var query1, query2 string
var i uint64
dbConn := getDBConnFromTablet(t, &vttablet, ks)
defer dbConn.Close()
for i = 0; i < count; i++ {
query1 = fmt.Sprintf(InsertTabletTemplateKsID, table, lotRange1+i, fmt.Sprintf("msg-range1-%d", 10000+i), lotRange1+i)
query2 = fmt.Sprintf(InsertTabletTemplateKsID, table, lotRange2+i, fmt.Sprintf("msg-range2-%d", 20000+i), lotRange2+i)

ExecuteOnTablet(t, query1, vttablet, ks, false)
ExecuteOnTablet(t, query2, vttablet, ks, false)
// insert first query
executeQueryInTransaction(t, query1, dbConn)
executeQueryInTransaction(t, query2, dbConn)
}
}

func executeQueryInTransaction(t *testing.T, query string, dbConn *mysql.Conn) {
dbConn.ExecuteFetch("begin", MaxRowsToFetch, true)
_, err := dbConn.ExecuteFetch(query, MaxRowsToFetch, true)
require.NoError(t, err)
dbConn.ExecuteFetch("commit", MaxRowsToFetch, true)
}

// ExecuteOnTablet executes a write query on specified vttablet
// It should always be called with a master tablet for the keyspace/shard
func ExecuteOnTablet(t *testing.T, query string, vttablet cluster.Vttablet, ks string, expectFail bool) {
Expand Down Expand Up @@ -330,32 +304,22 @@ func CheckLotsTimeout(t *testing.T, vttablet cluster.Vttablet, count uint64, tab
return false
}

// CheckLotsNotPresent verifies that no rows should be present in vttablet
func CheckLotsNotPresent(t *testing.T, vttablet cluster.Vttablet, count uint64, table string, ks string, keyType querypb.Type) {
var i uint64
for i = 0; i < count; i++ {
assert.False(t, CheckValues(t, vttablet,
lotRange1+i, fmt.Sprintf("msg-range1-%d", 10000+i), true, table, ks, keyType))

assert.False(t, CheckValues(t, vttablet,
lotRange2+i, fmt.Sprintf("msg-range2-%d", 20000+i), true, table, ks, keyType))
}
}

func checkLots(t *testing.T, vttablet cluster.Vttablet, count uint64, table string, ks string, keyType querypb.Type) float64 {
var isFound bool
var totalFound int
var i uint64
dbConn := getDBConnFromTablet(t, &vttablet, ks)
defer dbConn.Close()

for i = 0; i < count; i++ {
isFound = CheckValues(t, vttablet,
lotRange1+i, fmt.Sprintf("msg-range1-%d", 10000+i), true, table, ks, keyType)
lotRange1+i, fmt.Sprintf("msg-range1-%d", 10000+i), true, table, ks, keyType, dbConn)
if isFound {
totalFound++
}

isFound = CheckValues(t, vttablet,
lotRange2+i, fmt.Sprintf("msg-range2-%d", 20000+i), true, table, ks, keyType)
lotRange2+i, fmt.Sprintf("msg-range2-%d", 20000+i), true, table, ks, keyType, dbConn)
if isFound {
totalFound++
}
Expand Down Expand Up @@ -533,3 +497,10 @@ func CheckThrottlerService(t *testing.T, server string, names []string, rate int
checkThrottlerServiceMaxRates(t, server, names, rate, ci)
checkThrottlerServiceConfiguration(t, server, names, ci)
}

func getDBConnFromTablet(t *testing.T, vttablet *cluster.Vttablet, ks string) *mysql.Conn {
dbParams := cluster.NewConnParams(vttablet.VttabletProcess.DbPort, vttablet.VttabletProcess.DbPassword, path.Join(vttablet.VttabletProcess.Directory, "mysql.sock"), ks)
dbConn, err := mysql.Connect(context.Background(), &dbParams)
require.NoError(t, err)
return dbConn
}
12 changes: 6 additions & 6 deletions go/test/endtoend/sharding/initialsharding/sharding_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,27 +449,27 @@ func TestInitialSharding(t *testing.T, keyspace *cluster.Keyspace, keyType query

// check first value is in the left shard
for _, tablet := range shard21.Vttablets {
sharding.CheckValues(t, *tablet, 0x1000000000000000, "msg1", true, tableName, keyspaceName, keyType)
sharding.CheckValues(t, *tablet, 0x1000000000000000, "msg1", true, tableName, keyspaceName, keyType, nil)
}

for _, tablet := range shard22.Vttablets {
sharding.CheckValues(t, *tablet, 0x1000000000000000, "msg1", false, tableName, keyspaceName, keyType)
sharding.CheckValues(t, *tablet, 0x1000000000000000, "msg1", false, tableName, keyspaceName, keyType, nil)
}

for _, tablet := range shard21.Vttablets {
sharding.CheckValues(t, *tablet, 0x9000000000000000, "msg2", false, tableName, keyspaceName, keyType)
sharding.CheckValues(t, *tablet, 0x9000000000000000, "msg2", false, tableName, keyspaceName, keyType, nil)
}

for _, tablet := range shard22.Vttablets {
sharding.CheckValues(t, *tablet, 0x9000000000000000, "msg2", true, tableName, keyspaceName, keyType)
sharding.CheckValues(t, *tablet, 0x9000000000000000, "msg2", true, tableName, keyspaceName, keyType, nil)
}

for _, tablet := range shard21.Vttablets {
sharding.CheckValues(t, *tablet, 0xD000000000000000, "msg3", false, tableName, keyspaceName, keyType)
sharding.CheckValues(t, *tablet, 0xD000000000000000, "msg3", false, tableName, keyspaceName, keyType, nil)
}

for _, tablet := range shard22.Vttablets {
sharding.CheckValues(t, *tablet, 0xD000000000000000, "msg3", true, tableName, keyspaceName, keyType)
sharding.CheckValues(t, *tablet, 0xD000000000000000, "msg3", true, tableName, keyspaceName, keyType, nil)
}

err = ClusterInstance.VtctlclientProcess.ExecuteCommand("ValidateSchemaKeyspace", keyspaceName)
Expand Down