Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

.*/: fix dm unit tests and integration tests #578

Merged
merged 12 commits into from
Apr 9, 2020
10 changes: 5 additions & 5 deletions dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
c.Assert(ha.KeepAlive(ctx1, etcdTestCli, workerName1, keepAliveTTL), IsNil)
}()
// wait for source1 bound to worker1.
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
bounds := s.BoundSources()
return len(bounds) == 1 && bounds[0] == sourceID1
}), IsTrue)
Expand Down Expand Up @@ -304,7 +304,7 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
c.Assert(ha.KeepAlive(ctx1, etcdTestCli, workerName1, keepAliveTTL), IsNil)
}()
// wait for source1 bound to worker1.
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
bounds := s.BoundSources()
return len(bounds) == 1 && bounds[0] == sourceID1
}), IsTrue)
Expand Down Expand Up @@ -349,7 +349,7 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
c.Assert(ha.KeepAlive(ctx2, etcdTestCli, workerName2, keepAliveTTL), IsNil)
}()
// wait for worker2 become Free.
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
w := s.GetWorkerByName(workerName2)
return w.Stage() == WorkerFree
}), IsTrue)
Expand Down Expand Up @@ -710,7 +710,7 @@ func (t *testScheduler) TestRestartScheduler(c *C) {
}()
// step 2.3: scheduler should bound source to worker
// wait for source1 bound to worker1.
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
bounds := s.BoundSources()
return len(bounds) == 1 && bounds[0] == sourceID1
}), IsTrue)
Expand Down Expand Up @@ -756,7 +756,7 @@ func (t *testScheduler) TestRestartScheduler(c *C) {
wg.Wait()
// check whether keepalive lease is out of date
time.Sleep(time.Duration(keepAliveTTL) * time.Second)
c.Assert(utils.WaitSomething(30, 10*time.Millisecond, func() bool {
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
kam, _, err := ha.GetKeepAliveWorkers(etcdTestCli)
return err == nil && len(kam) == 0
}), IsTrue)
Expand Down
12 changes: 10 additions & 2 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ type Loader struct {

// for every worker goroutine, not for every data file
workerWg *sync.WaitGroup
// for other goroutines
wg sync.WaitGroup

fileJobQueue chan *fileJob
fileJobQueueClosed sync2.AtomicBool
Expand Down Expand Up @@ -580,7 +582,11 @@ func (l *Loader) Restore(ctx context.Context) error {
return err2
}

go l.PrintStatus(ctx)
l.wg.Add(1)
go func() {
defer l.wg.Done()
l.PrintStatus(ctx)
}()

begin := time.Now()
err = l.restoreData(ctx)
Expand Down Expand Up @@ -638,8 +644,10 @@ func (l *Loader) stopLoad() {

l.closeFileJobQueue()
l.workerWg.Wait()

l.logCtx.L().Debug("all workers have been closed")

l.wg.Wait()
l.logCtx.L().Debug("all loader's go-routines have been closed")
}

// Pause pauses the process, and it can be resumed later
Expand Down
2 changes: 1 addition & 1 deletion pkg/ha/keepalive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (t *testForEtcd) TestKeepAliveRevokeLease(c *C) {
}(ctx1, cancel1)
}

ctx1, cancel1 := context.WithTimeout(ctx, 4*time.Second)
ctx1, cancel1 := context.WithTimeout(ctx, 6*time.Second)
WatchWorkerEvent(ctx1, etcdTestCli, rev, evCh, errCh)
cancel1()
c.Assert(evCh, HasLen, 100)
Expand Down
11 changes: 10 additions & 1 deletion pkg/metricsproxy/countervec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
package metricsproxy

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
)

