Skip to content

Commit

Permalink
cherry pick pingcap#27850 to release-5.2
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
glorv authored and ti-srebot committed Nov 23, 2021
1 parent feb4b14 commit e8d4b10
Show file tree
Hide file tree
Showing 13 changed files with 874 additions and 69 deletions.
30 changes: 22 additions & 8 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type tidbEncoder struct {
// the index of table columns for each data field.
// index == len(table.columns) means this field is `_tidb_rowid`
columnIdx []int
// the max index used in this chunk, due to the ignore-columns config, we can't
// directly check the total column count, so we fall back to only check that
// the there are enough columns.
columnCnt int
}

Expand Down Expand Up @@ -265,22 +268,27 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
cols := enc.tbl.Cols()

if len(enc.columnIdx) == 0 {
columnCount := 0
columnMaxIdx := -1
columnIdx := make([]int, len(columnPermutation))
for i := 0; i < len(columnPermutation); i++ {
columnIdx[i] = -1
}
for i, idx := range columnPermutation {
if idx >= 0 {
columnIdx[idx] = i
columnCount++
if idx > columnMaxIdx {
columnMaxIdx = idx
}
}
}
enc.columnIdx = columnIdx
enc.columnCnt = columnCount
enc.columnCnt = columnMaxIdx + 1
}

// TODO: since the column count doesn't exactly reflect the real column names, we only check the upper bound currently.
// See: tests/generated_columns/data/gencol.various_types.0.sql this sql has no columns, so encodeLoop will fill the
// column permutation with default, thus enc.columnCnt > len(row).
if len(row) > enc.columnCnt {
if len(row) < enc.columnCnt {
logger.Error("column count mismatch", zap.Ints("column_permutation", columnPermutation),
zap.Array("data", kv.RowArrayMarshaler(row)))
return nil, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row))
Expand All @@ -289,8 +297,12 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
var encoded strings.Builder
encoded.Grow(8 * len(row))
encoded.WriteByte('(')
cnt := 0
for i, field := range row {
if i != 0 {
if enc.columnIdx[i] < 0 {
continue
}
if cnt > 0 {
encoded.WriteByte(',')
}
datum := field
Expand All @@ -302,6 +314,7 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
)
return nil, err
}
cnt++
}
encoded.WriteByte(')')
return tidbRow(encoded.String()), nil
Expand Down Expand Up @@ -456,7 +469,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
serverInfo := version.ParseServerInfo(versionStr)

