Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: Use insert for non safe-mode instead of replace #199

Merged
merged 5 commits into from
Jul 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions dm/tracer/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (
)

const (
opQueryEvents = "query-events"
opScanEvents = "scan-events"
opDelEvents = "del-events"
defaultLimit = 10
opQueryEvents = "query-events"
opScanEvents = "scan-events"
opDelEvents = "del-events"
opTruncateEvents = "truncate-events"
defaultLimit = 10
)

const (
Expand Down Expand Up @@ -91,6 +92,8 @@ func (h eventHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.handleTraceEventScanRequest(w, req)
case opDelEvents:
h.handleTraceEventDeleteRequest(w, req)
case opTruncateEvents:
h.handleTraceEventTruncateRequest(w, req)
}
}

Expand Down Expand Up @@ -152,3 +155,12 @@ func (h eventHandler) handleTraceEventDeleteRequest(w http.ResponseWriter, req *
writeBadRequest(w, errors.New("trace id not provided"))
}
}

func (h eventHandler) handleTraceEventTruncateRequest(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
writeBadRequest(w, errors.New("post only"))
return
}
h.truncate()
writeData(w, map[string]interface{}{"result": true})
}
53 changes: 53 additions & 0 deletions dm/tracer/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,56 @@ func (ts *HTTPHandlerTestSuite) TestTraceEventDelete(c *C) {
c.Assert(err, IsNil)
c.Assert(string(raw), Equals, fmt.Sprintf("trace event %s not found", traceID))
}

