diff --git a/cdc/owner/ddl_puller.go b/cdc/owner/ddl_puller.go index 6018b5957eb..32e96556000 100644 --- a/cdc/owner/ddl_puller.go +++ b/cdc/owner/ddl_puller.go @@ -52,6 +52,7 @@ type ddlPullerImpl struct { mu sync.Mutex resolvedTS uint64 pendingDDLJobs []*timodel.Job + lastDDLJobID int64 cancel context.CancelFunc } @@ -117,9 +118,14 @@ func (h *ddlPullerImpl) Run(ctx cdcContext.Context) error { log.Info("discard the ddl job", zap.Int64("jobID", job.ID), zap.String("query", job.Query)) return nil } + if job.ID == h.lastDDLJobID { + log.Warn("ignore duplicated DDL job", zap.Any("job", job)) + return nil + } h.mu.Lock() defer h.mu.Unlock() h.pendingDDLJobs = append(h.pendingDDLJobs, job) + h.lastDDLJobID = job.ID return nil } diff --git a/cdc/owner/ddl_puller_test.go b/cdc/owner/ddl_puller_test.go index 371c0f5232a..acb9e301642 100644 --- a/cdc/owner/ddl_puller_test.go +++ b/cdc/owner/ddl_puller_test.go @@ -187,9 +187,8 @@ func (s *ddlPullerSuite) TestPuller(c *check.C) { resolvedTs, ddl = p.PopFrontDDL() c.Assert(resolvedTs, check.Equals, uint64(25)) c.Assert(ddl.ID, check.Equals, int64(3)) - resolvedTs, ddl = p.PopFrontDDL() - c.Assert(resolvedTs, check.Equals, uint64(25)) - c.Assert(ddl.ID, check.Equals, int64(3)) + _, ddl = p.PopFrontDDL() + c.Assert(ddl, check.IsNil) waitResolvedTsGrowing(c, p, 30) resolvedTs, ddl = p.PopFrontDDL()