rows, e := tx.Query(`
SELECT table_name, column_name, column_type, extra
SELECT table_name, column_name, column_type, generation_expression, extra
FROM information_schema.columns
WHERE table_schema = ?
ORDER BY table_name, ordinal_position;
Expand All @@ -472,8 +485,8 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
curTable *model.TableInfo
)
for rows.Next() {
var tableName, columnName, columnType, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &columnExtra); e != nil {
var tableName, columnName, columnType, generationExpr, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &generationExpr, &columnExtra); e != nil {
return e
}
if tableName != curTableName {
Expand Down Expand Up @@ -502,6 +515,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
FieldType: types.FieldType{
Flag: flag,
},
GeneratedExprString: generationExpr,
})
curColOffset++
}
Expand Down
153 changes: 140 additions & 13 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *mysqlSuite) TearDownTest(c *C) {

func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
s.mockDB.
ExpectExec("\\QREPLACE INTO `foo`.`bar`(`a`,`b`,`c`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(18446744073709551615,-9223372036854775808,0,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E").
ExpectExec("\\QREPLACE INTO `foo`.`bar`(`b`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(-9223372036854775808,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E").
WillReturnResult(sqlmock.NewResult(1, 1))

ctx := context.Background()
Expand All @@ -98,6 +98,9 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
perms = append(perms, i)
}
perms = append(perms, -1)
// skip column a,c due to ignore-columns
perms[0] = -1
perms[2] = -1
encoder, err := s.backend.NewEncoder(s.tbl, &kv.SessionOptions{SQLMode: 0, Timestamp: 1234567890})
c.Assert(err, IsNil)
row, err := encoder.Encode(logger, []types.Datum{
Expand All @@ -121,9 +124,15 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

writer, err := engine.LocalWriter(ctx, nil)
<<<<<<< HEAD
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows)
c.Assert(err, IsNil)
=======
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"b", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)
st, err := writer.Close(ctx)
c.Assert(err, IsNil)
c.Assert(st, IsNil)
Expand All @@ -150,8 +159,13 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) {
c.Assert(err, IsNil)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
<<<<<<< HEAD
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, 0)
c.Assert(err, IsNil)
=======
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "1.csv", 0)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)
row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

writer, err := engine.LocalWriter(ctx, nil)
Expand Down Expand Up @@ -194,8 +208,13 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) {
c.Assert(err, IsNil)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
<<<<<<< HEAD
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, 0)
c.Assert(err, IsNil)
=======
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "3.csv", 0)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

Expand Down Expand Up @@ -250,10 +269,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_3_x(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v3.0.18"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "int(10)", "auto_increment"))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "int(10)", "", "auto_increment"))
s.mockDB.ExpectCommit()

bk := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup)
Expand Down Expand Up @@ -282,10 +301,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_0(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.0"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20) unsigned", "auto_increment"))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20) unsigned", "", "auto_increment"))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID"}).
AddRow("test", "t", "id", int64(1)))
Expand Down Expand Up @@ -317,10 +336,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_increment(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20)", ""))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20)", "", ""))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "t", "id", int64(1), "AUTO_INCREMENT"))
Expand Down Expand Up @@ -352,10 +371,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20)", ""))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20)", "1 + 2", ""))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "t", "id", int64(1), "AUTO_RANDOM"))
Expand All @@ -378,8 +397,116 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) {
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag,
},
GeneratedExprString: "1 + 2",
},
},
},
<<<<<<< HEAD
})
=======
}, tableInfos)
}

func TestWriteRowsErrorDowngrading(t *testing.T) {
t.Parallel()
nonRetryableError := sql.ErrNoRows
s := createMysqlSuite(t)
defer s.TearDownTest(t)
// First, batch insert, fail and rollback.
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(3),(4),(5)\\E").
WillReturnError(nonRetryableError)
// Then, insert row-by-row due to the non-retryable error.
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "7.csv", int64(0), nonRetryableError.Error(), "(1)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), nonRetryableError.Error(), "(2)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(3)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)").
WillReturnResult(driver.ResultNoRows)
// the forth row will exceed the error threshold, won't record this error
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E").
WillReturnError(nonRetryableError)

ctx := context.Background()
logger := log.L()

ignoreBackend := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup,
errormanager.New(s.dbHandle, &config.Config{
App: config.Lightning{
TaskInfoSchemaName: "tidb_lightning_errors",
MaxError: config.MaxError{
Type: *atomic.NewInt64(3),
},
},
}),
)
engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)

dataRows := ignoreBackend.MakeEmptyRows()
dataChecksum := verification.MakeKVChecksum(0, 0, 0)
indexRows := ignoreBackend.MakeEmptyRows()
indexChecksum := verification.MakeKVChecksum(0, 0, 0)

encoder, err := ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{})
require.NoError(t, err)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "7.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(2),
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "8.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(3),
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "9.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(4),
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "10.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(5),
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "11.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

writer, err := engine.LocalWriter(ctx, nil)
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"a"}, dataRows)
require.Error(t, err)
st, err := writer.Close(ctx)
require.NoError(t, err)
require.Nil(t, st)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)
}
8 changes: 8 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ type IgnoreColumns struct {
Columns []string `toml:"columns" json:"columns"`
}

func (ic *IgnoreColumns) ColumnsMap() map[string]struct{} {
columnMap := make(map[string]struct{}, len(ic.Columns))
for _, c := range ic.Columns {
columnMap[c] = struct{}{}
}
return columnMap
}

// GetIgnoreColumns gets Ignore config by schema name/regex and table name/regex.
func (igCols AllIgnoreColumns) GetIgnoreColumns(db string, table string, caseSensitive bool) (*IgnoreColumns, error) {
if !caseSensitive {
Expand Down
Loading

0 comments on commit e8d4b10

Please sign in to comment.