Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support syncer plugin #946

Open
wants to merge 5 commits into
base: plugin
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ type SyncerConfig struct {
EnableCausalityFlag *bool `toml:"-" json:"enable-detect-flag"`
DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"`
EnableCausalityFile *bool `toml:"enable-detect" json:"enable-detect"`

PluginPath string `toml:"plugin-path" json:"plugin-path"`
PluginName string `toml:"plugin-name" json:"plugin-name"`
}

// EnableDispatch return true if enable dispatch.
Expand Down Expand Up @@ -216,6 +219,8 @@ func NewConfig() *Config {
fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size")
fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced")
fs.StringVar(new(string), "log-rotate", "", "DEPRECATED")
fs.StringVar(&cfg.SyncerCfg.PluginName, "plugin-name", "", "syncer plugin name")
fs.StringVar(&cfg.SyncerCfg.PluginPath, "plugin-path", "", "syncer plugin path")

return cfg
}
Expand Down
4 changes: 2 additions & 2 deletions drainer/sync/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type KafkaSyncer struct {
lastSuccessTime time.Time

shutdown chan struct{}
*baseSyncer
*BaseSyncer
}

// newAsyncProducer will only be changed in unit test for mock
Expand All @@ -69,7 +69,7 @@ func NewKafka(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter) (*Kafka
topic: topic,
toBeAckCommitTS: make(map[int64]int),
shutdown: make(chan struct{}),
baseSyncer: newBaseSyncer(tableInfoGetter),
BaseSyncer: newBaseSyncer(tableInfoGetter),
}

config, err := util.NewSaramaConfig(cfg.KafkaVersion, "kafka.")
Expand Down
4 changes: 2 additions & 2 deletions drainer/sync/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type MysqlSyncer struct {
db *sql.DB
loader loader.Loader
relayer relay.Relayer
*baseSyncer
*BaseSyncer
}

// should only be used for unit test to create mock db
Expand Down Expand Up @@ -130,7 +130,7 @@ func NewMysqlSyncer(
db: db,
loader: loader,
relayer: relayer,
baseSyncer: newBaseSyncer(tableInfoGetter),
BaseSyncer: newBaseSyncer(tableInfoGetter),
}

go s.run()
Expand Down
4 changes: 2 additions & 2 deletions drainer/sync/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
var _ Syncer = &pbSyncer{}

