diff --git a/pkg/binlog/reader/reader.go b/pkg/binlog/reader/reader.go new file mode 100644 index 0000000000..ddf91b8680 --- /dev/null +++ b/pkg/binlog/reader/reader.go @@ -0,0 +1,65 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/gtid" +) + +type readerStage int32 + +const ( + stageNew readerStage = iota + stagePrepared + stageClosed +) + +// String implements Stringer.String. +func (s readerStage) String() string { + switch s { + case stageNew: + return "new" + case stagePrepared: + return "prepared" + case stageClosed: + return "closed" + default: + return "unknown" + } +} + +// Reader is a binlog event reader, it may read binlog events from a TCP stream, binlog files or any other in-memory buffer. +// One reader should read binlog events either through position mode or GTID mode. +type Reader interface { + // StartSyncByPos prepares the reader for reading binlog from the specified position. + StartSyncByPos(pos gmysql.Position) error + + // StartSyncByGTID prepares the reader for reading binlog from the specified GTID set. + StartSyncByGTID(gSet gtid.Set) error + + // Close closes the reader and release the resource. + Close() error + + // GetEvent gets the binlog event one by one, it will block if no event can be read. + // You can pass a context (like Cancel or Timeout) to break the block. + GetEvent(ctx context.Context) (*replication.BinlogEvent, error) + + // Status returns the status of the reader. + Status() interface{} +} diff --git a/pkg/binlog/reader/tcp.go b/pkg/binlog/reader/tcp.go new file mode 100644 index 0000000000..0f554663a2 --- /dev/null +++ b/pkg/binlog/reader/tcp.go @@ -0,0 +1,157 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "sync" + + "github.com/pingcap/errors" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + "github.com/siddontang/go/sync2" + + "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/utils" +) + +// TCPReader is a binlog event reader which read binlog events from a TCP stream. +type TCPReader struct { + mu sync.Mutex + + stage sync2.AtomicInt32 + syncerCfg replication.BinlogSyncerConfig + syncer *replication.BinlogSyncer + streamer *replication.BinlogStreamer +} + +// TCPReaderStatus represents the status of a TCPReader. +type TCPReaderStatus struct { + Stage string `json:"stage"` + ConnID uint32 `json:"connection"` +} + +// String implements Stringer.String. +func (s *TCPReaderStatus) String() string { + data, err := json.Marshal(s) + if err != nil { + log.Errorf("[TCPReaderStatus] marshal status to json error %v", err) + } + return string(data) +} + +// NewTCPReader creates a TCPReader instance. +func NewTCPReader(syncerCfg replication.BinlogSyncerConfig) Reader { + return &TCPReader{ + syncerCfg: syncerCfg, + syncer: replication.NewBinlogSyncer(syncerCfg), + } +} + +// StartSyncByPos implements Reader.StartSyncByPos. +func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.stage.Get() != int32(stageNew) { + return errors.Errorf("stage %s, expect %s", readerStage(r.stage), stageNew) + } + + streamer, err := r.syncer.StartSync(pos) + if err != nil { + return errors.Annotatef(err, "start sync from position %s", pos) + } + + r.streamer = streamer + r.stage.Set(int32(stagePrepared)) + return nil +} + +// StartSyncByGTID implements Reader.StartSyncByGTID. +func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.stage.Get() != int32(stageNew) { + return errors.Errorf("stage %s, expect %s", readerStage(r.stage), stageNew) + } + + if gSet == nil { + return errors.NotValidf("nil GTID set") + } + + streamer, err := r.syncer.StartSyncGTID(gSet.Origin()) + if err != nil { + return errors.Annotatef(err, "start sync from GTID set %s", gSet) + } + + r.streamer = streamer + r.stage.Set(int32(stagePrepared)) + return nil +} + +// Close implements Reader.Close. +func (r *TCPReader) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.stage.Get() == int32(stageClosed) { + return errors.New("already closed") + } + + connID := r.syncer.LastConnectionID() + if connID > 0 { + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4", + r.syncerCfg.User, r.syncerCfg.Password, r.syncerCfg.Host, r.syncerCfg.Port) + db, err := sql.Open("mysql", dsn) + if err != nil { + return errors.Annotatef(err, "open connection to the master %s:%d", r.syncerCfg.Host, r.syncerCfg.Port) + } + defer db.Close() + err = utils.KillConn(db, connID) + if err != nil { + return errors.Annotatef(err, "kill connection %d for master %s:%d", connID, r.syncerCfg.Host, r.syncerCfg.Port) + } + } + + r.stage.Set(int32(stageClosed)) + return nil +} + +// GetEvent implements Reader.GetEvent. +func (r *TCPReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { + if r.stage.Get() != int32(stagePrepared) { + return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage), stagePrepared) + } + + return r.streamer.GetEvent(ctx) +} + +// Status implements Reader.Status. +func (r *TCPReader) Status() interface{} { + stage := r.stage.Get() + + var connID uint32 + if stage != int32(stageNew) { + connID = r.syncer.LastConnectionID() + } + return &TCPReaderStatus{ + Stage: readerStage(stage).String(), + ConnID: connID, + } +} diff --git a/pkg/binlog/reader/tcp_test.go b/pkg/binlog/reader/tcp_test.go new file mode 100644 index 0000000000..8fd3467989 --- /dev/null +++ b/pkg/binlog/reader/tcp_test.go @@ -0,0 +1,329 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "database/sql" + "fmt" + "os" + "strconv" + "strings" + "testing" + "time" + + _ "github.com/go-sql-driver/mysql" + . "github.com/pingcap/check" + "github.com/pingcap/parser/ast" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/utils" +) + +var ( + _ = Suite(&testTCPReaderSuite{}) + dbName = "test_tcp_reader_db" + tableName = "test_tcp_reader_table" + columnValue = 123 + flavor = gmysql.MySQLFlavor + serverIDs = []uint32{3251, 3252} +) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testTCPReaderSuite struct { + host string + port int + user string + password string + db *sql.DB +} + +func (t *testTCPReaderSuite) SetUpSuite(c *C) { + t.setUpConn(c) + t.setUpData(c) +} + +func (t *testTCPReaderSuite) setUpConn(c *C) { + t.host = os.Getenv("MYSQL_HOST") + if t.host == "" { + t.host = "127.0.0.1" + } + t.port, _ = strconv.Atoi(os.Getenv("MYSQL_PORT")) + if t.port == 0 { + t.port = 3306 + } + t.user = os.Getenv("MYSQL_USER") + if t.user == "" { + t.user = "root" + } + t.password = os.Getenv("MYSQL_PSWD") + + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4", t.user, t.password, t.host, t.port) + db, err := sql.Open("mysql", dsn) + c.Assert(err, IsNil) + t.db = db +} + +func (t *testTCPReaderSuite) setUpData(c *C) { + // drop database first. + query := fmt.Sprintf("DROP DATABASE `%s`", dbName) + _, err := t.db.Exec(query) + + // delete previous binlog files/events. + query = "RESET MASTER" + _, err = t.db.Exec(query) + c.Assert(err, IsNil) + + // execute some SQL statements to generate binlog events. + query = fmt.Sprintf("CREATE DATABASE `%s`", dbName) + _, err = t.db.Exec(query) + c.Assert(err, IsNil) + + query = fmt.Sprintf("CREATE TABLE `%s`.`%s` (c1 INT)", dbName, tableName) + _, err = t.db.Exec(query) + c.Assert(err, IsNil) + + // we assume `binlog_format` already set to `ROW`. + query = fmt.Sprintf("INSERT INTO `%s`.`%s` VALUES (%d)", dbName, tableName, columnValue) + _, err = t.db.Exec(query) + c.Assert(err, IsNil) +} + +func (t *testTCPReaderSuite) TestSyncPos(c *C) { + var ( + cfg = replication.BinlogSyncerConfig{ + ServerID: serverIDs[0], + Flavor: flavor, + Host: t.host, + Port: uint16(t.port), + User: t.user, + Password: t.password, + UseDecimal: true, + VerifyChecksum: true, + } + pos gmysql.Position // empty position + ) + + // the first reader + r := NewTCPReader(cfg) + c.Assert(r, NotNil) + + // not prepared + e, err := r.GetEvent(context.Background()) + c.Assert(err, NotNil) + c.Assert(e, IsNil) + + // prepare + err = r.StartSyncByPos(pos) + c.Assert(err, IsNil) + + // verify + t.verifyInitialEvents(c, r) + + // check status, stagePrepared + status := r.Status() + trStatus, ok := status.(*TCPReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(trStatus.Stage, Equals, stagePrepared.String()) + c.Assert(trStatus.ConnID, Greater, uint32(0)) + trStatusStr := trStatus.String() + c.Assert(strings.Contains(trStatusStr, stagePrepared.String()), IsTrue) + + // re-prepare is invalid + err = r.StartSyncByPos(pos) + c.Assert(err, NotNil) + + // close the reader + err = r.Close() + c.Assert(err, IsNil) + + // already closed + err = r.Close() + c.Assert(err, NotNil) + + // invalid startup position + pos = gmysql.Position{ + Name: "mysql-bin.888888", + Pos: 666666, + } + + // create a new one. + r = NewTCPReader(cfg) + err = r.StartSyncByPos(pos) + c.Assert(err, IsNil) + + _, err = r.GetEvent(context.Background()) + // ERROR 1236 (HY000): Could not find first log file name in binary log index file + // close connection automatically. + c.Assert(err, NotNil) + + // get current position for master + pos, _, err = utils.GetMasterStatus(t.db, flavor) + + // execute another DML again + query := fmt.Sprintf("INSERT INTO `%s`.`%s` VALUES (%d)", dbName, tableName, columnValue+1) + _, err = t.db.Exec(query) + c.Assert(err, IsNil) + + // create a new one again + r = NewTCPReader(cfg) + err = r.StartSyncByPos(pos) + c.Assert(err, IsNil) + + t.verifyOneDML(c, r) + + err = r.Close() + c.Assert(err, IsNil) +} + +func (t *testTCPReaderSuite) TestSyncGTID(c *C) { + var ( + cfg = replication.BinlogSyncerConfig{ + ServerID: serverIDs[1], + Flavor: flavor, + Host: t.host, + Port: uint16(t.port), + User: t.user, + Password: t.password, + UseDecimal: true, + VerifyChecksum: true, + } + gSet gtid.Set // nit GTID set + ) + + // the first reader + r := NewTCPReader(cfg) + c.Assert(r, NotNil) + + // check status, stageNew + status := r.Status() + trStatus, ok := status.(*TCPReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(trStatus.Stage, Equals, stageNew.String()) + c.Assert(trStatus.ConnID, Equals, uint32(0)) + + // not prepared + e, err := r.GetEvent(context.Background()) + c.Assert(err, NotNil) + c.Assert(e, IsNil) + + // nil GTID set + err = r.StartSyncByGTID(gSet) + c.Assert(err, NotNil) + + // empty GTID set + gSet, err = gtid.ParserGTID(flavor, "") + c.Assert(err, IsNil) + + // prepare + err = r.StartSyncByGTID(gSet) + c.Assert(err, IsNil) + + // verify + t.verifyInitialEvents(c, r) + + // re-prepare is invalid + err = r.StartSyncByGTID(gSet) + c.Assert(err, NotNil) + + // close the reader + err = r.Close() + c.Assert(err, IsNil) + + // check status, stageClosed + status = r.Status() + trStatus, ok = status.(*TCPReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(trStatus.Stage, Equals, stageClosed.String()) + c.Assert(trStatus.ConnID, Greater, uint32(0)) + + // already closed + err = r.Close() + c.Assert(err, NotNil) + + // get current GTID set for master + _, gSet, err = utils.GetMasterStatus(t.db, flavor) + + // execute another DML again + query := fmt.Sprintf("INSERT INTO `%s`.`%s` VALUES (%d)", dbName, tableName, columnValue+2) + _, err = t.db.Exec(query) + c.Assert(err, IsNil) + + // create a new one + r = NewTCPReader(cfg) + err = r.StartSyncByGTID(gSet) + c.Assert(err, IsNil) + + t.verifyOneDML(c, r) + + err = r.Close() + c.Assert(err, IsNil) +} + +func (t *testTCPReaderSuite) verifyInitialEvents(c *C, reader Reader) { + // if timeout, we think the test case failed. + timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + parser2, err := utils.GetParser(t.db, false) + c.Assert(err, IsNil) + +forLoop: + for { + e, err := reader.GetEvent(timeoutCtx) + c.Assert(err, IsNil) + switch ev := e.Event.(type) { + case *replication.QueryEvent: + stmt, err := parser2.ParseOneStmt(string(ev.Query), "", "") + c.Assert(err, IsNil) + switch ddl := stmt.(type) { + case *ast.CreateDatabaseStmt: + c.Assert(ddl.Name, Equals, dbName) + case *ast.CreateTableStmt: + c.Assert(ddl.Table.Name.O, Equals, tableName) + } + case *replication.RowsEvent: + c.Assert(string(ev.Table.Schema), Equals, dbName) + c.Assert(string(ev.Table.Table), Equals, tableName) + c.Assert(ev.Rows, HasLen, 1) // `INSERT INTO` + c.Assert(ev.ColumnCount, Equals, uint64(1)) // `c1` + break forLoop // if we got the DML, then we think our test case passed. + } + } +} + +func (t *testTCPReaderSuite) verifyOneDML(c *C, reader Reader) { + // if timeout, we think the test case failed. + timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + +forLoop: + for { + e, err := reader.GetEvent(timeoutCtx) + c.Assert(err, IsNil) + switch ev := e.Event.(type) { + case *replication.RowsEvent: + c.Assert(string(ev.Table.Schema), Equals, dbName) + c.Assert(string(ev.Table.Table), Equals, tableName) + c.Assert(ev.Rows, HasLen, 1) // `INSERT INTO` + c.Assert(ev.ColumnCount, Equals, uint64(1)) // `c1` + break forLoop + } + } +} diff --git a/relay/relay.go b/relay/relay.go index 4ffebb4b42..8457b27c26 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" "github.com/pingcap/dm/dm/unit" + binlogreader "github.com/pingcap/dm/pkg/binlog/reader" fr "github.com/pingcap/dm/pkg/func-rollback" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" @@ -69,10 +70,11 @@ const ( // Relay relays mysql binlog to local file. type Relay struct { - db *sql.DB - cfg *Config - syncer *replication.BinlogSyncer - syncerCfg replication.BinlogSyncerConfig + db *sql.DB + cfg *Config + syncerCfg replication.BinlogSyncerConfig + reader binlogreader.Reader + gapSyncerCfg replication.BinlogSyncerConfig meta Meta lastSlaveConnectionID uint32 @@ -165,7 +167,7 @@ func (r *Relay) Init() (err error) { // Process implements the dm.Unit interface. func (r *Relay) Process(ctx context.Context, pr chan pb.ProcessResult) { - r.syncer = replication.NewBinlogSyncer(r.syncerCfg) + r.reader = binlogreader.NewTCPReader(r.syncerCfg) errs := make([]*pb.ProcessError, 0, 1) err := r.process(ctx) @@ -229,7 +231,7 @@ func (r *Relay) process(parentCtx context.Context) error { r.updateMetricsRelaySubDirIndex() } - streamer, err := r.getBinlogStreamer() + err = r.setUpReader() if err != nil { return errors.Trace(err) } @@ -245,8 +247,7 @@ func (r *Relay) process(parentCtx context.Context) error { // 2. create a special streamer to fill the gap // 3. catchup pos after the gap // 4. close the special streamer - gapSyncer *replication.BinlogSyncer // syncer used to fill the gap in relay log file - gapStreamer *replication.BinlogStreamer // streamer used to fill the gap in relay log file + gapReader binlogreader.Reader gapSyncStartPos *mysql.Position // the pos of the event starting to fill the gap gapSyncEndPos *mysql.Position // the pos of the event after the gap eventFormat *replication.FormatDescriptionEvent // latest FormatDescriptionEvent, used when re-calculate checksum @@ -254,11 +255,10 @@ func (r *Relay) process(parentCtx context.Context) error { closeGapSyncer := func() { addRelayFlag = false // reset to false when closing the gap syncer - if gapSyncer != nil { - gapSyncer.Close() - gapSyncer = nil + if gapReader != nil { + gapReader.Close() + gapReader = nil } - gapStreamer = nil gapSyncStartPos = nil gapSyncEndPos = nil } @@ -273,9 +273,9 @@ func (r *Relay) process(parentCtx context.Context) error { go r.doIntervalOps(parentCtx) for { - if gapStreamer == nil && gapSyncStartPos != nil && gapSyncEndPos != nil { - gapSyncer = replication.NewBinlogSyncer(r.gapSyncerCfg) - gapStreamer, err = gapSyncer.StartSync(*gapSyncStartPos) + if gapReader == nil && gapSyncStartPos != nil && gapSyncEndPos != nil { + gapReader = binlogreader.NewTCPReader(r.gapSyncerCfg) + err = gapReader.StartSyncByPos(*gapSyncStartPos) if err != nil { return errors.Annotatef(err, "start to fill gap in relay log file from %v", gapSyncStartPos) } @@ -297,10 +297,10 @@ func (r *Relay) process(parentCtx context.Context) error { ctx, cancel := context.WithTimeout(parentCtx, eventTimeout) readTimer := time.Now() var e *replication.BinlogEvent - if gapStreamer != nil { - e, err = gapStreamer.GetEvent(ctx) + if gapReader != nil { + e, err = gapReader.GetEvent(ctx) } else { - e, err = streamer.GetEvent(ctx) + e, err = r.reader.GetEvent(ctx) } cancel() binlogReadDurationHistogram.Observe(time.Since(readTimer).Seconds()) @@ -318,7 +318,7 @@ func (r *Relay) process(parentCtx context.Context) error { // do nothing default: if utils.IsErrBinlogPurged(err) { - if gapStreamer != nil { + if gapReader != nil { pos, err2 := tryFindGapStartPos(err, r.fd.Name()) if err2 == nil { log.Errorf("[relay] %s", err.Error()) // log the error @@ -344,7 +344,7 @@ func (r *Relay) process(parentCtx context.Context) error { return errors.Annotatef(err, "gap streamer") } if tryReSync && r.cfg.EnableGTID && r.cfg.AutoFixGTID { - streamer, err = r.reSyncBinlog(r.syncerCfg) + err = r.reSetUpReader(r.syncerCfg) if err != nil { return errors.Annotatef(err, "try auto switch with GTID") } @@ -418,7 +418,7 @@ func (r *Relay) process(parentCtx context.Context) error { } } - if gapStreamer == nil { + if gapReader == nil { gapDetected, fSize, err := r.detectGap(e) if err != nil { relayLogWriteErrorCounter.Inc() @@ -751,51 +751,44 @@ func (r *Relay) checkFormatDescriptionEventExists(filename string) (exists bool, return false, nil } -// NOTE: now, no online master-slave switching supported -// when switching, user must Pause relay, update config, then Resume -// so, will call `getBinlogStreamer` again on new master -func (r *Relay) getBinlogStreamer() (*replication.BinlogStreamer, error) { +func (r *Relay) setUpReader() error { defer func() { - r.lastSlaveConnectionID = r.syncer.LastConnectionID() - log.Infof("[relay] last slave connection id %d", r.lastSlaveConnectionID) + status := r.reader.Status() + log.Infof("[relay] set up binlog reader with status %s", status) }() + if r.cfg.EnableGTID { - return r.startSyncByGTID() + return r.setUpReaderByGTID() } - return r.startSyncByPos() + return r.setUpReaderByPos() } -func (r *Relay) startSyncByGTID() (*replication.BinlogStreamer, error) { +func (r *Relay) setUpReaderByGTID() error { uuid, gs := r.meta.GTID() log.Infof("[relay] start sync for master(%s, %s) from GTID set %s", r.masterNode(), uuid, gs) - streamer, err := r.syncer.StartSyncGTID(gs.Origin()) + err := r.reader.StartSyncByGTID(gs) if err != nil { log.Errorf("[relay] start sync in GTID mode from %s error %v", gs.String(), err) - return r.startSyncByPos() + return r.setUpReaderByPos() } - return streamer, errors.Trace(err) + return nil } -// TODO: exception handling. -// e.g. -// 1.relay connects to a difference MySQL -// 2. upstream MySQL does a pure restart (removes all its' data, and then restart) - -func (r *Relay) startSyncByPos() (*replication.BinlogStreamer, error) { +func (r *Relay) setUpReaderByPos() error { // if the first binlog not exists in local, we should fetch from the first position, whatever the specific position is. uuid, pos := r.meta.Pos() log.Infof("[relay] start sync for master (%s, %s) from %s", r.masterNode(), uuid, pos.String()) if pos.Name == "" { // let mysql decides - return r.syncer.StartSync(pos) + return r.reader.StartSyncByPos(pos) } if stat, err := os.Stat(filepath.Join(r.meta.Dir(), pos.Name)); os.IsNotExist(err) { log.Infof("[relay] should sync from %s:4 instead of %s:%d because the binlog file not exists in local before and should sync from the very beginning", pos.Name, pos.Name, pos.Pos) pos.Pos = 4 } else if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } else { if stat.Size() > int64(pos.Pos) { // it means binlog file already exists, and the local binlog file already contains the specific position @@ -806,25 +799,24 @@ func (r *Relay) startSyncByPos() (*replication.BinlogStreamer, error) { pos.Pos = uint32(stat.Size()) err := r.meta.Save(pos, nil) if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } } else if stat.Size() < int64(pos.Pos) { // in such case, we should stop immediately and check - return nil, errors.Annotatef(ErrBinlogPosGreaterThanFileSize, "%s size=%d, specific pos=%d", pos.Name, stat.Size(), pos.Pos) + return errors.Annotatef(ErrBinlogPosGreaterThanFileSize, "%s size=%d, specific pos=%d", pos.Name, stat.Size(), pos.Pos) } } - streamer, err := r.syncer.StartSync(pos) - return streamer, errors.Trace(err) + return r.reader.StartSyncByPos(pos) } // reSyncBinlog re-tries sync binlog when master-slave switched -func (r *Relay) reSyncBinlog(cfg replication.BinlogSyncerConfig) (*replication.BinlogStreamer, error) { +func (r *Relay) reSyncBinlog(cfg replication.BinlogSyncerConfig) error { err := r.retrySyncGTIDs() if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } - return r.reopenStreamer(cfg) + return r.reSetUpReader(cfg) } // retrySyncGTIDs try to auto fix GTID set @@ -864,18 +856,15 @@ func (r *Relay) retrySyncGTIDs() error { return nil } -// reopenStreamer reopen a new streamer -func (r *Relay) reopenStreamer(cfg replication.BinlogSyncerConfig) (*replication.BinlogStreamer, error) { - if r.syncer != nil { - err := r.closeBinlogSyncer(r.syncer) - r.syncer = nil +func (r *Relay) reSetUpReader(cfg replication.BinlogSyncerConfig) error { + if r.reader != nil { + err := r.reader.Close() + r.reader = nil if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } } - - r.syncer = replication.NewBinlogSyncer(cfg) - return r.getBinlogStreamer() + return r.setUpReader() } func (r *Relay) masterNode() string { @@ -889,9 +878,9 @@ func (r *Relay) IsClosed() bool { // stopSync stops syncing, now it used by Close and Pause func (r *Relay) stopSync() { - if r.syncer != nil { - r.closeBinlogSyncer(r.syncer) - r.syncer = nil + if r.reader != nil { + r.reader.Close() + r.reader = nil } if r.fd != nil { r.fd.Close()