Skip to content

Commit

Permalink
cdc/sink: Kafka support user set configuration (#4512) (#4525)
Browse files Browse the repository at this point in the history
close #4385
  • Loading branch information
ti-chi-bot authored Feb 24, 2022
1 parent 19b9177 commit 4d7609c
Show file tree
Hide file tree
Showing 12 changed files with 741 additions and 449 deletions.
6 changes: 2 additions & 4 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ import (
"fmt"

"github.com/pingcap/log"

"go.uber.org/zap"

"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/types"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/rowcodec"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -139,7 +137,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode

ti.findHandleIndex()
ti.initColumnsFlag()
log.Debug("warpped table info", zap.Reflect("tableInfo", ti))
log.Debug("warped table info", zap.Reflect("tableInfo", ti))
return ti
}

Expand Down
13 changes: 9 additions & 4 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,13 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m

start := time.Now()
if err := s.sinkInitHandler(ctx, s, id, info); err != nil {
log.Warn("ddl sink initialize failed", zap.Duration("elapsed", time.Since(start)))
log.Warn("ddl sink initialize failed",
zap.Duration("duration", time.Since(start)))
ctx.Throw(err)
return
}
log.Info("ddl sink initialized, start processing...", zap.Duration("elapsed", time.Since(start)))
log.Info("ddl sink initialized, start processing...",
zap.Duration("duration", time.Since(start)))

// TODO make the tick duration configurable
ticker := time.NewTicker(time.Second)
Expand Down Expand Up @@ -154,13 +156,16 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m
err = cerror.ErrExecDDLFailed.GenWithStackByArgs()
})
if err == nil || cerror.ErrDDLEventIgnored.Equal(errors.Cause(err)) {
log.Info("Execute DDL succeeded", zap.String("changefeed", ctx.ChangefeedVars().ID), zap.Bool("ignored", err != nil), zap.Reflect("ddl", ddl))
log.Info("Execute DDL succeeded",
zap.String("changefeed", ctx.ChangefeedVars().ID),
zap.Bool("ignored", err != nil),
zap.Reflect("ddl", ddl))
atomic.StoreUint64(&s.ddlFinishedTs, ddl.CommitTs)
continue
}
// If DDL executing failed, and the error can not be ignored, throw an error and pause the changefeed
log.Error("Execute DDL failed",
zap.String("ChangeFeedID", ctx.ChangefeedVars().ID),
zap.String("changefeed", ctx.ChangefeedVars().ID),
zap.Error(err),
zap.Reflect("ddl", ddl))
ctx.Throw(errors.Trace(err))
Expand Down
24 changes: 15 additions & 9 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
}

stdCtx := util.PutChangefeedIDInCtx(ctx, p.changefeed.ID)
stdCtx = util.PutCaptureAddrInCtx(stdCtx, p.captureInfo.AdvertiseAddr)
stdCtx = util.PutRoleInCtx(stdCtx, util.RoleProcessor)

p.mounter = entry.NewMounter(p.schemaStorage, p.changefeed.Info.Config.Mounter.WorkerNum, p.changefeed.Info.Config.EnableOldValue)
p.wg.Add(1)
Expand Down Expand Up @@ -804,15 +806,8 @@ func (p *processor) Close() error {
}
p.cancel()
p.wg.Wait()
// mark tables share the same cdcContext with its original table, don't need to cancel
failpoint.Inject("processorStopDelay", nil)
resolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
resolvedTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)

// sink close might be time-consuming, do it the last.
if p.sinkManager != nil {
// pass a canceled context is ok here, since we don't need to wait Close
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -830,6 +825,17 @@ func (p *processor) Close() error {
zap.String("changefeed", p.changefeedID),
zap.Duration("duration", time.Since(start)))
}

// mark tables share the same cdcContext with its original table, don't need to cancel
failpoint.Inject("processorStopDelay", nil)
resolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
resolvedTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)

return nil
}

Expand Down
65 changes: 42 additions & 23 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ type mqSink struct {
resolvedReceiver *notify.Receiver

statistics *Statistics

role util.Role
id model.ChangeFeedID
}

