diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index c7be3d1dfc6..edd472455dc 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -119,6 +119,26 @@ func newAgent( return result, nil } + var ownerCaptureInfo *model.CaptureInfo + _, captures, err := etcdClient.GetCaptures(ctx) + for _, captureInfo := range captures { + if captureInfo.ID == ownerCaptureID { + ownerCaptureInfo = captureInfo + break + } + } + if ownerCaptureInfo == nil { + log.Info("schedulerv3: no owner found. We will wait for an owner to contact us.", + zap.String("namespace", changeFeedID.Namespace), + zap.String("changefeed", changeFeedID.ID), + zap.Error(err)) + return result, nil + } + + result.compat.UpdateCaptureInfo(map[model.CaptureID]*model.CaptureInfo{ + ownerCaptureID: ownerCaptureInfo, + }) + log.Info("schedulerv3: agent owner found", zap.String("ownerCaptureID", ownerCaptureID), zap.String("captureID", captureID), diff --git a/cdc/scheduler/internal/v3/agent/agent_test.go b/cdc/scheduler/internal/v3/agent/agent_test.go index 4f950d95975..b3fd7399276 100644 --- a/cdc/scheduler/internal/v3/agent/agent_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_test.go @@ -86,7 +86,9 @@ func TestNewAgent(t *testing.T) { tableExector := newMockTableExecutor() // owner and revision found successfully - me.EXPECT().GetOwnerID(gomock.Any()).Return("owneID", nil).Times(1) + me.EXPECT().GetOwnerID(gomock.Any()).Return("ownerID", nil).Times(1) + me.EXPECT().GetCaptures( + gomock.Any()).Return(int64(0), []*model.CaptureInfo{{ID: "ownerID"}}, nil).Times(1) me.EXPECT().GetOwnerRevision(gomock.Any(), gomock.Any()).Return(int64(2333), nil).Times(1) a, err := newAgent( context.Background(), "capture-test", &liveness, changefeed, me, tableExector, 0) @@ -110,6 +112,8 @@ func TestNewAgent(t *testing.T) { // owner found, get revision failed. me.EXPECT().GetOwnerID(gomock.Any()).Return("ownerID", nil).Times(1) + me.EXPECT().GetCaptures( + gomock.Any()).Return(int64(0), []*model.CaptureInfo{{ID: "ownerID"}}, nil).Times(1) me.EXPECT().GetOwnerRevision(gomock.Any(), gomock.Any()). Return(int64(0), cerror.ErrPDEtcdAPIError).Times(1) a, err = newAgent( @@ -118,6 +122,8 @@ func TestNewAgent(t *testing.T) { require.Nil(t, a) me.EXPECT().GetOwnerID(gomock.Any()).Return("ownerID", nil).Times(1) + me.EXPECT().GetCaptures( + gomock.Any()).Return(int64(0), []*model.CaptureInfo{{ID: "ownerID"}}, nil).Times(1) me.EXPECT().GetOwnerRevision(gomock.Any(), gomock.Any()). Return(int64(0), cerror.ErrOwnerNotFound).Times(1) a, err = newAgent(