diff --git a/.gitignore b/.gitignore index f4fdee386..77f1b6151 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ _testmain.go *.exe *.test *.prof +*.log bin *.iml diff --git a/drainer/collector.go b/drainer/collector.go index 53966d2e3..cb8fbafbf 100644 --- a/drainer/collector.go +++ b/drainer/collector.go @@ -24,12 +24,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tidb-binlog/drainer/checkpoint" - "github.com/pingcap/tidb-binlog/pkg/etcd" - "github.com/pingcap/tidb-binlog/pkg/flags" - "github.com/pingcap/tidb-binlog/pkg/node" - "github.com/pingcap/tidb-binlog/pkg/util" - "github.com/pingcap/tidb-binlog/pump" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/store" @@ -38,6 +32,13 @@ import ( "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" "golang.org/x/net/context" + + "github.com/pingcap/tidb-binlog/drainer/checkpoint" + "github.com/pingcap/tidb-binlog/pkg/etcd" + "github.com/pingcap/tidb-binlog/pkg/flags" + "github.com/pingcap/tidb-binlog/pkg/node" + "github.com/pingcap/tidb-binlog/pkg/util" + "github.com/pingcap/tidb-binlog/pump" ) const ( @@ -268,7 +269,8 @@ func (c *Collector) reportErr(ctx context.Context, err error) { // we CAN NOT query the job from the tikv according the job id. // just skip this kind of binlog now. func skipQueryJob(binlog *binlog.Binlog) bool { - q := binlog.GetDdlQuery() + // ToLower is to fix https://github.com/pingcap/tidb/issues/31611 + q := bytes.ToLower(binlog.GetDdlQuery()) return bytes.HasPrefix(q, []byte("select setval")) } diff --git a/drainer/schema.go b/drainer/schema.go index f05e6d54f..95109e269 100644 --- a/drainer/schema.go +++ b/drainer/schema.go @@ -18,10 +18,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tidb-binlog/pkg/filter" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "go.uber.org/zap" + + "github.com/pingcap/tidb-binlog/pkg/filter" ) const implicitColName = "_tidb_rowid" @@ -319,6 +320,9 @@ func skipUnsupportedDDLJob(job *model.Job) bool { return true case model.ActionAlterTableAttributes, model.ActionAlterTablePartitionAttributes: return true + case model.ActionCreatePlacementPolicy, model.ActionAlterPlacementPolicy, model.ActionDropPlacementPolicy, + model.ActionAlterTablePartitionPlacement, model.ActionModifySchemaDefaultPlacement, model.ActionAlterTablePlacement: + return true } return false diff --git a/tests/placement_rules/drainer.toml b/tests/placement_rules/drainer.toml new file mode 100644 index 000000000..f5820631c --- /dev/null +++ b/tests/placement_rules/drainer.toml @@ -0,0 +1,17 @@ +data-dir = '/tmp/tidb_binlog_test/data.drainer' + +[syncer] +txn-batch = 1 +worker-count = 1 +safe-mode = false +db-type = 'mysql' +replicate-do-db = ['placement_test'] + +[syncer.to] +host = '127.0.0.1' +user = 'root' +password = '' +port = 3306 + +[syncer.to.checkpoint] +schema = "placement_rules_checkpoint" diff --git a/tests/placement_rules/run.sh b/tests/placement_rules/run.sh new file mode 100644 index 000000000..9474956c8 --- /dev/null +++ b/tests/placement_rules/run.sh @@ -0,0 +1,28 @@ +#!/bin/sh + +set -e + +cd "$(dirname "$0")" + +run_drainer & + +sleep 3 + +run_sql 'CREATE DATABASE placement_test;' +run_sql 'CREATE PLACEMENT POLICY x1 FOLLOWERS=4' +run_sql 'CREATE TABLE placement_test.t1 (a INT NOT NULL PRIMARY KEY, b VARCHAR(100)) PLACEMENT POLICY=x1;' +run_sql "INSERT INTO placement_test.t1 (a,b) VALUES(1,'a'),(2,'b'),(3,'c'),(4,'d');" +run_sql "INSERT INTO placement_test.t1 (a,b) VALUES(5,'e'),(6,'f'),(7,'g'),(8,'h');" +down_run_sql 'CREATE PLACEMENT POLICY x1 FOLLOWERS=4' + +sleep 3 + +down_run_sql 'SELECT a, b FROM placement_test.t1 order by a' +check_contains 'a: 7' +check_contains 'b: g' +check_contains 'a: 8' +check_contains 'b: h' + +run_sql 'DROP DATABASE placement_test' + +killall drainer