Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

loader: support ANSI_QUOTES when sharing rename (#1309) #1314

Merged
merged 3 commits into from
Nov 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab

lastOffset := cur

ansiquote := strings.Contains(w.cfg.SQLMode, "ANSI_QUOTES")
data := make([]byte, 0, 1024*1024)
br := bufio.NewReader(f)
for {
Expand Down Expand Up @@ -334,7 +335,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab
return terror.Annotatef(err, "file %s", file)
}
} else if table.sourceTable != table.targetTable {
query = renameShardingTable(query, table.sourceTable, table.targetTable)
query = renameShardingTable(query, table.sourceTable, table.targetTable, ansiquote)
}

idx := strings.Index(query, "INSERT INTO")
Expand Down Expand Up @@ -1091,6 +1092,7 @@ func (l *Loader) restoreStructure(ctx context.Context, conn *DBConn, sqlFile str
defer f.Close()

tctx := tcontext.NewContext(ctx, l.logger)
ansiquote := strings.Contains(l.cfg.SQLMode, "ANSI_QUOTES")

data := make([]byte, 0, 1024*1024)
br := bufio.NewReader(f)
Expand Down Expand Up @@ -1118,9 +1120,9 @@ func (l *Loader) restoreStructure(ctx context.Context, conn *DBConn, sqlFile str
// for table
if table != "" {
sqls = append(sqls, fmt.Sprintf("USE `%s`;", unescapePercent(dstSchema, l.logger)))
query = renameShardingTable(query, table, dstTable)
query = renameShardingTable(query, table, dstTable, ansiquote)
} else {
query = renameShardingSchema(query, schema, dstSchema)
query = renameShardingSchema(query, schema, dstSchema, ansiquote)
}

l.logger.Debug("schema create statement", zap.String("sql", query))
Expand All @@ -1137,13 +1139,13 @@ func (l *Loader) restoreStructure(ctx context.Context, conn *DBConn, sqlFile str
}

// renameShardingTable replaces srcTable with dstTable in query
func renameShardingTable(query, srcTable, dstTable string) string {
return SQLReplace(query, srcTable, dstTable)
func renameShardingTable(query, srcTable, dstTable string, ansiquote bool) string {
return SQLReplace(query, srcTable, dstTable, ansiquote)
}

// renameShardingSchema replaces srcSchema with dstSchema in query
func renameShardingSchema(query, srcSchema, dstSchema string) string {
return SQLReplace(query, srcSchema, dstSchema)
func renameShardingSchema(query, srcSchema, dstSchema string, ansiquote bool) string {
return SQLReplace(query, srcSchema, dstSchema, ansiquote)
}

func fetchMatchedLiteral(ctx *tcontext.Context, router *router.Table, schema, table string) (targetSchema string, targetTable string) {
Expand Down
29 changes: 17 additions & 12 deletions loader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package loader

import (
"bytes"
"crypto/sha1"
"fmt"
"os"
Expand Down Expand Up @@ -51,18 +50,24 @@ func CollectDirFiles(path string) (map[string]struct{}, error) {

// SQLReplace works like strings.Replace but only supports one replacement.
// It uses backquote pairs to quote the old and new word.
func SQLReplace(s, old, new string) string {
old = backquote(old)
new = backquote(new)
return strings.Replace(s, old, new, 1)
}
func SQLReplace(s, old, new string, ansiquote bool) string {
var quote string
if ansiquote {
quote = "\""
} else {
quote = "`"
}
quoteF := func(s string) string {
var b strings.Builder
b.WriteString(quote)
b.WriteString(s)
b.WriteString(quote)
return b.String()
}

func backquote(s string) string {
buf := bytes.Buffer{}
buf.WriteByte('`')
buf.WriteString(s)
buf.WriteByte('`')
return buf.String()
old = quoteF(old)
new = quoteF(new)
return strings.Replace(s, old, new, 1)
}

// shortSha1 returns the first 6 characters of sha1 value
Expand Down
4 changes: 3 additions & 1 deletion loader/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ func (t *testUtilSuite) TestSQLReplace(c *C) {
}

for _, tt := range replaceTests {
c.Assert(SQLReplace(tt.in, tt.old, tt.new), Equals, tt.out)
c.Assert(SQLReplace(tt.in, tt.old, tt.new, false), Equals, tt.out)
}

c.Assert(SQLReplace("create table \"xyz\"", "xyz", "abc", true),
Equals, "create table \"abc\"")
}

func (t *testUtilSuite) TestShortSha1(c *C) {
Expand Down
7 changes: 6 additions & 1 deletion tests/sharding/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ EOF
}

function run() {
run_sql_both_source "SET @@GLOBAL.SQL_MODE='NO_ZERO_IN_DATE,NO_ZERO_DATE'"
run_sql "SET @@GLOBAL.SQL_MODE='NO_ZERO_IN_DATE,NO_ZERO_DATE'" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "SET @@GLOBAL.SQL_MODE='NO_ZERO_IN_DATE,NO_ZERO_DATE,ANSI_QUOTES'" $MYSQL_PORT2 $MYSQL_PASSWORD2
run_sql_tidb "SET @@GLOBAL.SQL_MODE='NO_ZERO_IN_DATE,NO_ZERO_DATE'"

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
Expand Down Expand Up @@ -52,10 +53,14 @@ function run() {

# start DM task only
dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Sync" 2

# TODO: check sharding partition id
# use sync_diff_inspector to check full dump loader
echo "check sync diff for full dump and load"
run_sql "SET @@GLOBAL.SQL_MODE='NO_ZERO_IN_DATE,NO_ZERO_DATE'" $MYSQL_PORT2 $MYSQL_PASSWORD2
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
Expand Down