From cf67771eba5eb413b419edc4c0fdb0ba1b6b805e Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 7 Sep 2021 12:19:27 +0800 Subject: [PATCH 01/14] let ignore columns be compatible with tidb backend --- br/pkg/lightning/config/config.go | 8 ++ br/pkg/lightning/restore/check_info.go | 5 +- br/pkg/lightning/restore/restore.go | 33 +++++++- br/pkg/lightning/restore/restore_test.go | 103 +++++++++++++++++++++++ 4 files changed, 144 insertions(+), 5 deletions(-) diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 7fd7bbbec9ff0..b4885107ed046 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -295,6 +295,14 @@ type IgnoreColumns struct { Columns []string `toml:"columns" json:"columns"` } +func (ic *IgnoreColumns) ColumnsMap() map[string]struct{} { + columnMap := make(map[string]struct{}, len(ic.Columns)) + for _, c := range ic.Columns { + columnMap[c] = struct{}{} + } + return columnMap +} + // GetIgnoreColumns gets Ignore config by schema name/regex and table name/regex. func (igCols AllIgnoreColumns) GetIgnoreColumns(db string, table string, caseSensitive bool) (*IgnoreColumns, error) { if !caseSensitive { diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index 0fcb233984df3..d424e9a8c2fc1 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -571,14 +571,11 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab return msgs, nil } - igCols := make(map[string]struct{}) igCol, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableInfo.DB, tableInfo.Name, rc.cfg.Mydumper.CaseSensitive) if err != nil { return nil, errors.Trace(err) } - for _, col := range igCol.Columns { - igCols[col] = struct{}{} - } + igCols := igCol.ColumnsMap() if len(tableInfo.DataFiles) == 0 { log.L().Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name)) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index c7100d2bc0342..18a3f6401fab4 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2143,6 +2143,16 @@ func (cr *chunkRestore) encodeLoop( pauser, maxKvPairsCnt := rc.pauser, rc.cfg.TikvImporter.MaxKVPairs initializedColumns, reachEOF := false, false + // filteredColumns is column names that excluded ignored columns + // WARN: this might be not correct when different SQL statements contains different fields, + // but since ColumnPermutation also depends on the hypothesis that the columns in one source file is the same + // so this should be ok. + var filteredColumns []string + ignoreColumns, err1 := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(t.dbInfo.Name, t.tableInfo.Core.Name.O, rc.cfg.Mydumper.CaseSensitive) + if err1 != nil { + err = err1 + return + } for !reachEOF { if err = pauser.Wait(ctx); err != nil { return @@ -2173,6 +2183,27 @@ func (cr *chunkRestore) encodeLoop( return } } + filteredColumns = columnNames + if ignoreColumns != nil && len(ignoreColumns.Columns) > 0 { + filteredColumns = make([]string, 0, len(columnNames)) + ignoreColsMap := ignoreColumns.ColumnsMap() + if len(columnNames) > 0 { + for _, c := range columnNames { + if _, ok := ignoreColsMap[c]; !ok { + filteredColumns = append(filteredColumns, c) + } + } + } else { + // init column names by table schema + // after filtered out some columns, we must explicitly set the columns for TiDB backend + for _, col := range t.tableInfo.Core.Columns { + if _, ok := ignoreColsMap[col.Name.L]; !col.Hidden && !ok { + filteredColumns = append(filteredColumns, col.Name.O) + } + } + } + + } initializedColumns = true } case io.EOF: @@ -2193,7 +2224,7 @@ func (cr *chunkRestore) encodeLoop( err = errors.Annotatef(encodeErr, "in file %s at offset %d", &cr.chunk.Key, newOffset) return } - kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: columnNames, offset: newOffset, rowID: rowID}) + kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset, rowID: rowID}) kvSize += kvs.Size() failpoint.Inject("mock-kv-size", func(val failpoint.Value) { kvSize += uint64(val.(int)) diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index 02a38b431a8c1..687267600e1a4 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -1230,6 +1230,7 @@ func (s *chunkRestoreSuite) TestEncodeLoop(c *C) { c.Assert(kvs, HasLen, 1) c.Assert(kvs[0].rowID, Equals, int64(19)) c.Assert(kvs[0].offset, Equals, int64(36)) + c.Assert(kvs[0].columns, DeepEquals, []string(nil)) kvs = <-kvsCh c.Assert(len(kvs), Equals, 0) @@ -1380,12 +1381,114 @@ func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) { Timestamp: 1234567895, }) c.Assert(err, IsNil) + defer kvEncoder.Close() _, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc) c.Assert(err, ErrorMatches, "in file db.table.2.sql:0 at offset 8: column count mismatch, expected 3, got 4") c.Assert(kvsCh, HasLen, 0) } +func (s *chunkRestoreSuite) TestEncodeLoopIgnoreColumnsCSV(c *C) { + cases := []struct { + s string + ignoreColumns []*config.IgnoreColumns + kvs deliveredKVs + header bool + }{ + { + "1,2,3\r\n4,5,6\r\n", + []*config.IgnoreColumns{ + { + DB: "db", + Table: "table", + Columns: []string{"a"}, + }, + }, + deliveredKVs{ + rowID: 1, + offset: 6, + columns: []string{"b", "c"}, + }, + false, + }, + { + "b,c\r\n2,3\r\n5,6\r\n", + []*config.IgnoreColumns{ + { + TableFilter: []string{"db*.tab*"}, + Columns: []string{"b"}, + }, + }, + deliveredKVs{ + rowID: 1, + offset: 9, + columns: []string{"c"}, + }, + true, + }, + } + + for _, cs := range cases { + s.testEncodeLoopIgnoreColumnsCSV(c, cs.s, cs.ignoreColumns, cs.kvs, cs.header) + } +} + +func (s *chunkRestoreSuite) testEncodeLoopIgnoreColumnsCSV( + c *C, + f string, + ignoreColumns []*config.IgnoreColumns, + deliverKV deliveredKVs, + header bool, +) { + dir := c.MkDir() + fileName := "db.table.000.csv" + err := os.WriteFile(filepath.Join(dir, fileName), []byte(f), 0o644) + c.Assert(err, IsNil) + + store, err := storage.NewLocalStorage(dir) + c.Assert(err, IsNil) + + ctx := context.Background() + cfg := config.NewConfig() + cfg.Mydumper.IgnoreColumns = ignoreColumns + cfg.Mydumper.CSV.Header = header + rc := &Controller{pauser: DeliverPauser, cfg: cfg} + + reader, err := store.Open(ctx, fileName) + c.Assert(err, IsNil) + w := worker.NewPool(ctx, 5, "io") + p, err := mydump.NewCSVParser(&cfg.Mydumper.CSV, reader, 111, w, cfg.Mydumper.CSV.Header, nil) + c.Assert(err, IsNil) + + err = s.cr.parser.Close() + c.Assert(err, IsNil) + s.cr.parser = p + + kvsCh := make(chan []deliveredKVs, 2) + deliverCompleteCh := make(chan deliverResult) + kvEncoder, err := tidb.NewTiDBBackend(nil, config.ReplaceOnDup).NewEncoder( + s.tr.encTable, + &kv.SessionOptions{ + SQLMode: s.cfg.TiDB.SQLMode, + Timestamp: 1234567895, + }) + c.Assert(err, IsNil) + defer kvEncoder.Close() + + _, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc) + c.Assert(err, IsNil) + c.Assert(kvsCh, HasLen, 2) + + kvs := <-kvsCh + c.Assert(kvs, HasLen, 2) + c.Assert(kvs[0].rowID, Equals, deliverKV.rowID) + c.Assert(kvs[0].offset, Equals, deliverKV.offset) + c.Assert(kvs[0].columns, DeepEquals, deliverKV.columns) + + kvs = <-kvsCh + c.Assert(len(kvs), Equals, 0) +} + func (s *chunkRestoreSuite) TestRestore(c *C) { ctx := context.Background() From c8e1f7fdc30a126be27b0e71376d6715247f134c Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 7 Sep 2021 14:26:13 +0800 Subject: [PATCH 02/14] fix ut --- br/pkg/lightning/lightning_test.go | 1 + br/pkg/lightning/restore/restore_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/br/pkg/lightning/lightning_test.go b/br/pkg/lightning/lightning_test.go index 8bae6d89ad9e9..d90343b59b8c1 100644 --- a/br/pkg/lightning/lightning_test.go +++ b/br/pkg/lightning/lightning_test.go @@ -371,6 +371,7 @@ func (s *lightningServerSuite) TestHTTPAPIOutsideServerMode(c *C) { errCh := make(chan error) cfg := config.NewConfig() + cfg.TiDB.DistSQLScanConcurrency = 4 err := cfg.LoadFromGlobal(s.lightning.globalCfg) c.Assert(err, IsNil) go func() { diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index 687267600e1a4..77c31c64f9cf0 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -1303,6 +1303,7 @@ func (s *chunkRestoreSuite) TestEncodeLoopDeliverLimit(c *C) { rc := &Controller{pauser: DeliverPauser, cfg: cfg} c.Assert(failpoint.Enable( "github.com/pingcap/tidb/br/pkg/lightning/restore/mock-kv-size", "return(110000000)"), IsNil) + defer failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/restore/mock-kv-size") _, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc) c.Assert(err, IsNil) From ddec21ac4f7c1aef9156ac066af9f10bd193918c Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 8 Sep 2021 15:04:21 +0800 Subject: [PATCH 03/14] fix --- br/pkg/lightning/backend/tidb/tidb.go | 20 +++-- br/pkg/lightning/backend/tidb/tidb_test.go | 5 +- br/pkg/lightning/restore/check_info.go | 2 +- br/pkg/lightning/restore/restore.go | 2 +- br/pkg/lightning/restore/restore_test.go | 96 +++++++++++++++++----- br/pkg/lightning/restore/table_restore.go | 27 +++--- 6 files changed, 108 insertions(+), 44 deletions(-) diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 092893ab9d2d9..7ddf6528442f5 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -73,6 +73,9 @@ type tidbEncoder struct { // the index of table columns for each data field. // index == len(table.columns) means this field is `_tidb_rowid` columnIdx []int + // the max index used in this chunk, due to the ignore-columns config, we can't + // directly check the total column count, so we fall back to only check that + // the there are enough columns. columnCnt int } @@ -267,22 +270,24 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co cols := enc.tbl.Cols() if len(enc.columnIdx) == 0 { - columnCount := 0 + columnMaxIdx := 0 columnIdx := make([]int, len(columnPermutation)) for i, idx := range columnPermutation { if idx >= 0 { columnIdx[idx] = i - columnCount++ + if idx > columnMaxIdx { + columnMaxIdx = idx + } } } enc.columnIdx = columnIdx - enc.columnCnt = columnCount + enc.columnCnt = columnMaxIdx + 1 } // TODO: since the column count doesn't exactly reflect the real column names, we only check the upper bound currently. // See: tests/generated_columns/data/gencol.various_types.0.sql this sql has no columns, so encodeLoop will fill the // column permutation with default, thus enc.columnCnt > len(row). - if len(row) > enc.columnCnt { + if len(row) < enc.columnCnt { logger.Error("column count mismatch", zap.Ints("column_permutation", columnPermutation), zap.Array("data", kv.RowArrayMarshaler(row))) return nil, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row)) @@ -291,8 +296,12 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co var encoded strings.Builder encoded.Grow(8 * len(row)) encoded.WriteByte('(') + cnt := 0 for i, field := range row { - if i != 0 { + if enc.columnIdx[i] < 0 { + continue + } + if cnt > 0 { encoded.WriteByte(',') } datum := field @@ -304,6 +313,7 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co ) return nil, err } + cnt++ } encoded.WriteByte(')') return tidbRow(encoded.String()), nil diff --git a/br/pkg/lightning/backend/tidb/tidb_test.go b/br/pkg/lightning/backend/tidb/tidb_test.go index b9f1cc76b0b5b..6a7dd86deab0f 100644 --- a/br/pkg/lightning/backend/tidb/tidb_test.go +++ b/br/pkg/lightning/backend/tidb/tidb_test.go @@ -80,7 +80,7 @@ func (s *mysqlSuite) TearDownTest(c *C) { func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { s.mockDB. - ExpectExec("\\QREPLACE INTO `foo`.`bar`(`a`,`b`,`c`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(18446744073709551615,-9223372036854775808,0,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E"). + ExpectExec("\\QREPLACE INTO `foo`.`bar`(`b`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(-9223372036854775808,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E"). WillReturnResult(sqlmock.NewResult(1, 1)) ctx := context.Background() @@ -100,6 +100,9 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { perms = append(perms, i) } perms = append(perms, -1) + // skip column a,c due to ignore-columns + perms[0] = -1 + perms[2] = -1 encoder, err := s.backend.NewEncoder(s.tbl, &kv.SessionOptions{SQLMode: 0, Timestamp: 1234567890}) c.Assert(err, IsNil) row, err := encoder.Encode(logger, []types.Datum{ diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index bb19f14b46c61..514e0f7fe5c63 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -800,7 +800,7 @@ outloop: case nil: if !initializedColumns { if len(columnPermutation) == 0 { - columnPermutation, err = createColumnPermutation(columnNames, igCols.Columns, tableInfo) + columnPermutation, err = createColumnPermutation(columnNames, igCols.ColumnsMap(), tableInfo) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 00e53a14f51a7..c8dac1c11554f 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1353,7 +1353,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error { if err != nil { return errors.Trace(err) } - tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.Columns) + tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap()) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index 77c31c64f9cf0..7189de1edb33d 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -742,30 +742,84 @@ func (s *tableRestoreSuite) TestGetColumnsNames(c *C) { func (s *tableRestoreSuite) TestInitializeColumns(c *C) { ccp := &checkpoints.ChunkCheckpoint{} - c.Assert(s.tr.initializeColumns(nil, ccp), IsNil) - c.Assert(ccp.ColumnPermutation, DeepEquals, []int{0, 1, 2, -1}) - ccp.ColumnPermutation = nil - c.Assert(s.tr.initializeColumns([]string{"b", "c", "a"}, ccp), IsNil) - c.Assert(ccp.ColumnPermutation, DeepEquals, []int{2, 0, 1, -1}) - - ccp.ColumnPermutation = nil - c.Assert(s.tr.initializeColumns([]string{"b"}, ccp), IsNil) - c.Assert(ccp.ColumnPermutation, DeepEquals, []int{-1, 0, -1, -1}) - - ccp.ColumnPermutation = nil - c.Assert(s.tr.initializeColumns([]string{"_tidb_rowid", "b", "a", "c"}, ccp), IsNil) - c.Assert(ccp.ColumnPermutation, DeepEquals, []int{2, 1, 3, 0}) + defer func() { + s.tr.ignoreColumns = nil + }() - ccp.ColumnPermutation = nil - err := s.tr.initializeColumns([]string{"_tidb_rowid", "b", "a", "c", "d"}, ccp) - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, `unknown columns in header \[d\]`) + cases := []struct{ + columns []string + ignoreColumns map[string]struct{} + expectedPermutation []int + errPat string + } { + { + nil, + nil, + []int{0, 1, 2, -1}, + "", + }, + { + nil, + map[string]struct{}{"b": {}}, + []int{0, -1, 2, -1}, + "", + }, + { + []string{"b", "c", "a"}, + nil, + []int{2, 0, 1, -1}, + "", + }, + { + []string{"b", "c", "a"}, + map[string]struct{}{"b": {}}, + []int{2, -1, 1, -1}, + "", + }, + { + []string{"b"}, + nil, + []int{-1, 0, -1, -1}, + "", + }, + { + []string{"_tidb_rowid", "b", "a", "c"}, + nil, + []int{2, 1, 3, 0}, + "", + }, + { + []string{"_tidb_rowid", "b", "a", "c"}, + map[string]struct{}{"b": {}, "_tidb_rowid": {}}, + []int{2, -1, 3, -1}, + "", + }, + { + []string{"_tidb_rowid", "b", "a", "c", "d"}, + nil, + nil, + `unknown columns in header \[d\]`, + }, + { + []string{"e", "b", "c", "d"}, + nil, + nil, + `unknown columns in header \[e d\]`, + }, + } - ccp.ColumnPermutation = nil - err = s.tr.initializeColumns([]string{"e", "b", "c", "d"}, ccp) - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, `unknown columns in header \[e d\]`) + for _, testCase := range cases { + ccp.ColumnPermutation = nil + s.tr.ignoreColumns = testCase.ignoreColumns + err := s.tr.initializeColumns(testCase.columns, ccp) + if len(testCase.errPat) > 0 { + c.Assert(err, NotNil) + c.Assert(err, ErrorMatches, testCase.errPat) + } else { + c.Assert(ccp.ColumnPermutation, DeepEquals, testCase.expectedPermutation) + } + } } func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) { diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 0ead1c1eb36c6..53c5d2d8afe8c 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -52,7 +52,7 @@ type TableRestore struct { alloc autoid.Allocators logger log.Logger - ignoreColumns []string + ignoreColumns map[string]struct{} } func NewTableRestore( @@ -61,7 +61,7 @@ func NewTableRestore( dbInfo *checkpoints.TidbDBInfo, tableInfo *checkpoints.TidbTableInfo, cp *checkpoints.TableCheckpoint, - ignoreColumns []string, + ignoreColumns map[string]struct{}, ) (*TableRestore, error) { idAlloc := kv.NewPanickingAllocators(cp.AllocBase) tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core) @@ -166,15 +166,19 @@ func (tr *TableRestore) initializeColumns(columns []string, ccp *checkpoints.Chu return nil } -func createColumnPermutation(columns []string, ignoreColumns []string, tableInfo *model.TableInfo) ([]int, error) { +func createColumnPermutation(columns []string, ignoreColumns map[string]struct{}, tableInfo *model.TableInfo) ([]int, error) { var colPerm []int if len(columns) == 0 { colPerm = make([]int, 0, len(tableInfo.Columns)+1) shouldIncludeRowID := common.TableHasAutoRowID(tableInfo) // no provided columns, so use identity permutation. - for i := range tableInfo.Columns { - colPerm = append(colPerm, i) + for i, col := range tableInfo.Columns { + idx := i + if _, ok := ignoreColumns[col.Name.L]; ok { + idx = -1 + } + colPerm = append(colPerm, idx) } if shouldIncludeRowID { colPerm = append(colPerm, -1) @@ -799,7 +803,7 @@ func (tr *TableRestore) postProcess( return !finished, nil } -func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignoreColumns []string) ([]int, error) { +func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignoreColumns map[string]struct{}) ([]int, error) { colPerm := make([]int, 0, len(tableInfo.Columns)+1) columnMap := make(map[string]int) @@ -807,13 +811,6 @@ func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignor columnMap[column] = i } - ignoreMap := make(map[string]int) - for _, column := range ignoreColumns { - if i, ok := columnMap[column]; ok { - ignoreMap[column] = i - } - } - tableColumnMap := make(map[string]int) for i, col := range tableInfo.Columns { tableColumnMap[col.Name.L] = i @@ -823,7 +820,7 @@ func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignor var unknownCols []string for _, c := range columns { if _, ok := tableColumnMap[c]; !ok && c != model.ExtraHandleName.L { - if _, ignore := ignoreMap[c]; !ignore { + if _, ignore := ignoreColumns[c]; !ignore { unknownCols = append(unknownCols, c) } } @@ -835,7 +832,7 @@ func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignor for _, colInfo := range tableInfo.Columns { if i, ok := columnMap[colInfo.Name.L]; ok { - if _, ignore := ignoreMap[colInfo.Name.L]; !ignore { + if _, ignore := ignoreColumns[colInfo.Name.L]; !ignore { colPerm = append(colPerm, i) } else { log.L().Debug("column ignored by user requirements", From c66cdf5b3b294fab3af5c544fd2a79d6541fc905 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 8 Sep 2021 17:04:43 +0800 Subject: [PATCH 04/14] fmt code --- br/pkg/lightning/restore/restore_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index 7189de1edb33d..e622785fb0194 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -747,12 +747,12 @@ func (s *tableRestoreSuite) TestInitializeColumns(c *C) { s.tr.ignoreColumns = nil }() - cases := []struct{ - columns []string - ignoreColumns map[string]struct{} + cases := []struct { + columns []string + ignoreColumns map[string]struct{} expectedPermutation []int - errPat string - } { + errPat string + }{ { nil, nil, From 9d22c6c5bf2fe2e832f46e9221f9bb49525cf91f Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 19 Nov 2021 17:48:34 +0800 Subject: [PATCH 05/14] fix generated columns --- br/pkg/lightning/restore/restore_test.go | 47 +++++++++++++++++++ br/pkg/lightning/restore/table_restore.go | 2 + br/tests/lightning_duplicate_detection/run.sh | 2 +- 3 files changed, 50 insertions(+), 1 deletion(-) diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index e20da51aace9b..356d5dd9489bf 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -825,6 +825,53 @@ func (s *tableRestoreSuite) TestInitializeColumns(c *C) { } } +func (s *tableRestoreSuite) TestInitializeColumnsGenerated(c *C) { + p := parser.New() + p.SetSQLMode(mysql.ModeANSIQuotes) + se := tmock.NewContext() + tr, err := NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, nil, &checkpoints.TableCheckpoint{}, nil) + c.Assert(err, IsNil) + + cases := []struct{ + schema string + columns []string + expectedPermutation []int + } { + { + "CREATE TABLE `table` (a INT, b INT, C INT, d INT AS (a * 2))", + []string{"b", "c", "a"}, + []int{2, 0, 1, -1}, + }, + // all generated columns and none input columns + { + "CREATE TABLE `table` (a bigint as (1 + 2) stored, b text as (sha1(repeat('x', uint64))) stored)", + []string{}, + []int{-1, -1, -1}, + }, + // all generated columns and none input columns + { + "CREATE TABLE `table` (a bigint, b text as (sha1(repeat('x', uint64))) stored)", + []string{"a", "b"}, + []int{0, -1, -1}, + }, + } + + for _, testCase := range cases { + node, err := p.ParseOneStmt(testCase.schema, "", "") + c.Assert(err, IsNil) + core, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), 0xabcdef) + c.Assert(err, IsNil) + core.State = model.StatePublic + tableInfo := &checkpoints.TidbTableInfo{Name: "table", DB: "db", Core: core} + tr.tableInfo = tableInfo + ccp := &checkpoints.ChunkCheckpoint{} + + err = s.tr.initializeColumns(testCase.columns, ccp) + c.Assert(err, IsNil) + c.Assert(ccp.ColumnPermutation, DeepEquals, testCase.expectedPermutation) + } +} + func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 5bf333238bfba..6333955e001a8 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -178,6 +178,8 @@ func createColumnPermutation(columns []string, ignoreColumns map[string]struct{} idx := i if _, ok := ignoreColumns[col.Name.L]; ok { idx = -1 + } else if col.IsGenerated() { + idx = -1 } colPerm = append(colPerm, idx) } diff --git a/br/tests/lightning_duplicate_detection/run.sh b/br/tests/lightning_duplicate_detection/run.sh index cfbc95d6ef4e7..6aace9b8ae5e4 100644 --- a/br/tests/lightning_duplicate_detection/run.sh +++ b/br/tests/lightning_duplicate_detection/run.sh @@ -22,7 +22,7 @@ LOG_FILE1="$TEST_DIR/lightning-duplicate-detection1.log" LOG_FILE2="$TEST_DIR/lightning-duplicate-detection2.log" # let lightning run a bit slow to avoid some table in the first lightning finish too fast. -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownImport=sleep(50)" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownImport=sleep(250)" run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_detection.sorted1" \ --enable-checkpoint=1 --log-file "$LOG_FILE1" --config "tests/$TEST_NAME/config1.toml" & From 5f241edbbfefd41dcd63be9ccb58a22cc01ed274 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 19 Nov 2021 17:50:59 +0800 Subject: [PATCH 06/14] fix test --- br/pkg/lightning/restore/restore_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index 356d5dd9489bf..74e2cacb6154d 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -848,11 +848,10 @@ func (s *tableRestoreSuite) TestInitializeColumnsGenerated(c *C) { []string{}, []int{-1, -1, -1}, }, - // all generated columns and none input columns { - "CREATE TABLE `table` (a bigint, b text as (sha1(repeat('x', uint64))) stored)", + "CREATE TABLE `table` (a bigint as (1 + 2) stored, b text as (sha1(repeat('x', uint64))) stored)", []string{"a", "b"}, - []int{0, -1, -1}, + []int{-1, -1, -1}, }, } From bd4cbb50923d162b218d2ae65fc0f9d5ef1725d8 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 19 Nov 2021 18:37:41 +0800 Subject: [PATCH 07/14] fix generated columns --- br/pkg/lightning/backend/tidb/tidb.go | 12 ++++++++---- br/pkg/lightning/restore/restore_test.go | 4 ++-- br/tests/run.sh | 2 +- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 9548a15f5f75a..6a4f68e95306f 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -287,8 +287,11 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co cols := enc.tbl.Cols() if len(enc.columnIdx) == 0 { - columnMaxIdx := 0 + columnMaxIdx := -1 columnIdx := make([]int, len(columnPermutation)) + for i := 0; i < len(columnPermutation); i++ { + columnIdx[i] = -1 + } for i, idx := range columnPermutation { if idx >= 0 { columnIdx[idx] = i @@ -579,7 +582,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st serverInfo := version.ParseServerInfo(versionStr) rows, e := tx.Query(` - SELECT table_name, column_name, column_type, extra + SELECT table_name, column_name, column_type, GENERATION_EXPRESSION, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position; @@ -595,8 +598,8 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st curTable *model.TableInfo ) for rows.Next() { - var tableName, columnName, columnType, columnExtra string - if e := rows.Scan(&tableName, &columnName, &columnType, &columnExtra); e != nil { + var tableName, columnName, columnType, generationExpr, columnExtra string + if e := rows.Scan(&tableName, &columnName, &columnType, &generationExpr, &columnExtra); e != nil { return e } if tableName != curTableName { @@ -625,6 +628,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st FieldType: types.FieldType{ Flag: flag, }, + GeneratedExprString: generationExpr, }) curColOffset++ } diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index 74e2cacb6154d..301ed2c366f88 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -832,11 +832,11 @@ func (s *tableRestoreSuite) TestInitializeColumnsGenerated(c *C) { tr, err := NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, nil, &checkpoints.TableCheckpoint{}, nil) c.Assert(err, IsNil) - cases := []struct{ + cases := []struct { schema string columns []string expectedPermutation []int - } { + }{ { "CREATE TABLE `table` (a INT, b INT, C INT, d INT AS (a * 2))", []string{"b", "c", "a"}, diff --git a/br/tests/run.sh b/br/tests/run.sh index 140491caddc90..bbf17deb3e715 100755 --- a/br/tests/run.sh +++ b/br/tests/run.sh @@ -28,7 +28,7 @@ SELECTED_TEST_NAME="${TEST_NAME-$(find tests -mindepth 2 -maxdepth 2 -name run.s source tests/_utils/run_services trap stop_services EXIT -start_services +start_services $@ # Intermediate file needed because read can be used as a pipe target. # https://stackoverflow.com/q/2746553/ From 1cd4967d754110293574c30cdd0d32081acc6692 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 19 Nov 2021 18:52:06 +0800 Subject: [PATCH 08/14] fix build --- br/pkg/lightning/restore/restore_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index 301ed2c366f88..ecc98a802a5cf 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -1572,7 +1572,7 @@ func (s *chunkRestoreSuite) testEncodeLoopIgnoreColumnsCSV( kvsCh := make(chan []deliveredKVs, 2) deliverCompleteCh := make(chan deliverResult) - kvEncoder, err := tidb.NewTiDBBackend(nil, config.ReplaceOnDup).NewEncoder( + kvEncoder, err := tidb.NewTiDBBackend(nil, config.ReplaceOnDup, errormanager.New(nil, config.NewConfig())).NewEncoder( s.tr.encTable, &kv.SessionOptions{ SQLMode: s.cfg.TiDB.SQLMode, From 4af3a35d9e0232694bef9f7fa04f609d9600ade1 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 22 Nov 2021 12:23:16 +0800 Subject: [PATCH 09/14] fix --- br/pkg/lightning/backend/tidb/tidb.go | 2 +- br/pkg/lightning/backend/tidb/tidb_test.go | 25 +++++++++++----------- br/pkg/lightning/restore/restore.go | 1 - br/pkg/lightning/restore/restore_test.go | 17 ++++++--------- br/pkg/lightning/restore/table_restore.go | 11 +++++++--- 5 files changed, 29 insertions(+), 27 deletions(-) diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 6a4f68e95306f..c748da0c76625 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -582,7 +582,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st serverInfo := version.ParseServerInfo(versionStr) rows, e := tx.Query(` - SELECT table_name, column_name, column_type, GENERATION_EXPRESSION, extra + SELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position; diff --git a/br/pkg/lightning/backend/tidb/tidb_test.go b/br/pkg/lightning/backend/tidb/tidb_test.go index b525f2a3b4bf4..909618cae1a3f 100644 --- a/br/pkg/lightning/backend/tidb/tidb_test.go +++ b/br/pkg/lightning/backend/tidb/tidb_test.go @@ -257,10 +257,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_3_x(c *C) { s.mockDB.ExpectBegin() s.mockDB.ExpectQuery("SELECT version()"). WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v3.0.18")) - s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). + s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). WithArgs("test"). - WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}). - AddRow("t", "id", "int(10)", "auto_increment")) + WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}). + AddRow("t", "id", "int(10)", "", "auto_increment")) s.mockDB.ExpectCommit() bk := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup, errormanager.New(nil, config.NewConfig())) @@ -289,10 +289,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_0(c *C) { s.mockDB.ExpectBegin() s.mockDB.ExpectQuery("SELECT version()"). WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.0")) - s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). + s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). WithArgs("test"). - WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}). - AddRow("t", "id", "bigint(20) unsigned", "auto_increment")) + WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}). + AddRow("t", "id", "bigint(20) unsigned", "", "auto_increment")) s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID"). WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID"}). AddRow("test", "t", "id", int64(1))) @@ -324,10 +324,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_increment(c *C) { s.mockDB.ExpectBegin() s.mockDB.ExpectQuery("SELECT version()"). WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7")) - s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). + s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). WithArgs("test"). - WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}). - AddRow("t", "id", "bigint(20)", "")) + WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}). + AddRow("t", "id", "bigint(20)", "", "")) s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID"). WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}). AddRow("test", "t", "id", int64(1), "AUTO_INCREMENT")) @@ -359,10 +359,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) { s.mockDB.ExpectBegin() s.mockDB.ExpectQuery("SELECT version()"). WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7")) - s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). + s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). WithArgs("test"). - WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}). - AddRow("t", "id", "bigint(20)", "")) + WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}). + AddRow("t", "id", "bigint(20)", "1 + 2", "")) s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID"). WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}). AddRow("test", "t", "id", int64(1), "AUTO_RANDOM")) @@ -385,6 +385,7 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) { FieldType: types.FieldType{ Flag: mysql.PriKeyFlag, }, + GeneratedExprString: "1 + 2", }, }, }, diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index e164ed32f5343..bf417602bf4d2 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -2365,7 +2365,6 @@ func (cr *chunkRestore) encodeLoop( } } } - } initializedColumns = true } diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index ecc98a802a5cf..f9b67e7016ce9 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -829,8 +829,6 @@ func (s *tableRestoreSuite) TestInitializeColumnsGenerated(c *C) { p := parser.New() p.SetSQLMode(mysql.ModeANSIQuotes) se := tmock.NewContext() - tr, err := NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, nil, &checkpoints.TableCheckpoint{}, nil) - c.Assert(err, IsNil) cases := []struct { schema string @@ -840,19 +838,14 @@ func (s *tableRestoreSuite) TestInitializeColumnsGenerated(c *C) { { "CREATE TABLE `table` (a INT, b INT, C INT, d INT AS (a * 2))", []string{"b", "c", "a"}, - []int{2, 0, 1, -1}, + []int{2, 0, 1, -1, -1}, }, // all generated columns and none input columns { - "CREATE TABLE `table` (a bigint as (1 + 2) stored, b text as (sha1(repeat('x', uint64))) stored)", + "CREATE TABLE `table` (a bigint as (1 + 2) stored, b text as (sha1(repeat('x', a))) stored)", []string{}, []int{-1, -1, -1}, }, - { - "CREATE TABLE `table` (a bigint as (1 + 2) stored, b text as (sha1(repeat('x', uint64))) stored)", - []string{"a", "b"}, - []int{-1, -1, -1}, - }, } for _, testCase := range cases { @@ -862,7 +855,8 @@ func (s *tableRestoreSuite) TestInitializeColumnsGenerated(c *C) { c.Assert(err, IsNil) core.State = model.StatePublic tableInfo := &checkpoints.TidbTableInfo{Name: "table", DB: "db", Core: core} - tr.tableInfo = tableInfo + s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil) + c.Assert(err, IsNil) ccp := &checkpoints.ChunkCheckpoint{} err = s.tr.initializeColumns(testCase.columns, ccp) @@ -1495,6 +1489,7 @@ func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) { } func (s *chunkRestoreSuite) TestEncodeLoopIgnoreColumnsCSV(c *C) { + log.InitLogger(&log.Config{}, "error") cases := []struct { s string ignoreColumns []*config.IgnoreColumns @@ -1535,6 +1530,8 @@ func (s *chunkRestoreSuite) TestEncodeLoopIgnoreColumnsCSV(c *C) { } for _, cs := range cases { + // reset test + s.SetUpTest(c) s.testEncodeLoopIgnoreColumnsCSV(c, cs.s, cs.ignoreColumns, cs.kvs, cs.header) } } diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 6333955e001a8..2fca61b9df252 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -895,10 +895,15 @@ func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignor colPerm = append(colPerm, -1) } } + // append _tidb_rowid column + rowIDIdx := -1 if i, ok := columnMap[model.ExtraHandleName.L]; ok { - colPerm = append(colPerm, i) - } else if common.TableHasAutoRowID(tableInfo) { - colPerm = append(colPerm, -1) + if _, ignored := ignoreColumns[model.ExtraHandleName.L]; !ignored { + rowIDIdx = i + } + } + if common.TableHasAutoRowID(tableInfo) { + colPerm = append(colPerm, rowIDIdx) } return colPerm, nil From 9fe66e2f76392e9dba9aca7a40a9acbf317bdd56 Mon Sep 17 00:00:00 2001 From: glorv Date: Mon, 22 Nov 2021 14:19:46 +0800 Subject: [PATCH 10/14] do not check tableHasAutoID fro tidb backend --- br/pkg/lightning/restore/table_restore.go | 6 +++--- .../data/rowid.explicit_tidb_rowid-schema.sql | 2 +- .../data/rowid.specific_auto_inc-schema.sql | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 2fca61b9df252..8664943e75199 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -902,9 +902,9 @@ func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignor rowIDIdx = i } } - if common.TableHasAutoRowID(tableInfo) { - colPerm = append(colPerm, rowIDIdx) - } + // FIXME: the schema info for tidb backend is not complete, so always add the _tidb_rowid field. + // Other logic should ignore this extra field if not needed. + colPerm = append(colPerm, rowIDIdx) return colPerm, nil } diff --git a/br/tests/lightning_tidb_rowid/data/rowid.explicit_tidb_rowid-schema.sql b/br/tests/lightning_tidb_rowid/data/rowid.explicit_tidb_rowid-schema.sql index 4f1d634485cca..55232f2ff6081 100644 --- a/br/tests/lightning_tidb_rowid/data/rowid.explicit_tidb_rowid-schema.sql +++ b/br/tests/lightning_tidb_rowid/data/rowid.explicit_tidb_rowid-schema.sql @@ -1 +1 @@ -create table explicit_tidb_rowid (pk varchar(6) primary key); \ No newline at end of file +create table explicit_tidb_rowid (pk varchar(6) primary key /*T![clustered_index] NONCLUSTERED */); diff --git a/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql b/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql index f6962e15a0072..a69f5bf4350eb 100644 --- a/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql +++ b/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql @@ -1 +1 @@ -create table specific_auto_inc (a varchar(6) primary key, b int unique auto_increment) auto_increment=80000; \ No newline at end of file +create table specific_auto_inc (a varchar(6) primary key /*T![clustered_index] NONCLUSTERED */, b int unique auto_increment) auto_increment=80000; From afb8cf7629ab4078629401868c9feb187968fb0e Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 23 Nov 2021 10:17:31 +0800 Subject: [PATCH 11/14] slow down write speed for lightning_distributed_import --- br/tests/lightning_distributed_import/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/tests/lightning_distributed_import/run.sh b/br/tests/lightning_distributed_import/run.sh index f640ec3159c75..d21bf356b1568 100644 --- a/br/tests/lightning_distributed_import/run.sh +++ b/br/tests/lightning_distributed_import/run.sh @@ -20,7 +20,7 @@ LOG_FILE1="$TEST_DIR/lightning-distributed-import1.log" LOG_FILE2="$TEST_DIR/lightning-distributed-import2.log" # let lightning run a bit slow to avoid some table in the first lightning finish too fast. -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownImport=sleep(50)" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownImport=sleep(250)" run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_distributed_import.sorted1" \ -d "tests/$TEST_NAME/data1" --log-file "$LOG_FILE1" --config "tests/$TEST_NAME/config.toml" & From 5459b6a700dac8b435b077cff5fb218cc8c3dca7 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 23 Nov 2021 11:53:34 +0800 Subject: [PATCH 12/14] fix unit test --- br/pkg/lightning/backend/tidb/tidb_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/br/pkg/lightning/backend/tidb/tidb_test.go b/br/pkg/lightning/backend/tidb/tidb_test.go index 909618cae1a3f..3a824063ccaed 100644 --- a/br/pkg/lightning/backend/tidb/tidb_test.go +++ b/br/pkg/lightning/backend/tidb/tidb_test.go @@ -129,7 +129,7 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { writer, err := engine.LocalWriter(ctx, nil) c.Assert(err, IsNil) - err = writer.WriteRows(ctx, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows) + err = writer.WriteRows(ctx, []string{"b", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows) c.Assert(err, IsNil) st, err := writer.Close(ctx) c.Assert(err, IsNil) @@ -157,7 +157,7 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) { c.Assert(err, IsNil) row, err := encoder.Encode(logger, []types.Datum{ types.NewIntDatum(1), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "1.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "1.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) @@ -201,7 +201,7 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) { c.Assert(err, IsNil) row, err := encoder.Encode(logger, []types.Datum{ types.NewIntDatum(1), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "3.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "3.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) @@ -451,35 +451,35 @@ func (s *mysqlSuite) TestWriteRowsErrorDowngrading(c *C) { c.Assert(err, IsNil) row, err := encoder.Encode(logger, []types.Datum{ types.NewIntDatum(1), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "7.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "7.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) row, err = encoder.Encode(logger, []types.Datum{ types.NewIntDatum(2), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "8.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "8.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) row, err = encoder.Encode(logger, []types.Datum{ types.NewIntDatum(3), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "9.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "9.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) row, err = encoder.Encode(logger, []types.Datum{ types.NewIntDatum(4), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "10.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "10.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) row, err = encoder.Encode(logger, []types.Datum{ types.NewIntDatum(5), - }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, "11.csv", 0) + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "11.csv", 0) c.Assert(err, IsNil) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) From 80d22a04e5f1270c1e167ef6516e92dbaa43a4f9 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 23 Nov 2021 14:30:25 +0800 Subject: [PATCH 13/14] fix test --- br/pkg/lightning/restore/restore_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index f9b67e7016ce9..19c68b9db287f 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -1451,7 +1451,7 @@ func (s *chunkRestoreSuite) TestEncodeLoopDeliverErrored(c *C) { func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) { dir := c.MkDir() fileName := "db.table.000.csv" - err := os.WriteFile(filepath.Join(dir, fileName), []byte("1,2,3,4\r\n4,5,6,7\r\n"), 0o644) + err := os.WriteFile(filepath.Join(dir, fileName), []byte("1,2\r\n4,5,6,7\r\n"), 0o644) c.Assert(err, IsNil) store, err := storage.NewLocalStorage(dir) @@ -1484,7 +1484,7 @@ func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) { defer kvEncoder.Close() _, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc) - c.Assert(err, ErrorMatches, "in file db.table.2.sql:0 at offset 8: column count mismatch, expected 3, got 4") + c.Assert(err, ErrorMatches, "in file db.table.2.sql:0 at offset 4: column count mismatch, expected 3, got 2") c.Assert(kvsCh, HasLen, 0) } From 58f035d38dae1c3feac2dbdb194532779f9f1125 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 23 Nov 2021 18:55:16 +0800 Subject: [PATCH 14/14] rollback change --- br/pkg/lightning/backend/local/duplicate.go | 66 +++++++++------------ 1 file changed, 27 insertions(+), 39 deletions(-) diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index 7fa1e8b7cf475..fc88055c40c61 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -20,7 +20,6 @@ import ( "io" "math" "sort" - "sync" "time" "github.com/cockroachdb/pebble" @@ -260,45 +259,34 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, } tryTimes := 0 indexHandles := makePendingIndexHandlesWithCapacity(0) - eg, rpcctx := errgroup.WithContext(ctx) - unfinishedRegions := make([]*metapb.Region, 0) - var rgLock sync.Mutex for len(regions) > 0 { if tryTimes > maxRetryTimes { return errors.Errorf("retry time exceed limit") } + unfinishedRegions := make([]*restore.RegionInfo, 0) waitingClients := make([]import_sstpb.ImportSST_DuplicateDetectClient, 0) - waitingRegions := make([]*restore.RegionInfo, 0) + watingRegions := make([]*restore.RegionInfo, 0) for idx, region := range regions { - r := region - manager.remoteWorkerPool.ApplyOnErrorGroup(eg, func() error { - _, start, _ := codec.DecodeBytes(r.Region.StartKey, []byte{}) - _, end, _ := codec.DecodeBytes(r.Region.EndKey, []byte{}) - if bytes.Compare(startKey, r.Region.StartKey) > 0 { - start = req.start - } - if r.Region.EndKey == nil || len(r.Region.EndKey) == 0 || bytes.Compare(endKey, r.Region.EndKey) < 0 { - end = req.end - } - - logger.Debug("[detect-dupe] get duplicate stream", - zap.Int("localStreamID", idx), - logutil.Region(region.Region), - logutil.Leader(region.Leader), - logutil.Key("regionStartKey", start), - logutil.Key("regionEndKey", end)) - cli, err := manager.getDuplicateStream(ctx, region, start, end) - if err != nil { - r, err := manager.splitCli.GetRegionByID(ctx, region.Region.GetId()) - if err != nil { - unfinishedRegions = append(unfinishedRegions, region) - } else { - unfinishedRegions = append(unfinishedRegions, r) - } - return nil - } - }) + if len(waitingClients) > manager.regionConcurrency { + r := regions[idx:] + unfinishedRegions = append(unfinishedRegions, r...) + break + } + _, start, _ := codec.DecodeBytes(region.Region.StartKey, []byte{}) + _, end, _ := codec.DecodeBytes(region.Region.EndKey, []byte{}) + if bytes.Compare(startKey, region.Region.StartKey) > 0 { + start = req.start + } + if region.Region.EndKey == nil || len(region.Region.EndKey) == 0 || bytes.Compare(endKey, region.Region.EndKey) < 0 { + end = req.end + } + logger.Debug("[detect-dupe] get duplicate stream", + zap.Int("localStreamID", idx), + logutil.Region(region.Region), + logutil.Leader(region.Leader), + logutil.Key("regionStartKey", start), + logutil.Key("regionEndKey", end)) cli, err := manager.getDuplicateStream(ctx, region, start, end) if err != nil { r, err := manager.splitCli.GetRegionByID(ctx, region.Region.GetId()) @@ -309,7 +297,7 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, } } else { waitingClients = append(waitingClients, cli) - waitingRegions = append(waitingRegions, region) + watingRegions = append(watingRegions, region) } } @@ -323,7 +311,7 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, } for idx, cli := range waitingClients { - region := waitingRegions[idx] + region := watingRegions[idx] cliLogger := logger.With( zap.Int("localStreamID", idx), logutil.Region(region.Region), @@ -362,9 +350,9 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, cliLogger.Warn("[detect-dupe] meet key error in duplicate detect response from TiKV, retry again ", zap.String("RegionError", resp.GetRegionError().GetMessage())) - r, err := restore.PaginateScanRegion(ctx, manager.splitCli, waitingRegions[idx].Region.GetStartKey(), waitingRegions[idx].Region.GetEndKey(), scanRegionLimit) + r, err := restore.PaginateScanRegion(ctx, manager.splitCli, watingRegions[idx].Region.GetStartKey(), watingRegions[idx].Region.GetEndKey(), scanRegionLimit) if err != nil { - unfinishedRegions = append(unfinishedRegions, waitingRegions[idx]) + unfinishedRegions = append(unfinishedRegions, watingRegions[idx]) } else { unfinishedRegions = append(unfinishedRegions, r...) } @@ -723,7 +711,7 @@ func (manager *DuplicateManager) getDuplicateStream(ctx context.Context, KeyOnly: false, } stream, err := cli.DuplicateDetect(ctx, req) - return stream, errors.Trace(err) + return stream, err } func (manager *DuplicateManager) getImportClient(ctx context.Context, peer *metapb.Peer) (import_sstpb.ImportSSTClient, error) { @@ -731,7 +719,7 @@ func (manager *DuplicateManager) getImportClient(ctx context.Context, peer *meta return manager.makeConn(ctx, peer.GetStoreId()) }) if err != nil { - return nil, errors.Trace(err) + return nil, err } return import_sstpb.NewImportSSTClient(conn), nil }