Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
loader: support ANSI_QUOTES when sharing rename (#1309) (#1314)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>

Co-authored-by: lance6716 <[email protected]>
  • Loading branch information
ti-srebot and lance6716 authored Nov 27, 2020
1 parent eee7853 commit 7e25db1
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 21 deletions.
16 changes: 9 additions & 7 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab

lastOffset := cur

ansiquote := strings.Contains(w.cfg.SQLMode, "ANSI_QUOTES")
data := make([]byte, 0, 1024*1024)
br := bufio.NewReader(f)
for {
Expand Down Expand Up @@ -334,7 +335,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab
return terror.Annotatef(err, "file %s", file)
}
} else if table.sourceTable != table.targetTable {
query = renameShardingTable(query, table.sourceTable, table.targetTable)
query = renameShardingTable(query, table.sourceTable, table.targetTable, ansiquote)
}

idx := strings.Index(query, "INSERT INTO")
Expand Down Expand Up @@ -1091,6 +1092,7 @@ func (l *Loader) restoreStructure(ctx context.Context, conn *DBConn, sqlFile str
defer f.Close()

tctx := tcontext.NewContext(ctx, l.logger)
ansiquote := strings.Contains(l.cfg.SQLMode, "ANSI_QUOTES")

data := make([]byte, 0, 1024*1024)
br := bufio.NewReader(f)
Expand Down Expand Up @@ -1118,9 +1120,9 @@ func (l *Loader) restoreStructure(ctx context.Context, conn *DBConn, sqlFile str
// for table
if table != "" {
sqls = append(sqls, fmt.Sprintf("USE `%s`;", unescapePercent(dstSchema, l.logger)))
query = renameShardingTable(query, table, dstTable)
query = renameShardingTable(query, table, dstTable, ansiquote)
} else {
query = renameShardingSchema(query, schema, dstSchema)
query = renameShardingSchema(query, schema, dstSchema, ansiquote)
}

l.logger.Debug("schema create statement", zap.String("sql", query))
Expand All @@ -1137,13 +1139,13 @@ func (l *Loader) restoreStructure(ctx context.Context, conn *DBConn, sqlFile str
}

// renameShardingTable replaces srcTable with dstTable in query
func renameShardingTable(query, srcTable, dstTable string) string {
return SQLReplace(query, srcTable, dstTable)
func renameShardingTable(query, srcTable, dstTable string, ansiquote bool) string {
return SQLReplace(query, srcTable, dstTable, ansiquote)
}

// renameShardingSchema replaces srcSchema with dstSchema in query
func renameShardingSchema(query, srcSchema, dstSchema string) string {
return SQLReplace(query, srcSchema, dstSchema)
func renameShardingSchema(query, srcSchema, dstSchema string, ansiquote bool) string {
return SQLReplace(query, srcSchema, dstSchema, ansiquote)
}

func fetchMatchedLiteral(ctx *tcontext.Context, router *router.Table, schema, table string) (targetSchema string, targetTable string) {
Expand Down
29 changes: 17 additions & 12 deletions loader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package loader

import (
"bytes"
"crypto/sha1"
"fmt"
"os"
Expand Down Expand Up @@ -51,18 +50,24 @@ func CollectDirFiles(path string) (map[string]struct{}, error) {

// SQLReplace works like strings.Replace but only supports one replacement.
// It uses backquote pairs to quote the old and new word.
func SQLReplace(s, old, new string) string {
old = backquote(old)
new = backquote(new)
return strings.Replace(s, old, new, 1)
}
func SQLReplace(s, old, new string, ansiquote bool) string {
var quote string
if ansiquote {
quote = "\""
} else {
quote = "`"
}
quoteF := func(s string) string {
var b strings.Builder
b.WriteString(quote)
b.WriteString(s)
b.WriteString(quote)
return b.String()
}

func backquote(s string) string {
buf := bytes.Buffer{}
buf.WriteByte('`')
buf.WriteString(s)
buf.WriteByte('`')
return buf.String()
old = quoteF(old)
new = quoteF(new)
return strings.Replace(s, old, new, 1)
}

// shortSha1 returns the first 6 characters of sha1 value
Expand Down
4 changes: 3 additions & 1 deletion loader/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ func (t *testUtilSuite) TestSQLReplace(c *C) {
}

for _, tt := range replaceTests {
c.Assert(SQLReplace(tt.in, tt.old, tt.new), Equals, tt.out)
c.Assert(SQLReplace(tt.in, tt.old, tt.new, false), Equals, tt.out)
}

c.Assert(SQLReplace("create table \"xyz\"", "xyz", "abc", true),
Equals, "create table \"abc\"")
}

func (t *testUtilSuite) TestShortSha1(c *C) {
Expand Down
7 changes: 6 additions & 1 deletion tests/sharding/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ EOF
}

function run() {
run_sql_both_source "SET @@GLOBAL.SQL_MODE='NO_ZERO_IN_DATE,NO_ZERO_DATE'"
run_sql "SET @@GLOBAL.SQL_MODE='NO_ZERO_IN_DATE,NO_ZERO_DATE'" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "SET @@GLOBAL.SQL_MODE='NO_ZERO_IN_DATE,NO_ZERO_DATE,ANSI_QUOTES'" $MYSQL_PORT2 $MYSQL_PASSWORD2
run_sql_tidb "SET @@GLOBAL.SQL_MODE='NO_ZERO_IN_DATE,NO_ZERO_DATE'"

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
Expand Down Expand Up @@ -52,10 +53,14 @@ function run() {

# start DM task only
dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Sync" 2

# TODO: check sharding partition id
# use sync_diff_inspector to check full dump loader
echo "check sync diff for full dump and load"
run_sql "SET @@GLOBAL.SQL_MODE='NO_ZERO_IN_DATE,NO_ZERO_DATE'" $MYSQL_PORT2 $MYSQL_PASSWORD2
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
Expand Down

0 comments on commit 7e25db1

Please sign in to comment.