Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/pingcap/tidb-binlog into …
Browse files Browse the repository at this point in the history
…czli/drainer/reduceMemoryUsage
  • Loading branch information
lichunzhu committed Sep 9, 2019
2 parents 4520757 + 10ded8f commit 04d71a1
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 53 deletions.
8 changes: 8 additions & 0 deletions cmd/reparo/reparo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ log-level = "info"
# for print, it just prints decoded value.
dest-type = "mysql"

# number of binlog events in a transaction batch
txn-batch = 20

# work count to execute binlogs
# if the latency between reparo and downstream(mysql or tidb) are too high, you might want to increase this
# to get higher throughput by higher concurrent write to the downstream
worker-count = 16

# Enable safe mode to make reparo reentrant, which value can be "true", "false". If the value is "true", reparo will change the "update" command into "delete+replace".
# The default value of safe-mode is false.
# safe-mode = false
Expand Down
7 changes: 6 additions & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Server struct {
wg sync.WaitGroup
gcDuration time.Duration
triggerGC chan time.Time
pullClose chan struct{}
metrics *util.MetricClient
// save the last time we write binlog to Storage
// if long time not write, we can write a fake binlog
Expand Down Expand Up @@ -181,6 +182,7 @@ func NewServer(cfg *Config) (*Server, error) {
pdCli: pdCli,
cfg: cfg,
triggerGC: make(chan time.Time),
pullClose: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -278,12 +280,14 @@ func (s *Server) PullBinlogs(in *binlog.PullBinlogReq, stream binlog.Pump_PullBi
log.Error("drainer request a purged binlog TS, some binlog events may be loss", zap.Int64("gc TS", gcTS), zap.Reflect("request", in))
}

ctx, cancel := context.WithCancel(s.ctx)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
binlogs := s.storage.PullCommitBinlog(ctx, last)

for {
select {
case <-s.pullClose:
return nil
case data, ok := <-binlogs:
if !ok {
return nil
Expand Down Expand Up @@ -857,6 +861,7 @@ func (s *Server) Close() {
s.commitStatus()
log.Info("commit status done")

close(s.pullClose)
// stop the gRPC server
s.gs.GracefulStop()
log.Info("grpc is stopped")
Expand Down
3 changes: 2 additions & 1 deletion pump/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,8 @@ func (s *startServerSuite) TestStartPumpServer(c *C) {
gcDuration: time.Duration(cfg.GC) * 24 * time.Hour,
pdCli: nil,
cfg: cfg,
triggerGC: make(chan time.Time)}
triggerGC: make(chan time.Time),
pullClose: make(chan struct{})}
defer func() {
close(sig)
p.Close()
Expand Down
4 changes: 4 additions & 0 deletions reparo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Config struct {
StopDatetime string `toml:"stop-datetime" json:"stop-datetime"`
StartTSO int64 `toml:"start-tso" json:"start-tso"`
StopTSO int64 `toml:"stop-tso" json:"stop-tso"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`

DestType string `toml:"dest-type" json:"dest-type"`
DestDB *syncer.DBConfig `toml:"dest-db" json:"dest-db"`
Expand Down Expand Up @@ -77,6 +79,8 @@ func NewConfig() *Config {
fs.StringVar(&c.StopDatetime, "stop-datetime", "", "recovery end in stop-datetime, empty string means never end.")
fs.Int64Var(&c.StartTSO, "start-tso", 0, "similar to start-datetime but in pd-server tso format")
fs.Int64Var(&c.StopTSO, "stop-tso", 0, "similar to stop-datetime, but in pd-server tso format")
fs.IntVar(&c.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
fs.IntVar(&c.WorkerCount, "c", 16, "parallel worker count")
fs.StringVar(&c.LogFile, "log-file", "", "log file path")
fs.StringVar(&c.DestType, "dest-type", "print", "dest type, values can be [print,mysql]")
fs.StringVar(&c.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal")
Expand Down
2 changes: 1 addition & 1 deletion reparo/reparo.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Reparo struct {
func New(cfg *Config) (*Reparo, error) {
log.Info("New Reparo", zap.Stringer("config", cfg))

syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.SafeMode)
syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.WorkerCount, cfg.TxnBatch, cfg.SafeMode)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
12 changes: 5 additions & 7 deletions reparo/syncer/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,23 @@ type mysqlSyncer struct {
}

var (
_ Syncer = &mysqlSyncer{}
defaultWorkerCount = 16
defaultBatchSize = 20
_ Syncer = &mysqlSyncer{}
)

// should be only used for unit test to create mock db
var createDB = loader.CreateDB

func newMysqlSyncer(cfg *DBConfig, safemode bool) (*mysqlSyncer, error) {
func newMysqlSyncer(cfg *DBConfig, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) {
db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port)
if err != nil {
return nil, errors.Trace(err)
}

return newMysqlSyncerFromSQLDB(db, safemode)
return newMysqlSyncerFromSQLDB(db, worker, batchSize, safemode)
}

func newMysqlSyncerFromSQLDB(db *sql.DB, safemode bool) (*mysqlSyncer, error) {
loader, err := loader.NewLoader(db, loader.WorkerCount(defaultWorkerCount), loader.BatchSize(defaultBatchSize))
func newMysqlSyncerFromSQLDB(db *sql.DB, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) {
loader, err := loader.NewLoader(db, loader.WorkerCount(worker), loader.BatchSize(batchSize))
if err != nil {
return nil, errors.Annotate(err, "new loader failed")
}
Expand Down
7 changes: 1 addition & 6 deletions reparo/syncer/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) {
var (
mock sqlmock.Sqlmock
)
originWorkerCount := defaultWorkerCount
defaultWorkerCount = 1
defer func() {
defaultWorkerCount = originWorkerCount
}()

oldCreateDB := createDB
createDB = func(string, string, string, int) (db *sql.DB, err error) {
Expand All @@ -37,7 +32,7 @@ func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) {
createDB = oldCreateDB
}()

syncer, err := newMysqlSyncer(&DBConfig{}, safemode)
syncer, err := newMysqlSyncer(&DBConfig{}, 1, 20, safemode)
c.Assert(err, check.IsNil)

mock.ExpectBegin()
Expand Down
4 changes: 2 additions & 2 deletions reparo/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ type Syncer interface {
}

// New creates a new executor based on the name.
func New(name string, cfg *DBConfig, safemode bool) (Syncer, error) {
func New(name string, cfg *DBConfig, worker int, batchSize int, safemode bool) (Syncer, error) {
switch name {
case "mysql":
return newMysqlSyncer(cfg, safemode)
return newMysqlSyncer(cfg, worker, batchSize, safemode)
case "print":
return newPrintSyncer()
case "memory":
Expand Down
2 changes: 1 addition & 1 deletion reparo/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *testSyncerSuite) TestNewSyncer(c *check.C) {
}

for _, testCase := range testCases {
syncer, err := New(testCase.typeStr, cfg, false)
syncer, err := New(testCase.typeStr, cfg, 16, 20, false)
c.Assert(err, check.IsNil)
c.Assert(reflect.TypeOf(syncer), testCase.checker, testCase.tp)
}
Expand Down
76 changes: 42 additions & 34 deletions tests/dailytest/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,34 +351,46 @@ func updatePKUK(db *sql.DB, opNum int) error {
maxKey := 20
mustExec(db, "create table pkuk(pk int primary key, uk int, v int, unique key uk(uk));")

var pks []int
addPK := func(pk int) {
pks = append(pks, pk)
pks := make(map[int]struct{})
freePks := rand.Perm(maxKey)

nextPk := func() int {
rand.Shuffle(len(freePks), func(i, j int) {
freePks[i], freePks[j] = freePks[j], freePks[i]
})
return freePks[0]
}
removePK := func(pk int) {
var tmp []int
for _, v := range pks {
if v != pk {
tmp = append(tmp, v)
addPK := func(pk int) {
pks[pk] = struct{}{}
var i, v int
for i, v = range freePks {
if v == pk {
break
}
}
pks = tmp
freePks = append(freePks[:i], freePks[i+1:]...)
}
hasPK := func(pk int) bool {
for _, v := range pks {
if v == pk {
return true
removePK := func(pk int) {
delete(pks, pk)
freePks = append(freePks, pk)
}
genOldPk := func() int {
n := rand.Intn(len(pks))
var i, pk int
for pk = range pks {
if i == n {
break
}
i++
}
return false
return pk
}

for i := 0; i < opNum; {
var sql string
pk := rand.Intn(maxKey)
uk := rand.Intn(maxKey)
v := rand.Intn(10000)
oldPK := rand.Intn(maxKey)
var (
sql string
pk, oldPK int
)

// try randomly insert&update&delete
op := rand.Intn(3)
Expand All @@ -387,29 +399,25 @@ func updatePKUK(db *sql.DB, opNum int) error {
if len(pks) == maxKey {
continue
}
for hasPK(pk) {
log.S().Info(pks)
pk = rand.Intn(maxKey)
}
pk = nextPk()
uk := rand.Intn(maxKey)
v := rand.Intn(10000)
sql = fmt.Sprintf("insert into pkuk(pk, uk, v) values(%d,%d,%d)", pk, uk, v)
case 1:
if len(pks) == 0 {
if len(pks) == 0 || len(pks) == maxKey {
continue
}
for !hasPK(oldPK) {
log.S().Info(pks)
oldPK = rand.Intn(maxKey)
}
pk = nextPk()
oldPK = genOldPk()
uk := rand.Intn(maxKey)
v := rand.Intn(10000)
sql = fmt.Sprintf("update pkuk set pk = %d, uk = %d, v = %d where pk = %d", pk, uk, v, oldPK)
case 2:
if len(pks) == 0 {
continue
}
for !hasPK(pk) {
log.S().Info(pks)
pk = rand.Intn(maxKey)
}
sql = fmt.Sprintf("delete from pkuk where pk = %d", pk)
oldPK = genOldPk()
sql = fmt.Sprintf("delete from pkuk where pk = %d", oldPK)
}

_, err := db.Exec(sql)
Expand All @@ -428,7 +436,7 @@ func updatePKUK(db *sql.DB, opNum int) error {
removePK(oldPK)
addPK(pk)
case 2:
removePK(pk)
removePK(oldPK)
}
i++
}
Expand Down

0 comments on commit 04d71a1

Please sign in to comment.