diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index c5dcacc76ba..28a49d60a03 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -463,7 +463,9 @@ func (s *schemaSnapshot) updatePartition(tbl *model.TableInfo) error { for _, partition := range newPi.Definitions { // update table info. if _, ok := s.partitionTable[partition.ID]; ok { - log.Debug("add table partition success", zap.String("name", tbl.Name.O), zap.Int64("tid", id), zap.Reflect("add partition id", partition.ID)) + log.Debug("add table partition success", + zap.String("name", tbl.Name.O), zap.Int64("tid", id), + zap.Int64("add partition id", partition.ID)) } s.partitionTable[partition.ID] = tbl if !tbl.IsEligible(s.explicitTables) { @@ -477,7 +479,9 @@ func (s *schemaSnapshot) updatePartition(tbl *model.TableInfo) error { s.truncateTableID[pid] = struct{}{} delete(s.partitionTable, pid) delete(s.ineligibleTableID, pid) - log.Debug("drop table partition success", zap.String("name", tbl.Name.O), zap.Int64("tid", id), zap.Reflect("truncated partition id", pid)) + log.Debug("drop table partition success", + zap.String("name", tbl.Name.O), zap.Int64("tid", id), + zap.Int64("truncated partition id", pid)) } return nil @@ -545,7 +549,7 @@ func (s *schemaSnapshot) handleDDL(job *timodel.Job) error { if err := s.FillSchemaName(job); err != nil { return errors.Trace(err) } - log.Debug("handle job: ", zap.String("sql query", job.Query), zap.Stringer("job", job)) + log.Info("handle DDL", zap.String("DDL", job.Query), zap.Stringer("job", job)) getWrapTableInfo := func(job *timodel.Job) *model.TableInfo { return model.WrapTableInfo(job.SchemaID, job.SchemaName, job.BinlogInfo.FinishedTS, @@ -719,11 +723,15 @@ func (s *schemaStorageImpl) GetSnapshot(ctx context.Context, ts uint64) (*schema // The infinite retry here is a temporary solution to the `ErrSchemaStorageUnresolved` caused by // DDL puller lagging too much. startTime := time.Now() + logTime := startTime err := retry.Do(ctx, func() error { var err error snap, err = s.getSnapshot(ts) - if time.Since(startTime) >= 5*time.Minute && isRetryable(err) { - log.Warn("GetSnapshot is taking too long, DDL puller stuck?", zap.Uint64("ts", ts)) + now := time.Now() + if now.Sub(logTime) >= 30*time.Second && isRetryable(err) { + log.Warn("GetSnapshot is taking too long, DDL puller stuck?", + zap.Uint64("ts", ts), zap.Duration("duration", now.Sub(startTime))) + logTime = now } return err }, retry.WithBackoffBaseDelay(10), retry.WithInfiniteTries(), retry.WithIsRetryableErr(isRetryable)) @@ -754,7 +762,8 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { if len(s.snaps) > 0 { lastSnap := s.snaps[len(s.snaps)-1] if job.BinlogInfo.FinishedTS <= lastSnap.currentTs { - log.Debug("ignore foregone DDL job", zap.Reflect("job", job)) + log.Info("ignore foregone DDL", + zap.Int64("jobID", job.ID), zap.String("DDL", job.Query)) return nil } snap = lastSnap.Clone() @@ -818,7 +827,7 @@ func (s *schemaStorageImpl) DoGC(ts uint64) { // At state *done*, it will be always and only changed to *synced*. func (s *schemaStorageImpl) skipJob(job *timodel.Job) bool { if s.filter != nil && s.filter.ShouldDiscardDDL(job.Type) { - log.Info("discard the ddl job", zap.Int64("jobID", job.ID), zap.String("query", job.Query)) + log.Info("discard DDL", zap.Int64("jobID", job.ID), zap.String("DDL", job.Query)) return true } return !job.IsSynced() && !job.IsDone()