diff --git a/cmd/tidb-lightning-ctl/main.go b/cmd/tidb-lightning-ctl/main.go index 16aed9349..e67747876 100644 --- a/cmd/tidb-lightning-ctl/main.go +++ b/cmd/tidb-lightning-ctl/main.go @@ -12,6 +12,7 @@ import ( "github.com/pingcap/tidb-lightning/lightning/kv" "github.com/pingcap/tidb-lightning/lightning/restore" "github.com/pkg/errors" + "github.com/satori/go.uuid" ) func main() { @@ -139,7 +140,39 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tableName s } defer target.Close() - return errors.Trace(cpdb.DestroyErrorCheckpoint(ctx, tableName, target)) + importer, err := kv.NewImporter(ctx, cfg.TikvImporter.Addr, cfg.TiDB.PdAddr) + if err != nil { + return errors.Trace(err) + } + defer importer.Close() + + targetTables, err := cpdb.DestroyErrorCheckpoint(ctx, tableName) + if err != nil { + return errors.Trace(err) + } + + var lastErr error + + for _, table := range targetTables { + fmt.Fprintln(os.Stderr, "Dropping table:", table.TableName) + err := target.DropTable(ctx, table.TableName) + if err != nil { + fmt.Fprintln(os.Stderr, "* Encountered error while dropping table:", err) + lastErr = err + } + } + + for _, table := range targetTables { + if table.Engine == uuid.Nil { + continue + } + if closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, table.Engine); err == nil { + fmt.Fprintln(os.Stderr, "Cleaning up engine:", table.TableName, table.Engine) + closedEngine.Cleanup(ctx) + } + } + + return errors.Trace(lastErr) } func checkpointDump(ctx context.Context, cfg *config.Config, dumpFolder string) error { diff --git a/lightning/restore/checkpoints.go b/lightning/restore/checkpoints.go index 54e7f4699..b5b5ec489 100644 --- a/lightning/restore/checkpoints.go +++ b/lightning/restore/checkpoints.go @@ -164,6 +164,11 @@ func (merger *RebaseCheckpointMerger) MergeInto(cpd *TableCheckpointDiff) { cpd.allocBase = mathutil.MaxInt64(cpd.allocBase, merger.AllocBase) } +type DestroyedTableCheckpoint struct { + TableName string + Engine uuid.UUID +} + type CheckpointsDB interface { Initialize(ctx context.Context, dbInfo map[string]*TidbDBInfo) error Get(ctx context.Context, tableName string) (*TableCheckpoint, error) @@ -173,7 +178,7 @@ type CheckpointsDB interface { RemoveCheckpoint(ctx context.Context, tableName string) error IgnoreErrorCheckpoint(ctx context.Context, tableName string) error - DestroyErrorCheckpoint(ctx context.Context, tableName string, target *TiDBManager) error + DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error) DumpTables(ctx context.Context, csv io.Writer) error DumpChunks(ctx context.Context, csv io.Writer) error } @@ -488,8 +493,8 @@ func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error { func (*NullCheckpointsDB) IgnoreErrorCheckpoint(context.Context, string) error { return errors.Trace(cannotManageNullDB) } -func (*NullCheckpointsDB) DestroyErrorCheckpoint(context.Context, string, *TiDBManager) error { - return errors.Trace(cannotManageNullDB) +func (*NullCheckpointsDB) DestroyErrorCheckpoint(context.Context, string) ([]DestroyedTableCheckpoint, error) { + return nil, errors.Trace(cannotManageNullDB) } func (*NullCheckpointsDB) DumpTables(context.Context, io.Writer) error { return errors.Trace(cannotManageNullDB) @@ -547,24 +552,7 @@ func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, table return errors.Trace(err) } -func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string, target *TiDBManager) error { - targetTables, err := cpdb.destroyErrorCheckpoints(ctx, tableName) - if err != nil { - return errors.Trace(err) - } - - common.AppLogger.Info("Going to drop the following tables:", targetTables) - for _, tableName := range targetTables { - query := "DROP TABLE " + tableName - err := common.ExecWithRetry(ctx, target.db, query, query) - if err != nil { - return errors.Trace(err) - } - } - return nil -} - -func (cpdb *MySQLCheckpointsDB) destroyErrorCheckpoints(ctx context.Context, tableName string) ([]string, error) { +func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error) { var ( conditionColumn string arg interface{} @@ -579,7 +567,7 @@ func (cpdb *MySQLCheckpointsDB) destroyErrorCheckpoints(ctx context.Context, tab } selectQuery := fmt.Sprintf(` - SELECT table_name FROM %s.%s WHERE %s = ? AND status <= %d; + SELECT table_name, engine FROM %s.%s WHERE %s = ? AND status <= %d; `, cpdb.schema, checkpointTableNameTable, conditionColumn, CheckpointStatusMaxInvalid) deleteChunkQuery := fmt.Sprintf(` DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = ? AND status <= %[3]d) @@ -588,7 +576,7 @@ func (cpdb *MySQLCheckpointsDB) destroyErrorCheckpoints(ctx context.Context, tab DELETE FROM %s.%s WHERE %s = ? AND status <= %d `, cpdb.schema, checkpointTableNameTable, conditionColumn, CheckpointStatusMaxInvalid) - var targetTables []string + var targetTables []DestroyedTableCheckpoint err := common.TransactWithRetry(ctx, cpdb.db, fmt.Sprintf("(destroy error checkpoints for %s)", tableName), func(c context.Context, tx *sql.Tx) error { // Obtain the list of tables @@ -599,11 +587,17 @@ func (cpdb *MySQLCheckpointsDB) destroyErrorCheckpoints(ctx context.Context, tab } defer rows.Close() for rows.Next() { - var matchedTableName string - if e := rows.Scan(&matchedTableName); e != nil { + var ( + matchedTableName string + matchedEngine []byte + ) + if e := rows.Scan(&matchedTableName, &matchedEngine); e != nil { return errors.Trace(e) } - targetTables = append(targetTables, matchedTableName) + targetTables = append(targetTables, DestroyedTableCheckpoint{ + TableName: matchedTableName, + Engine: uuid.FromBytesOrNil(matchedEngine), + }) } if e := rows.Err(); e != nil { return errors.Trace(e) diff --git a/lightning/restore/tidb.go b/lightning/restore/tidb.go index 731568e58..ac25df93d 100644 --- a/lightning/restore/tidb.go +++ b/lightning/restore/tidb.go @@ -141,6 +141,11 @@ func (timgr *TiDBManager) getTables(schema string) ([]*model.TableInfo, error) { return tables, nil } +func (timgr *TiDBManager) DropTable(ctx context.Context, tableName string) error { + query := "DROP TABLE " + tableName + return errors.Trace(common.ExecWithRetry(ctx, timgr.db, query, query)) +} + func (timgr *TiDBManager) LoadSchemaInfo(ctx context.Context, schemas map[string]*mydump.MDDatabaseMeta) (map[string]*TidbDBInfo, error) { result := make(map[string]*TidbDBInfo, len(schemas)) for schema := range schemas { diff --git a/tests/checkpoint_error_destroy/bad-data/cped-schema-create.sql b/tests/checkpoint_error_destroy/bad-data/cped-schema-create.sql new file mode 100644 index 000000000..68c25cc40 --- /dev/null +++ b/tests/checkpoint_error_destroy/bad-data/cped-schema-create.sql @@ -0,0 +1 @@ +create database cped; diff --git a/tests/checkpoint_error_destroy/bad-data/cped.t-schema.sql b/tests/checkpoint_error_destroy/bad-data/cped.t-schema.sql new file mode 100644 index 000000000..8bfb649ef --- /dev/null +++ b/tests/checkpoint_error_destroy/bad-data/cped.t-schema.sql @@ -0,0 +1 @@ +create table t (x timestamp not null); diff --git a/tests/checkpoint_error_destroy/bad-data/cped.t.sql b/tests/checkpoint_error_destroy/bad-data/cped.t.sql new file mode 100644 index 000000000..e96d6d157 --- /dev/null +++ b/tests/checkpoint_error_destroy/bad-data/cped.t.sql @@ -0,0 +1 @@ +insert into t values ('1111-11-11 11:11:11'); diff --git a/tests/checkpoint_error_destroy/bad.toml b/tests/checkpoint_error_destroy/bad.toml new file mode 100644 index 000000000..b711777e9 --- /dev/null +++ b/tests/checkpoint_error_destroy/bad.toml @@ -0,0 +1,26 @@ +[lightning] +check-requirements = false +file = "/tmp/lightning_test_result/lightning.log" +level = "info" + +[checkpoint] +enable = true + +[tikv-importer] +addr = "127.0.0.1:8808" + +[mydumper] +data-source-dir = "tests/checkpoint_error_destroy/bad-data" + +[tidb] +host = "127.0.0.1" +port = 4000 +user = "root" +status-port = 10080 +pd-addr = "127.0.0.1:2379" +log-level = "error" + +[post-restore] +checksum = true +compact = false +analyze = false diff --git a/tests/checkpoint_error_destroy/good-data/cped-schema-create.sql b/tests/checkpoint_error_destroy/good-data/cped-schema-create.sql new file mode 100644 index 000000000..68c25cc40 --- /dev/null +++ b/tests/checkpoint_error_destroy/good-data/cped-schema-create.sql @@ -0,0 +1 @@ +create database cped; diff --git a/tests/checkpoint_error_destroy/good-data/cped.t-schema.sql b/tests/checkpoint_error_destroy/good-data/cped.t-schema.sql new file mode 100644 index 000000000..8bfb649ef --- /dev/null +++ b/tests/checkpoint_error_destroy/good-data/cped.t-schema.sql @@ -0,0 +1 @@ +create table t (x timestamp not null); diff --git a/tests/checkpoint_error_destroy/good-data/cped.t.sql b/tests/checkpoint_error_destroy/good-data/cped.t.sql new file mode 100644 index 000000000..9bb4e0266 --- /dev/null +++ b/tests/checkpoint_error_destroy/good-data/cped.t.sql @@ -0,0 +1 @@ +insert into t values ('1999-09-09 09:09:09'); diff --git a/tests/checkpoint_error_destroy/good.toml b/tests/checkpoint_error_destroy/good.toml new file mode 100644 index 000000000..364ea95c0 --- /dev/null +++ b/tests/checkpoint_error_destroy/good.toml @@ -0,0 +1,26 @@ +[lightning] +check-requirements = false +file = "/tmp/lightning_test_result/lightning.log" +level = "info" + +[checkpoint] +enable = true + +[tikv-importer] +addr = "127.0.0.1:8808" + +[mydumper] +data-source-dir = "tests/checkpoint_error_destroy/good-data" + +[tidb] +host = "127.0.0.1" +port = 4000 +user = "root" +status-port = 10080 +pd-addr = "127.0.0.1:2379" +log-level = "error" + +[post-restore] +checksum = true +compact = false +analyze = false diff --git a/tests/checkpoint_error_destroy/run.sh b/tests/checkpoint_error_destroy/run.sh new file mode 100755 index 000000000..c4f44b13e --- /dev/null +++ b/tests/checkpoint_error_destroy/run.sh @@ -0,0 +1,16 @@ +#!/bin/sh + +set -eu + +# Make sure we won't run out of table concurrency by destroying checkpoints + +for i in $(seq 8); do + set +e + run_lightning bad + set -e + bin/tidb-lightning-ctl -checkpoint-error-destroy=all +done + +run_lightning good +run_sql 'SELECT * FROM cped.t' +check_contains 'x: 1999-09-09 09:09:09'