func (ts *HTTPHandlerTestSuite) TestTraceEventTruncate(c *C) {
ts.startServer(c, 4)
defer ts.stopServer(c)

var (
traceID = "test.delete.trace_id"
err error
queryURL = fmt.Sprintf("http://127.0.0.1%s/events/query?trace_id=%s", ts.cfg.TracerAddr, traceID)
truncateURL = fmt.Sprintf("http://127.0.0.1%s/events/truncate", ts.cfg.TracerAddr)
resp *http.Response
data []SyncerBinlogEventResp
raw []byte
)

err = ts.server.eventStore.addNewEvent(&TraceEvent{
Type: pb.TraceType_BinlogEvent,
Event: &pb.SyncerBinlogEvent{
Base: &pb.BaseEvent{
Filename: "/path/to/test.go",
Line: 100,
Tso: time.Now().UnixNano(),
TraceID: traceID,
Type: pb.TraceType_BinlogEvent,
},
},
})
c.Assert(err, IsNil)

resp, err = http.Get(queryURL)
c.Assert(err, IsNil, Commentf("url:%s", queryURL))
decoder := json.NewDecoder(resp.Body)

err = decoder.Decode(&data)
resp.Body.Close()
c.Assert(err, IsNil)
for _, event := range data {
c.Assert(event.Event.Base.TraceID, Equals, traceID)
}

form := make(url.Values)
resp, err = http.PostForm(truncateURL, form)
resp.Body.Close()
c.Assert(err, IsNil, Commentf("url:%s", truncateURL))

resp, err = http.Get(queryURL)
c.Assert(err, IsNil, Commentf("url:%s", queryURL))
c.Assert(resp.StatusCode, Equals, http.StatusNotFound)
raw, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()
c.Assert(err, IsNil)
c.Assert(string(raw), Equals, fmt.Sprintf("trace event %s not found", traceID))
}
1 change: 1 addition & 0 deletions dm/tracer/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (s *Server) startHTTPServer(lis net.Listener) {
router.Handle("/events/query", eventHandler{s.eventStore, opQueryEvents})
router.Handle("/events/scan", eventHandler{s.eventStore, opScanEvents})
router.Handle("/events/delete", eventHandler{s.eventStore, opDelEvents})
router.Handle("/events/truncate", eventHandler{s.eventStore, opTruncateEvents})

httpServer := &http.Server{
Handler: router,
Expand Down
8 changes: 8 additions & 0 deletions dm/tracer/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type EventStore struct {
// NewEventStore creates a new EventStore
func NewEventStore() *EventStore {
return &EventStore{
ids: make([]string, 0),
events: make(map[string][]*TraceEvent),
}
}
Expand Down Expand Up @@ -119,3 +120,10 @@ func (store *EventStore) removeByTraceID(traceID string) (removed bool) {
}
return
}

func (store *EventStore) truncate() {
store.Lock()
defer store.Unlock()
store.events = make(map[string][]*TraceEvent)
store.ids = make([]string, 0)
}
9 changes: 8 additions & 1 deletion syncer/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,14 @@ func genInsertSQLs(param *genDMLParam) ([]string, [][]string, [][]interface{}, e
originalValue = extractValueFromData(originalData, originalColumns)
}

sql := fmt.Sprintf("REPLACE INTO `%s`.`%s` (%s) VALUES (%s);", schema, table, columnList, columnPlaceholders)
var insertOrReplace string
if param.safeMode {
insertOrReplace = "REPLACE"
} else {
insertOrReplace = "INSERT"
}

sql := fmt.Sprintf("%s INTO `%s`.`%s` (%s) VALUES (%s);", insertOrReplace, schema, table, columnList, columnPlaceholders)
ks := genMultipleKeys(originalColumns, originalValue, originalIndexColumns)
sqls = append(sqls, sql)
values = append(values, value)
Expand Down
7 changes: 5 additions & 2 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-router"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go/sync2"
Expand All @@ -48,7 +48,7 @@ import (
"github.com/pingcap/dm/pkg/tracing"
"github.com/pingcap/dm/pkg/utils"
sm "github.com/pingcap/dm/syncer/safe-mode"
"github.com/pingcap/dm/syncer/sql-operator"
operator "github.com/pingcap/dm/syncer/sql-operator"
)

var (
Expand Down Expand Up @@ -1385,6 +1385,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
switch ec.header.EventType {
case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
if !applied {
param.safeMode = ec.safeMode.Enable()
sqls, keys, args, err = genInsertSQLs(param)
if err != nil {
return errors.Annotatef(err, "gen insert sqls failed, schema: %s, table: %s", table.schema, table.name)
Expand Down Expand Up @@ -1752,6 +1753,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
if ddlExecItem.req.Exec {
failpoint.Inject("ShardSyncedExecutionExit", func() {
s.tctx.L().Warn("exit triggered", zap.String("failpoint", "ShardSyncedExecutionExit"))
s.flushCheckPoints()
utils.OsExit(1)
})
failpoint.Inject("SequenceShardSyncedExecutionExit", func() {
Expand All @@ -1760,6 +1762,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
// exit in the first round sequence sharding DDL only
if group.meta.ActiveIdx() == 1 {
s.tctx.L().Warn("exit triggered", zap.String("failpoint", "SequenceShardSyncedExecutionExit"))
s.flushCheckPoints()
utils.OsExit(1)
}
}
Expand Down
24 changes: 12 additions & 12 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ import (
"database/sql"
"database/sql/driver"
"fmt"
"github.com/DATA-DOG/go-sqlmock"
"strings"
"sync"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
_ "github.com/go-sql-driver/mysql"
. "github.com/pingcap/check"
"github.com/pingcap/parser/ast"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
cm "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-router"
router "github.com/pingcap/tidb-tools/pkg/table-router"

"github.com/siddontang/go-mysql/replication"
"go.uber.org/zap"
Expand Down Expand Up @@ -826,15 +826,15 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) {
"insert into gctest_1.t_3(id, cfg) values (3, '{\"id\": 3}')",
},
[]string{
"REPLACE INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);",
"REPLACE INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);",
"REPLACE INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);",
"REPLACE INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?);",
"REPLACE INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?);",
"REPLACE INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?);",
"REPLACE INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?);",
"REPLACE INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?);",
"REPLACE INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?);",
"INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);",
"INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);",
"INSERT INTO `gctest_1`.`t_1` (`id`,`age`,`cfg`) VALUES (?,?,?);",
"INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?);",
"INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?);",
"INSERT INTO `gctest_1`.`t_2` (`id`,`age`,`cfg`) VALUES (?,?,?);",
"INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?);",
"INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?);",
"INSERT INTO `gctest_1`.`t_3` (`id`,`cfg`) VALUES (?,?);",
},
[][]interface{}{
{int32(1), int32(18), "{}"},
Expand Down Expand Up @@ -934,7 +934,7 @@ func (s *testSyncerSuite) TestGeneratedColumn(c *C) {
for _, testCase := range testCases {
for _, sql := range testCase.sqls {
_, err := s.db.Exec(sql)
c.Assert(err, IsNil)
c.Assert(err, IsNil, Commentf("sql: %s", sql))
}
idx := 0
for {
Expand Down
10 changes: 8 additions & 2 deletions tests/_dmctl_tools/check_safe_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"os"
"sort"
"strings"

Expand Down Expand Up @@ -96,9 +97,11 @@ func main() {
limit int = 10
count int
)
checkInstance := fmt.Sprintf("mysql-replica-0%s", os.Args[1])
updateEvents := make(map[string][]*pb.SyncerBinlogEvent)
ddlEvents := make(map[string][]*pb.SyncerBinlogEvent)
latestDDL := make(map[string][]*pb.SyncerBinlogEvent)
fmt.Printf("check safe mode for instance %s\n", checkInstance)
for {
raw, err = scan_events(offset, limit)
if err != nil {
Expand Down Expand Up @@ -145,8 +148,8 @@ func main() {
for worker, events := range ddlEvents {
sort.Sort(BinlogEventSlice(events))
latestDDL[worker] = make([]*pb.SyncerBinlogEvent, 2)
// we have run two round of sharding DDL
if len(events) < 4 {
// we check the second round of sharding DDL only
if len(events) < 2 {
panic(fmt.Sprintf("invalid ddl events count, events: %v", events))
}
latestDDL[worker][1] = events[len(events)-1]
Expand All @@ -165,6 +168,9 @@ func main() {

// check safe mode of each `update` binlog event is reasonable
for worker, evs := range latestDDL {
if worker != checkInstance {
continue
}
startPos := binlogPosition(evs[0])
endPos := binlogPosition(evs[1])
for _, ev := range updateEvents[worker] {
Expand Down
8 changes: 6 additions & 2 deletions tests/safe_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function run() {
check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20

export GO_FAILPOINTS='github.com/pingcap/dm/syncer/ShardSyncedExecutionExit=return(true);github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds=return(0)'
export GO_FAILPOINTS='github.com/pingcap/dm/syncer/ShardSyncedExecutionExit=return(true);github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds=return(300)'

run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
Expand All @@ -44,6 +44,8 @@ function run() {
run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1
run_sql_file $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2

curl -d "a=a" "http://127.0.0.1:$TRACER_PORT/events/truncate"

i=0
while [ $i -lt 10 ]; do
# we can't determine which DM-worker is the sharding lock owner, so we try both of them
Expand All @@ -53,6 +55,7 @@ function run() {
export GO_FAILPOINTS='github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds=return(0)'
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
check_instance_id="1"
break
fi
# DM-worker2 is sharding lock owner and exits
Expand All @@ -61,6 +64,7 @@ function run() {
export GO_FAILPOINTS='github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds=return(0)'
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
check_instance_id="2"
break
fi

Expand All @@ -74,7 +78,7 @@ function run() {

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

$cur/../bin/check_safe_mode
$cur/../bin/check_safe_mode $check_instance_id
}

cleanup1 safe_mode_target
Expand Down
8 changes: 6 additions & 2 deletions tests/sequence_safe_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function run() {
pkill -hup dm-worker.test 2>/dev/null || true
wait_process_exit dm-worker.test

export GO_FAILPOINTS='github.com/pingcap/dm/syncer/SequenceShardSyncedExecutionExit=return(true);github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds=return(0)'
export GO_FAILPOINTS='github.com/pingcap/dm/syncer/SequenceShardSyncedExecutionExit=return(true);github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds=return(300)'

run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
Expand All @@ -53,6 +53,8 @@ function run() {
run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1
run_sql_file $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2

curl -d "a=a" "http://127.0.0.1:$TRACER_PORT/events/truncate"

i=0
while [ $i -lt 10 ]; do
# we can't determine which DM-worker is the sharding lock owner, so we try both of them
Expand All @@ -62,6 +64,7 @@ function run() {
export GO_FAILPOINTS='github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds=return(0)'
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
check_instance_id="1"
break
fi
# DM-worker2 is sharding lock owner and exits
Expand All @@ -70,6 +73,7 @@ function run() {
export GO_FAILPOINTS='github.com/pingcap/dm/syncer/SafeModeInitPhaseSeconds=return(0)'
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
check_instance_id="2"
break
fi

Expand All @@ -83,7 +87,7 @@ function run() {

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

$cur/../bin/check_safe_mode
$cur/../bin/check_safe_mode $check_instance_id
}

cleanup1 sequence_safe_mode_target
Expand Down