// CounterVecProxy to proxy prometheus.CounterVec
type CounterVecProxy struct {
mu sync.Mutex

LabelNames []string
Labels map[string]map[string]string
*prometheus.CounterVec
Expand All @@ -44,7 +48,9 @@ func (c *CounterVecProxy) WithLabelValues(lvs ...string) prometheus.Counter {
for index, label := range lvs {
labels[c.LabelNames[index]] = label
}
c.mu.Lock()
noteLabelsInMetricsProxy(c, labels)
c.mu.Unlock()
}
return c.CounterVec.WithLabelValues(lvs...)
}
Expand All @@ -54,7 +60,9 @@ func (c *CounterVecProxy) WithLabelValues(lvs ...string) prometheus.Counter {
// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Add(42)
func (c *CounterVecProxy) With(labels prometheus.Labels) prometheus.Counter {
if len(labels) > 0 {
c.mu.Lock()
noteLabelsInMetricsProxy(c, labels)
c.mu.Unlock()
}

return c.CounterVec.With(labels)
Expand All @@ -65,7 +73,8 @@ func (c *CounterVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool {
if len(labels) == 0 {
return false
}

c.mu.Lock()
defer c.mu.Unlock()
return findAndDeleteLabelsInMetricsProxy(c, labels)
}

Expand Down
11 changes: 10 additions & 1 deletion pkg/metricsproxy/gaugevec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
package metricsproxy

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
)

// GaugeVecProxy to proxy prometheus.GaugeVec
type GaugeVecProxy struct {
mu sync.Mutex

LabelNames []string
Labels map[string]map[string]string
*prometheus.GaugeVec
Expand All @@ -44,7 +48,9 @@ func (c *GaugeVecProxy) WithLabelValues(lvs ...string) prometheus.Gauge {
for index, label := range lvs {
labels[c.LabelNames[index]] = label
}
c.mu.Lock()
noteLabelsInMetricsProxy(c, labels)
c.mu.Unlock()
}
return c.GaugeVec.WithLabelValues(lvs...)
}
Expand All @@ -54,7 +60,9 @@ func (c *GaugeVecProxy) WithLabelValues(lvs ...string) prometheus.Gauge {
// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Add(42)
func (c *GaugeVecProxy) With(labels prometheus.Labels) prometheus.Gauge {
if len(labels) > 0 {
c.mu.Lock()
noteLabelsInMetricsProxy(c, labels)
c.mu.Unlock()
}

return c.GaugeVec.With(labels)
Expand All @@ -65,7 +73,8 @@ func (c *GaugeVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool {
if len(labels) == 0 {
return false
}

c.mu.Lock()
defer c.mu.Unlock()
return findAndDeleteLabelsInMetricsProxy(c, labels)
}

Expand Down
11 changes: 10 additions & 1 deletion pkg/metricsproxy/histogramvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
package metricsproxy

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
)

// HistogramVecProxy to proxy prometheus.HistogramVec
type HistogramVecProxy struct {
mu sync.Mutex

LabelNames []string
Labels map[string]map[string]string
*prometheus.HistogramVec
Expand All @@ -44,7 +48,9 @@ func (c *HistogramVecProxy) WithLabelValues(lvs ...string) prometheus.Observer {
for index, label := range lvs {
labels[c.LabelNames[index]] = label
}
c.mu.Lock()
noteLabelsInMetricsProxy(c, labels)
c.mu.Unlock()
}
return c.HistogramVec.WithLabelValues(lvs...)
}
Expand All @@ -54,7 +60,9 @@ func (c *HistogramVecProxy) WithLabelValues(lvs ...string) prometheus.Observer {
// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Observe(42.21)
func (c *HistogramVecProxy) With(labels prometheus.Labels) prometheus.Observer {
if len(labels) > 0 {
c.mu.Lock()
noteLabelsInMetricsProxy(c, labels)
c.mu.Unlock()
}

return c.HistogramVec.With(labels)
Expand All @@ -65,7 +73,8 @@ func (c *HistogramVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool
if len(labels) == 0 {
return false
}

c.mu.Lock()
defer c.mu.Unlock()
return findAndDeleteLabelsInMetricsProxy(c, labels)
}

Expand Down
11 changes: 10 additions & 1 deletion pkg/metricsproxy/summaryvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
package metricsproxy

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
)

// SummaryVecProxy to proxy prometheus.SummaryVec
type SummaryVecProxy struct {
mu sync.Mutex

LabelNames []string
Labels map[string]map[string]string
*prometheus.SummaryVec
Expand Down Expand Up @@ -48,7 +52,9 @@ func (c *SummaryVecProxy) WithLabelValues(lvs ...string) prometheus.Observer {
for index, label := range lvs {
labels[c.LabelNames[index]] = label
}
c.mu.Lock()
noteLabelsInMetricsProxy(c, labels)
c.mu.Unlock()
}
return c.SummaryVec.WithLabelValues(lvs...)
}
Expand All @@ -58,7 +64,9 @@ func (c *SummaryVecProxy) WithLabelValues(lvs ...string) prometheus.Observer {
// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Observe(42.21)
func (c *SummaryVecProxy) With(labels prometheus.Labels) prometheus.Observer {
if len(labels) > 0 {
c.mu.Lock()
noteLabelsInMetricsProxy(c, labels)
c.mu.Unlock()
}

return c.SummaryVec.With(labels)
Expand All @@ -69,7 +77,8 @@ func (c *SummaryVecProxy) DeleteAllAboutLabels(labels prometheus.Labels) bool {
if len(labels) == 0 {
return false
}

c.mu.Lock()
defer c.mu.Unlock()
return findAndDeleteLabelsInMetricsProxy(c, labels)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/shardddl/optimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (t *testForEtcd) TestInfoEtcd(c *C) {
defer clearTestInfoOperation(c)

var (
watchTimeout = 500 * time.Millisecond
watchTimeout = 2 * time.Second
source1 = "mysql-replica-1"
source2 = "mysql-replica-2"
task1 = "task-1"
Expand Down
2 changes: 1 addition & 1 deletion pkg/shardddl/optimism/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (t *testForEtcd) TestSourceTablesEtcd(c *C) {
defer clearTestInfoOperation(c)

var (
watchTimeout = 500 * time.Millisecond
watchTimeout = 2 * time.Second
task = "task"
source1 = "mysql-replica-1"
source2 = "mysql-replica-2"
Expand Down
14 changes: 8 additions & 6 deletions pkg/streamer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"github.com/pingcap/dm/pkg/terror"
)

var parseFileTimeout = 3 * time.Second

var _ = Suite(&testReaderSuite{})

type testReaderSuite struct {
Expand Down Expand Up @@ -269,7 +271,7 @@ func (t *testReaderSuite) TestParseFileRelaySubDirUpdated(c *C) {
_, err2 := f.Write(extraEvents[0].RawData)
c.Assert(err2, IsNil)
}()
ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout)
defer cancel2()
needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile(
ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast)
Expand All @@ -290,7 +292,7 @@ func (t *testReaderSuite) TestParseFileRelaySubDirUpdated(c *C) {
err2 := ioutil.WriteFile(nextPath, replication.BinLogFileHeader, 0600)
c.Assert(err2, IsNil)
}()
ctx3, cancel3 := context.WithTimeout(context.Background(), 2*time.Second)
ctx3, cancel3 := context.WithTimeout(context.Background(), parseFileTimeout)
defer cancel3()
needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile(
ctx3, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast)
Expand Down Expand Up @@ -340,7 +342,7 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) {

// invalid UUID in UUID list, error
r.uuids = []string{currentUUID, "invalid.uuid"}
ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second)
ctx1, cancel1 := context.WithTimeout(context.Background(), parseFileTimeout)
defer cancel1()
needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err := r.parseFile(
ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast)
Expand All @@ -360,7 +362,7 @@ func (t *testReaderSuite) TestParseFileRelayNeedSwitchSubDir(c *C) {
c.Assert(err, IsNil)

// has relay log file in next sub directory, need to switch
ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second)
ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout)
defer cancel2()
needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile(
ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast)
Expand Down Expand Up @@ -401,7 +403,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) {
defer f.Close()

// file has no data, meet io.EOF error (when reading file header) and ignore it. but will get `context deadline exceeded` error
ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second)
ctx1, cancel1 := context.WithTimeout(context.Background(), parseFileTimeout)
defer cancel1()
needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err := r.parseFile(
ctx1, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast)
Expand All @@ -422,7 +424,7 @@ func (t *testReaderSuite) TestParseFileRelayWithIgnorableError(c *C) {
c.Assert(err, IsNil)

// meet `err EOF` error (when parsing binlog event) ignored
ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second)
ctx2, cancel2 := context.WithTimeout(context.Background(), parseFileTimeout)
defer cancel2()
needSwitch, needReParse, latestPos, nextUUID, nextBinlogName, err = r.parseFile(
ctx2, s, filename, offset, relayDir, firstParse, currentUUID, possibleLast)
Expand Down
4 changes: 3 additions & 1 deletion tests/_dmctl_tools/check_master_online.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ func main() {
}
cli := pb.NewMasterClient(conn)
req := &pb.ShowDDLLocksRequest{}
_, err = cli.ShowDDLLocks(context.Background(), req)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
_, err = cli.ShowDDLLocks(ctx, req)
cancel()
if err != nil {
utils.ExitWithError(err)
}
Expand Down
4 changes: 3 additions & 1 deletion tests/_dmctl_tools/check_worker_online.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ func main() {
}
cli := pb.NewWorkerClient(conn)
req := &pb.QueryStatusRequest{}
_, err = cli.QueryStatus(context.Background(), req)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
_, err = cli.QueryStatus(ctx, req)
cancel()
if err != nil {
utils.ExitWithError(err)
}
Expand Down
3 changes: 2 additions & 1 deletion tests/dm_syncer/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ function run() {
sed -i "s/binlog-pos-placeholder-1/$pos1/g" $WORK_DIR/dm-syncer-1.toml
sed -i "s/binlog-name-placeholder-2/\"$name2\"/g" $WORK_DIR/old_meta_file
sed -i "s/binlog-pos-placeholder-2/$pos2/g" $WORK_DIR/old_meta_file
sleep 2
run_dm_syncer $WORK_DIR/syncer1 $WORK_DIR/dm-syncer-1.toml
meta_file=$WORK_DIR/old_meta_file
run_dm_syncer $WORK_DIR/syncer2 $WORK_DIR/dm-syncer-2.toml $meta_file --syncer-config-format syncer2

# wait for dm_syncer to init and start
sleep 5
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

Expand Down
Loading