diff --git a/drainer/config.go b/drainer/config.go index 4a3ea29da..4200e2757 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -52,7 +52,7 @@ const ( var ( maxBinlogItemCount int - defaultBinlogItemCount = 512 + defaultBinlogItemCount = 8 supportedCompressors = [...]string{"gzip"} newZKFromConnectionString = zk.NewFromConnectionString ) diff --git a/drainer/merge.go b/drainer/merge.go index fe29322f4..920e1804e 100644 --- a/drainer/merge.go +++ b/drainer/merge.go @@ -192,7 +192,7 @@ func NewMerger(ts int64, strategy string, sources ...MergeSource) *Merger { m := &Merger{ latestTS: ts, sources: make(map[string]MergeSource), - output: make(chan MergeItem, 10), + output: make(chan MergeItem), strategy: mergeStrategy, } diff --git a/drainer/metrics.go b/drainer/metrics.go index 99dcb9365..978c8e3d5 100644 --- a/drainer/metrics.go +++ b/drainer/metrics.go @@ -109,7 +109,7 @@ var ( Subsystem: "drainer", Name: "read_binlog_size", Help: "Bucketed histogram of size of a binlog.", - Buckets: prometheus.ExponentialBuckets(16, 2, 20), + Buckets: prometheus.ExponentialBuckets(16, 2, 25), }, []string{"nodeID"}) queueSizeGauge = prometheus.NewGaugeVec( diff --git a/drainer/pump.go b/drainer/pump.go index 6daf2cdb8..6d548aa46 100644 --- a/drainer/pump.go +++ b/drainer/pump.go @@ -32,7 +32,7 @@ import ( ) const ( - binlogChanSize = 10 + binlogChanSize = 0 ) // Pump holds the connection to a pump node, and keeps the savepoint of binlog last read diff --git a/drainer/pump_test.go b/drainer/pump_test.go index d1e379ae4..56e56eeb7 100644 --- a/drainer/pump_test.go +++ b/drainer/pump_test.go @@ -80,10 +80,12 @@ func (s *pumpSuite) TestPullBinlog(c *C) { // ascending commitTs order pullBinlogCommitTSChecker(commitTsArray, ret, binlogBytesChan, c) + time.Sleep(10 * time.Microsecond) c.Assert(p.latestTS, Equals, commitTsArray[len(commitTsArray)-1]) // should omit disorder binlog item, latestTs should be 29 pullBinlogCommitTSChecker(wrongCommitTsArray, ret, binlogBytesChan, c) + time.Sleep(10 * time.Microsecond) c.Assert(p.latestTS, Equals, wrongCommitTsArray[len(wrongCommitTsArray)-2]) } diff --git a/drainer/sync/syncer.go b/drainer/sync/syncer.go index c0a13751f..1dad58bda 100644 --- a/drainer/sync/syncer.go +++ b/drainer/sync/syncer.go @@ -50,7 +50,7 @@ type baseSyncer struct { func newBaseSyncer(tableInfoGetter translator.TableInfoGetter) *baseSyncer { return &baseSyncer{ baseError: newBaseError(), - success: make(chan *Item, 1024), + success: make(chan *Item, 8), tableInfoGetter: tableInfoGetter, } } diff --git a/drainer/sync/syncer_test.go b/drainer/sync/syncer_test.go index 39cf8ebee..a153eb132 100644 --- a/drainer/sync/syncer_test.go +++ b/drainer/sync/syncer_test.go @@ -120,13 +120,6 @@ func (s *syncerSuite) TestOpenAndClose(c *check.C) { func (s *syncerSuite) TestGetFromSuccesses(c *check.C) { gen := translator.BinlogGenrator{} - gen.SetDDL() - item := &Item{ - Binlog: gen.TiBinlog, - PrewriteValue: gen.PV, - Schema: gen.Schema, - Table: gen.Table, - } // set up mysql db mock expect s.mysqlMock.ExpectBegin() @@ -144,6 +137,14 @@ func (s *syncerSuite) TestGetFromSuccesses(c *check.C) { var successCount = make([]int64, len(s.syncers)) for idx, syncer := range s.syncers { + gen.SetDDL() + item := &Item{ + Binlog: gen.TiBinlog, + PrewriteValue: gen.PV, + Schema: gen.Schema, + Table: gen.Table, + } + go func(idx int) { for range syncer.Successes() { atomic.AddInt64(&successCount[idx], 1) diff --git a/pkg/loader/load.go b/pkg/loader/load.go index 64d77bf94..402fe5766 100644 --- a/pkg/loader/load.go +++ b/pkg/loader/load.go @@ -159,8 +159,8 @@ func NewLoader(db *gosql.DB, opt ...Option) (Loader, error) { workerCount: opts.workerCount, batchSize: opts.batchSize, metrics: opts.metrics, - input: make(chan *Txn, 1024), - successTxn: make(chan *Txn, 1024), + input: make(chan *Txn), + successTxn: make(chan *Txn), merge: true, saveAppliedTS: opts.saveAppliedTS, @@ -430,16 +430,19 @@ func (s *loaderImpl) execDMLs(dmls []*DML) error { // Run will quit when meet any error, or all the txn are drained func (s *loaderImpl) Run() error { + txnManager := newTxnManager(1024, s.input) defer func() { log.Info("Run()... in Loader quit") close(s.successTxn) + txnManager.Close() }() batch := fNewBatchManager(s) + input := txnManager.run() for { select { - case txn, ok := <-s.input: + case txn, ok := <-input: if !ok { log.Info("Loader closed, quit running") if err := batch.execAccumulatedDMLs(); err != nil { @@ -449,6 +452,7 @@ func (s *loaderImpl) Run() error { } s.metricsInputTxn(txn) + txnManager.pop(txn) if err := batch.put(txn); err != nil { return errors.Trace(err) } @@ -464,12 +468,13 @@ func (s *loaderImpl) Run() error { } // get first - txn, ok := <-s.input + txn, ok := <-input if !ok { return nil } s.metricsInputTxn(txn) + txnManager.pop(txn) if err := batch.put(txn); err != nil { return errors.Trace(err) } @@ -624,6 +629,92 @@ func (b *batchManager) put(txn *Txn) error { return nil } +// txnManager can only match one input channel +type txnManager struct { + input chan *Txn + cacheChan chan *Txn + shutdown chan struct{} + cachedSize int + maxCacheSize int + cond *sync.Cond + isClosed int32 +} + +func newTxnManager(maxCacheSize int, input chan *Txn) *txnManager { + return &txnManager{ + input: input, + cacheChan: make(chan *Txn, 1024), + maxCacheSize: maxCacheSize, + cond: sync.NewCond(new(sync.Mutex)), + shutdown: make(chan struct{}), + } +} + +// run can only be used once for a txnManager instance +func (t *txnManager) run() chan *Txn { + ret := t.cacheChan + input := t.input + go func() { + defer func() { + log.Info("run()... in txnManager quit") + close(ret) + }() + + for atomic.LoadInt32(&t.isClosed) == 0 { + var txn *Txn + var ok bool + select { + case txn, ok = <-input: + if !ok { + log.Info("Loader has been closed. Start quitting txnManager") + return + } + case <-t.shutdown: + return + } + txnSize := len(txn.DMLs) + + t.cond.L.Lock() + if txnSize < t.maxCacheSize { + for atomic.LoadInt32(&t.isClosed) == 0 && txnSize+t.cachedSize > t.maxCacheSize { + t.cond.Wait() + } + } else { + for atomic.LoadInt32(&t.isClosed) == 0 && t.cachedSize != 0 { + t.cond.Wait() + } + } + t.cond.L.Unlock() + + select { + case ret <- txn: + t.cond.L.Lock() + t.cachedSize += txnSize + t.cond.L.Unlock() + case <-t.shutdown: + return + } + } + }() + return ret +} + +func (t *txnManager) pop(txn *Txn) { + t.cond.L.Lock() + t.cachedSize -= len(txn.DMLs) + t.cond.Signal() + t.cond.L.Unlock() +} + +func (t *txnManager) Close() { + if !atomic.CompareAndSwapInt32(&t.isClosed, 0, 1) { + return + } + close(t.shutdown) + t.cond.Signal() + log.Info("txnManager has been closed") +} + func getAppliedTS(db *gosql.DB) int64 { appliedTS, err := pkgsql.GetTidbPosition(db) if err != nil { diff --git a/pkg/loader/load_test.go b/pkg/loader/load_test.go index fbdda7979..d6655cacf 100644 --- a/pkg/loader/load_test.go +++ b/pkg/loader/load_test.go @@ -378,6 +378,138 @@ func (s *batchManagerSuite) TestShouldExecAccumulatedDMLs(c *check.C) { c.Assert(bm.txns, check.HasLen, 1) } +type txnManagerSuite struct{} + +var _ = check.Suite(&txnManagerSuite{}) + +func inputTxnInTime(c *check.C, input chan *Txn, txn *Txn, timeLimit time.Duration) { + select { + case input <- txn: + case <-time.After(timeLimit): + c.Fatal("txnManager gets blocked while receiving txns") + } +} + +func outputTxnInTime(c *check.C, output chan *Txn, timeLimit time.Duration) *Txn { + if timeLimit != 0 { + select { + case t := <-output: + return t + case <-time.After(timeLimit): + c.Fatal("Fail to pick txn from txnManager") + } + } else { + select { + case t := <-output: + return t + default: + c.Fatal("Fail to pick txn from txnManager") + } + } + return nil +} + +func (s *txnManagerSuite) TestRunTxnManager(c *check.C) { + input := make(chan *Txn) + dmls := []*DML{ + {Tp: UpdateDMLType}, + {Tp: InsertDMLType}, + {Tp: UpdateDMLType}, + } + txn := &Txn{DMLs: dmls} + txnManager := newTxnManager(14, input) + output := txnManager.run() + // send 5 txns (size 3) to txnManager, the 5th txn should get blocked at cond.Wait() + for i := 0; i < 5; i++ { + inputTxnInTime(c, input, txn, 10*time.Microsecond) + } + c.Assert(output, check.HasLen, 4) + // Next txn should be blocked + select { + case input <- txn: + c.Fatal("txnManager doesn't block the txn when room is not enough") + default: + } + c.Assert(output, check.HasLen, 4) + // pick one txn from output channel + t := outputTxnInTime(c, output, 0) + txnManager.pop(t) + c.Assert(t, check.DeepEquals, txn) + // Now txn won't be blocked but txnManager should be blocked at cond.Wait() + inputTxnInTime(c, input, txn, 10*time.Microsecond) + // close txnManager and output should be closed when txnManager is closed + txnManager.Close() + outputClose := make(chan struct{}) + go func() { + for t := range output { + c.Assert(t, check.DeepEquals, txn) + } + close(outputClose) + }() + select { + case <-outputClose: + case <-time.After(time.Second): + c.Fatal("txnManager fails to end run()... and close output channel in 1s, may get blocked") + } +} + +func (s *txnManagerSuite) TestAddBigTxn(c *check.C) { + input := make(chan *Txn) + txnSmall := &Txn{DMLs: []*DML{{Tp: UpdateDMLType}}} + txnBig := &Txn{DMLs: []*DML{ + {Tp: UpdateDMLType}, + {Tp: InsertDMLType}, + {Tp: UpdateDMLType}, + }} + txnManager := newTxnManager(1, input) + output := txnManager.run() + inputTxnInTime(c, input, txnSmall, 50*time.Microsecond) + inputTxnInTime(c, input, txnBig, 10*time.Microsecond) + + t := outputTxnInTime(c, output, 0) + txnManager.pop(t) + c.Assert(t, check.DeepEquals, txnSmall) + + t = outputTxnInTime(c, output, 10*time.Microsecond) + txnManager.pop(t) + c.Assert(t, check.DeepEquals, txnBig) + + txnManager.Close() + select { + case _, ok := <-output: + c.Assert(ok, check.Equals, false) + case <-time.After(time.Second): + c.Fatal("txnManager fails to end run()... and close output channel in 1s, may get blocked") + } +} + +func (s *txnManagerSuite) TestCloseLoaderInput(c *check.C) { + input := make(chan *Txn) + dmls := []*DML{ + {Tp: UpdateDMLType}, + {Tp: InsertDMLType}, + {Tp: UpdateDMLType}, + } + txn := &Txn{DMLs: dmls} + txnManager := newTxnManager(1, input) + output := txnManager.run() + + inputTxnInTime(c, input, txn, 50*time.Microsecond) + close(input) + + t := outputTxnInTime(c, output, 10*time.Microsecond) + txnManager.pop(t) + c.Assert(t, check.DeepEquals, txn) + + // output should be closed when input is closed + select { + case _, ok := <-output: + c.Assert(ok, check.Equals, false) + case <-time.After(time.Second): + c.Fatal("txnManager fails to end run()... when input channel is closed") + } +} + type runSuite struct{} var _ = check.Suite(&runSuite{}) diff --git a/tests/kafka/kafka.go b/tests/kafka/kafka.go index c84fb0065..d765ba62b 100644 --- a/tests/kafka/kafka.go +++ b/tests/kafka/kafka.go @@ -84,19 +84,20 @@ func main() { go func() { defer ld.Close() - for { - select { - case msg := <-breader.Messages(): - str := msg.Binlog.String() - log.S().Debugf("recv: %.2000s", str) - txn, err := loader.SlaveBinlogToTxn(msg.Binlog) - if err != nil { - log.S().Fatal(err) - } - ld.Input() <- txn - case txn := <-ld.Successes(): - log.S().Debug("succ: ", txn) + for msg := range breader.Messages() { + str := msg.Binlog.String() + log.S().Debugf("recv: %.2000s", str) + txn, err := loader.SlaveBinlogToTxn(msg.Binlog) + if err != nil { + log.S().Fatal(err) } + ld.Input() <- txn + } + }() + + go func() { + for txn := range ld.Successes() { + log.S().Debug("succ: ", txn) } }() diff --git a/tests/status/run.sh b/tests/status/run.sh index c2d07e10d..466eba687 100755 --- a/tests/status/run.sh +++ b/tests/status/run.sh @@ -8,8 +8,17 @@ cd "$(dirname "$0")" OUT_DIR=/tmp/tidb_binlog_test STATUS_LOG="${OUT_DIR}/status.log" +# use latest ts as initial-commit-ts, so we can skip binlog by previous test case +ms=$(date +'%s') +ts=$(($ms*1000<<18)) +args="-initial-commit-ts=$ts" +down_run_sql "DROP DATABASE IF EXISTS tidb_binlog" +rm -rf /tmp/tidb_binlog_test/data.drainer + + # run drainer, and drainer's status should be online -run_drainer & +run_drainer "$args" & + sleep 2 echo "check drainer's status, should be online" check_status drainers online