diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 5a60e7f8fd589..d7a18e1c20c5d 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -71,6 +71,9 @@ type tidbEncoder struct { // the index of table columns for each data field. // index == len(table.columns) means this field is `_tidb_rowid` columnIdx []int + // the max index used in this chunk, due to the ignore-columns config, we can't + // directly check the total column count, so we fall back to only check that + // the there are enough columns. columnCnt int } @@ -265,22 +268,27 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co cols := enc.tbl.Cols() if len(enc.columnIdx) == 0 { - columnCount := 0 + columnMaxIdx := -1 columnIdx := make([]int, len(columnPermutation)) + for i := 0; i < len(columnPermutation); i++ { + columnIdx[i] = -1 + } for i, idx := range columnPermutation { if idx >= 0 { columnIdx[idx] = i - columnCount++ + if idx > columnMaxIdx { + columnMaxIdx = idx + } } } enc.columnIdx = columnIdx - enc.columnCnt = columnCount + enc.columnCnt = columnMaxIdx + 1 } // TODO: since the column count doesn't exactly reflect the real column names, we only check the upper bound currently. // See: tests/generated_columns/data/gencol.various_types.0.sql this sql has no columns, so encodeLoop will fill the // column permutation with default, thus enc.columnCnt > len(row). - if len(row) > enc.columnCnt { + if len(row) < enc.columnCnt { logger.Error("column count mismatch", zap.Ints("column_permutation", columnPermutation), zap.Array("data", kv.RowArrayMarshaler(row))) return nil, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row)) @@ -289,8 +297,12 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co var encoded strings.Builder encoded.Grow(8 * len(row)) encoded.WriteByte('(') + cnt := 0 for i, field := range row { - if i != 0 { + if enc.columnIdx[i] < 0 { + continue + } + if cnt > 0 { encoded.WriteByte(',') } datum := field @@ -302,6 +314,7 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co ) return nil, err } + cnt++ } encoded.WriteByte(')') return tidbRow(encoded.String()), nil @@ -456,7 +469,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st serverInfo := version.ParseServerInfo(versionStr) rows, e := tx.Query(` - SELECT table_name, column_name, column_type, extra + SELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position; @@ -472,8 +485,8 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st curTable *model.TableInfo ) for rows.Next() { - var tableName, columnName, columnType, columnExtra string - if e := rows.Scan(&tableName, &columnName, &columnType, &columnExtra); e != nil { + var tableName, columnName, columnType, generationExpr, columnExtra string + if e := rows.Scan(&tableName, &columnName, &columnType, &generationExpr, &columnExtra); e != nil { return e } if tableName != curTableName { @@ -502,6 +515,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st FieldType: types.FieldType{ Flag: flag, }, + GeneratedExprString: generationExpr, }) curColOffset++ } diff --git a/br/pkg/lightning/backend/tidb/tidb_test.go b/br/pkg/lightning/backend/tidb/tidb_test.go index 29ed7f01831a0..2bb1154d07691 100644 --- a/br/pkg/lightning/backend/tidb/tidb_test.go +++ b/br/pkg/lightning/backend/tidb/tidb_test.go @@ -78,7 +78,7 @@ func (s *mysqlSuite) TearDownTest(c *C) { func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { s.mockDB. - ExpectExec("\\QREPLACE INTO `foo`.`bar`(`a`,`b`,`c`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(18446744073709551615,-9223372036854775808,0,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E"). + ExpectExec("\\QREPLACE INTO `foo`.`bar`(`b`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(-9223372036854775808,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E"). WillReturnResult(sqlmock.NewResult(1, 1)) ctx := context.Background() @@ -98,6 +98,9 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { perms = append(perms, i) } perms = append(perms, -1) + // skip column a,c due to ignore-columns + perms[0] = -1 + perms[2] = -1 encoder, err := s.backend.NewEncoder(s.tbl, &kv.SessionOptions{SQLMode: 0, Timestamp: 1234567890}) c.Assert(err, IsNil) row, err := encoder.Encode(logger, []types.Datum{ @@ -121,9 +124,15 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) writer, err := engine.LocalWriter(ctx, nil) +<<<<<<< HEAD c.Assert(err, IsNil) err = writer.WriteRows(ctx, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows) c.Assert(err, IsNil) +======= + require.NoError(t, err) + err = writer.WriteRows(ctx, []string{"b", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows) + require.NoError(t, err) +>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850) st, err := writer.Close(ctx) c.Assert(err, IsNil) c.Assert(st, IsNil) @@ -150,8 +159,13 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) { c.Assert(err, IsNil) row, err := encoder.Encode(logger, []types.Datum{ types.NewIntDatum(1), +<<<<<<< HEAD }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, 0) c.Assert(err, IsNil) +======= + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "1.csv", 0) + require.NoError(t, err) +>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) writer, err := engine.LocalWriter(ctx, nil) @@ -194,8 +208,13 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) { c.Assert(err, IsNil) row, err := encoder.Encode(logger, []types.Datum{ types.NewIntDatum(1), +<<<<<<< HEAD }, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, 0) c.Assert(err, IsNil) +======= + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "3.csv", 0) + require.NoError(t, err) +>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850) row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) @@ -250,10 +269,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_3_x(c *C) { s.mockDB.ExpectBegin() s.mockDB.ExpectQuery("SELECT version()"). WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v3.0.18")) - s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). + s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). WithArgs("test"). - WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}). - AddRow("t", "id", "int(10)", "auto_increment")) + WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}). + AddRow("t", "id", "int(10)", "", "auto_increment")) s.mockDB.ExpectCommit() bk := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup) @@ -282,10 +301,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_0(c *C) { s.mockDB.ExpectBegin() s.mockDB.ExpectQuery("SELECT version()"). WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.0")) - s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). + s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). WithArgs("test"). - WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}). - AddRow("t", "id", "bigint(20) unsigned", "auto_increment")) + WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}). + AddRow("t", "id", "bigint(20) unsigned", "", "auto_increment")) s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID"). WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID"}). AddRow("test", "t", "id", int64(1))) @@ -317,10 +336,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_increment(c *C) { s.mockDB.ExpectBegin() s.mockDB.ExpectQuery("SELECT version()"). WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7")) - s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). + s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). WithArgs("test"). - WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}). - AddRow("t", "id", "bigint(20)", "")) + WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}). + AddRow("t", "id", "bigint(20)", "", "")) s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID"). WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}). AddRow("test", "t", "id", int64(1), "AUTO_INCREMENT")) @@ -352,10 +371,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) { s.mockDB.ExpectBegin() s.mockDB.ExpectQuery("SELECT version()"). WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7")) - s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). + s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E"). WithArgs("test"). - WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}). - AddRow("t", "id", "bigint(20)", "")) + WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}). + AddRow("t", "id", "bigint(20)", "1 + 2", "")) s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID"). WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}). AddRow("test", "t", "id", int64(1), "AUTO_RANDOM")) @@ -378,8 +397,116 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) { FieldType: types.FieldType{ Flag: mysql.PriKeyFlag, }, + GeneratedExprString: "1 + 2", }, }, }, +<<<<<<< HEAD }) +======= + }, tableInfos) +} + +func TestWriteRowsErrorDowngrading(t *testing.T) { + t.Parallel() + nonRetryableError := sql.ErrNoRows + s := createMysqlSuite(t) + defer s.TearDownTest(t) + // First, batch insert, fail and rollback. + s.mockDB. + ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(3),(4),(5)\\E"). + WillReturnError(nonRetryableError) + // Then, insert row-by-row due to the non-retryable error. + s.mockDB. + ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E"). + WillReturnError(nonRetryableError) + s.mockDB. + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "7.csv", int64(0), nonRetryableError.Error(), "(1)"). + WillReturnResult(driver.ResultNoRows) + s.mockDB. + ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E"). + WillReturnError(nonRetryableError) + s.mockDB. + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), nonRetryableError.Error(), "(2)"). + WillReturnResult(driver.ResultNoRows) + s.mockDB. + ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(3)\\E"). + WillReturnError(nonRetryableError) + s.mockDB. + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*"). + WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)"). + WillReturnResult(driver.ResultNoRows) + // the forth row will exceed the error threshold, won't record this error + s.mockDB. + ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E"). + WillReturnError(nonRetryableError) + + ctx := context.Background() + logger := log.L() + + ignoreBackend := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup, + errormanager.New(s.dbHandle, &config.Config{ + App: config.Lightning{ + TaskInfoSchemaName: "tidb_lightning_errors", + MaxError: config.MaxError{ + Type: *atomic.NewInt64(3), + }, + }, + }), + ) + engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) + require.NoError(t, err) + + dataRows := ignoreBackend.MakeEmptyRows() + dataChecksum := verification.MakeKVChecksum(0, 0, 0) + indexRows := ignoreBackend.MakeEmptyRows() + indexChecksum := verification.MakeKVChecksum(0, 0, 0) + + encoder, err := ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{}) + require.NoError(t, err) + row, err := encoder.Encode(logger, []types.Datum{ + types.NewIntDatum(1), + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "7.csv", 0) + require.NoError(t, err) + + row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) + + row, err = encoder.Encode(logger, []types.Datum{ + types.NewIntDatum(2), + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "8.csv", 0) + require.NoError(t, err) + + row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) + + row, err = encoder.Encode(logger, []types.Datum{ + types.NewIntDatum(3), + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "9.csv", 0) + require.NoError(t, err) + + row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) + + row, err = encoder.Encode(logger, []types.Datum{ + types.NewIntDatum(4), + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "10.csv", 0) + require.NoError(t, err) + + row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) + + row, err = encoder.Encode(logger, []types.Datum{ + types.NewIntDatum(5), + }, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "11.csv", 0) + require.NoError(t, err) + + row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) + + writer, err := engine.LocalWriter(ctx, nil) + require.NoError(t, err) + err = writer.WriteRows(ctx, []string{"a"}, dataRows) + require.Error(t, err) + st, err := writer.Close(ctx) + require.NoError(t, err) + require.Nil(t, st) +>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850) } diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 394c73ce66908..310f948164f27 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -279,6 +279,14 @@ type IgnoreColumns struct { Columns []string `toml:"columns" json:"columns"` } +func (ic *IgnoreColumns) ColumnsMap() map[string]struct{} { + columnMap := make(map[string]struct{}, len(ic.Columns)) + for _, c := range ic.Columns { + columnMap[c] = struct{}{} + } + return columnMap +} + // GetIgnoreColumns gets Ignore config by schema name/regex and table name/regex. func (igCols AllIgnoreColumns) GetIgnoreColumns(db string, table string, caseSensitive bool) (*IgnoreColumns, error) { if !caseSensitive { diff --git a/br/pkg/lightning/lightning_server_serial_test.go b/br/pkg/lightning/lightning_server_serial_test.go new file mode 100644 index 0000000000000..e8a9d86ebbdec --- /dev/null +++ b/br/pkg/lightning/lightning_server_serial_test.go @@ -0,0 +1,374 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Contexts for HTTP requests communicating with a real HTTP server are essential, +// however, when the subject is a mocked server, it would probably be redundant. +//nolint:noctx + +package lightning + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/stretchr/testify/require" +) + +type lightningServerSuite struct { + lightning *Lightning + taskCfgCh chan *config.Config + taskRunCh chan struct{} +} + +func createSuite(t *testing.T) (s *lightningServerSuite, clean func()) { + cfg := config.NewGlobalConfig() + cfg.TiDB.Host = "test.invalid" + cfg.TiDB.Port = 4000 + cfg.TiDB.PdAddr = "test.invalid:2379" + cfg.App.ServerMode = true + cfg.App.StatusAddr = "127.0.0.1:0" + cfg.Mydumper.SourceDir = "file://." + cfg.TikvImporter.Backend = config.BackendLocal + cfg.TikvImporter.SortedKVDir = t.TempDir() + + s = new(lightningServerSuite) + s.lightning = New(cfg) + s.taskRunCh = make(chan struct{}, 1) + s.taskCfgCh = make(chan *config.Config) + s.lightning.ctx = context.WithValue(s.lightning.ctx, taskRunNotifyKey, s.taskRunCh) + s.lightning.ctx = context.WithValue(s.lightning.ctx, taskCfgRecorderKey, s.taskCfgCh) + _ = s.lightning.GoServe() + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/SkipRunTask", "return")) + clean = func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/SkipRunTask")) + s.lightning.Stop() + } + + return +} + +func TestRunServer(t *testing.T) { + s, clean := createSuite(t) + defer clean() + + url := "http://" + s.lightning.serverAddr.String() + "/tasks" + + resp, err := http.Post(url, "application/toml", strings.NewReader("????")) + require.NoError(t, err) + require.Equal(t, http.StatusNotImplemented, resp.StatusCode) + var data map[string]string + err = json.NewDecoder(resp.Body).Decode(&data) + require.NoError(t, err) + require.Contains(t, data, "error") + require.Equal(t, "server-mode not enabled", data["error"]) + require.NoError(t, resp.Body.Close()) + + go func() { + _ = s.lightning.RunServer() + }() + time.Sleep(100 * time.Millisecond) + + req, err := http.NewRequest(http.MethodPut, url, nil) + require.NoError(t, err) + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode) + require.Regexp(t, ".*"+http.MethodPost+".*", resp.Header.Get("Allow")) + require.NoError(t, resp.Body.Close()) + + resp, err = http.Post(url, "application/toml", strings.NewReader("????")) + require.NoError(t, err) + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + err = json.NewDecoder(resp.Body).Decode(&data) + require.NoError(t, err) + require.Contains(t, data, "error") + require.Regexp(t, "cannot parse task.*", data["error"]) + require.NoError(t, resp.Body.Close()) + + resp, err = http.Post(url, "application/toml", strings.NewReader("[mydumper.csv]\nseparator = 'fooo'\ndelimiter= 'foo'")) + require.NoError(t, err) + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + err = json.NewDecoder(resp.Body).Decode(&data) + require.NoError(t, err) + require.Contains(t, data, "error") + require.Regexp(t, "invalid task configuration:.*", data["error"]) + require.NoError(t, resp.Body.Close()) + + for i := 0; i < 20; i++ { + resp, err = http.Post(url, "application/toml", strings.NewReader(fmt.Sprintf(` + [mydumper] + data-source-dir = 'file://demo-path-%d' + [mydumper.csv] + separator = '/' + `, i))) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + var result map[string]int + err = json.NewDecoder(resp.Body).Decode(&result) + require.NoError(t, resp.Body.Close()) + require.NoError(t, err) + require.Contains(t, result, "id") + + select { + case taskCfg := <-s.taskCfgCh: + require.Equal(t, "test.invalid", taskCfg.TiDB.Host) + require.Equal(t, fmt.Sprintf("file://demo-path-%d", i), taskCfg.Mydumper.SourceDir) + require.Equal(t, "/", taskCfg.Mydumper.CSV.Separator) + case <-time.After(5 * time.Second): + t.Fatalf("task is not queued after 5 seconds (i = %d)", i) + } + } +} + +func TestGetDeleteTask(t *testing.T) { + s, clean := createSuite(t) + defer clean() + + url := "http://" + s.lightning.serverAddr.String() + "/tasks" + + type getAllResultType struct { + Current int64 + Queue []int64 + } + + getAllTasks := func() (result getAllResultType) { + resp, err := http.Get(url) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + err = json.NewDecoder(resp.Body).Decode(&result) + require.NoError(t, resp.Body.Close()) + require.NoError(t, err) + return + } + + postTask := func(i int) int64 { + resp, err := http.Post(url, "application/toml", strings.NewReader(fmt.Sprintf(` + [mydumper] + data-source-dir = 'file://demo-path-%d' + `, i))) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + var result struct{ ID int64 } + err = json.NewDecoder(resp.Body).Decode(&result) + require.NoError(t, resp.Body.Close()) + require.NoError(t, err) + return result.ID + } + + go func() { + _ = s.lightning.RunServer() + }() + time.Sleep(500 * time.Millisecond) + + // Check `GET /tasks` without any active tasks + require.Equal(t, getAllResultType{ + Current: 0, + Queue: []int64{}, + }, getAllTasks()) + + first := postTask(1) + second := postTask(2) + third := postTask(3) + + require.NotEqual(t, 123456, first) + require.NotEqual(t, 123456, second) + require.NotEqual(t, 123456, third) + + // Check `GET /tasks` returns all tasks currently running + + <-s.taskRunCh + require.Equal(t, getAllResultType{ + Current: first, + Queue: []int64{second, third}, + }, getAllTasks()) + + // Check `GET /tasks/abcdef` returns error + + resp, err := http.Get(url + "/abcdef") + require.NoError(t, err) + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + // Check `GET /tasks/123456` returns not found + + resp, err = http.Get(url + "/123456") + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + // Check `GET /tasks/1` returns the desired cfg + + var resCfg config.Config + + resp, err = http.Get(fmt.Sprintf("%s/%d", url, second)) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + err = json.NewDecoder(resp.Body).Decode(&resCfg) + require.NoError(t, resp.Body.Close()) + require.NoError(t, err) + require.Equal(t, "file://demo-path-2", resCfg.Mydumper.SourceDir) + + resp, err = http.Get(fmt.Sprintf("%s/%d", url, first)) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + err = json.NewDecoder(resp.Body).Decode(&resCfg) + require.NoError(t, resp.Body.Close()) + require.NoError(t, err) + require.Equal(t, "file://demo-path-1", resCfg.Mydumper.SourceDir) + + // Check `DELETE /tasks` returns error. + + req, err := http.NewRequest(http.MethodDelete, url, nil) + require.NoError(t, err) + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + // Check `DELETE /tasks/` returns error. + + req.URL.Path = "/tasks/" + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + // Check `DELETE /tasks/(not a number)` returns error. + + req.URL.Path = "/tasks/abcdef" + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + // Check `DELETE /tasks/123456` returns not found + + req.URL.Path = "/tasks/123456" + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + // Cancel a queued task, then verify the task list. + + req.URL.Path = fmt.Sprintf("/tasks/%d", second) + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + require.Equal(t, getAllResultType{ + Current: first, + Queue: []int64{third}, + }, getAllTasks()) + + // Cancel a running task, then verify the task list. + + req.URL.Path = fmt.Sprintf("/tasks/%d", first) + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + <-s.taskRunCh + require.Equal(t, getAllResultType{ + Current: third, + Queue: []int64{}, + }, getAllTasks()) +} + +func TestHTTPAPIOutsideServerMode(t *testing.T) { + s, clean := createSuite(t) + defer clean() + + s.lightning.globalCfg.App.ServerMode = false + + url := "http://" + s.lightning.serverAddr.String() + "/tasks" + + errCh := make(chan error) + cfg := config.NewConfig() + cfg.TiDB.DistSQLScanConcurrency = 4 + err := cfg.LoadFromGlobal(s.lightning.globalCfg) + require.NoError(t, err) + go func() { + errCh <- s.lightning.RunOnce(s.lightning.ctx, cfg, nil) + }() + time.Sleep(600 * time.Millisecond) + + var curTask struct { + Current int64 + Queue []int64 + } + + // `GET /tasks` should work fine. + resp, err := http.Get(url) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + err = json.NewDecoder(resp.Body).Decode(&curTask) + require.NoError(t, resp.Body.Close()) + require.NoError(t, err) + require.NotEqual(t, int64(0), curTask.Current) + require.Len(t, curTask.Queue, 0) + + // `POST /tasks` should return 501 + resp, err = http.Post(url, "application/toml", strings.NewReader("??????")) + require.NoError(t, err) + require.Equal(t, http.StatusNotImplemented, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + // `GET /tasks/(current)` should work fine. + resp, err = http.Get(fmt.Sprintf("%s/%d", url, curTask.Current)) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + // `GET /tasks/123456` should return 404 + resp, err = http.Get(url + "/123456") + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + // `PATCH /tasks/(current)/front` should return 501 + req, err := http.NewRequest(http.MethodPatch, fmt.Sprintf("%s/%d/front", url, curTask.Current), nil) + require.NoError(t, err) + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusNotImplemented, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + // `DELETE /tasks/123456` should return 404 + req.Method = http.MethodDelete + req.URL.Path = "/tasks/123456" + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + // `DELETE /tasks/(current)` should return 200 + req.URL.Path = fmt.Sprintf("/tasks/%d", curTask.Current) + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + // ... and the task should be canceled now. + require.Equal(t, context.Canceled, <-errCh) +} diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index 57a631a2e39a8..03e2a4257f601 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -420,14 +420,11 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab return msgs, nil } - igCols := make(map[string]struct{}) igCol, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableInfo.DB, tableInfo.Name, rc.cfg.Mydumper.CaseSensitive) if err != nil { return nil, errors.Trace(err) } - for _, col := range igCol.Columns { - igCols[col] = struct{}{} - } + igCols := igCol.ColumnsMap() if len(tableInfo.DataFiles) == 0 { log.L().Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name)) @@ -597,7 +594,7 @@ outloop: case nil: if !initializedColumns { if len(columnPermutation) == 0 { - columnPermutation, err = createColumnPermutation(columnNames, igCols.Columns, tableInfo) + columnPermutation, err = createColumnPermutation(columnNames, igCols.ColumnsMap(), tableInfo) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index b323c5025d9c9..06c0a0d5be84e 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1317,7 +1317,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error { if err != nil { return errors.Trace(err) } - tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.Columns) + tr, err := NewTableRestore(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap()) if err != nil { return errors.Trace(err) } @@ -2092,6 +2092,16 @@ func (cr *chunkRestore) encodeLoop( pauser, maxKvPairsCnt := rc.pauser, rc.cfg.TikvImporter.MaxKVPairs initializedColumns, reachEOF := false, false + // filteredColumns is column names that excluded ignored columns + // WARN: this might be not correct when different SQL statements contains different fields, + // but since ColumnPermutation also depends on the hypothesis that the columns in one source file is the same + // so this should be ok. + var filteredColumns []string + ignoreColumns, err1 := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(t.dbInfo.Name, t.tableInfo.Core.Name.O, rc.cfg.Mydumper.CaseSensitive) + if err1 != nil { + err = err1 + return + } for !reachEOF { if err = pauser.Wait(ctx); err != nil { return @@ -2122,6 +2132,26 @@ func (cr *chunkRestore) encodeLoop( return } } + filteredColumns = columnNames + if ignoreColumns != nil && len(ignoreColumns.Columns) > 0 { + filteredColumns = make([]string, 0, len(columnNames)) + ignoreColsMap := ignoreColumns.ColumnsMap() + if len(columnNames) > 0 { + for _, c := range columnNames { + if _, ok := ignoreColsMap[c]; !ok { + filteredColumns = append(filteredColumns, c) + } + } + } else { + // init column names by table schema + // after filtered out some columns, we must explicitly set the columns for TiDB backend + for _, col := range t.tableInfo.Core.Columns { + if _, ok := ignoreColsMap[col.Name.L]; !col.Hidden && !ok { + filteredColumns = append(filteredColumns, col.Name.O) + } + } + } + } initializedColumns = true } case io.EOF: @@ -2142,7 +2172,15 @@ func (cr *chunkRestore) encodeLoop( err = errors.Annotatef(encodeErr, "in file %s at offset %d", &cr.chunk.Key, newOffset) return } +<<<<<<< HEAD kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: columnNames, offset: newOffset, rowID: rowID}) +======= + if hasIgnoredEncodeErr { + continue + } + + kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: filteredColumns, offset: newOffset, rowID: rowID}) +>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850) kvSize += kvs.Size() failpoint.Inject("mock-kv-size", func(val failpoint.Value) { kvSize += uint64(val.(int)) diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index 6c3cddb5e9fc0..f5105662ccebb 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -738,30 +738,124 @@ func (s *tableRestoreSuite) TestGetColumnsNames(c *C) { func (s *tableRestoreSuite) TestInitializeColumns(c *C) { ccp := &checkpoints.ChunkCheckpoint{} - c.Assert(s.tr.initializeColumns(nil, ccp), IsNil) - c.Assert(ccp.ColumnPermutation, DeepEquals, []int{0, 1, 2, -1}) - ccp.ColumnPermutation = nil - c.Assert(s.tr.initializeColumns([]string{"b", "c", "a"}, ccp), IsNil) - c.Assert(ccp.ColumnPermutation, DeepEquals, []int{2, 0, 1, -1}) + defer func() { + s.tr.ignoreColumns = nil + }() + + cases := []struct { + columns []string + ignoreColumns map[string]struct{} + expectedPermutation []int + errPat string + }{ + { + nil, + nil, + []int{0, 1, 2, -1}, + "", + }, + { + nil, + map[string]struct{}{"b": {}}, + []int{0, -1, 2, -1}, + "", + }, + { + []string{"b", "c", "a"}, + nil, + []int{2, 0, 1, -1}, + "", + }, + { + []string{"b", "c", "a"}, + map[string]struct{}{"b": {}}, + []int{2, -1, 1, -1}, + "", + }, + { + []string{"b"}, + nil, + []int{-1, 0, -1, -1}, + "", + }, + { + []string{"_tidb_rowid", "b", "a", "c"}, + nil, + []int{2, 1, 3, 0}, + "", + }, + { + []string{"_tidb_rowid", "b", "a", "c"}, + map[string]struct{}{"b": {}, "_tidb_rowid": {}}, + []int{2, -1, 3, -1}, + "", + }, + { + []string{"_tidb_rowid", "b", "a", "c", "d"}, + nil, + nil, + `unknown columns in header \[d\]`, + }, + { + []string{"e", "b", "c", "d"}, + nil, + nil, + `unknown columns in header \[e d\]`, + }, + } + + for _, testCase := range cases { + ccp.ColumnPermutation = nil + s.tr.ignoreColumns = testCase.ignoreColumns + err := s.tr.initializeColumns(testCase.columns, ccp) + if len(testCase.errPat) > 0 { + c.Assert(err, NotNil) + c.Assert(err, ErrorMatches, testCase.errPat) + } else { + c.Assert(ccp.ColumnPermutation, DeepEquals, testCase.expectedPermutation) + } + } +} - ccp.ColumnPermutation = nil - c.Assert(s.tr.initializeColumns([]string{"b"}, ccp), IsNil) - c.Assert(ccp.ColumnPermutation, DeepEquals, []int{-1, 0, -1, -1}) +func (s *tableRestoreSuite) TestInitializeColumnsGenerated(c *C) { + p := parser.New() + p.SetSQLMode(mysql.ModeANSIQuotes) + se := tmock.NewContext() - ccp.ColumnPermutation = nil - c.Assert(s.tr.initializeColumns([]string{"_tidb_rowid", "b", "a", "c"}, ccp), IsNil) - c.Assert(ccp.ColumnPermutation, DeepEquals, []int{2, 1, 3, 0}) + cases := []struct { + schema string + columns []string + expectedPermutation []int + }{ + { + "CREATE TABLE `table` (a INT, b INT, C INT, d INT AS (a * 2))", + []string{"b", "c", "a"}, + []int{2, 0, 1, -1, -1}, + }, + // all generated columns and none input columns + { + "CREATE TABLE `table` (a bigint as (1 + 2) stored, b text as (sha1(repeat('x', a))) stored)", + []string{}, + []int{-1, -1, -1}, + }, + } - ccp.ColumnPermutation = nil - err := s.tr.initializeColumns([]string{"_tidb_rowid", "b", "a", "c", "d"}, ccp) - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, `unknown columns in header \[d\]`) + for _, testCase := range cases { + node, err := p.ParseOneStmt(testCase.schema, "", "") + c.Assert(err, IsNil) + core, err := ddl.MockTableInfo(se, node.(*ast.CreateTableStmt), 0xabcdef) + c.Assert(err, IsNil) + core.State = model.StatePublic + tableInfo := &checkpoints.TidbTableInfo{Name: "table", DB: "db", Core: core} + s.tr, err = NewTableRestore("`db`.`table`", s.tableMeta, s.dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil) + c.Assert(err, IsNil) + ccp := &checkpoints.ChunkCheckpoint{} - ccp.ColumnPermutation = nil - err = s.tr.initializeColumns([]string{"e", "b", "c", "d"}, ccp) - c.Assert(err, NotNil) - c.Assert(err, ErrorMatches, `unknown columns in header \[e d\]`) + err = s.tr.initializeColumns(testCase.columns, ccp) + c.Assert(err, IsNil) + c.Assert(ccp.ColumnPermutation, DeepEquals, testCase.expectedPermutation) + } } func (s *tableRestoreSuite) TestCompareChecksumSuccess(c *C) { @@ -1191,6 +1285,7 @@ func (s *chunkRestoreSuite) TestEncodeLoop(c *C) { c.Assert(kvs, HasLen, 1) c.Assert(kvs[0].rowID, Equals, int64(19)) c.Assert(kvs[0].offset, Equals, int64(36)) + c.Assert(kvs[0].columns, DeepEquals, []string(nil)) kvs = <-kvsCh c.Assert(len(kvs), Equals, 0) @@ -1262,6 +1357,7 @@ func (s *chunkRestoreSuite) TestEncodeLoopDeliverLimit(c *C) { rc := &Controller{pauser: DeliverPauser, cfg: cfg} c.Assert(failpoint.Enable( "github.com/pingcap/tidb/br/pkg/lightning/restore/mock-kv-size", "return(110000000)"), IsNil) + defer failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/restore/mock-kv-size") _, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc) c.Assert(err, IsNil) @@ -1311,7 +1407,7 @@ func (s *chunkRestoreSuite) TestEncodeLoopDeliverErrored(c *C) { func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) { dir := c.MkDir() fileName := "db.table.000.csv" - err := os.WriteFile(filepath.Join(dir, fileName), []byte("1,2,3,4\r\n4,5,6,7\r\n"), 0o644) + err := os.WriteFile(filepath.Join(dir, fileName), []byte("1,2\r\n4,5,6,7\r\n"), 0o644) c.Assert(err, IsNil) store, err := storage.NewLocalStorage(dir) @@ -1339,12 +1435,117 @@ func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) { Timestamp: 1234567895, }) c.Assert(err, IsNil) + defer kvEncoder.Close() _, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc) - c.Assert(err, ErrorMatches, "in file db.table.2.sql:0 at offset 8: column count mismatch, expected 3, got 4") + c.Assert(err, ErrorMatches, "in file db.table.2.sql:0 at offset 4: column count mismatch, expected 3, got 2") c.Assert(kvsCh, HasLen, 0) } +func (s *chunkRestoreSuite) TestEncodeLoopIgnoreColumnsCSV(c *C) { + log.InitLogger(&log.Config{}, "error") + cases := []struct { + s string + ignoreColumns []*config.IgnoreColumns + kvs deliveredKVs + header bool + }{ + { + "1,2,3\r\n4,5,6\r\n", + []*config.IgnoreColumns{ + { + DB: "db", + Table: "table", + Columns: []string{"a"}, + }, + }, + deliveredKVs{ + rowID: 1, + offset: 6, + columns: []string{"b", "c"}, + }, + false, + }, + { + "b,c\r\n2,3\r\n5,6\r\n", + []*config.IgnoreColumns{ + { + TableFilter: []string{"db*.tab*"}, + Columns: []string{"b"}, + }, + }, + deliveredKVs{ + rowID: 1, + offset: 9, + columns: []string{"c"}, + }, + true, + }, + } + + for _, cs := range cases { + // reset test + s.SetUpTest(c) + s.testEncodeLoopIgnoreColumnsCSV(c, cs.s, cs.ignoreColumns, cs.kvs, cs.header) + } +} + +func (s *chunkRestoreSuite) testEncodeLoopIgnoreColumnsCSV( + c *C, + f string, + ignoreColumns []*config.IgnoreColumns, + deliverKV deliveredKVs, + header bool, +) { + dir := c.MkDir() + fileName := "db.table.000.csv" + err := os.WriteFile(filepath.Join(dir, fileName), []byte(f), 0o644) + c.Assert(err, IsNil) + + store, err := storage.NewLocalStorage(dir) + c.Assert(err, IsNil) + + ctx := context.Background() + cfg := config.NewConfig() + cfg.Mydumper.IgnoreColumns = ignoreColumns + cfg.Mydumper.CSV.Header = header + rc := &Controller{pauser: DeliverPauser, cfg: cfg} + + reader, err := store.Open(ctx, fileName) + c.Assert(err, IsNil) + w := worker.NewPool(ctx, 5, "io") + p, err := mydump.NewCSVParser(&cfg.Mydumper.CSV, reader, 111, w, cfg.Mydumper.CSV.Header, nil) + c.Assert(err, IsNil) + + err = s.cr.parser.Close() + c.Assert(err, IsNil) + s.cr.parser = p + + kvsCh := make(chan []deliveredKVs, 2) + deliverCompleteCh := make(chan deliverResult) + kvEncoder, err := tidb.NewTiDBBackend(nil, config.ReplaceOnDup, errormanager.New(nil, config.NewConfig())).NewEncoder( + s.tr.encTable, + &kv.SessionOptions{ + SQLMode: s.cfg.TiDB.SQLMode, + Timestamp: 1234567895, + }) + c.Assert(err, IsNil) + defer kvEncoder.Close() + + _, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc) + c.Assert(err, IsNil) + c.Assert(kvsCh, HasLen, 2) + + kvs := <-kvsCh + c.Assert(kvs, HasLen, 2) + c.Assert(kvs[0].rowID, Equals, deliverKV.rowID) + c.Assert(kvs[0].offset, Equals, deliverKV.offset) + c.Assert(kvs[0].columns, DeepEquals, deliverKV.columns) + + kvs = <-kvsCh + c.Assert(len(kvs), Equals, 0) +} + func (s *chunkRestoreSuite) TestRestore(c *C) { ctx := context.Background() diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 947441ca2c8f8..817d489e97250 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -54,7 +54,7 @@ type TableRestore struct { alloc autoid.Allocators logger log.Logger - ignoreColumns []string + ignoreColumns map[string]struct{} } func NewTableRestore( @@ -63,7 +63,7 @@ func NewTableRestore( dbInfo *checkpoints.TidbDBInfo, tableInfo *checkpoints.TidbTableInfo, cp *checkpoints.TableCheckpoint, - ignoreColumns []string, + ignoreColumns map[string]struct{}, ) (*TableRestore, error) { idAlloc := kv.NewPanickingAllocators(cp.AllocBase) tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core) @@ -168,15 +168,21 @@ func (tr *TableRestore) initializeColumns(columns []string, ccp *checkpoints.Chu return nil } -func createColumnPermutation(columns []string, ignoreColumns []string, tableInfo *model.TableInfo) ([]int, error) { +func createColumnPermutation(columns []string, ignoreColumns map[string]struct{}, tableInfo *model.TableInfo) ([]int, error) { var colPerm []int if len(columns) == 0 { colPerm = make([]int, 0, len(tableInfo.Columns)+1) shouldIncludeRowID := common.TableHasAutoRowID(tableInfo) // no provided columns, so use identity permutation. - for i := range tableInfo.Columns { - colPerm = append(colPerm, i) + for i, col := range tableInfo.Columns { + idx := i + if _, ok := ignoreColumns[col.Name.L]; ok { + idx = -1 + } else if col.IsGenerated() { + idx = -1 + } + colPerm = append(colPerm, idx) } if shouldIncludeRowID { colPerm = append(colPerm, -1) @@ -782,7 +788,7 @@ func (tr *TableRestore) postProcess( return !finished, nil } -func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignoreColumns []string) ([]int, error) { +func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignoreColumns map[string]struct{}) ([]int, error) { colPerm := make([]int, 0, len(tableInfo.Columns)+1) columnMap := make(map[string]int) @@ -790,13 +796,6 @@ func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignor columnMap[column] = i } - ignoreMap := make(map[string]int) - for _, column := range ignoreColumns { - if i, ok := columnMap[column]; ok { - ignoreMap[column] = i - } - } - tableColumnMap := make(map[string]int) for i, col := range tableInfo.Columns { tableColumnMap[col.Name.L] = i @@ -806,7 +805,7 @@ func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignor var unknownCols []string for _, c := range columns { if _, ok := tableColumnMap[c]; !ok && c != model.ExtraHandleName.L { - if _, ignore := ignoreMap[c]; !ignore { + if _, ignore := ignoreColumns[c]; !ignore { unknownCols = append(unknownCols, c) } } @@ -818,7 +817,7 @@ func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignor for _, colInfo := range tableInfo.Columns { if i, ok := columnMap[colInfo.Name.L]; ok { - if _, ignore := ignoreMap[colInfo.Name.L]; !ignore { + if _, ignore := ignoreColumns[colInfo.Name.L]; !ignore { colPerm = append(colPerm, i) } else { log.L().Debug("column ignored by user requirements", @@ -839,11 +838,16 @@ func parseColumnPermutations(tableInfo *model.TableInfo, columns []string, ignor colPerm = append(colPerm, -1) } } + // append _tidb_rowid column + rowIDIdx := -1 if i, ok := columnMap[model.ExtraHandleName.L]; ok { - colPerm = append(colPerm, i) - } else if common.TableHasAutoRowID(tableInfo) { - colPerm = append(colPerm, -1) + if _, ignored := ignoreColumns[model.ExtraHandleName.L]; !ignored { + rowIDIdx = i + } } + // FIXME: the schema info for tidb backend is not complete, so always add the _tidb_rowid field. + // Other logic should ignore this extra field if not needed. + colPerm = append(colPerm, rowIDIdx) return colPerm, nil } diff --git a/br/tests/lightning_distributed_import/run.sh b/br/tests/lightning_distributed_import/run.sh new file mode 100644 index 0000000000000..d21bf356b1568 --- /dev/null +++ b/br/tests/lightning_distributed_import/run.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# +# Copyright 2021 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +LOG_FILE1="$TEST_DIR/lightning-distributed-import1.log" +LOG_FILE2="$TEST_DIR/lightning-distributed-import2.log" + +# let lightning run a bit slow to avoid some table in the first lightning finish too fast. +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownImport=sleep(250)" + +run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_distributed_import.sorted1" \ + -d "tests/$TEST_NAME/data1" --log-file "$LOG_FILE1" --config "tests/$TEST_NAME/config.toml" & +pid1="$!" + +run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_distributed_import.sorted2" \ + -d "tests/$TEST_NAME/data2" --log-file "$LOG_FILE2" --config "tests/$TEST_NAME/config.toml" & +pid2="$!" + +wait "$pid1" "$pid2" + +run_sql 'select count(*) from distributed_import.t' +check_contains 'count(*): 10' diff --git a/br/tests/lightning_duplicate_detection/run.sh b/br/tests/lightning_duplicate_detection/run.sh index 091ac122ebb4e..6ac9205a705b3 100644 --- a/br/tests/lightning_duplicate_detection/run.sh +++ b/br/tests/lightning_duplicate_detection/run.sh @@ -20,6 +20,12 @@ check_cluster_version 4 0 0 'local backend' || exit 0 LOG_FILE1="$TEST_DIR/lightning-duplicate-detection1.log" LOG_FILE2="$TEST_DIR/lightning-duplicate-detection2.log" +<<<<<<< HEAD +======= +# let lightning run a bit slow to avoid some table in the first lightning finish too fast. +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownImport=sleep(250)" + +>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850) run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_detection.sorted1" \ --enable-checkpoint=1 --log-file "$LOG_FILE1" --config "tests/$TEST_NAME/config1.toml" && exit 1 & run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_detection.sorted2" \ diff --git a/br/tests/lightning_tidb_rowid/data/rowid.explicit_tidb_rowid-schema.sql b/br/tests/lightning_tidb_rowid/data/rowid.explicit_tidb_rowid-schema.sql index 4f1d634485cca..55232f2ff6081 100644 --- a/br/tests/lightning_tidb_rowid/data/rowid.explicit_tidb_rowid-schema.sql +++ b/br/tests/lightning_tidb_rowid/data/rowid.explicit_tidb_rowid-schema.sql @@ -1 +1 @@ -create table explicit_tidb_rowid (pk varchar(6) primary key); \ No newline at end of file +create table explicit_tidb_rowid (pk varchar(6) primary key /*T![clustered_index] NONCLUSTERED */); diff --git a/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql b/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql index f6962e15a0072..a69f5bf4350eb 100644 --- a/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql +++ b/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql @@ -1 +1 @@ -create table specific_auto_inc (a varchar(6) primary key, b int unique auto_increment) auto_increment=80000; \ No newline at end of file +create table specific_auto_inc (a varchar(6) primary key /*T![clustered_index] NONCLUSTERED */, b int unique auto_increment) auto_increment=80000; diff --git a/br/tests/run.sh b/br/tests/run.sh index be8c1b57e8642..670bba7903fb8 100755 --- a/br/tests/run.sh +++ b/br/tests/run.sh @@ -27,7 +27,7 @@ SELECTED_TEST_NAME="${TEST_NAME-$(find tests -mindepth 2 -maxdepth 2 -name run.s source tests/_utils/run_services trap stop_services EXIT -start_services +start_services $@ # Intermediate file needed because read can be used as a pipe target. # https://stackoverflow.com/q/2746553/