Skip to content

Commit

Permalink
restore: ignore error when downstream db/table is created (#38327)
Browse files Browse the repository at this point in the history
ref #37984
  • Loading branch information
lance6716 authored Oct 12, 2022
1 parent 78d3307 commit 00791e7
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 3 deletions.
17 changes: 16 additions & 1 deletion br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func InterpolateMySQLString(s string) string {
}

// TableExists return whether table with specified name exists in target db
func TableExists(ctx context.Context, db *sql.DB, schema, table string) (bool, error) {
func TableExists(ctx context.Context, db utils.QueryExecutor, schema, table string) (bool, error) {
query := "SELECT 1 from INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?"
var exist string
err := db.QueryRowContext(ctx, query, schema, table).Scan(&exist)
Expand All @@ -309,6 +309,21 @@ func TableExists(ctx context.Context, db *sql.DB, schema, table string) (bool, e
}
}

// SchemaExists return whether schema with specified name exists.
func SchemaExists(ctx context.Context, db utils.QueryExecutor, schema string) (bool, error) {
query := "SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?"
var exist string
err := db.QueryRowContext(ctx, query, schema).Scan(&exist)
switch {
case err == nil:
return true, nil
case err == sql.ErrNoRows:
return false, nil
default:
return false, errors.Annotatef(err, "check schema exists failed")
}
}

// GetJSON fetches a page and parses it as JSON. The parsed result will be
// stored into the `v`. The variable `v` must be a pointer to a type that can be
// unmarshalled from JSON.
Expand Down
28 changes: 26 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,15 @@ type restoreSchemaWorker struct {
func (worker *restoreSchemaWorker) addJob(sqlStr string, job *schemaJob) error {
stmts, err := createIfNotExistsStmt(worker.glue.GetParser(), sqlStr, job.dbName, job.tblName)
if err != nil {
return err
worker.logger.Warn("failed to rewrite statement, will use raw input instead",
zap.String("db", job.dbName),
zap.String("table", job.tblName),
zap.String("statement", sqlStr),
zap.Error(err))
job.stmts = []string{sqlStr}
} else {
job.stmts = stmts
}
job.stmts = stmts
return worker.appendJob(job)
}

Expand Down Expand Up @@ -656,7 +662,25 @@ loop:
for _, stmt := range job.stmts {
task := logger.Begin(zap.DebugLevel, fmt.Sprintf("execute SQL: %s", stmt))
err = sqlWithRetry.Exec(worker.ctx, "run create schema job", stmt)
if err != nil {
// try to imitate IF NOT EXISTS behavior for parsing errors
exists := false
switch job.stmtType {
case schemaCreateDatabase:
var err2 error
exists, err2 = common.SchemaExists(worker.ctx, session, job.dbName)
if err2 != nil {
task.Error("failed to check database existence", zap.Error(err2))
}
case schemaCreateTable:
exists, _ = common.TableExists(worker.ctx, session, job.dbName, job.tblName)
}
if exists {
err = nil
}
}
task.End(zap.ErrorLevel, err)

if err != nil {
err = common.ErrCreateSchema.Wrap(err).GenWithStackByArgs(common.UniqueTable(job.dbName, job.tblName), job.stmtType.String())
worker.wg.Done()
Expand Down
2 changes: 2 additions & 0 deletions br/tests/lightning_character_sets/greek.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tikv-importer]
backend = "local"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/*!40101 SET NAMES binary*/;
CREATE DATABASE `charsets` /*!40100 DEFAULT CHARACTER SET greek */;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/*!40101 SET NAMES binary*/;
CREATE TABLE `greek` (
`c` varchar(20) DEFAULT NULL,
PRIMARY KEY (`c`)
) ENGINE=InnoDB DEFAULT CHARSET=greek;
3 changes: 3 additions & 0 deletions br/tests/lightning_character_sets/greek/charsets.greek.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/*!40101 SET NAMES binary*/;
INSERT INTO `greek` VALUES
('α');
14 changes: 14 additions & 0 deletions br/tests/lightning_character_sets/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,17 @@ run_lightning --config "tests/$TEST_NAME/binary.toml" -d "tests/$TEST_NAME/mixed
run_sql 'SELECT sum(`唯一键`) AS s FROM charsets.mixed'
check_contains 's: 5291'

# test about unsupported charset in UTF-8 encoding dump files
# test local backend
run_lightning --config "tests/$TEST_NAME/greek.toml" -d "tests/$TEST_NAME/greek" 2>&1 | grep -q "Unknown character set: 'greek'"
run_sql 'DROP DATABASE IF EXISTS charsets;'
run_sql 'CREATE DATABASE charsets;'
run_sql 'CREATE TABLE charsets.greek (c VARCHAR(20) PRIMARY KEY);'
run_lightning --config "tests/$TEST_NAME/greek.toml" -d "tests/$TEST_NAME/greek"
run_sql "SELECT count(*) FROM charsets.greek WHERE c = 'α';"
check_contains 'count(*): 1'
# test tidb backend
run_sql 'TRUNCATE TABLE charsets.greek;'
run_lightning --config "tests/$TEST_NAME/greek.toml" -d "tests/$TEST_NAME/greek" --backend tidb
run_sql "SELECT count(*) FROM charsets.greek WHERE c = 'α';"
check_contains 'count(*): 1'

0 comments on commit 00791e7

Please sign in to comment.