Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc/kafka: 5.3-manaual-cherry-pick-for-kafka-configuration #4641

Merged
merged 11 commits into from
Feb 21, 2022
2 changes: 1 addition & 1 deletion cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode

ti.findHandleIndex()
ti.initColumnsFlag()
log.Debug("warpped table info", zap.Reflect("tableInfo", ti))
log.Debug("warped table info", zap.Reflect("tableInfo", ti))
return ti
}

Expand Down
5 changes: 4 additions & 1 deletion cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m
err = cerror.ErrExecDDLFailed.GenWithStackByArgs()
})
if err == nil || cerror.ErrDDLEventIgnored.Equal(errors.Cause(err)) {
log.Info("Execute DDL succeeded", zap.String("changefeed", ctx.ChangefeedVars().ID), zap.Bool("ignored", err != nil), zap.Reflect("ddl", ddl))
log.Info("Execute DDL succeeded",
zap.String("changefeed", ctx.ChangefeedVars().ID),
zap.Bool("ignored", err != nil),
zap.Reflect("ddl", ddl))
atomic.StoreUint64(&s.ddlFinishedTs, ddl.CommitTs)
continue
}
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {

stdCtx := util.PutChangefeedIDInCtx(ctx, p.changefeed.ID)
stdCtx = util.PutCaptureAddrInCtx(stdCtx, p.captureInfo.AdvertiseAddr)
stdCtx = util.PutRoleInCtx(stdCtx, util.RoleProcessor)

p.mounter = entry.NewMounter(p.schemaStorage, p.changefeed.Info.Config.Mounter.WorkerNum, p.changefeed.Info.Config.EnableOldValue)
p.wg.Add(1)
Expand Down
30 changes: 25 additions & 5 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/notify"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand All @@ -60,6 +61,9 @@ type mqSink struct {
resolvedReceiver *notify.Receiver

statistics *Statistics

role util.Role
id model.ChangeFeedID
}

func newMqSink(
Expand Down Expand Up @@ -100,6 +104,9 @@ func newMqSink(
return nil, errors.Trace(err)
}

changefeedID := util.ChangefeedIDFromCtx(ctx)
role := util.RoleFromCtx(ctx)

k := &mqSink{
mqProducer: mqProducer,
dispatcher: d,
Expand All @@ -115,6 +122,9 @@ func newMqSink(
resolvedReceiver: resolvedReceiver,

statistics: NewStatistics(ctx, "MQ"),

role: role,
id: changefeedID,
}

go func() {
Expand All @@ -124,7 +134,8 @@ func newMqSink(
return
case errCh <- err:
default:
log.Error("error channel is full", zap.Error(err))
log.Error("error channel is full", zap.Error(err),
zap.String("changefeed", changefeedID), zap.Any("role", k.role))
}
}
}()
Expand All @@ -135,7 +146,10 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha
rowsCount := 0
for _, row := range rows {
if k.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) {
log.Info("Row changed event ignored", zap.Uint64("start-ts", row.StartTs))
log.Info("Row changed event ignored",
zap.Uint64("start-ts", row.StartTs),
zap.String("changefeed", k.id),
zap.Any("role", k.role))
continue
}
partition := k.dispatcher.Dispatch(row)
Expand Down Expand Up @@ -216,6 +230,8 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
zap.String("query", ddl.Query),
zap.Uint64("startTs", ddl.StartTs),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("changefeed", k.id),
zap.Any("role", k.role),
)
return cerror.ErrDDLEventIgnored.GenWithStackByArgs()
}
Expand All @@ -233,7 +249,8 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
}

k.statistics.AddDDLCount()
log.Debug("emit ddl event", zap.String("query", ddl.Query), zap.Uint64("commit-ts", ddl.CommitTs))
log.Debug("emit ddl event", zap.String("query", ddl.Query), zap.Uint64("commitTs", ddl.CommitTs),
zap.String("changefeed", k.id), zap.Any("role", k.role))
err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, -1)
return errors.Trace(err)
}
Expand Down Expand Up @@ -294,7 +311,8 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
return 0, err
}
}
log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize))
log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize),
zap.String("changefeed", k.id), zap.Any("role", k.role))
return thisBatchSize, nil
})
}
Expand Down Expand Up @@ -364,7 +382,9 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage,
log.Warn("writeToProducer called with no-op",
zap.ByteString("key", message.Key),
zap.ByteString("value", message.Value),
zap.Int32("partition", partition))
zap.Int32("partition", partition),
zap.String("changefeed", k.id),
zap.Any("role", k.role))
return nil
}

Expand Down
Loading