Skip to content

Commit

Permalink
validator(dm): fast mode validation (#4754)
Browse files Browse the repository at this point in the history
close #4716
  • Loading branch information
buchuitoudegou authored Mar 14, 2022
1 parent 4d36a3e commit 461b98f
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 162 deletions.
1 change: 1 addition & 0 deletions dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (st *SubTask) StartValidator(expect pb.Stage) {
}
st.Lock()
defer st.Unlock()

if st.cfg.ValidatorCfg.Mode != config.ValidationFast && st.cfg.ValidatorCfg.Mode != config.ValidationFull {
return
}
Expand Down
16 changes: 9 additions & 7 deletions dm/syncer/validate_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,15 @@ func (vw *validateWorker) validateInsertAndUpdateRows(rows []*rowChange, cond *C
failedRows[key] = &validateFailedRow{tp: rowNotExist}
continue
}

eq, err := vw.compareData(sourceRow, targetRow, tableInfo.Columns[:cond.ColumnCnt])
if err != nil {
return nil, err
}
if !eq {
failedRows[key] = &validateFailedRow{tp: rowDifferent, dstData: targetRow}
if vw.cfg.Mode == config.ValidationFull {
// only compare the whole row in full mode
eq, err := vw.compareData(sourceRow, targetRow, tableInfo.Columns[:cond.ColumnCnt])
if err != nil {
return nil, err
}
if !eq {
failedRows[key] = &validateFailedRow{tp: rowDifferent, dstData: targetRow}
}
}
}
return failedRows, nil
Expand Down
342 changes: 187 additions & 155 deletions dm/syncer/validate_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,173 +28,205 @@ import (
"github.com/pingcap/tidb/parser/types"
"github.com/stretchr/testify/require"

"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/conn"
)

// split into 3 cases, since it may be unstable when put together.
func TestValidatorWorkerRunInsertUpdate(t *testing.T) {
tbl1 := filter.Table{Schema: "test", Name: "tbl1"}
tbl2 := filter.Table{Schema: "test", Name: "tbl2"}
tbl3 := filter.Table{Schema: "test", Name: "tbl3"}
tableInfo1 := genValidateTableInfo(t, tbl1.Schema, tbl1.Name,
"create table tbl1(a int primary key, b varchar(100))")
tableInfo2 := genValidateTableInfo(t, tbl2.Schema, tbl2.Name,
"create table tbl2(a varchar(100) primary key, b varchar(100))")
tableInfo3 := genValidateTableInfo(t, tbl3.Schema, tbl3.Name,
"create table tbl3(a varchar(100) primary key, b varchar(100))")
testFunc := func(t *testing.T, mode string) {
t.Helper()
tbl1 := filter.Table{Schema: "test", Name: "tbl1"}
tbl2 := filter.Table{Schema: "test", Name: "tbl2"}
tbl3 := filter.Table{Schema: "test", Name: "tbl3"}
tableInfo1 := genValidateTableInfo(t, tbl1.Schema, tbl1.Name,
"create table tbl1(a int primary key, b varchar(100))")
tableInfo2 := genValidateTableInfo(t, tbl2.Schema, tbl2.Name,
"create table tbl2(a varchar(100) primary key, b varchar(100))")
tableInfo3 := genValidateTableInfo(t, tbl3.Schema, tbl3.Name,
"create table tbl3(a varchar(100) primary key, b varchar(100))")

cfg := genSubtaskConfig(t)
_, mock, err := conn.InitMockDBFull()
mock.MatchExpectationsInOrder(false)
require.NoError(t, err)
defer func() {
conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{}
}()
syncerObj := NewSyncer(cfg, nil, nil)
validator := NewContinuousDataValidator(cfg, syncerObj)
validator.Start(pb.Stage_Stopped)
defer validator.cancel()
cfg := genSubtaskConfig(t)
cfg.ValidatorCfg.Mode = mode
_, mock, err := conn.InitMockDBFull()
mock.MatchExpectationsInOrder(false)
require.NoError(t, err)
defer func() {
conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{}
}()
syncerObj := NewSyncer(cfg, nil, nil)
validator := NewContinuousDataValidator(cfg, syncerObj)
validator.Start(pb.Stage_Stopped)
defer validator.cancel()

// insert & update same table, both row are validated failed
worker := newValidateWorker(validator, 0)
worker.updateRowChange(&rowChange{
table: tableInfo1,
key: "1",
pkValues: []string{"1"},
data: []interface{}{1, "a"},
tp: rowInsert,
lastMeetTS: time.Now().Unix(),
})
worker.updateRowChange(&rowChange{
table: tableInfo1,
key: "1",
pkValues: []string{"1"},
data: []interface{}{1, "b"},
tp: rowUpdated,
lastMeetTS: time.Now().Unix(),
})
worker.updateRowChange(&rowChange{
table: tableInfo1,
key: "2",
pkValues: []string{"2"},
data: []interface{}{2, "2b"},
tp: rowInsert,
lastMeetTS: time.Now().Unix(),
})
mock.ExpectQuery("SELECT .* FROM .*tbl1.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}).AddRow(2, "incorrect data"))
require.NoError(t, worker.validateTableChange())
require.Equal(t, int64(2), worker.pendingRowCount.Load())
require.Len(t, worker.pendingChangesMap, 1)
require.Contains(t, worker.pendingChangesMap, tbl1.String())
require.Len(t, worker.pendingChangesMap[tbl1.String()].rows, 2)
require.Contains(t, worker.pendingChangesMap[tbl1.String()].rows, "1")
require.Equal(t, rowUpdated, worker.pendingChangesMap[tbl1.String()].rows["1"].tp)
require.Equal(t, 1, worker.pendingChangesMap[tbl1.String()].rows["1"].failedCnt)
require.Contains(t, worker.pendingChangesMap[tbl1.String()].rows, "2")
require.Equal(t, rowInsert, worker.pendingChangesMap[tbl1.String()].rows["2"].tp)
require.Equal(t, 1, worker.pendingChangesMap[tbl1.String()].rows["2"].failedCnt)
// insert & update same table, both row are validated failed
worker := newValidateWorker(validator, 0)
worker.updateRowChange(&rowChange{
table: tableInfo1,
key: "1",
pkValues: []string{"1"},
data: []interface{}{1, "a"},
tp: rowInsert,
lastMeetTS: time.Now().Unix(),
})
worker.updateRowChange(&rowChange{
table: tableInfo1,
key: "1",
pkValues: []string{"1"},
data: []interface{}{1, "b"},
tp: rowUpdated,
lastMeetTS: time.Now().Unix(),
})
worker.updateRowChange(&rowChange{
table: tableInfo1,
key: "2",
pkValues: []string{"2"},
data: []interface{}{2, "2b"},
tp: rowInsert,
lastMeetTS: time.Now().Unix(),
})
mock.ExpectQuery("SELECT .* FROM .*tbl1.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}).AddRow(2, "incorrect data"))
require.NoError(t, worker.validateTableChange())
require.Len(t, worker.pendingChangesMap, 1)
require.Contains(t, worker.pendingChangesMap, tbl1.String())
require.Contains(t, worker.pendingChangesMap[tbl1.String()].rows, "1")
require.Equal(t, rowUpdated, worker.pendingChangesMap[tbl1.String()].rows["1"].tp)
require.Equal(t, 1, worker.pendingChangesMap[tbl1.String()].rows["1"].failedCnt)
if mode == config.ValidationFull {
require.Len(t, worker.pendingChangesMap[tbl1.String()].rows, 2)
require.Equal(t, int64(2), worker.pendingRowCount.Load())
require.Contains(t, worker.pendingChangesMap[tbl1.String()].rows, "2")
require.Equal(t, rowInsert, worker.pendingChangesMap[tbl1.String()].rows["2"].tp)
require.Equal(t, 1, worker.pendingChangesMap[tbl1.String()].rows["2"].failedCnt)
} else {
// fast mode
require.Len(t, worker.pendingChangesMap[tbl1.String()].rows, 1)
require.Equal(t, int64(1), worker.pendingRowCount.Load())
}

// validate again, this time row with pk=2 validate success
mock.ExpectQuery("SELECT .* FROM .*tbl1.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}).AddRow(2, "2b"))
require.NoError(t, worker.validateTableChange())
require.Equal(t, int64(1), worker.pendingRowCount.Load())
require.Len(t, worker.pendingChangesMap, 1)
require.Contains(t, worker.pendingChangesMap, tbl1.String())
require.Len(t, worker.pendingChangesMap[tbl1.String()].rows, 1)
require.Contains(t, worker.pendingChangesMap[tbl1.String()].rows, "1")
require.Equal(t, rowUpdated, worker.pendingChangesMap[tbl1.String()].rows["1"].tp)
require.Equal(t, 2, worker.pendingChangesMap[tbl1.String()].rows["1"].failedCnt)
// validate again, this time row with pk=2 validate success
mock.ExpectQuery("SELECT .* FROM .*tbl1.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}).AddRow(2, "2b"))
require.NoError(t, worker.validateTableChange())
require.Equal(t, int64(1), worker.pendingRowCount.Load())
require.Len(t, worker.pendingChangesMap, 1)
require.Contains(t, worker.pendingChangesMap, tbl1.String())
require.Len(t, worker.pendingChangesMap[tbl1.String()].rows, 1)
require.Contains(t, worker.pendingChangesMap[tbl1.String()].rows, "1")
require.Equal(t, rowUpdated, worker.pendingChangesMap[tbl1.String()].rows["1"].tp)
require.Equal(t, 2, worker.pendingChangesMap[tbl1.String()].rows["1"].failedCnt)

//
// add 2 delete row of tbl2 and tbl3
worker.updateRowChange(&rowChange{
table: tableInfo2,
key: "a",
pkValues: []string{"a"},
data: []interface{}{"a", "b"},
tp: rowDeleted,
lastMeetTS: time.Now().Unix(),
})
worker.updateRowChange(&rowChange{
table: tableInfo3,
key: "aa",
pkValues: []string{"aa"},
data: []interface{}{"aa", "b"},
tp: rowDeleted,
lastMeetTS: time.Now().Unix(),
})
mock.ExpectQuery("SELECT .* FROM .*tbl1.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}))
mock.ExpectQuery("SELECT .* FROM .*tbl2.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}))
mock.ExpectQuery("SELECT .* FROM .*tbl3.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}).AddRow("aa", "b"))
require.NoError(t, worker.validateTableChange())
require.Equal(t, int64(2), worker.pendingRowCount.Load())
require.Len(t, worker.pendingChangesMap, 2)
require.Contains(t, worker.pendingChangesMap, tbl1.String())
require.Len(t, worker.pendingChangesMap[tbl1.String()].rows, 1)
require.Contains(t, worker.pendingChangesMap[tbl1.String()].rows, "1")
require.Equal(t, rowUpdated, worker.pendingChangesMap[tbl1.String()].rows["1"].tp)
require.Equal(t, 3, worker.pendingChangesMap[tbl1.String()].rows["1"].failedCnt)
require.Contains(t, worker.pendingChangesMap, tbl3.String())
require.Len(t, worker.pendingChangesMap[tbl3.String()].rows, 1)
require.Contains(t, worker.pendingChangesMap[tbl3.String()].rows, "aa")
require.Equal(t, rowDeleted, worker.pendingChangesMap[tbl3.String()].rows["aa"].tp)
require.Equal(t, 1, worker.pendingChangesMap[tbl3.String()].rows["aa"].failedCnt)
//
// add 2 deleted row of tbl2 and tbl3
worker.updateRowChange(&rowChange{
table: tableInfo2,
key: "a",
pkValues: []string{"a"},
data: []interface{}{"a", "b"},
tp: rowDeleted,
lastMeetTS: time.Now().Unix(),
})
worker.updateRowChange(&rowChange{
table: tableInfo3,
key: "aa",
pkValues: []string{"aa"},
data: []interface{}{"aa", "b"},
tp: rowDeleted,
lastMeetTS: time.Now().Unix(),
})
mock.ExpectQuery("SELECT .* FROM .*tbl1.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}))
mock.ExpectQuery("SELECT .* FROM .*tbl2.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}))
mock.ExpectQuery("SELECT .* FROM .*tbl3.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}).AddRow("aa", "b"))
require.NoError(t, worker.validateTableChange())
require.Equal(t, int64(2), worker.pendingRowCount.Load())
require.Len(t, worker.pendingChangesMap, 2)
require.Contains(t, worker.pendingChangesMap, tbl1.String())
require.Len(t, worker.pendingChangesMap[tbl1.String()].rows, 1)
require.Contains(t, worker.pendingChangesMap[tbl1.String()].rows, "1")
require.Equal(t, rowUpdated, worker.pendingChangesMap[tbl1.String()].rows["1"].tp)
require.Equal(t, 3, worker.pendingChangesMap[tbl1.String()].rows["1"].failedCnt)
require.Contains(t, worker.pendingChangesMap, tbl3.String())
require.Len(t, worker.pendingChangesMap[tbl3.String()].rows, 1)
require.Contains(t, worker.pendingChangesMap[tbl3.String()].rows, "aa")
require.Equal(t, rowDeleted, worker.pendingChangesMap[tbl3.String()].rows["aa"].tp)
require.Equal(t, 1, worker.pendingChangesMap[tbl3.String()].rows["aa"].failedCnt)

// for tbl1, pk=1 is synced, validate success
// for tbl3, pk=aa is synced, validate success
mock.ExpectQuery("SELECT .* FROM .*tbl1.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}).AddRow(1, "b"))
mock.ExpectQuery("SELECT .* FROM .*tbl3.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}))
require.NoError(t, worker.validateTableChange())
require.Equal(t, int64(0), worker.pendingRowCount.Load())
require.Len(t, worker.pendingChangesMap, 0)
// for tbl1, pk=1 is synced, validate success
// for tbl3, pk=aa is synced, validate success
mock.ExpectQuery("SELECT .* FROM .*tbl1.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}).AddRow(1, "b"))
mock.ExpectQuery("SELECT .* FROM .*tbl3.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}))
require.NoError(t, worker.validateTableChange())
require.Equal(t, int64(0), worker.pendingRowCount.Load())
require.Len(t, worker.pendingChangesMap, 0)

//
// validate with batch size = 2
worker.batchSize = 2
worker.updateRowChange(&rowChange{
table: tableInfo1,
key: "1",
pkValues: []string{"1"},
data: []interface{}{1, "a"},
tp: rowInsert,
lastMeetTS: time.Now().Unix(),
})
worker.updateRowChange(&rowChange{
table: tableInfo1,
key: "2",
pkValues: []string{"2"},
data: []interface{}{2, "2b"},
tp: rowInsert,
lastMeetTS: time.Now().Unix(),
})
worker.updateRowChange(&rowChange{
table: tableInfo1,
key: "3",
pkValues: []string{"3"},
data: []interface{}{3, "3c"},
tp: rowInsert,
lastMeetTS: time.Now().Unix(),
})
mock.ExpectQuery("SELECT .* FROM .*tbl1.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}).AddRow(1, "a").AddRow(2, "2b"))
mock.ExpectQuery("SELECT .* FROM .*tbl1.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}).AddRow(1, "a").AddRow(2, "2b"))
require.NoError(t, worker.validateTableChange())
require.Equal(t, int64(1), worker.pendingRowCount.Load())
require.Len(t, worker.pendingChangesMap, 1)
require.Contains(t, worker.pendingChangesMap, tbl1.String())
require.Len(t, worker.pendingChangesMap[tbl1.String()].rows, 1)
require.Contains(t, worker.pendingChangesMap[tbl1.String()].rows, "3")
require.Equal(t, rowInsert, worker.pendingChangesMap[tbl1.String()].rows["3"].tp)
require.Equal(t, 1, worker.pendingChangesMap[tbl1.String()].rows["3"].failedCnt)
//
// validate with batch size = 2
worker.batchSize = 2
worker.updateRowChange(&rowChange{
table: tableInfo1,
key: "1",
pkValues: []string{"1"},
data: []interface{}{1, "a"},
tp: rowInsert,
lastMeetTS: time.Now().Unix(),
})
worker.updateRowChange(&rowChange{
table: tableInfo1,
key: "2",
pkValues: []string{"2"},
data: []interface{}{2, "2b"},
tp: rowInsert,
lastMeetTS: time.Now().Unix(),
})
worker.updateRowChange(&rowChange{
table: tableInfo1,
key: "3",
pkValues: []string{"3"},
data: []interface{}{3, "3c"},
tp: rowInsert,
lastMeetTS: time.Now().Unix(),
})
mock.ExpectQuery("SELECT .* FROM .*tbl1.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}).AddRow(1, "a").AddRow(2, "2b"))
mock.ExpectQuery("SELECT .* FROM .*tbl1.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}).AddRow(1, "a").AddRow(2, "2b"))
require.NoError(t, worker.validateTableChange())
require.Equal(t, int64(1), worker.pendingRowCount.Load())
require.Len(t, worker.pendingChangesMap, 1)
require.Contains(t, worker.pendingChangesMap, tbl1.String())
require.Len(t, worker.pendingChangesMap[tbl1.String()].rows, 1)
require.Contains(t, worker.pendingChangesMap[tbl1.String()].rows, "3")
require.Equal(t, rowInsert, worker.pendingChangesMap[tbl1.String()].rows["3"].tp)
require.Equal(t, 1, worker.pendingChangesMap[tbl1.String()].rows["3"].failedCnt)

// sync row 3 but got wrong result
mock.ExpectQuery("SELECT .* FROM .*tbl1.* WHERE .*").WillReturnRows(
sqlmock.NewRows([]string{"a", "b"}).AddRow(1, "a").AddRow(2, "2b").AddRow(3, "3dd"))
require.NoError(t, worker.validateTableChange())
if mode == config.ValidationFull {
// remain error
require.Equal(t, int64(1), worker.pendingRowCount.Load())
require.Len(t, worker.pendingChangesMap, 1)
require.Contains(t, worker.pendingChangesMap, tbl1.String())
require.Len(t, worker.pendingChangesMap[tbl1.String()].rows, 1)
require.Contains(t, worker.pendingChangesMap[tbl1.String()].rows, "3")
require.Equal(t, rowInsert, worker.pendingChangesMap[tbl1.String()].rows["3"].tp)
require.Equal(t, 2, worker.pendingChangesMap[tbl1.String()].rows["3"].failedCnt) // fail again
} else {
// correct and clear all errors
require.Equal(t, int64(0), worker.pendingRowCount.Load())
require.Len(t, worker.pendingChangesMap, 0)
}
}
testFunc(t, config.ValidationFast)
testFunc(t, config.ValidationFull)
}

func TestValidatorWorkerCompareData(t *testing.T) {
Expand Down

0 comments on commit 461b98f

Please sign in to comment.