type pbSyncer struct {
*baseSyncer
*BaseSyncer

binlogger binlogfile.Binlogger
cancel func()
Expand All @@ -45,7 +45,7 @@ func NewPBSyncer(dir string, retentionDays int, tableInfoGetter translator.Table

s := &pbSyncer{
binlogger: binlogger,
baseSyncer: newBaseSyncer(tableInfoGetter),
BaseSyncer: newBaseSyncer(tableInfoGetter),
cancel: cancel,
}

Expand Down
11 changes: 6 additions & 5 deletions drainer/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,27 @@ type Syncer interface {
Close() error
}

type baseSyncer struct {
//BaseSyncer is basic implementation of syncer
type BaseSyncer struct {
*baseError
success chan *Item
tableInfoGetter translator.TableInfoGetter
}

func newBaseSyncer(tableInfoGetter translator.TableInfoGetter) *baseSyncer {
return &baseSyncer{
func newBaseSyncer(tableInfoGetter translator.TableInfoGetter) *BaseSyncer {
return &BaseSyncer{
baseError: newBaseError(),
success: make(chan *Item, 8),
tableInfoGetter: tableInfoGetter,
}
}

// Successes implements Syncer interface
func (s *baseSyncer) Successes() <-chan *Item {
func (s *BaseSyncer) Successes() <-chan *Item {
return s.success
}

// Error implements Syncer interface
func (s *baseSyncer) Error() <-chan error {
func (s *BaseSyncer) Error() <-chan error {
return s.error()
}
21 changes: 21 additions & 0 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/drainer/syncplg"
"github.com/pingcap/tidb-binlog/pkg/loader"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -120,6 +121,26 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema, info *loopbacksync.LoopBac
// only use for test
case "_intercept":
dsyncer = newInterceptSyncer()
case "plugin":
if len(cfg.PluginName) == 0 || len(cfg.PluginPath) == 0 {
return nil, errors.Errorf("plugin-name or plugin-path is incorrect")
}
newSyncer, err := syncplg.LoadPlugin(cfg.PluginPath, cfg.PluginName)
if err != nil {
return nil, errors.Annotate(err, "fail to load plugin dsyncer")
}

var relayer relay.Relayer
if cfg.Relay.IsEnabled() {
if relayer, err = relay.NewRelayer(cfg.Relay.LogDir, cfg.Relay.MaxFileSize, schema); err != nil {
return nil, errors.Annotate(err, "fail to create relayer")
}
}

dsyncer, err = newSyncer(cfg.To, schema, cfg.WorkerCount, cfg.TxnBatch, queryHistogramVec, cfg.StrSQLMode, cfg.DestDBType, relayer, info, cfg.EnableDispatch(), cfg.EnableCausality())
if err != nil {
return nil, errors.Annotate(err, "fail to create plugin dsyncer")
}
default:
return nil, errors.Errorf("unknown DestDBType: %s", cfg.DestDBType)
}
Expand Down
2 changes: 2 additions & 0 deletions drainer/syncplg/demo/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
plugin:
go build -o syncerdemo.so -buildmode=plugin demo.go
44 changes: 44 additions & 0 deletions drainer/syncplg/demo/demo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package main

import (
"errors"

"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/drainer/relay"
"github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/prometheus/client_golang/prometheus"
)

//DemoSyncer is a syncer demo
type DemoSyncer struct {
sync.BaseSyncer
}

//Sync should be implemented
func (ds *DemoSyncer) Sync(item *sync.Item) error {
return nil
}

//Close should be implemented
func (ds *DemoSyncer) Close() error {
return nil
}

//NewSyncerPlugin return A syncer instance which implemented interface of sync.Syncer
func NewSyncerPlugin(
cfg *sync.DBConfig,
tableInfoGetter translator.TableInfoGetter,
worker int,
batchSize int,
queryHistogramVec *prometheus.HistogramVec,
sqlMode *string,
destDBType string,
relayer relay.Relayer,
info *loopbacksync.LoopBackSync,
enableDispatch bool,
enableCausility bool) (dsyncer sync.Syncer, err error) {
return nil, errors.New("test error")
}

var _ DemoSyncer
62 changes: 62 additions & 0 deletions drainer/syncplg/syncer_plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package syncplg

import (
"errors"
"fmt"
"plugin"

"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/drainer/relay"
"github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/prometheus/client_golang/prometheus"
)

const (
//NewSyncerPlugin is the name of exported function by syncer plugin
NewSyncerPlugin = "NewSyncerPlugin"
)

//NewSyncerFunc is a function type which syncer plugin must implement
type NewSyncerFunc func(
cfg *sync.DBConfig,
tableInfoGetter translator.TableInfoGetter,
worker int,
batchSize int,
queryHistogramVec *prometheus.HistogramVec,
sqlMode *string,
destDBType string,
relayer relay.Relayer,
info *loopbacksync.LoopBackSync,
enableDispatch bool,
enableCausility bool) (dsyncer sync.Syncer, err error)

//LoadPlugin load syncer plugin
func LoadPlugin(path, name string) (NewSyncerFunc, error) {
fp := path + "/" + name
p, err := plugin.Open(fp)
if err != nil {
return nil, fmt.Errorf("faile to Open %s . err: %s", fp, err.Error())
}

sym, err := p.Lookup(NewSyncerPlugin)
if err != nil {
return nil, err
}
newSyncer, ok := sym.(func(
cfg *sync.DBConfig,
tableInfoGetter translator.TableInfoGetter,
worker int,
batchSize int,
queryHistogramVec *prometheus.HistogramVec,
sqlMode *string,
destDBType string,
relayer relay.Relayer,
info *loopbacksync.LoopBackSync,
enableDispatch bool,
enableCausility bool) (dsyncer sync.Syncer, err error))
if !ok {
return nil, errors.New("function type is incorrect")
}
return newSyncer, nil
}