Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

worker: rename to source worker and provide source-level status #2076

Merged
merged 16 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/conn"
fr "github.com/pingcap/dm/pkg/func-rollback"
"github.com/pingcap/dm/pkg/log"
Expand Down Expand Up @@ -420,7 +421,7 @@ func (c *Checker) IsFreshTask() (bool, error) {
}

// Status implements Unit interface.
func (c *Checker) Status() interface{} {
func (c *Checker) Status(_ *binlog.SourceStatus) interface{} {
c.result.RLock()
res := c.result.detail
c.result.RUnlock()
Expand Down
6 changes: 4 additions & 2 deletions dm/unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/terror"
)

Expand Down Expand Up @@ -55,8 +56,9 @@ type Unit interface {
// Update updates the configuration
Update(cfg *config.SubTaskConfig) error

// Status returns the unit's current status
Status() interface{}
// Status returns the unit's current status. The result may need calculation with source status, like estimated time
// to catch up. If sourceStatus is nil, the calculation should be skipped.
Status(sourceStatus *binlog.SourceStatus) interface{}
// Type returns the unit's type
Type() pb.UnitType
// IsFreshTask return whether is a fresh task (not processed before)
Expand Down
4 changes: 2 additions & 2 deletions dm/worker/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ var conditionHub *ConditionHub

// ConditionHub holds a DM-worker and it is used for wait condition detection.
type ConditionHub struct {
w *Worker
w *SourceWorker
}

// InitConditionHub inits the singleton instance of ConditionHub.
func InitConditionHub(w *Worker) {
func InitConditionHub(w *SourceWorker) {
conditionHub = &ConditionHub{
w: w,
}
Expand Down
9 changes: 5 additions & 4 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/terror"
Expand All @@ -39,7 +40,7 @@ type RelayHolder interface {
// Close closes the holder
Close()
// Status returns relay unit's status
Status(ctx context.Context) *pb.RelayStatus
Status(sourceStatus *binlog.SourceStatus) *pb.RelayStatus
// Stage returns the stage of the relay
Stage() pb.Stage
// Error returns relay unit's status
Expand Down Expand Up @@ -148,14 +149,14 @@ func (h *realRelayHolder) run() {
}

// Status returns relay unit's status.
func (h *realRelayHolder) Status(ctx context.Context) *pb.RelayStatus {
func (h *realRelayHolder) Status(sourceStatus *binlog.SourceStatus) *pb.RelayStatus {
if h.closed.Load() || h.relay.IsClosed() {
return &pb.RelayStatus{
Stage: pb.Stage_Stopped,
}
}

s := h.relay.Status(ctx).(*pb.RelayStatus)
s := h.relay.Status(sourceStatus).(*pb.RelayStatus)
s.Stage = h.Stage()
s.Result = h.Result()

Expand Down Expand Up @@ -374,7 +375,7 @@ func (d *dummyRelayHolder) Close() {
}

// Status implements interface of RelayHolder.
func (d *dummyRelayHolder) Status(ctx context.Context) *pb.RelayStatus {
func (d *dummyRelayHolder) Status(sourceStatus *binlog.SourceStatus) *pb.RelayStatus {
d.Lock()
defer d.Unlock()
return &pb.RelayStatus{
Expand Down
11 changes: 6 additions & 5 deletions dm/worker/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/gtid"
pkgstreamer "github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/utils"
Expand Down Expand Up @@ -104,7 +105,7 @@ func (d *DummyRelay) Error() interface{} {
}

// Status implements Process interface.
func (d *DummyRelay) Status(ctx context.Context) interface{} {
func (d *DummyRelay) Status(sourceStatus *binlog.SourceStatus) interface{} {
return &pb.RelayStatus{
Stage: pb.Stage_New,
}
Expand Down Expand Up @@ -185,7 +186,7 @@ func (t *testRelay) testStart(c *C, holder *realRelayHolder) {
c.Assert(holder.closed.Load(), IsFalse)

// test status
status := holder.Status(context.Background())
status := holder.Status(nil)
c.Assert(status.Stage, Equals, pb.Stage_Running)
c.Assert(status.Result, IsNil)

Expand Down Expand Up @@ -226,7 +227,7 @@ func (t *testRelay) testClose(c *C, holder *realRelayHolder) {
c.Assert(holder.closed.Load(), IsTrue)

// todo: very strange, and can't resume
status := holder.Status(context.Background())
status := holder.Status(nil)
c.Assert(status.Stage, Equals, pb.Stage_Stopped)
c.Assert(status.Result, IsNil)

Expand All @@ -244,7 +245,7 @@ func (t *testRelay) testPauseAndResume(c *C, holder *realRelayHolder) {
c.Assert(err, ErrorMatches, ".*current stage is Paused.*")

// test status
status := holder.Status(context.Background())
status := holder.Status(nil)
c.Assert(status.Stage, Equals, pb.Stage_Paused)

// test update
Expand All @@ -260,7 +261,7 @@ func (t *testRelay) testPauseAndResume(c *C, holder *realRelayHolder) {
c.Assert(err, ErrorMatches, ".*current stage is Running.*")

// test status
status = holder.Status(context.Background())
status = holder.Status(nil)
c.Assert(status.Stage, Equals, pb.Stage_Running)
c.Assert(status.Result, IsNil)

Expand Down
10 changes: 5 additions & 5 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Server struct {

rootLis net.Listener
svr *grpc.Server
worker *Worker
worker *SourceWorker
etcdClient *clientv3.Client

// relay status will never be put in server.sourceStatus
Expand Down Expand Up @@ -497,7 +497,7 @@ func (s *Server) Close() {
}

// if needLock is false, we should make sure Server has been locked in caller.
func (s *Server) getWorker(needLock bool) *Worker {
func (s *Server) getWorker(needLock bool) *SourceWorker {
if needLock {
s.Lock()
defer s.Unlock()
Expand All @@ -506,7 +506,7 @@ func (s *Server) getWorker(needLock bool) *Worker {
}

// if needLock is false, we should make sure Server has been locked in caller.
func (s *Server) setWorker(worker *Worker, needLock bool) {
func (s *Server) setWorker(worker *SourceWorker, needLock bool) {
if needLock {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -819,7 +819,7 @@ func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR
}, nil
}

func (s *Server) getOrStartWorker(cfg *config.SourceConfig, needLock bool) (*Worker, error) {
func (s *Server) getOrStartWorker(cfg *config.SourceConfig, needLock bool) (*SourceWorker, error) {
if needLock {
s.Lock()
defer s.Unlock()
Expand All @@ -834,7 +834,7 @@ func (s *Server) getOrStartWorker(cfg *config.SourceConfig, needLock bool) (*Wor
}

log.L().Info("will start a new worker", zap.String("sourceID", cfg.SourceID))
w, err := NewWorker(cfg, s.etcdClient, s.cfg.Name)
w, err := NewSourceWorker(cfg, s.etcdClient, s.cfg.Name)
if err != nil {
return nil, err
}
Expand Down
Loading