Skip to content

Commit

Permalink
schema_storage: verbose DDL log (#3187) (#3198)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 2, 2021
1 parent f8c1f36 commit 6c26fa6
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,9 @@ func (s *schemaSnapshot) updatePartition(tbl *model.TableInfo) error {
for _, partition := range newPi.Definitions {
// update table info.
if _, ok := s.partitionTable[partition.ID]; ok {
log.Debug("add table partition success", zap.String("name", tbl.Name.O), zap.Int64("tid", id), zap.Reflect("add partition id", partition.ID))
log.Debug("add table partition success",
zap.String("name", tbl.Name.O), zap.Int64("tid", id),
zap.Int64("add partition id", partition.ID))
}
s.partitionTable[partition.ID] = tbl
if !tbl.IsEligible(s.explicitTables) {
Expand All @@ -477,7 +479,9 @@ func (s *schemaSnapshot) updatePartition(tbl *model.TableInfo) error {
s.truncateTableID[pid] = struct{}{}
delete(s.partitionTable, pid)
delete(s.ineligibleTableID, pid)
log.Debug("drop table partition success", zap.String("name", tbl.Name.O), zap.Int64("tid", id), zap.Reflect("truncated partition id", pid))
log.Debug("drop table partition success",
zap.String("name", tbl.Name.O), zap.Int64("tid", id),
zap.Int64("truncated partition id", pid))
}

return nil
Expand Down Expand Up @@ -545,7 +549,7 @@ func (s *schemaSnapshot) handleDDL(job *timodel.Job) error {
if err := s.FillSchemaName(job); err != nil {
return errors.Trace(err)
}
log.Debug("handle job: ", zap.String("sql query", job.Query), zap.Stringer("job", job))
log.Info("handle DDL", zap.String("DDL", job.Query), zap.Stringer("job", job))
getWrapTableInfo := func(job *timodel.Job) *model.TableInfo {
return model.WrapTableInfo(job.SchemaID, job.SchemaName,
job.BinlogInfo.FinishedTS,
Expand Down Expand Up @@ -719,11 +723,15 @@ func (s *schemaStorageImpl) GetSnapshot(ctx context.Context, ts uint64) (*schema
// The infinite retry here is a temporary solution to the `ErrSchemaStorageUnresolved` caused by
// DDL puller lagging too much.
startTime := time.Now()
logTime := startTime
err := retry.Do(ctx, func() error {
var err error
snap, err = s.getSnapshot(ts)
if time.Since(startTime) >= 5*time.Minute && isRetryable(err) {
log.Warn("GetSnapshot is taking too long, DDL puller stuck?", zap.Uint64("ts", ts))
now := time.Now()
if now.Sub(logTime) >= 30*time.Second && isRetryable(err) {
log.Warn("GetSnapshot is taking too long, DDL puller stuck?",
zap.Uint64("ts", ts), zap.Duration("duration", now.Sub(startTime)))
logTime = now
}
return err
}, retry.WithBackoffBaseDelay(10), retry.WithInfiniteTries(), retry.WithIsRetryableErr(isRetryable))
Expand Down Expand Up @@ -754,7 +762,8 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error {
if len(s.snaps) > 0 {
lastSnap := s.snaps[len(s.snaps)-1]
if job.BinlogInfo.FinishedTS <= lastSnap.currentTs {
log.Debug("ignore foregone DDL job", zap.Reflect("job", job))
log.Info("ignore foregone DDL",
zap.Int64("jobID", job.ID), zap.String("DDL", job.Query))
return nil
}
snap = lastSnap.Clone()
Expand Down Expand Up @@ -818,7 +827,7 @@ func (s *schemaStorageImpl) DoGC(ts uint64) {
// At state *done*, it will be always and only changed to *synced*.
func (s *schemaStorageImpl) skipJob(job *timodel.Job) bool {
if s.filter != nil && s.filter.ShouldDiscardDDL(job.Type) {
log.Info("discard the ddl job", zap.Int64("jobID", job.ID), zap.String("query", job.Query))
log.Info("discard DDL", zap.Int64("jobID", job.ID), zap.String("DDL", job.Query))
return true
}
return !job.IsSynced() && !job.IsDone()
Expand Down

0 comments on commit 6c26fa6

Please sign in to comment.