From 81d115ab0ff336073be36b5869dfd21137132844 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 15 Nov 2021 16:46:36 +0800 Subject: [PATCH 1/2] dm/load: fix concurent call Loader.Status --- dm/loader/loader.go | 8 +++--- dm/loader/status.go | 10 +++++-- dm/loader/status_test.go | 56 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 6 deletions(-) create mode 100644 dm/loader/status_test.go diff --git a/dm/loader/loader.go b/dm/loader/loader.go index 35239edf0b4..38ce623b424 100644 --- a/dm/loader/loader.go +++ b/dm/loader/loader.go @@ -441,7 +441,7 @@ type Loader struct { // to calculate remainingTimeGauge metric, map will be init in `l.prepare.prepareDataFiles` dbTableDataTotalSize map[string]map[string]*atomic.Int64 dbTableDataFinishedSize map[string]map[string]*atomic.Int64 - dbTableDataLastFinishedSize map[string]map[string]int64 + dbTableDataLastFinishedSize map[string]map[string]*atomic.Int64 dbTableDataLastUpdatedTime time.Time metaBinlog atomic.String @@ -1045,12 +1045,12 @@ func (l *Loader) prepareDataFiles(files map[string]struct{}) error { if _, ok := l.dbTableDataTotalSize[db]; !ok { l.dbTableDataTotalSize[db] = make(map[string]*atomic.Int64) l.dbTableDataFinishedSize[db] = make(map[string]*atomic.Int64) - l.dbTableDataLastFinishedSize[db] = make(map[string]int64) + l.dbTableDataLastFinishedSize[db] = make(map[string]*atomic.Int64) } if _, ok := l.dbTableDataTotalSize[db][table]; !ok { l.dbTableDataTotalSize[db][table] = atomic.NewInt64(0) l.dbTableDataFinishedSize[db][table] = atomic.NewInt64(0) - l.dbTableDataLastFinishedSize[db][table] = 0 + l.dbTableDataLastFinishedSize[db][table] = atomic.NewInt64(0) } l.dbTableDataTotalSize[db][table].Add(size) @@ -1075,7 +1075,7 @@ func (l *Loader) prepare() error { l.finishedDataSize.Store(0) // reset before load from checkpoint l.dbTableDataTotalSize = make(map[string]map[string]*atomic.Int64) l.dbTableDataFinishedSize = make(map[string]map[string]*atomic.Int64) - l.dbTableDataLastFinishedSize = make(map[string]map[string]int64) + l.dbTableDataLastFinishedSize = make(map[string]map[string]*atomic.Int64) // check if mydumper dir data exists. if !utils.IsDirExists(l.cfg.Dir) { diff --git a/dm/loader/status.go b/dm/loader/status.go index fe525c6022d..4c47e1fd738 100644 --- a/dm/loader/status.go +++ b/dm/loader/status.go @@ -45,11 +45,17 @@ func (l *Loader) printStatus() { totalFileCount := l.totalFileCount.Load() interval := time.Since(l.dbTableDataLastUpdatedTime) + intervalSecond := interval.Seconds() + if intervalSecond == 0 { + return + } + for db, tables := range l.dbTableDataFinishedSize { for table, size := range tables { curFinished := size.Load() - speed := float64(curFinished-l.dbTableDataLastFinishedSize[db][table]) / interval.Seconds() - l.dbTableDataLastFinishedSize[db][table] = curFinished + lastFinished := l.dbTableDataFinishedSize[db][table].Load() + speed := float64(curFinished-lastFinished) / intervalSecond + l.dbTableDataLastFinishedSize[db][table].Store(curFinished) if speed > 0 { remainingSeconds := float64(l.dbTableDataTotalSize[db][table].Load()-curFinished) / speed remainingTimeGauge.WithLabelValues(l.cfg.Name, l.cfg.WorkerName, l.cfg.SourceID, db, table).Set(remainingSeconds) diff --git a/dm/loader/status_test.go b/dm/loader/status_test.go new file mode 100644 index 00000000000..f756258dcc4 --- /dev/null +++ b/dm/loader/status_test.go @@ -0,0 +1,56 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package loader + +import ( + "sync" + + . "github.com/pingcap/check" + "go.uber.org/atomic" + + "github.com/pingcap/ticdc/dm/dm/config" + "github.com/pingcap/ticdc/dm/pkg/log" +) + +func (*testLoaderSuite) TestConcurrentStatus(c *C) { + l := &Loader{} + l.cfg = &config.SubTaskConfig{} + l.logger = log.L() + l.finishedDataSize.Store(100) + l.totalDataSize.Store(200) + l.totalFileCount.Store(10) + l.dbTableDataFinishedSize = map[string]map[string]*atomic.Int64{ + "db1": { + "table1": atomic.NewInt64(10), + "table2": atomic.NewInt64(20), + }, + } + l.dbTableDataLastFinishedSize = map[string]map[string]*atomic.Int64{ + "db1": { + "table1": atomic.NewInt64(0), + "table2": atomic.NewInt64(0), + }, + } + + // test won't race or panic + wg := sync.WaitGroup{} + wg.Add(20) + for i := 0; i < 20; i++ { + go func() { + l.Status(nil) + wg.Done() + }() + } + wg.Wait() +} From f45de3d94a9d74e8d3ed0e1c2220a65202b9258c Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 15 Nov 2021 17:19:21 +0800 Subject: [PATCH 2/2] fix CI --- dm/loader/loader.go | 2 +- dm/loader/status.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dm/loader/loader.go b/dm/loader/loader.go index 38ce623b424..917efcfd638 100644 --- a/dm/loader/loader.go +++ b/dm/loader/loader.go @@ -442,7 +442,7 @@ type Loader struct { dbTableDataTotalSize map[string]map[string]*atomic.Int64 dbTableDataFinishedSize map[string]map[string]*atomic.Int64 dbTableDataLastFinishedSize map[string]map[string]*atomic.Int64 - dbTableDataLastUpdatedTime time.Time + dbTableDataLastUpdatedTime atomic.Time metaBinlog atomic.String metaBinlogGTID atomic.String diff --git a/dm/loader/status.go b/dm/loader/status.go index 4c47e1fd738..33a619ea30f 100644 --- a/dm/loader/status.go +++ b/dm/loader/status.go @@ -44,7 +44,7 @@ func (l *Loader) printStatus() { totalSize := l.totalDataSize.Load() totalFileCount := l.totalFileCount.Load() - interval := time.Since(l.dbTableDataLastUpdatedTime) + interval := time.Since(l.dbTableDataLastUpdatedTime.Load()) intervalSecond := interval.Seconds() if intervalSecond == 0 { return @@ -62,7 +62,7 @@ func (l *Loader) printStatus() { } } } - l.dbTableDataLastUpdatedTime = time.Now() + l.dbTableDataLastUpdatedTime.Store(time.Now()) l.logger.Info("progress status of load", zap.Int64("finished_bytes", finishedSize),