From ca04f5196343d910cd1e5245a3e2fdaabc656438 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 25 Mar 2019 20:47:42 +0800 Subject: [PATCH 1/8] reader: add a Reader interface for binlog events; add a TCPReader implementation. --- pkg/binlog/reader/reader.go | 47 ++++++ pkg/binlog/reader/tcp.go | 130 +++++++++++++++ pkg/binlog/reader/tcp_test.go | 305 ++++++++++++++++++++++++++++++++++ 3 files changed, 482 insertions(+) create mode 100644 pkg/binlog/reader/reader.go create mode 100644 pkg/binlog/reader/tcp.go create mode 100644 pkg/binlog/reader/tcp_test.go diff --git a/pkg/binlog/reader/reader.go b/pkg/binlog/reader/reader.go new file mode 100644 index 0000000000..25fb1130e4 --- /dev/null +++ b/pkg/binlog/reader/reader.go @@ -0,0 +1,47 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/gtid" +) + +const ( + stageNew = iota + stagePrepared + stageClosed +) + +// Reader is a binlog event reader, it may read binlog events from a TCP stream, binlog files or any other in-memory buffer. +// One reader should read binlog events either through position mode or GTID mode. +type Reader interface { + // StartSyncByPos prepares the reader for reading binlog from the specified position. + StartSyncByPos(pos gmysql.Position) error + + // StartSyncByGTID prepares the reader for reading binlog from the specified GTID set. + StartSyncByGTID(gSet gtid.Set) error + + // Close closes the reader and release the resource. + Close() error + + // GetEvent gets the binlog event one by one, it will block if no event can be read. + // You can pass a context (like Cancel or Timeout) to break the block. + // If you do not want to check the stage (for reducing the lock operation), you can set `checkStage` to false. + GetEvent(ctx context.Context, checkStage bool) (*replication.BinlogEvent, error) +} diff --git a/pkg/binlog/reader/tcp.go b/pkg/binlog/reader/tcp.go new file mode 100644 index 0000000000..d636458a91 --- /dev/null +++ b/pkg/binlog/reader/tcp.go @@ -0,0 +1,130 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "database/sql" + "fmt" + "sync" + + "github.com/pingcap/errors" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/utils" +) + +// TCPReader is a binlog event reader which read binlog events from a TCP stream. +type TCPReader struct { + stageMu sync.Mutex + stage int + + syncerCfg replication.BinlogSyncerConfig + syncer *replication.BinlogSyncer + streamer *replication.BinlogStreamer +} + +// NewTCPReader creates a TCPReader instance. +func NewTCPReader(syncerCfg replication.BinlogSyncerConfig) Reader { + return &TCPReader{ + syncerCfg: syncerCfg, + syncer: replication.NewBinlogSyncer(syncerCfg), + } +} + +// StartSyncByPos implements Reader.StartSyncByPos. +func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { + r.stageMu.Lock() + defer r.stageMu.Unlock() + + if r.stage != stageNew { + return errors.NotValidf("stage %d, expect %d", r.stage, stageNew) + } + + streamer, err := r.syncer.StartSync(pos) + if err != nil { + return errors.Annotatef(err, "start sync from position %s", pos) + } + + r.streamer = streamer + r.stage = stagePrepared + return nil +} + +// StartSyncByGTID implements Reader.StartSyncByGTID. +func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { + r.stageMu.Lock() + defer r.stageMu.Unlock() + + if r.stage != stageNew { + return errors.NotValidf("stage %d, expect %d", r.stage, stageNew) + } + + if gSet == nil { + return errors.NotValidf("nil GTID set") + } + + streamer, err := r.syncer.StartSyncGTID(gSet.Origin()) + if err != nil { + return errors.Annotatef(err, "start sync from GTID set %s", gSet) + } + + r.streamer = streamer + r.stage = stagePrepared + return nil +} + +// Close implements Reader.Close. +func (r *TCPReader) Close() error { + r.stageMu.Lock() + defer r.stageMu.Unlock() + + if r.stage == stageClosed { + return errors.NotValidf("already closed") + } + + connID := r.syncer.LastConnectionID() + if connID > 0 { + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4", + r.syncerCfg.User, r.syncerCfg.Password, r.syncerCfg.Host, r.syncerCfg.Port) + db, err := sql.Open("mysql", dsn) + if err != nil { + return errors.Annotate(err, "open connection to the master") + } + defer db.Close() + err = utils.KillConn(db, connID) + if err != nil { + return errors.Annotatef(err, "kill connection %d", connID) + } + } + + r.stage = stageClosed + return nil +} + +// GetEvent implements Reader.GetEvent. +func (r *TCPReader) GetEvent(ctx context.Context, checkStage bool) (*replication.BinlogEvent, error) { + if checkStage { + r.stageMu.Lock() + if r.stage != stagePrepared { + r.stageMu.Unlock() + return nil, errors.NotValidf("stage %d, expect %d", r.stage, stagePrepared) + } + r.stageMu.Unlock() + } + + return r.streamer.GetEvent(ctx) +} diff --git a/pkg/binlog/reader/tcp_test.go b/pkg/binlog/reader/tcp_test.go new file mode 100644 index 0000000000..3d5c18a295 --- /dev/null +++ b/pkg/binlog/reader/tcp_test.go @@ -0,0 +1,305 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "database/sql" + "fmt" + "os" + "strconv" + "testing" + "time" + + _ "github.com/go-sql-driver/mysql" + . "github.com/pingcap/check" + "github.com/pingcap/parser/ast" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/utils" +) + +var ( + _ = Suite(&testTCPReaderSuite{}) + dbName = "test_tcp_reader_db" + tableName = "test_tcp_reader_table" + columnValue = 123 + flavor = gmysql.MySQLFlavor + serverIDs = []uint32{3251, 3252} +) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testTCPReaderSuite struct { + host string + port int + user string + password string + db *sql.DB +} + +func (t *testTCPReaderSuite) SetUpSuite(c *C) { + t.setUpConn(c) + t.setUpData(c) +} + +func (t *testTCPReaderSuite) setUpConn(c *C) { + t.host = os.Getenv("MYSQL_HOST") + if t.host == "" { + t.host = "127.0.0.1" + } + t.port, _ = strconv.Atoi(os.Getenv("MYSQL_PORT")) + if t.port == 0 { + t.port = 3306 + } + t.user = os.Getenv("MYSQL_USER") + if t.user == "" { + t.user = "root" + } + t.password = os.Getenv("MYSQL_PSWD") + + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4", t.user, t.password, t.host, t.port) + db, err := sql.Open("mysql", dsn) + c.Assert(err, IsNil) + t.db = db +} + +func (t *testTCPReaderSuite) setUpData(c *C) { + // drop database first. + query := fmt.Sprintf("DROP DATABASE `%s`", dbName) + _, err := t.db.Exec(query) + + // delete previous binlog files/events. + query = "RESET MASTER" + _, err = t.db.Exec(query) + c.Assert(err, IsNil) + + // execute some SQL statements to generate binlog events. + query = fmt.Sprintf("CREATE DATABASE `%s`", dbName) + _, err = t.db.Exec(query) + c.Assert(err, IsNil) + + query = fmt.Sprintf("CREATE TABLE `%s`.`%s` (c1 INT)", dbName, tableName) + _, err = t.db.Exec(query) + c.Assert(err, IsNil) + + // we assume `binlog_format` already set to `ROW`. + query = fmt.Sprintf("INSERT INTO `%s`.`%s` VALUES (%d)", dbName, tableName, columnValue) + _, err = t.db.Exec(query) + c.Assert(err, IsNil) +} + +func (t *testTCPReaderSuite) TestSyncPos(c *C) { + var ( + cfg = replication.BinlogSyncerConfig{ + ServerID: serverIDs[0], + Flavor: flavor, + Host: t.host, + Port: uint16(t.port), + User: t.user, + Password: t.password, + UseDecimal: true, + VerifyChecksum: true, + } + pos gmysql.Position // empty position + ) + + // the first reader + r := NewTCPReader(cfg) + c.Assert(r, NotNil) + + // not prepared + e, err := r.GetEvent(context.Background(), true) + c.Assert(err, NotNil) + c.Assert(e, IsNil) + + // prepare + err = r.StartSyncByPos(pos) + c.Assert(err, IsNil) + + // verify + t.verifyInitialEvents(c, r) + + // re-prepare is invalid + err = r.StartSyncByPos(pos) + c.Assert(err, NotNil) + + // close the reader + err = r.Close() + c.Assert(err, IsNil) + + // already closed + err = r.Close() + c.Assert(err, NotNil) + + // invalid startup position + pos = gmysql.Position{ + Name: "mysql-bin.888888", + Pos: 666666, + } + + // create a new one. + r = NewTCPReader(cfg) + err = r.StartSyncByPos(pos) + c.Assert(err, IsNil) + + _, err = r.GetEvent(context.Background(), true) + // ERROR 1236 (HY000): Could not find first log file name in binary log index file + // close connection automatically. + c.Assert(err, NotNil) + + // get current position for master + pos, _, err = utils.GetMasterStatus(t.db, flavor) + + // execute another DML again + query := fmt.Sprintf("INSERT INTO `%s`.`%s` VALUES (%d)", dbName, tableName, columnValue+1) + _, err = t.db.Exec(query) + c.Assert(err, IsNil) + + // create a new one again + r = NewTCPReader(cfg) + err = r.StartSyncByPos(pos) + c.Assert(err, IsNil) + + t.verifyOneDML(c, r) + + err = r.Close() + c.Assert(err, IsNil) +} + +func (t *testTCPReaderSuite) TestSyncGTID(c *C) { + var ( + cfg = replication.BinlogSyncerConfig{ + ServerID: serverIDs[1], + Flavor: flavor, + Host: t.host, + Port: uint16(t.port), + User: t.user, + Password: t.password, + UseDecimal: true, + VerifyChecksum: true, + } + gSet gtid.Set // nit GTID set + ) + + // the first reader + r := NewTCPReader(cfg) + c.Assert(r, NotNil) + + // not prepared + e, err := r.GetEvent(context.Background(), true) + c.Assert(err, NotNil) + c.Assert(e, IsNil) + + // nil GTID set + err = r.StartSyncByGTID(gSet) + c.Assert(err, NotNil) + + // empty GTID set + gSet, err = gtid.ParserGTID(flavor, "") + c.Assert(err, IsNil) + + // prepare + err = r.StartSyncByGTID(gSet) + c.Assert(err, IsNil) + + // verify + t.verifyInitialEvents(c, r) + + // re-prepare is invalid + err = r.StartSyncByGTID(gSet) + c.Assert(err, NotNil) + + // close the reader + err = r.Close() + c.Assert(err, IsNil) + + // already closed + err = r.Close() + c.Assert(err, NotNil) + + // get current GTID set for master + _, gSet, err = utils.GetMasterStatus(t.db, flavor) + + // execute another DML again + query := fmt.Sprintf("INSERT INTO `%s`.`%s` VALUES (%d)", dbName, tableName, columnValue+2) + _, err = t.db.Exec(query) + c.Assert(err, IsNil) + + // create a new one + r = NewTCPReader(cfg) + err = r.StartSyncByGTID(gSet) + c.Assert(err, IsNil) + + t.verifyOneDML(c, r) + + err = r.Close() + c.Assert(err, IsNil) +} + +func (t *testTCPReaderSuite) verifyInitialEvents(c *C, reader Reader) { + // if timeout, we think the test case failed. + timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + parser2, err := utils.GetParser(t.db, false) + c.Assert(err, IsNil) + +forLoop: + for { + e, err := reader.GetEvent(timeoutCtx, true) + c.Assert(err, IsNil) + switch ev := e.Event.(type) { + case *replication.QueryEvent: + stmt, err := parser2.ParseOneStmt(string(ev.Query), "", "") + c.Assert(err, IsNil) + switch ddl := stmt.(type) { + case *ast.CreateDatabaseStmt: + c.Assert(ddl.Name, Equals, dbName) + case *ast.CreateTableStmt: + c.Assert(ddl.Table.Name.O, Equals, tableName) + } + case *replication.RowsEvent: + c.Assert(string(ev.Table.Schema), Equals, dbName) + c.Assert(string(ev.Table.Table), Equals, tableName) + c.Assert(ev.Rows, HasLen, 1) // `INSERT INTO` + c.Assert(ev.ColumnCount, Equals, uint64(1)) // `c1` + break forLoop // if we got the DML, then we think our test case passed. + } + } +} + +func (t *testTCPReaderSuite) verifyOneDML(c *C, reader Reader) { + // if timeout, we think the test case failed. + timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + +forLoop: + for { + e, err := reader.GetEvent(timeoutCtx, false) + c.Assert(err, IsNil) + switch ev := e.Event.(type) { + case *replication.RowsEvent: + c.Assert(string(ev.Table.Schema), Equals, dbName) + c.Assert(string(ev.Table.Table), Equals, tableName) + c.Assert(ev.Rows, HasLen, 1) // `INSERT INTO` + c.Assert(ev.ColumnCount, Equals, uint64(1)) // `c1` + break forLoop + } + } +} From 7f2ecd3271cd80c95290b7d37701651b1e8c1d5b Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Tue, 26 Mar 2019 11:38:31 +0800 Subject: [PATCH 2/8] reader: add Status for reader --- pkg/binlog/reader/reader.go | 21 ++++++++++++++++++++- pkg/binlog/reader/tcp.go | 35 ++++++++++++++++++++++++++++++++++- pkg/binlog/reader/tcp_test.go | 24 ++++++++++++++++++++++++ 3 files changed, 78 insertions(+), 2 deletions(-) diff --git a/pkg/binlog/reader/reader.go b/pkg/binlog/reader/reader.go index 25fb1130e4..bccc16831e 100644 --- a/pkg/binlog/reader/reader.go +++ b/pkg/binlog/reader/reader.go @@ -22,12 +22,28 @@ import ( "github.com/pingcap/dm/pkg/gtid" ) +type readerStage int + const ( - stageNew = iota + stageNew readerStage = iota stagePrepared stageClosed ) +// String implements Stringer.String. +func (s readerStage) String() string { + switch s { + case stageNew: + return "new" + case stagePrepared: + return "prepared" + case stageClosed: + return "closed" + default: + return "unknown" + } +} + // Reader is a binlog event reader, it may read binlog events from a TCP stream, binlog files or any other in-memory buffer. // One reader should read binlog events either through position mode or GTID mode. type Reader interface { @@ -44,4 +60,7 @@ type Reader interface { // You can pass a context (like Cancel or Timeout) to break the block. // If you do not want to check the stage (for reducing the lock operation), you can set `checkStage` to false. GetEvent(ctx context.Context, checkStage bool) (*replication.BinlogEvent, error) + + // Status returns the status of the reader. + Status() interface{} } diff --git a/pkg/binlog/reader/tcp.go b/pkg/binlog/reader/tcp.go index d636458a91..c802cad599 100644 --- a/pkg/binlog/reader/tcp.go +++ b/pkg/binlog/reader/tcp.go @@ -16,6 +16,7 @@ package reader import ( "context" "database/sql" + "encoding/json" "fmt" "sync" @@ -24,19 +25,35 @@ import ( "github.com/siddontang/go-mysql/replication" "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/utils" ) // TCPReader is a binlog event reader which read binlog events from a TCP stream. type TCPReader struct { stageMu sync.Mutex - stage int + stage readerStage syncerCfg replication.BinlogSyncerConfig syncer *replication.BinlogSyncer streamer *replication.BinlogStreamer } +// TCPReaderStatus represents the status of a TCPReader. +type TCPReaderStatus struct { + Stage string `json:"stage"` + Connection uint32 `json:"connection"` +} + +// String implements Stringer.String. +func (s *TCPReaderStatus) String() string { + data, err := json.Marshal(s) + if err != nil { + log.Errorf("[TCPReaderStatus] marshal status to json error %v", err) + } + return string(data) +} + // NewTCPReader creates a TCPReader instance. func NewTCPReader(syncerCfg replication.BinlogSyncerConfig) Reader { return &TCPReader{ @@ -128,3 +145,19 @@ func (r *TCPReader) GetEvent(ctx context.Context, checkStage bool) (*replication return r.streamer.GetEvent(ctx) } + +// Status implements Reader.Status. +func (r *TCPReader) Status() interface{} { + r.stageMu.Lock() + stage := r.stage + r.stageMu.Unlock() + + var connID uint32 + if stage != stageNew { + connID = r.syncer.LastConnectionID() + } + return &TCPReaderStatus{ + Stage: stage.String(), + Connection: connID, + } +} diff --git a/pkg/binlog/reader/tcp_test.go b/pkg/binlog/reader/tcp_test.go index 3d5c18a295..cdae094777 100644 --- a/pkg/binlog/reader/tcp_test.go +++ b/pkg/binlog/reader/tcp_test.go @@ -19,6 +19,7 @@ import ( "fmt" "os" "strconv" + "strings" "testing" "time" @@ -135,6 +136,15 @@ func (t *testTCPReaderSuite) TestSyncPos(c *C) { // verify t.verifyInitialEvents(c, r) + // check status, stagePrepared + status := r.Status() + trStatus, ok := status.(*TCPReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(trStatus.Stage, Equals, stagePrepared.String()) + c.Assert(trStatus.Connection, Greater, uint32(0)) + trStatusStr := trStatus.String() + c.Assert(strings.Contains(trStatusStr, stagePrepared.String()), IsTrue) + // re-prepare is invalid err = r.StartSyncByPos(pos) c.Assert(err, NotNil) @@ -201,6 +211,13 @@ func (t *testTCPReaderSuite) TestSyncGTID(c *C) { r := NewTCPReader(cfg) c.Assert(r, NotNil) + // check status, stageNew + status := r.Status() + trStatus, ok := status.(*TCPReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(trStatus.Stage, Equals, stageNew.String()) + c.Assert(trStatus.Connection, Equals, uint32(0)) + // not prepared e, err := r.GetEvent(context.Background(), true) c.Assert(err, NotNil) @@ -229,6 +246,13 @@ func (t *testTCPReaderSuite) TestSyncGTID(c *C) { err = r.Close() c.Assert(err, IsNil) + // check status, stageClosed + status = r.Status() + trStatus, ok = status.(*TCPReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(trStatus.Stage, Equals, stageClosed.String()) + c.Assert(trStatus.Connection, Greater, uint32(0)) + // already closed err = r.Close() c.Assert(err, NotNil) From 1dd5052e9d69a3a0af30d6f41ba38b362666f50b Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Tue, 26 Mar 2019 13:38:14 +0800 Subject: [PATCH 3/8] relay: use Reader interface for normal stream --- relay/relay.go | 85 ++++++++++++++++++++++---------------------------- 1 file changed, 38 insertions(+), 47 deletions(-) diff --git a/relay/relay.go b/relay/relay.go index 4ffebb4b42..dd59e66ab0 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" + binlogreader "github.com/pingcap/dm/pkg/binlog/reader" fr "github.com/pingcap/dm/pkg/func-rollback" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" @@ -69,10 +70,11 @@ const ( // Relay relays mysql binlog to local file. type Relay struct { - db *sql.DB - cfg *Config - syncer *replication.BinlogSyncer - syncerCfg replication.BinlogSyncerConfig + db *sql.DB + cfg *Config + syncerCfg replication.BinlogSyncerConfig + reader binlogreader.Reader + gapSyncerCfg replication.BinlogSyncerConfig meta Meta lastSlaveConnectionID uint32 @@ -165,7 +167,7 @@ func (r *Relay) Init() (err error) { // Process implements the dm.Unit interface. func (r *Relay) Process(ctx context.Context, pr chan pb.ProcessResult) { - r.syncer = replication.NewBinlogSyncer(r.syncerCfg) + r.reader = binlogreader.NewTCPReader(r.syncerCfg) errs := make([]*pb.ProcessError, 0, 1) err := r.process(ctx) @@ -229,7 +231,7 @@ func (r *Relay) process(parentCtx context.Context) error { r.updateMetricsRelaySubDirIndex() } - streamer, err := r.getBinlogStreamer() + err = r.setUpReader() if err != nil { return errors.Trace(err) } @@ -300,7 +302,7 @@ func (r *Relay) process(parentCtx context.Context) error { if gapStreamer != nil { e, err = gapStreamer.GetEvent(ctx) } else { - e, err = streamer.GetEvent(ctx) + e, err = r.reader.GetEvent(ctx, false) } cancel() binlogReadDurationHistogram.Observe(time.Since(readTimer).Seconds()) @@ -344,7 +346,7 @@ func (r *Relay) process(parentCtx context.Context) error { return errors.Annotatef(err, "gap streamer") } if tryReSync && r.cfg.EnableGTID && r.cfg.AutoFixGTID { - streamer, err = r.reSyncBinlog(r.syncerCfg) + err = r.reSetUpReader(r.syncerCfg) if err != nil { return errors.Annotatef(err, "try auto switch with GTID") } @@ -751,51 +753,44 @@ func (r *Relay) checkFormatDescriptionEventExists(filename string) (exists bool, return false, nil } -// NOTE: now, no online master-slave switching supported -// when switching, user must Pause relay, update config, then Resume -// so, will call `getBinlogStreamer` again on new master -func (r *Relay) getBinlogStreamer() (*replication.BinlogStreamer, error) { +func (r *Relay) setUpReader() error { defer func() { - r.lastSlaveConnectionID = r.syncer.LastConnectionID() - log.Infof("[relay] last slave connection id %d", r.lastSlaveConnectionID) + status := r.reader.Status() + log.Infof("[relay] set up binlog reader with status %s", status) }() + if r.cfg.EnableGTID { - return r.startSyncByGTID() + return r.setUpReaderByGTID() } - return r.startSyncByPos() + return r.setUpReaderByPos() } -func (r *Relay) startSyncByGTID() (*replication.BinlogStreamer, error) { +func (r *Relay) setUpReaderByGTID() error { uuid, gs := r.meta.GTID() log.Infof("[relay] start sync for master(%s, %s) from GTID set %s", r.masterNode(), uuid, gs) - streamer, err := r.syncer.StartSyncGTID(gs.Origin()) + err := r.reader.StartSyncByGTID(gs) if err != nil { log.Errorf("[relay] start sync in GTID mode from %s error %v", gs.String(), err) - return r.startSyncByPos() + return r.setUpReaderByPos() } - return streamer, errors.Trace(err) + return nil } -// TODO: exception handling. -// e.g. -// 1.relay connects to a difference MySQL -// 2. upstream MySQL does a pure restart (removes all its' data, and then restart) - -func (r *Relay) startSyncByPos() (*replication.BinlogStreamer, error) { +func (r *Relay) setUpReaderByPos() error { // if the first binlog not exists in local, we should fetch from the first position, whatever the specific position is. uuid, pos := r.meta.Pos() log.Infof("[relay] start sync for master (%s, %s) from %s", r.masterNode(), uuid, pos.String()) if pos.Name == "" { // let mysql decides - return r.syncer.StartSync(pos) + return r.reader.StartSyncByPos(pos) } if stat, err := os.Stat(filepath.Join(r.meta.Dir(), pos.Name)); os.IsNotExist(err) { log.Infof("[relay] should sync from %s:4 instead of %s:%d because the binlog file not exists in local before and should sync from the very beginning", pos.Name, pos.Name, pos.Pos) pos.Pos = 4 } else if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } else { if stat.Size() > int64(pos.Pos) { // it means binlog file already exists, and the local binlog file already contains the specific position @@ -806,25 +801,24 @@ func (r *Relay) startSyncByPos() (*replication.BinlogStreamer, error) { pos.Pos = uint32(stat.Size()) err := r.meta.Save(pos, nil) if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } } else if stat.Size() < int64(pos.Pos) { // in such case, we should stop immediately and check - return nil, errors.Annotatef(ErrBinlogPosGreaterThanFileSize, "%s size=%d, specific pos=%d", pos.Name, stat.Size(), pos.Pos) + return errors.Annotatef(ErrBinlogPosGreaterThanFileSize, "%s size=%d, specific pos=%d", pos.Name, stat.Size(), pos.Pos) } } - streamer, err := r.syncer.StartSync(pos) - return streamer, errors.Trace(err) + return r.reader.StartSyncByPos(pos) } // reSyncBinlog re-tries sync binlog when master-slave switched -func (r *Relay) reSyncBinlog(cfg replication.BinlogSyncerConfig) (*replication.BinlogStreamer, error) { +func (r *Relay) reSyncBinlog(cfg replication.BinlogSyncerConfig) error { err := r.retrySyncGTIDs() if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } - return r.reopenStreamer(cfg) + return r.reSetUpReader(cfg) } // retrySyncGTIDs try to auto fix GTID set @@ -864,18 +858,15 @@ func (r *Relay) retrySyncGTIDs() error { return nil } -// reopenStreamer reopen a new streamer -func (r *Relay) reopenStreamer(cfg replication.BinlogSyncerConfig) (*replication.BinlogStreamer, error) { - if r.syncer != nil { - err := r.closeBinlogSyncer(r.syncer) - r.syncer = nil +func (r *Relay) reSetUpReader(cfg replication.BinlogSyncerConfig) error { + if r.reader != nil { + err := r.reader.Close() + r.reader = nil if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } } - - r.syncer = replication.NewBinlogSyncer(cfg) - return r.getBinlogStreamer() + return r.setUpReader() } func (r *Relay) masterNode() string { @@ -889,9 +880,9 @@ func (r *Relay) IsClosed() bool { // stopSync stops syncing, now it used by Close and Pause func (r *Relay) stopSync() { - if r.syncer != nil { - r.closeBinlogSyncer(r.syncer) - r.syncer = nil + if r.reader != nil { + r.reader.Close() + r.reader = nil } if r.fd != nil { r.fd.Close() From da5817f9909646dd20709c52298799df97e60a2d Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Tue, 26 Mar 2019 14:03:30 +0800 Subject: [PATCH 4/8] relay: use Reader interface for gap stream --- relay/relay.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/relay/relay.go b/relay/relay.go index dd59e66ab0..ffecac506a 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -247,8 +247,7 @@ func (r *Relay) process(parentCtx context.Context) error { // 2. create a special streamer to fill the gap // 3. catchup pos after the gap // 4. close the special streamer - gapSyncer *replication.BinlogSyncer // syncer used to fill the gap in relay log file - gapStreamer *replication.BinlogStreamer // streamer used to fill the gap in relay log file + gapReader binlogreader.Reader gapSyncStartPos *mysql.Position // the pos of the event starting to fill the gap gapSyncEndPos *mysql.Position // the pos of the event after the gap eventFormat *replication.FormatDescriptionEvent // latest FormatDescriptionEvent, used when re-calculate checksum @@ -256,11 +255,10 @@ func (r *Relay) process(parentCtx context.Context) error { closeGapSyncer := func() { addRelayFlag = false // reset to false when closing the gap syncer - if gapSyncer != nil { - gapSyncer.Close() - gapSyncer = nil + if gapReader != nil { + gapReader.Close() + gapReader = nil } - gapStreamer = nil gapSyncStartPos = nil gapSyncEndPos = nil } @@ -275,9 +273,9 @@ func (r *Relay) process(parentCtx context.Context) error { go r.doIntervalOps(parentCtx) for { - if gapStreamer == nil && gapSyncStartPos != nil && gapSyncEndPos != nil { - gapSyncer = replication.NewBinlogSyncer(r.gapSyncerCfg) - gapStreamer, err = gapSyncer.StartSync(*gapSyncStartPos) + if gapReader == nil && gapSyncStartPos != nil && gapSyncEndPos != nil { + gapReader = binlogreader.NewTCPReader(r.gapSyncerCfg) + err = gapReader.StartSyncByPos(*gapSyncStartPos) if err != nil { return errors.Annotatef(err, "start to fill gap in relay log file from %v", gapSyncStartPos) } @@ -299,8 +297,8 @@ func (r *Relay) process(parentCtx context.Context) error { ctx, cancel := context.WithTimeout(parentCtx, eventTimeout) readTimer := time.Now() var e *replication.BinlogEvent - if gapStreamer != nil { - e, err = gapStreamer.GetEvent(ctx) + if gapReader != nil { + e, err = gapReader.GetEvent(ctx, false) } else { e, err = r.reader.GetEvent(ctx, false) } @@ -320,7 +318,7 @@ func (r *Relay) process(parentCtx context.Context) error { // do nothing default: if utils.IsErrBinlogPurged(err) { - if gapStreamer != nil { + if gapReader != nil { pos, err2 := tryFindGapStartPos(err, r.fd.Name()) if err2 == nil { log.Errorf("[relay] %s", err.Error()) // log the error @@ -420,7 +418,7 @@ func (r *Relay) process(parentCtx context.Context) error { } } - if gapStreamer == nil { + if gapReader == nil { gapDetected, fSize, err := r.detectGap(e) if err != nil { relayLogWriteErrorCounter.Inc() From f58f0c63d74a163ba51964ffe3fc81afcfeb312f Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 27 Mar 2019 19:16:43 +0800 Subject: [PATCH 5/8] reader: address comments --- pkg/binlog/reader/tcp.go | 46 +++++++++++++++++------------------ pkg/binlog/reader/tcp_test.go | 6 ++--- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/pkg/binlog/reader/tcp.go b/pkg/binlog/reader/tcp.go index c802cad599..f942fa8bf0 100644 --- a/pkg/binlog/reader/tcp.go +++ b/pkg/binlog/reader/tcp.go @@ -31,9 +31,9 @@ import ( // TCPReader is a binlog event reader which read binlog events from a TCP stream. type TCPReader struct { - stageMu sync.Mutex - stage readerStage + mu sync.Mutex + stage readerStage syncerCfg replication.BinlogSyncerConfig syncer *replication.BinlogSyncer streamer *replication.BinlogStreamer @@ -41,8 +41,8 @@ type TCPReader struct { // TCPReaderStatus represents the status of a TCPReader. type TCPReaderStatus struct { - Stage string `json:"stage"` - Connection uint32 `json:"connection"` + Stage string `json:"stage"` + ConnID uint32 `json:"connection"` } // String implements Stringer.String. @@ -64,11 +64,11 @@ func NewTCPReader(syncerCfg replication.BinlogSyncerConfig) Reader { // StartSyncByPos implements Reader.StartSyncByPos. func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { - r.stageMu.Lock() - defer r.stageMu.Unlock() + r.mu.Lock() + defer r.mu.Unlock() if r.stage != stageNew { - return errors.NotValidf("stage %d, expect %d", r.stage, stageNew) + return errors.Errorf("stage %d, expect %d", r.stage, stageNew) } streamer, err := r.syncer.StartSync(pos) @@ -83,11 +83,11 @@ func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { // StartSyncByGTID implements Reader.StartSyncByGTID. func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { - r.stageMu.Lock() - defer r.stageMu.Unlock() + r.mu.Lock() + defer r.mu.Unlock() if r.stage != stageNew { - return errors.NotValidf("stage %d, expect %d", r.stage, stageNew) + return errors.Errorf("stage %d, expect %d", r.stage, stageNew) } if gSet == nil { @@ -106,11 +106,11 @@ func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { // Close implements Reader.Close. func (r *TCPReader) Close() error { - r.stageMu.Lock() - defer r.stageMu.Unlock() + r.mu.Lock() + defer r.mu.Unlock() if r.stage == stageClosed { - return errors.NotValidf("already closed") + return errors.New("already closed") } connID := r.syncer.LastConnectionID() @@ -119,12 +119,12 @@ func (r *TCPReader) Close() error { r.syncerCfg.User, r.syncerCfg.Password, r.syncerCfg.Host, r.syncerCfg.Port) db, err := sql.Open("mysql", dsn) if err != nil { - return errors.Annotate(err, "open connection to the master") + return errors.Annotatef(err, "open connection to the master %s:%d", r.syncerCfg.Host, r.syncerCfg.Port) } defer db.Close() err = utils.KillConn(db, connID) if err != nil { - return errors.Annotatef(err, "kill connection %d", connID) + return errors.Annotatef(err, "kill connection %d for master %s:%d", connID, r.syncerCfg.Host, r.syncerCfg.Port) } } @@ -135,12 +135,12 @@ func (r *TCPReader) Close() error { // GetEvent implements Reader.GetEvent. func (r *TCPReader) GetEvent(ctx context.Context, checkStage bool) (*replication.BinlogEvent, error) { if checkStage { - r.stageMu.Lock() + r.mu.Lock() if r.stage != stagePrepared { - r.stageMu.Unlock() - return nil, errors.NotValidf("stage %d, expect %d", r.stage, stagePrepared) + r.mu.Unlock() + return nil, errors.Errorf("stage %d, expect %d", r.stage, stagePrepared) } - r.stageMu.Unlock() + r.mu.Unlock() } return r.streamer.GetEvent(ctx) @@ -148,16 +148,16 @@ func (r *TCPReader) GetEvent(ctx context.Context, checkStage bool) (*replication // Status implements Reader.Status. func (r *TCPReader) Status() interface{} { - r.stageMu.Lock() + r.mu.Lock() stage := r.stage - r.stageMu.Unlock() + r.mu.Unlock() var connID uint32 if stage != stageNew { connID = r.syncer.LastConnectionID() } return &TCPReaderStatus{ - Stage: stage.String(), - Connection: connID, + Stage: stage.String(), + ConnID: connID, } } diff --git a/pkg/binlog/reader/tcp_test.go b/pkg/binlog/reader/tcp_test.go index cdae094777..4adb93830d 100644 --- a/pkg/binlog/reader/tcp_test.go +++ b/pkg/binlog/reader/tcp_test.go @@ -141,7 +141,7 @@ func (t *testTCPReaderSuite) TestSyncPos(c *C) { trStatus, ok := status.(*TCPReaderStatus) c.Assert(ok, IsTrue) c.Assert(trStatus.Stage, Equals, stagePrepared.String()) - c.Assert(trStatus.Connection, Greater, uint32(0)) + c.Assert(trStatus.ConnID, Greater, uint32(0)) trStatusStr := trStatus.String() c.Assert(strings.Contains(trStatusStr, stagePrepared.String()), IsTrue) @@ -216,7 +216,7 @@ func (t *testTCPReaderSuite) TestSyncGTID(c *C) { trStatus, ok := status.(*TCPReaderStatus) c.Assert(ok, IsTrue) c.Assert(trStatus.Stage, Equals, stageNew.String()) - c.Assert(trStatus.Connection, Equals, uint32(0)) + c.Assert(trStatus.ConnID, Equals, uint32(0)) // not prepared e, err := r.GetEvent(context.Background(), true) @@ -251,7 +251,7 @@ func (t *testTCPReaderSuite) TestSyncGTID(c *C) { trStatus, ok = status.(*TCPReaderStatus) c.Assert(ok, IsTrue) c.Assert(trStatus.Stage, Equals, stageClosed.String()) - c.Assert(trStatus.Connection, Greater, uint32(0)) + c.Assert(trStatus.ConnID, Greater, uint32(0)) // already closed err = r.Close() From 0fe7fede8a633605fc8e8a5ed154dc7e047beb54 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 27 Mar 2019 19:32:20 +0800 Subject: [PATCH 6/8] *: address comments --- pkg/binlog/reader/reader.go | 5 ++--- pkg/binlog/reader/tcp.go | 34 +++++++++++++++------------------- pkg/binlog/reader/tcp_test.go | 10 +++++----- relay/relay.go | 4 ++-- 4 files changed, 24 insertions(+), 29 deletions(-) diff --git a/pkg/binlog/reader/reader.go b/pkg/binlog/reader/reader.go index bccc16831e..ddf91b8680 100644 --- a/pkg/binlog/reader/reader.go +++ b/pkg/binlog/reader/reader.go @@ -22,7 +22,7 @@ import ( "github.com/pingcap/dm/pkg/gtid" ) -type readerStage int +type readerStage int32 const ( stageNew readerStage = iota @@ -58,8 +58,7 @@ type Reader interface { // GetEvent gets the binlog event one by one, it will block if no event can be read. // You can pass a context (like Cancel or Timeout) to break the block. - // If you do not want to check the stage (for reducing the lock operation), you can set `checkStage` to false. - GetEvent(ctx context.Context, checkStage bool) (*replication.BinlogEvent, error) + GetEvent(ctx context.Context) (*replication.BinlogEvent, error) // Status returns the status of the reader. Status() interface{} diff --git a/pkg/binlog/reader/tcp.go b/pkg/binlog/reader/tcp.go index f942fa8bf0..bfa5328fdf 100644 --- a/pkg/binlog/reader/tcp.go +++ b/pkg/binlog/reader/tcp.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/errors" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" + "github.com/siddontang/go/sync2" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" @@ -33,7 +34,7 @@ import ( type TCPReader struct { mu sync.Mutex - stage readerStage + stage sync2.AtomicInt32 syncerCfg replication.BinlogSyncerConfig syncer *replication.BinlogSyncer streamer *replication.BinlogStreamer @@ -67,8 +68,8 @@ func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { r.mu.Lock() defer r.mu.Unlock() - if r.stage != stageNew { - return errors.Errorf("stage %d, expect %d", r.stage, stageNew) + if r.stage.Get() != int32(stageNew) { + return errors.Errorf("stage %s, expect %s", readerStage(r.stage), stageNew) } streamer, err := r.syncer.StartSync(pos) @@ -77,7 +78,7 @@ func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { } r.streamer = streamer - r.stage = stagePrepared + r.stage.Set(int32(stagePrepared)) return nil } @@ -86,8 +87,8 @@ func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { r.mu.Lock() defer r.mu.Unlock() - if r.stage != stageNew { - return errors.Errorf("stage %d, expect %d", r.stage, stageNew) + if r.stage.Get() != int32(stageNew) { + return errors.Errorf("stage %s, expect %s", readerStage(r.stage), stageNew) } if gSet == nil { @@ -100,7 +101,7 @@ func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { } r.streamer = streamer - r.stage = stagePrepared + r.stage.Set(int32(stagePrepared)) return nil } @@ -109,7 +110,7 @@ func (r *TCPReader) Close() error { r.mu.Lock() defer r.mu.Unlock() - if r.stage == stageClosed { + if r.stage.Get() == int32(stageClosed) { return errors.New("already closed") } @@ -128,19 +129,14 @@ func (r *TCPReader) Close() error { } } - r.stage = stageClosed + r.stage.Set(int32(stageClosed)) return nil } // GetEvent implements Reader.GetEvent. -func (r *TCPReader) GetEvent(ctx context.Context, checkStage bool) (*replication.BinlogEvent, error) { - if checkStage { - r.mu.Lock() - if r.stage != stagePrepared { - r.mu.Unlock() - return nil, errors.Errorf("stage %d, expect %d", r.stage, stagePrepared) - } - r.mu.Unlock() +func (r *TCPReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { + if r.stage.Get() != int32(stagePrepared) { + return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage), stagePrepared) } return r.streamer.GetEvent(ctx) @@ -153,11 +149,11 @@ func (r *TCPReader) Status() interface{} { r.mu.Unlock() var connID uint32 - if stage != stageNew { + if stage.Get() != int32(stageNew) { connID = r.syncer.LastConnectionID() } return &TCPReaderStatus{ - Stage: stage.String(), + Stage: readerStage(stage).String(), ConnID: connID, } } diff --git a/pkg/binlog/reader/tcp_test.go b/pkg/binlog/reader/tcp_test.go index 4adb93830d..8fd3467989 100644 --- a/pkg/binlog/reader/tcp_test.go +++ b/pkg/binlog/reader/tcp_test.go @@ -125,7 +125,7 @@ func (t *testTCPReaderSuite) TestSyncPos(c *C) { c.Assert(r, NotNil) // not prepared - e, err := r.GetEvent(context.Background(), true) + e, err := r.GetEvent(context.Background()) c.Assert(err, NotNil) c.Assert(e, IsNil) @@ -168,7 +168,7 @@ func (t *testTCPReaderSuite) TestSyncPos(c *C) { err = r.StartSyncByPos(pos) c.Assert(err, IsNil) - _, err = r.GetEvent(context.Background(), true) + _, err = r.GetEvent(context.Background()) // ERROR 1236 (HY000): Could not find first log file name in binary log index file // close connection automatically. c.Assert(err, NotNil) @@ -219,7 +219,7 @@ func (t *testTCPReaderSuite) TestSyncGTID(c *C) { c.Assert(trStatus.ConnID, Equals, uint32(0)) // not prepared - e, err := r.GetEvent(context.Background(), true) + e, err := r.GetEvent(context.Background()) c.Assert(err, NotNil) c.Assert(e, IsNil) @@ -286,7 +286,7 @@ func (t *testTCPReaderSuite) verifyInitialEvents(c *C, reader Reader) { forLoop: for { - e, err := reader.GetEvent(timeoutCtx, true) + e, err := reader.GetEvent(timeoutCtx) c.Assert(err, IsNil) switch ev := e.Event.(type) { case *replication.QueryEvent: @@ -315,7 +315,7 @@ func (t *testTCPReaderSuite) verifyOneDML(c *C, reader Reader) { forLoop: for { - e, err := reader.GetEvent(timeoutCtx, false) + e, err := reader.GetEvent(timeoutCtx) c.Assert(err, IsNil) switch ev := e.Event.(type) { case *replication.RowsEvent: diff --git a/relay/relay.go b/relay/relay.go index ffecac506a..8457b27c26 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -298,9 +298,9 @@ func (r *Relay) process(parentCtx context.Context) error { readTimer := time.Now() var e *replication.BinlogEvent if gapReader != nil { - e, err = gapReader.GetEvent(ctx, false) + e, err = gapReader.GetEvent(ctx) } else { - e, err = r.reader.GetEvent(ctx, false) + e, err = r.reader.GetEvent(ctx) } cancel() binlogReadDurationHistogram.Observe(time.Since(readTimer).Seconds()) From 570266abbf78c7c1129c53a898edc733aac2b768 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 27 Mar 2019 21:05:07 +0800 Subject: [PATCH 7/8] *: address comments --- pkg/binlog/reader/tcp.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/binlog/reader/tcp.go b/pkg/binlog/reader/tcp.go index bfa5328fdf..9981fe1808 100644 --- a/pkg/binlog/reader/tcp.go +++ b/pkg/binlog/reader/tcp.go @@ -145,11 +145,11 @@ func (r *TCPReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, err // Status implements Reader.Status. func (r *TCPReader) Status() interface{} { r.mu.Lock() - stage := r.stage + stage := r.stage.Get() r.mu.Unlock() var connID uint32 - if stage.Get() != int32(stageNew) { + if stage != int32(stageNew) { connID = r.syncer.LastConnectionID() } return &TCPReaderStatus{ From f7e461ca3954296b191399e134876bf9e25d8b50 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 27 Mar 2019 21:06:24 +0800 Subject: [PATCH 8/8] *: address comments --- pkg/binlog/reader/tcp.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/binlog/reader/tcp.go b/pkg/binlog/reader/tcp.go index 9981fe1808..0f554663a2 100644 --- a/pkg/binlog/reader/tcp.go +++ b/pkg/binlog/reader/tcp.go @@ -144,9 +144,7 @@ func (r *TCPReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, err // Status implements Reader.Status. func (r *TCPReader) Status() interface{} { - r.mu.Lock() stage := r.stage.Get() - r.mu.Unlock() var connID uint32 if stage != int32(stageNew) {