func newMqSink(
ctx context.Context, credential *security.Credential, mqProducer producer.Producer,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string,
errCh chan error,
) (*mqSink, error) {
partitionNum := mqProducer.GetPartitionNum()
partitionInput := make([]chan struct {
Expand All @@ -74,13 +78,13 @@ func newMqSink(
resolvedTs uint64
}, 12800)
}
d, err := dispatcher.NewDispatcher(config, mqProducer.GetPartitionNum())
d, err := dispatcher.NewDispatcher(replicaConfig, mqProducer.GetPartitionNum())
if err != nil {
return nil, errors.Trace(err)
}
notifier := new(notify.Notifier)
var protocol codec.Protocol
protocol.FromString(config.Sink.Protocol)
protocol.FromString(replicaConfig.Sink.Protocol)

newEncoder := codec.NewEventBatchEncoder(protocol)
if protocol == codec.ProtocolAvro {
Expand Down Expand Up @@ -108,7 +112,7 @@ func newMqSink(
avroEncoder.SetTimeZone(util.TimezoneFromCtx(ctx))
return avroEncoder
}
} else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON || protocol == codec.ProtocolMaxwell) && !config.EnableOldValue {
} else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON || protocol == codec.ProtocolMaxwell) && !replicaConfig.EnableOldValue {
log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+
"Please update changefeed config", protocol.String()))
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", protocol.String())))
Expand All @@ -133,7 +137,11 @@ func newMqSink(
if err != nil {
return nil, err
}
k := &mqSink{

changefeedID := util.ChangefeedIDFromCtx(ctx)
role := util.RoleFromCtx(ctx)

s := &mqSink{
mqProducer: mqProducer,
dispatcher: d,
newEncoder: newEncoder,
Expand All @@ -148,27 +156,34 @@ func newMqSink(
resolvedReceiver: resolvedReceiver,

statistics: NewStatistics(ctx, "MQ"),

role: role,
id: changefeedID,
}

go func() {
if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
if err := s.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
select {
case <-ctx.Done():
return
case errCh <- err:
default:
log.Error("error channel is full", zap.Error(err))
log.Error("error channel is full", zap.Error(err),
zap.String("changefeed", changefeedID), zap.Any("role", s.role))
}
}
}()
return k, nil
return s, nil
}

func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
rowsCount := 0
for _, row := range rows {
if k.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) {
log.Info("Row changed event ignored", zap.Uint64("start-ts", row.StartTs))
log.Info("Row changed event ignored",
zap.Uint64("start-ts", row.StartTs),
zap.String("changefeed", k.id),
zap.Any("role", k.role))
continue
}
partition := k.dispatcher.Dispatch(row)
Expand Down Expand Up @@ -246,6 +261,8 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
zap.String("query", ddl.Query),
zap.Uint64("startTs", ddl.StartTs),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("changefeed", k.id),
zap.Any("role", k.role),
)
return cerror.ErrDDLEventIgnored.GenWithStackByArgs()
}
Expand All @@ -260,7 +277,8 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
}

k.statistics.AddDDLCount()
log.Debug("emit ddl event", zap.String("query", ddl.Query), zap.Uint64("commit-ts", ddl.CommitTs))
log.Debug("emit ddl event", zap.String("query", ddl.Query),
zap.Uint64("commitTs", ddl.CommitTs), zap.String("changefeed", k.id), zap.Any("role", k.role))
err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, -1)
return errors.Trace(err)
}
Expand Down Expand Up @@ -318,7 +336,8 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
return 0, err
}
}
log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize))
log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize),
zap.String("changefeed", k.id), zap.Any("role", k.role))
return thisBatchSize, nil
})
}
Expand Down Expand Up @@ -391,18 +410,17 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage,
log.Warn("writeToProducer called with no-op",
zap.ByteString("key", message.Key),
zap.ByteString("value", message.Value),
zap.Int32("partition", partition))
zap.Int32("partition", partition),
zap.String("changefeed", k.id),
zap.Any("role", k.role))
return nil
}

func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) {
scheme := strings.ToLower(sinkURI.Scheme)
if scheme != "kafka" && scheme != "kafka+ssl" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("can't create MQ sink with unsupported scheme: %s", scheme)
}

config := kafka.NewConfig()
if err := config.Initialize(sinkURI, replicaConfig, opts); err != nil {
func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
filter *filter.Filter, replicaConfig *config.ReplicaConfig,
opts map[string]string, errCh chan error) (*mqSink, error) {
producerConfig := kafka.NewConfig()
if err := kafka.CompleteConfigsAndOpts(sinkURI, producerConfig, replicaConfig, opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

Expand All @@ -412,18 +430,19 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
if topic == "" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}
producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, config, opts, errCh)
producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, producerConfig, opts, errCh)
if err != nil {
return nil, errors.Trace(err)
}
sink, err := newMqSink(ctx, config.Credential, producer, filter, replicaConfig, opts, errCh)
sink, err := newMqSink(ctx, producerConfig.Credential, producer, filter, replicaConfig, opts, errCh)
if err != nil {
return nil, errors.Trace(err)
}
return sink, nil
}

func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) {
func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) {
producer, err := pulsar.NewProducer(sinkURI, errCh)
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading

0 comments on commit 4d7609c

Please sign in to comment.