Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay: use Reader interface read binlog events #92

Merged
merged 9 commits into from
Mar 28, 2019
65 changes: 65 additions & 0 deletions pkg/binlog/reader/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package reader

import (
"context"

gmysql "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"

"github.com/pingcap/dm/pkg/gtid"
)

type readerStage int32

const (
stageNew readerStage = iota
stagePrepared
stageClosed
)

// String implements Stringer.String.
func (s readerStage) String() string {
switch s {
case stageNew:
return "new"
case stagePrepared:
return "prepared"
case stageClosed:
return "closed"
default:
return "unknown"
}
}

// Reader is a binlog event reader, it may read binlog events from a TCP stream, binlog files or any other in-memory buffer.
// One reader should read binlog events either through position mode or GTID mode.
type Reader interface {
// StartSyncByPos prepares the reader for reading binlog from the specified position.
StartSyncByPos(pos gmysql.Position) error

// StartSyncByGTID prepares the reader for reading binlog from the specified GTID set.
StartSyncByGTID(gSet gtid.Set) error

// Close closes the reader and release the resource.
Close() error

// GetEvent gets the binlog event one by one, it will block if no event can be read.
// You can pass a context (like Cancel or Timeout) to break the block.
GetEvent(ctx context.Context) (*replication.BinlogEvent, error)

// Status returns the status of the reader.
Status() interface{}
}
157 changes: 157 additions & 0 deletions pkg/binlog/reader/tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package reader

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"sync"

"github.com/pingcap/errors"
gmysql "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go/sync2"

"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
)

// TCPReader is a binlog event reader which read binlog events from a TCP stream.
type TCPReader struct {
mu sync.Mutex

stage sync2.AtomicInt32
syncerCfg replication.BinlogSyncerConfig
syncer *replication.BinlogSyncer
streamer *replication.BinlogStreamer
}

// TCPReaderStatus represents the status of a TCPReader.
type TCPReaderStatus struct {
Stage string `json:"stage"`
ConnID uint32 `json:"connection"`
}

// String implements Stringer.String.
func (s *TCPReaderStatus) String() string {
data, err := json.Marshal(s)
if err != nil {
log.Errorf("[TCPReaderStatus] marshal status to json error %v", err)
}
return string(data)
}

// NewTCPReader creates a TCPReader instance.
func NewTCPReader(syncerCfg replication.BinlogSyncerConfig) Reader {
return &TCPReader{
syncerCfg: syncerCfg,
syncer: replication.NewBinlogSyncer(syncerCfg),
}
}

// StartSyncByPos implements Reader.StartSyncByPos.
func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error {
r.mu.Lock()
defer r.mu.Unlock()

if r.stage.Get() != int32(stageNew) {
return errors.Errorf("stage %s, expect %s", readerStage(r.stage), stageNew)
}

streamer, err := r.syncer.StartSync(pos)
if err != nil {
return errors.Annotatef(err, "start sync from position %s", pos)
}

r.streamer = streamer
r.stage.Set(int32(stagePrepared))
return nil
}

// StartSyncByGTID implements Reader.StartSyncByGTID.
func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error {
r.mu.Lock()
defer r.mu.Unlock()

if r.stage.Get() != int32(stageNew) {
return errors.Errorf("stage %s, expect %s", readerStage(r.stage), stageNew)
}

if gSet == nil {
return errors.NotValidf("nil GTID set")
}

streamer, err := r.syncer.StartSyncGTID(gSet.Origin())
if err != nil {
return errors.Annotatef(err, "start sync from GTID set %s", gSet)
}

r.streamer = streamer
r.stage.Set(int32(stagePrepared))
return nil
}

// Close implements Reader.Close.
func (r *TCPReader) Close() error {
r.mu.Lock()
defer r.mu.Unlock()

if r.stage.Get() == int32(stageClosed) {
return errors.New("already closed")
}

connID := r.syncer.LastConnectionID()
if connID > 0 {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4",
r.syncerCfg.User, r.syncerCfg.Password, r.syncerCfg.Host, r.syncerCfg.Port)
db, err := sql.Open("mysql", dsn)
if err != nil {
return errors.Annotatef(err, "open connection to the master %s:%d", r.syncerCfg.Host, r.syncerCfg.Port)
}
defer db.Close()
err = utils.KillConn(db, connID)
if err != nil {
return errors.Annotatef(err, "kill connection %d for master %s:%d", connID, r.syncerCfg.Host, r.syncerCfg.Port)
}
}

r.stage.Set(int32(stageClosed))
return nil
}

// GetEvent implements Reader.GetEvent.
func (r *TCPReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) {
if r.stage.Get() != int32(stagePrepared) {
return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage), stagePrepared)
}

return r.streamer.GetEvent(ctx)
}

// Status implements Reader.Status.
func (r *TCPReader) Status() interface{} {
stage := r.stage.Get()

var connID uint32
if stage != int32(stageNew) {
connID = r.syncer.LastConnectionID()
}
return &TCPReaderStatus{
Stage: readerStage(stage).String(),
ConnID: connID,
}
}
Loading