Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer/: Reduce memory usage (#735) #737

Merged
merged 23 commits into from
Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0dfc864
Change unreasonable buff size and reduce memory usage
lichunzhu Sep 2, 2019
212d716
Merge branch 'master' of https://github.com/pingcap/tidb-binlog into …
lichunzhu Sep 2, 2019
4520757
wait for 0.01s for goroutines to update status
lichunzhu Sep 2, 2019
04d71a1
Merge branch 'master' of https://github.com/pingcap/tidb-binlog into …
lichunzhu Sep 9, 2019
8a6124a
change defaultBinlogItemCount to 8; revise /tests/status/run.sh to sk…
lichunzhu Sep 9, 2019
b860497
fix check error
lichunzhu Sep 9, 2019
dafce05
Merge branch 'master' of https://github.com/pingcap/tidb-binlog into …
lichunzhu Sep 11, 2019
f9fc0e0
Merge branch 'master' of https://github.com/pingcap/tidb-binlog into …
lichunzhu Sep 11, 2019
e21f41c
Merge branch 'czli/drainer/reduceMemoryUsage' of https://github.com/l…
lichunzhu Sep 11, 2019
9bd6252
eliminate loader & syncer buffer usage
lichunzhu Sep 11, 2019
e513e0d
Merge branch 'czli/drainer/reduceMemoryUsage' of https://github.com/l…
lichunzhu Sep 12, 2019
4b74231
reduce memory usage for success channel
lichunzhu Sep 12, 2019
1e51514
add txn manager
lichunzhu Sep 16, 2019
60bd55e
remove info clear and reduce success channel buffer
lichunzhu Sep 18, 2019
be6146d
Merge branch 'master' into czli/drainer/reduceMemoryUsage
july2993 Sep 23, 2019
9b4a2e5
Add more comments and logs for txnManager. Refine txnManager's code.
lichunzhu Sep 24, 2019
e6aded0
simplify atomic operations
lichunzhu Sep 24, 2019
6ddab70
Merge branch 'master' into czli/drainer/reduceMemoryUsage
IANTHEREAL Sep 24, 2019
03427a7
add two unit tests for txnManager to test whether txnManager can quit…
lichunzhu Sep 25, 2019
21d4efc
reduce wait time
lichunzhu Sep 25, 2019
6789bdf
simplify load_test
lichunzhu Sep 25, 2019
81e586c
simplify failMsg
lichunzhu Sep 25, 2019
64aa654
Update pkg/loader/load_test.go
lichunzhu Sep 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const (

var (
maxBinlogItemCount int
defaultBinlogItemCount = 512
defaultBinlogItemCount = 8
supportedCompressors = [...]string{"gzip"}
newZKFromConnectionString = zk.NewFromConnectionString
)
Expand Down
2 changes: 1 addition & 1 deletion drainer/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func NewMerger(ts int64, strategy string, sources ...MergeSource) *Merger {
m := &Merger{
latestTS: ts,
sources: make(map[string]MergeSource),
output: make(chan MergeItem, 10),
output: make(chan MergeItem),
strategy: mergeStrategy,
}

Expand Down
2 changes: 1 addition & 1 deletion drainer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ var (
Subsystem: "drainer",
Name: "read_binlog_size",
Help: "Bucketed histogram of size of a binlog.",
Buckets: prometheus.ExponentialBuckets(16, 2, 20),
Buckets: prometheus.ExponentialBuckets(16, 2, 25),
}, []string{"nodeID"})

queueSizeGauge = prometheus.NewGaugeVec(
Expand Down
2 changes: 1 addition & 1 deletion drainer/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

const (
binlogChanSize = 10
binlogChanSize = 0
)

// Pump holds the connection to a pump node, and keeps the savepoint of binlog last read
Expand Down
2 changes: 2 additions & 0 deletions drainer/pump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ func (s *pumpSuite) TestPullBinlog(c *C) {

// ascending commitTs order
pullBinlogCommitTSChecker(commitTsArray, ret, binlogBytesChan, c)
time.Sleep(10 * time.Microsecond)
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(p.latestTS, Equals, commitTsArray[len(commitTsArray)-1])

// should omit disorder binlog item, latestTs should be 29
pullBinlogCommitTSChecker(wrongCommitTsArray, ret, binlogBytesChan, c)
time.Sleep(10 * time.Microsecond)
c.Assert(p.latestTS, Equals, wrongCommitTsArray[len(wrongCommitTsArray)-2])
}

Expand Down
2 changes: 1 addition & 1 deletion drainer/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type baseSyncer struct {
func newBaseSyncer(tableInfoGetter translator.TableInfoGetter) *baseSyncer {
return &baseSyncer{
baseError: newBaseError(),
success: make(chan *Item, 1024),
success: make(chan *Item, 8),
tableInfoGetter: tableInfoGetter,
}
}
Expand Down
15 changes: 8 additions & 7 deletions drainer/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,6 @@ func (s *syncerSuite) TestOpenAndClose(c *check.C) {

func (s *syncerSuite) TestGetFromSuccesses(c *check.C) {
gen := translator.BinlogGenrator{}
gen.SetDDL()
item := &Item{
Binlog: gen.TiBinlog,
PrewriteValue: gen.PV,
Schema: gen.Schema,
Table: gen.Table,
}

// set up mysql db mock expect
s.mysqlMock.ExpectBegin()
Expand All @@ -144,6 +137,14 @@ func (s *syncerSuite) TestGetFromSuccesses(c *check.C) {

var successCount = make([]int64, len(s.syncers))
for idx, syncer := range s.syncers {
gen.SetDDL()
item := &Item{
Binlog: gen.TiBinlog,
PrewriteValue: gen.PV,
Schema: gen.Schema,
Table: gen.Table,
}

go func(idx int) {
for range syncer.Successes() {
atomic.AddInt64(&successCount[idx], 1)
Expand Down
89 changes: 85 additions & 4 deletions pkg/loader/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ func NewLoader(db *gosql.DB, opt ...Option) (Loader, error) {
workerCount: opts.workerCount,
batchSize: opts.batchSize,
metrics: opts.metrics,
input: make(chan *Txn, 1024),
successTxn: make(chan *Txn, 1024),
input: make(chan *Txn),
successTxn: make(chan *Txn),
merge: true,
saveAppliedTS: opts.saveAppliedTS,

Expand Down Expand Up @@ -430,16 +430,19 @@ func (s *loaderImpl) execDMLs(dmls []*DML) error {

// Run will quit when meet any error, or all the txn are drained
func (s *loaderImpl) Run() error {
txnManager := newTxnManager(1024)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We can't control the memory consumed by the buffers here even when the capacity is limited by the number of DMLs.
  2. If we do want to limit memory consumption by the number of DMLs, we can do that by replacing chan *Txn with chan X where X only contains single DML or DDL, we don't need to implement something similar to a buffered channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It's not easy to measure the memory consumed by a single DML. It seems that the main memory usage in one single DML is map OldValues and Values. So maybe we can count the sizeof these two maps?
  2. If we use X contains only single DML or DDL, we also need to add txn to batchManager but not single DML or DDL. A successfully executed single DML doesn't mean the whole txn is successful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't avoid OOM by checking everywhere we might use memory (and making the program more complex), so the problem we need to solve is to lower the possibility, which I think can be solved by just setting smaller buffer sizes.

Copy link
Contributor Author

@lichunzhu lichunzhu Sep 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that setting smaller buffer sizes will cause performance digression. If we just set buffer size of loader.input to 8 the Update Event wil drop from 7k+ to 3k+.
image
If we change default in loader to case <-time.After(10 * time.Microsecond) the situation will get better, but we still have performance digression. The update event drop from around 7.2k to around 6.7k. However, if the binlogs are very few, it takes more time for load to execute cached binlogs.
image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any better suggestions for @lichunzhu ? @suzaku

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please change output: make(chan MergeItem) back to output: make(chan MergeItem, 10) and try again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@suzaku
It still has performance digression. It's the situation which we set default to case <-time.After(10 * time.Microsecond).
image

defer func() {
log.Info("Run()... in Loader quit")
close(s.successTxn)
txnManager.Close()
}()

batch := fNewBatchManager(s)
input := txnManager.put(s.input)

for {
select {
case txn, ok := <-s.input:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it slow if only change s.input to be no buffer without using txnManager here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
Yes. From 10:27 on I set loader.input a 8 size buffer and don't use txnManager here. The Update Event drop from 7k+ to 3k+.

case txn, ok := <-input:
if !ok {
log.Info("Loader closed, quit running")
if err := batch.execAccumulatedDMLs(); err != nil {
Expand All @@ -449,6 +452,7 @@ func (s *loaderImpl) Run() error {
}

s.metricsInputTxn(txn)
txnManager.pop(txn)
if err := batch.put(txn); err != nil {
return errors.Trace(err)
}
Expand All @@ -464,12 +468,13 @@ func (s *loaderImpl) Run() error {
}

// get first
txn, ok := <-s.input
txn, ok := <-input
if !ok {
return nil
}

s.metricsInputTxn(txn)
txnManager.pop(txn)
if err := batch.put(txn); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -624,6 +629,82 @@ func (b *batchManager) put(txn *Txn) error {
return nil
}

type txnManager struct {
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Sep 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any test for txnManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If in the end we decided to use txnManager, I will add some tests.

cacheChan chan *Txn
shutdown chan struct{}
cachedSize int
maxCacheSize int
cond *sync.Cond
closed bool
}

func newTxnManager(maxCacheSize int) *txnManager {
return &txnManager{
cacheChan: make(chan *Txn, 1024),
maxCacheSize: maxCacheSize,
cond: sync.NewCond(new(sync.Mutex)),
shutdown: make(chan struct{}),
}
}

func (t *txnManager) put(input chan *Txn) chan *Txn {
ret := t.cacheChan
go func() {
for !t.closed {
var txn *Txn
var ok bool
select {
case txn, ok = <-input:
if !ok {
close(ret)
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Sep 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only close ret when input is closed?

it's also an implied assumption, caller must close both the input channel and txnManager in its closing procedure by itself

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function Close() can close ret now.

return
}
case <-t.shutdown:
return
}
txnSize := len(txn.DMLs)

t.cond.L.Lock()
if txnSize < t.maxCacheSize {
for !t.closed && txnSize+t.cachedSize > t.maxCacheSize {
t.cond.Wait()
}
} else {
for !t.closed && t.cachedSize != 0 {
t.cond.Wait()
}
}
t.cond.L.Unlock()

select {
case ret <- txn:
t.cond.L.Lock()
t.cachedSize += txnSize
t.cond.L.Unlock()
case <-t.shutdown:
return
}
}
}()
return ret
}

func (t *txnManager) pop(txn *Txn) {
t.cond.L.Lock()
t.cachedSize -= len(txn.DMLs)
t.cond.Signal()
t.cond.L.Unlock()
}

func (t *txnManager) Close() {
if t.closed {
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Sep 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should atomic operation here to avoid data race

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There won't be concurrent operations to change t.closed so I think we don't need to use atomic operation.

Copy link
Collaborator

@IANTHEREAL IANTHEREAL Sep 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YOU NEED TO ADD A COMMENT IT'S NOT THREAD SAFE, and I really don't like to make any assumptions.

return
}
close(t.shutdown)
t.closed = true
t.cond.Signal()
}

func getAppliedTS(db *gosql.DB) int64 {
appliedTS, err := pkgsql.GetTidbPosition(db)
if err != nil {
Expand Down
25 changes: 13 additions & 12 deletions tests/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,20 @@ func main() {

go func() {
defer ld.Close()
for {
select {
case msg := <-breader.Messages():
str := msg.Binlog.String()
log.S().Debugf("recv: %.2000s", str)
txn, err := loader.SlaveBinlogToTxn(msg.Binlog)
if err != nil {
log.S().Fatal(err)
}
ld.Input() <- txn
case txn := <-ld.Successes():
log.S().Debug("succ: ", txn)
for msg := range breader.Messages() {
str := msg.Binlog.String()
log.S().Debugf("recv: %.2000s", str)
txn, err := loader.SlaveBinlogToTxn(msg.Binlog)
if err != nil {
log.S().Fatal(err)
}
ld.Input() <- txn
}
}()

go func() {
for txn := range ld.Successes() {
log.S().Debug("succ: ", txn)
}
}()

Expand Down
11 changes: 10 additions & 1 deletion tests/status/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,17 @@ cd "$(dirname "$0")"
OUT_DIR=/tmp/tidb_binlog_test
STATUS_LOG="${OUT_DIR}/status.log"

# use latest ts as initial-commit-ts, so we can skip binlog by previous test case
ms=$(date +'%s')
ts=$(($ms*1000<<18))
args="-initial-commit-ts=$ts"
down_run_sql "DROP DATABASE IF EXISTS tidb_binlog"
rm -rf /tmp/tidb_binlog_test/data.drainer


# run drainer, and drainer's status should be online
run_drainer &
run_drainer "$args" &

sleep 2
echo "check drainer's status, should be online"
check_status drainers online
Expand Down