From 5268713fc0e0a3c8cdaad2187b1311aef4b9e5be Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 9 Feb 2022 21:03:36 +0800 Subject: [PATCH] This is an automated cherry-pick of #4359 Signed-off-by: ti-chi-bot --- cdc/model/schema_storage.go | 9 +- cdc/owner/ddl_sink.go | 223 +++++++++++++++++ cdc/processor/processor.go | 34 +++ cdc/sink/mq.go | 60 ++++- cdc/sink/producer/kafka/config.go | 337 ++++++++++++++++++++++++++ cdc/sink/producer/kafka/kafka.go | 191 +++++++++++++-- cdc/sink/producer/kafka/kafka_test.go | 40 ++- cdc/sink/sink.go | 31 +++ cmd/kafka-consumer/main.go | 1 + pkg/applier/redo.go | 229 +++++++++++++++++ pkg/util/ctx.go | 16 ++ pkg/util/identity.go | 47 ++++ 12 files changed, 1194 insertions(+), 24 deletions(-) create mode 100644 cdc/owner/ddl_sink.go create mode 100644 cdc/sink/producer/kafka/config.go create mode 100644 pkg/applier/redo.go create mode 100644 pkg/util/identity.go diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 2238dc3b331..8c4dc441956 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -17,14 +17,21 @@ import ( "fmt" "github.com/pingcap/log" +<<<<<<< HEAD "go.uber.org/zap" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/types" +======= + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/parser/types" +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/rowcodec" + "go.uber.org/zap" ) const ( @@ -139,7 +146,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode ti.findHandleIndex() ti.initColumnsFlag() - log.Debug("warpped table info", zap.Reflect("tableInfo", ti)) + log.Debug("warped table info", zap.Reflect("tableInfo", ti)) return ti } diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go new file mode 100644 index 00000000000..ea89ff101de --- /dev/null +++ b/cdc/owner/ddl_sink.go @@ -0,0 +1,223 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink" + cdcContext "github.com/pingcap/tiflow/pkg/context" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/filter" + "go.uber.org/zap" +) + +const ( + defaultErrChSize = 1024 +) + +// DDLSink is a wrapper of the `Sink` interface for the owner +// DDLSink should send `DDLEvent` and `CheckpointTs` to downstream sink, +// If `SyncPointEnabled`, also send `syncPoint` to downstream. +type DDLSink interface { + // run the DDLSink + run(ctx cdcContext.Context, id model.ChangeFeedID, info *model.ChangeFeedInfo) + // emitCheckpointTs emits the checkpoint Ts to downstream data source + // this function will return after recording the checkpointTs specified in memory immediately + // and the recorded checkpointTs will be sent and updated to downstream data source every second + emitCheckpointTs(ctx cdcContext.Context, ts uint64) + // emitDDLEvent emits DDL event and return true if the DDL is executed + // the DDL event will be sent to another goroutine and execute to downstream + // the caller of this function can call again and again until a true returned + emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) + emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64) error + // close the sink, cancel running goroutine. + close(ctx context.Context) error +} + +type ddlSinkImpl struct { + lastSyncPoint model.Ts + syncPointStore sink.SyncpointStore + + checkpointTs model.Ts + ddlFinishedTs model.Ts + ddlSentTs model.Ts + + ddlCh chan *model.DDLEvent + errCh chan error + + sink sink.Sink + // `sinkInitHandler` can be helpful in unit testing. + sinkInitHandler ddlSinkInitHandler + + // cancel would be used to cancel the goroutine start by `run` + cancel context.CancelFunc + wg sync.WaitGroup +} + +func newDDLSink() DDLSink { + return &ddlSinkImpl{ + ddlCh: make(chan *model.DDLEvent, 1), + errCh: make(chan error, defaultErrChSize), + sinkInitHandler: ddlSinkInitializer, + cancel: func() {}, + } +} + +type ddlSinkInitHandler func(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeFeedID, info *model.ChangeFeedInfo) error + +func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeFeedID, info *model.ChangeFeedInfo) error { + filter, err := filter.NewFilter(info.Config) + if err != nil { + return errors.Trace(err) + } + + s, err := sink.New(ctx, id, info.SinkURI, filter, info.Config, info.Opts, a.errCh) + if err != nil { + return errors.Trace(err) + } + a.sink = s + + if !info.SyncPointEnabled { + return nil + } + syncPointStore, err := sink.NewSyncpointStore(ctx, id, info.SinkURI) + if err != nil { + return errors.Trace(err) + } + a.syncPointStore = syncPointStore + + if err := a.syncPointStore.CreateSynctable(ctx); err != nil { + return errors.Trace(err) + } + return nil +} + +func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *model.ChangeFeedInfo) { + ctx, cancel := cdcContext.WithCancel(ctx) + s.cancel = cancel + + s.wg.Add(1) + go func() { + defer s.wg.Done() + + start := time.Now() + if err := s.sinkInitHandler(ctx, s, id, info); err != nil { + log.Warn("ddl sink initialize failed", + zap.Duration("duration", time.Since(start))) + ctx.Throw(err) + return + } + log.Info("ddl sink initialized, start processing...", + zap.Duration("duration", time.Since(start))) + + // TODO make the tick duration configurable + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + var lastCheckpointTs model.Ts + for { + select { + case <-ctx.Done(): + return + case err := <-s.errCh: + ctx.Throw(err) + return + case <-ticker.C: + checkpointTs := atomic.LoadUint64(&s.checkpointTs) + if checkpointTs == 0 || checkpointTs <= lastCheckpointTs { + continue + } + lastCheckpointTs = checkpointTs + if err := s.sink.EmitCheckpointTs(ctx, checkpointTs); err != nil { + ctx.Throw(errors.Trace(err)) + return + } + case ddl := <-s.ddlCh: + err := s.sink.EmitDDLEvent(ctx, ddl) + failpoint.Inject("InjectChangefeedDDLError", func() { + err = cerror.ErrExecDDLFailed.GenWithStackByArgs() + }) + if err == nil || cerror.ErrDDLEventIgnored.Equal(errors.Cause(err)) { + log.Info("Execute DDL succeeded", + zap.String("changefeed", ctx.ChangefeedVars().ID), + zap.Bool("ignored", err != nil), + zap.Reflect("ddl", ddl)) + atomic.StoreUint64(&s.ddlFinishedTs, ddl.CommitTs) + continue + } + // If DDL executing failed, and the error can not be ignored, throw an error and pause the changefeed + log.Error("Execute DDL failed", + zap.String("changefeed", ctx.ChangefeedVars().ID), + zap.Error(err), + zap.Reflect("ddl", ddl)) + ctx.Throw(errors.Trace(err)) + return + } + } + }() +} + +func (s *ddlSinkImpl) emitCheckpointTs(ctx cdcContext.Context, ts uint64) { + atomic.StoreUint64(&s.checkpointTs, ts) +} + +func (s *ddlSinkImpl) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) { + ddlFinishedTs := atomic.LoadUint64(&s.ddlFinishedTs) + if ddl.CommitTs <= ddlFinishedTs { + // the DDL event is executed successfully, and done is true + return true, nil + } + if ddl.CommitTs <= s.ddlSentTs { + // the DDL event is executing and not finished yet, return false + return false, nil + } + select { + case <-ctx.Done(): + return false, errors.Trace(ctx.Err()) + case s.ddlCh <- ddl: + s.ddlSentTs = ddl.CommitTs + default: + // if this hit, we think that ddlCh is full, + // just return false and send the ddl in the next round. + } + return false, nil +} + +func (s *ddlSinkImpl) emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64) error { + if checkpointTs == s.lastSyncPoint { + return nil + } + s.lastSyncPoint = checkpointTs + // TODO implement async sink syncPoint + return s.syncPointStore.SinkSyncpoint(ctx, ctx.ChangefeedVars().ID, checkpointTs) +} + +func (s *ddlSinkImpl) close(ctx context.Context) (err error) { + s.cancel() + if s.sink != nil { + err = s.sink.Close(ctx) + } + if s.syncPointStore != nil { + err = s.syncPointStore.Close() + } + s.wg.Wait() + return err +} diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 5ec1ccae5d3..70cfdbf0a24 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -263,6 +263,11 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { } stdCtx := util.PutChangefeedIDInCtx(ctx, p.changefeed.ID) +<<<<<<< HEAD +======= + stdCtx = util.PutCaptureAddrInCtx(stdCtx, p.captureInfo.AdvertiseAddr) + stdCtx = util.PutRoleInCtx(stdCtx, util.RoleProcessor) +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) p.mounter = entry.NewMounter(p.schemaStorage, p.changefeed.Info.Config.Mounter.WorkerNum, p.changefeed.Info.Config.EnableOldValue) p.wg.Add(1) @@ -792,6 +797,7 @@ func (p *processor) Close() error { } p.cancel() p.wg.Wait() +<<<<<<< HEAD // mark tables share the same cdcContext with its original table, don't need to cancel failpoint.Inject("processorStopDelay", nil) resolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) @@ -801,12 +807,40 @@ func (p *processor) Close() error { syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) +======= + + if p.newSchedulerEnabled { + if p.agent == nil { + return nil + } + if err := p.agent.Close(); err != nil { + return errors.Trace(err) + } + p.agent = nil + } + + // sink close might be time-consuming, do it the last. +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) if p.sinkManager != nil { // pass a canceled context is ok here, since we don't need to wait Close ctx, cancel := context.WithCancel(context.Background()) cancel() return p.sinkManager.Close(ctx) } +<<<<<<< HEAD +======= + + // mark tables share the same cdcContext with its original table, don't need to cancel + failpoint.Inject("processorStopDelay", nil) + resolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + resolvedTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + checkpointTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + checkpointTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) return nil } diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index deb13f32988..dec44481a54 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -57,11 +57,19 @@ type mqSink struct { resolvedReceiver *notify.Receiver statistics *Statistics + + role util.Role + id model.ChangeFeedID } func newMqSink( ctx context.Context, credential *security.Credential, mqProducer producer.Producer, +<<<<<<< HEAD filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, +======= + filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, + errCh chan error, +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) ) (*mqSink, error) { partitionNum := mqProducer.GetPartitionNum() partitionInput := make([]chan struct { @@ -133,12 +141,25 @@ func newMqSink( if err != nil { return nil, err } +<<<<<<< HEAD k := &mqSink{ mqProducer: mqProducer, dispatcher: d, newEncoder: newEncoder, filter: filter, protocol: protocol, +======= + + changefeedID := util.ChangefeedIDFromCtx(ctx) + role := util.RoleFromCtx(ctx) + + s := &mqSink{ + mqProducer: mqProducer, + dispatcher: d, + encoderBuilder: encoderBuilder, + filter: filter, + protocol: protocol, +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) partitionNum: partitionNum, partitionInput: partitionInput, @@ -148,6 +169,9 @@ func newMqSink( resolvedReceiver: resolvedReceiver, statistics: NewStatistics(ctx, "MQ", opts), + + role: role, + id: changefeedID, } go func() { @@ -157,7 +181,8 @@ func newMqSink( return case errCh <- err: default: - log.Error("error channel is full", zap.Error(err)) + log.Error("error channel is full", zap.Error(err), + zap.String("changefeed", changefeedID), zap.Any("role", s.role)) } } }() @@ -168,7 +193,10 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha rowsCount := 0 for _, row := range rows { if k.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) { - log.Info("Row changed event ignored", zap.Uint64("start-ts", row.StartTs)) + log.Info("Row changed event ignored", + zap.Uint64("start-ts", row.StartTs), + zap.String("changefeed", k.id), + zap.Any("role", k.role)) continue } partition := k.dispatcher.Dispatch(row) @@ -246,6 +274,8 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { zap.String("query", ddl.Query), zap.Uint64("startTs", ddl.StartTs), zap.Uint64("commitTs", ddl.CommitTs), + zap.String("changefeed", k.id), + zap.Any("role", k.role), ) return cerror.ErrDDLEventIgnored.GenWithStackByArgs() } @@ -260,8 +290,15 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { } k.statistics.AddDDLCount() +<<<<<<< HEAD log.Debug("emit ddl event", zap.String("query", ddl.Query), zap.Uint64("commit-ts", ddl.CommitTs)) err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, -1) +======= + log.Debug("emit ddl event", zap.String("query", ddl.Query), + zap.Uint64("commitTs", ddl.CommitTs), zap.Int32("partition", partition), + zap.String("changefeed", k.id), zap.Any("role", k.role)) + err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, partition) +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) return errors.Trace(err) } @@ -318,7 +355,8 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error { return 0, err } } - log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize)) + log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize), + zap.String("changefeed", k.id), zap.Any("role", k.role)) return thisBatchSize, nil }) } @@ -391,14 +429,25 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, log.Warn("writeToProducer called with no-op", zap.ByteString("key", message.Key), zap.ByteString("value", message.Value), - zap.Int32("partition", partition)) + zap.Int32("partition", partition), + zap.String("changefeed", k.id), + zap.Any("role", k.role)) return nil } +<<<<<<< HEAD func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) { scheme := strings.ToLower(sinkURI.Scheme) if scheme != "kafka" && scheme != "kafka+ssl" { return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("can't create MQ sink with unsupported scheme: %s", scheme) +======= +func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, + filter *filter.Filter, replicaConfig *config.ReplicaConfig, + opts map[string]string, errCh chan error) (*mqSink, error) { + producerConfig := kafka.NewConfig() + if err := kafka.CompleteConfigsAndOpts(sinkURI, producerConfig, replicaConfig, opts); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) } config := kafka.NewConfig() @@ -424,7 +473,8 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi return sink, nil } -func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) { +func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, + replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) { producer, err := pulsar.NewProducer(sinkURI, errCh) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/sink/producer/kafka/config.go b/cdc/sink/producer/kafka/config.go new file mode 100644 index 00000000000..9a3d1e029d1 --- /dev/null +++ b/cdc/sink/producer/kafka/config.go @@ -0,0 +1,337 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "net/url" + "strconv" + "strings" + "time" + + "github.com/Shopify/sarama" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/security" + "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" +) + +// Config stores user specified Kafka producer configuration +type Config struct { + BrokerEndpoints []string + PartitionNum int32 + + // User should make sure that `replication-factor` not greater than the number of kafka brokers. + ReplicationFactor int16 + + Version string + MaxMessageBytes int + Compression string + ClientID string + Credential *security.Credential + SaslScram *security.SaslScram + // control whether to create topic + AutoCreate bool + + // Timeout for sarama `config.Net` configurations, default to `10s` + DialTimeout time.Duration + WriteTimeout time.Duration + ReadTimeout time.Duration +} + +// NewConfig returns a default Kafka configuration +func NewConfig() *Config { + return &Config{ + Version: "2.4.0", + // MaxMessageBytes will be used to initialize producer + MaxMessageBytes: config.DefaultMaxMessageBytes, + ReplicationFactor: 1, + Compression: "none", + Credential: &security.Credential{}, + SaslScram: &security.SaslScram{}, + AutoCreate: true, + DialTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, + } +} + +// set the partition-num by the topic's partition count. +func (c *Config) setPartitionNum(realPartitionCount int32) error { + // user does not specify the `partition-num` in the sink-uri + if c.PartitionNum == 0 { + c.PartitionNum = realPartitionCount + return nil + } + + if c.PartitionNum < realPartitionCount { + log.Warn("number of partition specified in sink-uri is less than that of the actual topic. "+ + "Some partitions will not have messages dispatched to", + zap.Int32("sink-uri partitions", c.PartitionNum), + zap.Int32("topic partitions", realPartitionCount)) + return nil + } + + // Make sure that the user-specified `partition-num` is not greater than + // the real partition count, since messages would be dispatched to different + // partitions, this could prevent potential correctness problems. + if c.PartitionNum > realPartitionCount { + return cerror.ErrKafkaInvalidPartitionNum.GenWithStack( + "the number of partition (%d) specified in sink-uri is more than that of actual topic (%d)", + c.PartitionNum, realPartitionCount) + } + return nil +} + +// CompleteConfigsAndOpts the kafka producer configuration, replication configuration and opts. +func CompleteConfigsAndOpts(sinkURI *url.URL, producerConfig *Config, replicaConfig *config.ReplicaConfig, opts map[string]string) error { + producerConfig.BrokerEndpoints = strings.Split(sinkURI.Host, ",") + params := sinkURI.Query() + s := params.Get("partition-num") + if s != "" { + a, err := strconv.ParseInt(s, 10, 32) + if err != nil { + return err + } + producerConfig.PartitionNum = int32(a) + if producerConfig.PartitionNum <= 0 { + return cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(producerConfig.PartitionNum) + } + } + + s = params.Get("replication-factor") + if s != "" { + a, err := strconv.ParseInt(s, 10, 16) + if err != nil { + return err + } + producerConfig.ReplicationFactor = int16(a) + } + + s = params.Get("kafka-version") + if s != "" { + producerConfig.Version = s + } + + s = params.Get("max-message-bytes") + if s != "" { + a, err := strconv.Atoi(s) + if err != nil { + return err + } + producerConfig.MaxMessageBytes = a + opts["max-message-bytes"] = s + } + + s = params.Get("max-batch-size") + if s != "" { + opts["max-batch-size"] = s + } + + s = params.Get("compression") + if s != "" { + producerConfig.Compression = s + } + + producerConfig.ClientID = params.Get("kafka-client-id") + + s = params.Get("ca") + if s != "" { + producerConfig.Credential.CAPath = s + } + + s = params.Get("cert") + if s != "" { + producerConfig.Credential.CertPath = s + } + + s = params.Get("key") + if s != "" { + producerConfig.Credential.KeyPath = s + } + + s = params.Get("sasl-user") + if s != "" { + producerConfig.SaslScram.SaslUser = s + } + + s = params.Get("sasl-password") + if s != "" { + producerConfig.SaslScram.SaslPassword = s + } + + s = params.Get("sasl-mechanism") + if s != "" { + producerConfig.SaslScram.SaslMechanism = s + } + + s = params.Get("auto-create-topic") + if s != "" { + autoCreate, err := strconv.ParseBool(s) + if err != nil { + return err + } + producerConfig.AutoCreate = autoCreate + } + + s = params.Get(config.ProtocolKey) + if s != "" { + replicaConfig.Sink.Protocol = s + } + + s = params.Get("enable-tidb-extension") + if s != "" { + _, err := strconv.ParseBool(s) + if err != nil { + return err + } + if replicaConfig.Sink.Protocol != "canal-json" { + return cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("enable-tidb-extension only support canal-json protocol")) + } + opts["enable-tidb-extension"] = s + } + + s = params.Get("dial-timeout") + if s != "" { + a, err := time.ParseDuration(s) + if err != nil { + return err + } + producerConfig.DialTimeout = a + } + + s = params.Get("write-timeout") + if s != "" { + a, err := time.ParseDuration(s) + if err != nil { + return err + } + producerConfig.WriteTimeout = a + } + + s = params.Get("read-timeout") + if s != "" { + a, err := time.ParseDuration(s) + if err != nil { + return err + } + producerConfig.ReadTimeout = a + } + + return nil +} + +// newSaramaConfig return the default config and set the according version and metrics +func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { + config := sarama.NewConfig() + + version, err := sarama.ParseKafkaVersion(c.Version) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidVersion, err) + } + var role string + if util.IsOwnerFromCtx(ctx) { + role = "owner" + } else { + role = "processor" + } + captureAddr := util.CaptureAddrFromCtx(ctx) + changefeedID := util.ChangefeedIDFromCtx(ctx) + + config.ClientID, err = kafkaClientID(role, captureAddr, changefeedID, c.ClientID) + if err != nil { + return nil, errors.Trace(err) + } + config.Version = version + + // Producer fetch metadata from brokers frequently, if metadata cannot be + // refreshed easily, this would indicate the network condition between the + // capture server and kafka broker is not good. + // In the scenario that cannot get response from Kafka server, this default + // setting can help to get response more quickly. + config.Metadata.Retry.Max = 1 + config.Metadata.Retry.Backoff = 100 * time.Millisecond + // This Timeout is useless if the `RefreshMetadata` time cost is less than it. + config.Metadata.Timeout = 1 * time.Minute + + // Admin.Retry take effect on `ClusterAdmin` related operations, + // only `CreateTopic` for cdc now. set the `Timeout` to `1m` to make CI stable. + config.Admin.Retry.Max = 5 + config.Admin.Retry.Backoff = 100 * time.Millisecond + config.Admin.Timeout = 1 * time.Minute + + // Producer.Retry take effect when the producer try to send message to kafka + // brokers. If kafka cluster is healthy, just the default value should be enough. + // For kafka cluster with a bad network condition, producer should not try to + // waster too much time on sending a message, get response no matter success + // or fail as soon as possible is preferred. + config.Producer.Retry.Max = 3 + config.Producer.Retry.Backoff = 100 * time.Millisecond + + // make sure sarama producer flush messages as soon as possible. + config.Producer.Flush.Bytes = 0 + config.Producer.Flush.Messages = 0 + config.Producer.Flush.Frequency = time.Duration(0) + + config.Net.DialTimeout = c.DialTimeout + config.Net.WriteTimeout = c.WriteTimeout + config.Net.ReadTimeout = c.ReadTimeout + + config.Producer.Partitioner = sarama.NewManualPartitioner + config.Producer.MaxMessageBytes = c.MaxMessageBytes + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + config.Producer.RequiredAcks = sarama.WaitForAll + switch strings.ToLower(strings.TrimSpace(c.Compression)) { + case "none": + config.Producer.Compression = sarama.CompressionNone + case "gzip": + config.Producer.Compression = sarama.CompressionGZIP + case "snappy": + config.Producer.Compression = sarama.CompressionSnappy + case "lz4": + config.Producer.Compression = sarama.CompressionLZ4 + case "zstd": + config.Producer.Compression = sarama.CompressionZSTD + default: + log.Warn("Unsupported compression algorithm", zap.String("compression", c.Compression)) + config.Producer.Compression = sarama.CompressionNone + } + + if c.Credential != nil && len(c.Credential.CAPath) != 0 { + config.Net.TLS.Enable = true + config.Net.TLS.Config, err = c.Credential.ToTLSConfig() + if err != nil { + return nil, errors.Trace(err) + } + } + if c.SaslScram != nil && len(c.SaslScram.SaslUser) != 0 { + config.Net.SASL.Enable = true + config.Net.SASL.User = c.SaslScram.SaslUser + config.Net.SASL.Password = c.SaslScram.SaslPassword + config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SaslScram.SaslMechanism) + if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-256") { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA256} } + } else if strings.EqualFold(c.SaslScram.SaslMechanism, "SCRAM-SHA-512") { + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &security.XDGSCRAMClient{HashGeneratorFcn: security.SHA512} } + } else { + return nil, errors.New("Unsupported sasl-mechanism, should be SCRAM-SHA-256 or SCRAM-SHA-512") + } + } + + return config, err +} diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index b275639a015..ef2b936d98f 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -28,11 +28,15 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/codec" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/notify" +<<<<<<< HEAD "github.com/pingcap/tiflow/pkg/security" +======= +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -171,13 +175,15 @@ func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfi } type kafkaSaramaProducer struct { - // clientLock is used to protect concurrent access of asyncClient and syncClient. + // clientLock is used to protect concurrent access of asyncProducer and syncProducer. // Since we don't close these two clients (which have an input chan) from the // sender routine, data race or send on closed chan could happen. - clientLock sync.RWMutex - asyncClient sarama.AsyncProducer - syncClient sarama.SyncProducer - // producersReleased records whether asyncClient and syncClient have been closed properly + clientLock sync.RWMutex + client sarama.Client + asyncProducer sarama.AsyncProducer + syncProducer sarama.SyncProducer + + // producersReleased records whether asyncProducer and syncProducer have been closed properly producersReleased bool topic string partitionNum int32 @@ -194,6 +200,9 @@ type kafkaSaramaProducer struct { closeCh chan struct{} // atomic flag indicating whether the producer is closing closing kafkaProducerClosingFlag + + role util.Role + id model.ChangeFeedID } type kafkaProducerClosingFlag = int32 @@ -224,14 +233,15 @@ func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, message *codec.MQ failpoint.Inject("KafkaSinkAsyncSendError", func() { // simulate sending message to input channel successfully but flushing // message to Kafka meets error - log.Info("failpoint error injected") + log.Info("failpoint error injected", zap.String("changefeed", k.id), zap.Any("role", k.role)) k.failpointCh <- errors.New("kafka sink injected error") failpoint.Return(nil) }) failpoint.Inject("SinkFlushDMLPanic", func() { time.Sleep(time.Second) - log.Panic("SinkFlushDMLPanic") + log.Panic("SinkFlushDMLPanic", + zap.String("changefeed", k.id), zap.Any("role", k.role)) }) select { @@ -239,7 +249,7 @@ func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, message *codec.MQ return ctx.Err() case <-k.closeCh: return nil - case k.asyncClient.Input() <- msg: + case k.asyncProducer.Input() <- msg: } return nil } @@ -262,7 +272,7 @@ func (k *kafkaSaramaProducer) SyncBroadcastMessage(ctx context.Context, message case <-k.closeCh: return nil default: - err := k.syncClient.SendMessages(msgs) + err := k.syncProducer.SendMessages(msgs) return cerror.WrapError(cerror.ErrKafkaSendMessage, err) } } @@ -324,11 +334,16 @@ func (k *kafkaSaramaProducer) stop() { if atomic.SwapInt32(&k.closing, kafkaProducerClosing) == kafkaProducerClosing { return } +<<<<<<< HEAD +======= + log.Info("kafka producer closing...", zap.String("changefeed", k.id), zap.Any("role", k.role)) +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) close(k.closeCh) } // Close closes the sync and async clients. func (k *kafkaSaramaProducer) Close() error { + log.Info("stop the kafka producer", zap.String("changefeed", k.id), zap.Any("role", k.role)) k.stop() k.clientLock.Lock() @@ -340,6 +355,7 @@ func (k *kafkaSaramaProducer) Close() error { return nil } k.producersReleased = true +<<<<<<< HEAD // In fact close sarama sync client doesn't return any error. // But close async client returns error if error channel is not empty, we // don't populate this error to the upper caller, just add a log here. @@ -350,6 +366,44 @@ func (k *kafkaSaramaProducer) Close() error { } if err2 != nil { log.Error("close async client with error", zap.Error(err2)) +======= + + // `client` is mainly used by `asyncProducer` to fetch metadata and other related + // operations. When we close the `kafkaSaramaProducer`, TiCDC no need to make sure + // that buffered messages flushed. + // Consider the situation that the broker does not respond, If the client is not + // closed, `asyncProducer.Close()` would waste a mount of time to try flush all messages. + // To prevent the scenario mentioned above, close client first. + start := time.Now() + if err := k.client.Close(); err != nil { + log.Error("close sarama client with error", zap.Error(err), + zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) + } else { + log.Info("sarama client closed", zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) + } + + start = time.Now() + err := k.asyncProducer.Close() + if err != nil { + log.Error("close async client with error", zap.Error(err), + zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) + } else { + log.Info("async client closed", zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) + } + start = time.Now() + err = k.syncProducer.Close() + if err != nil { + log.Error("close sync client with error", zap.Error(err), + zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) + } else { + log.Info("sync client closed", zap.Duration("duration", time.Since(start)), + zap.String("changefeed", k.id), zap.Any("role", k.role)) +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) } return nil } @@ -357,6 +411,8 @@ func (k *kafkaSaramaProducer) Close() error { func (k *kafkaSaramaProducer) run(ctx context.Context) error { defer func() { k.flushedReceiver.Stop() + log.Info("stop the kafka producer", + zap.String("changefeed", k.id), zap.Any("role", k.role)) k.stop() }() for { @@ -366,16 +422,17 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { case <-k.closeCh: return nil case err := <-k.failpointCh: - log.Warn("receive from failpoint chan", zap.Error(err)) + log.Warn("receive from failpoint chan", zap.Error(err), + zap.String("changefeed", k.id), zap.Any("role", k.role)) return err - case msg := <-k.asyncClient.Successes(): + case msg := <-k.asyncProducer.Successes(): if msg == nil || msg.Metadata == nil { continue } flushedOffset := msg.Metadata.(uint64) atomic.StoreUint64(&k.partitionOffset[msg.Partition].flushed, flushedOffset) k.flushedNotifier.Notify() - case err := <-k.asyncClient.Errors(): + case err := <-k.asyncProducer.Errors(): // We should not wrap a nil pointer if the pointer is of a subtype of `error` // because Go would store the type info and the resulted `error` variable would not be nil, // which will cause the pkg/error library to malfunction. @@ -387,21 +444,125 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { } } +<<<<<<< HEAD func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) error { // FIXME: find a way to remove this failpoint for workload the unit test failpoint.Inject("SkipTopicAutoCreate", func() { failpoint.Return(nil) }) admin, err := sarama.NewClusterAdmin(config.BrokerEndpoints, saramaConfig) +======= +var ( + newSaramaConfigImpl = newSaramaConfig + NewAdminClientImpl kafka.ClusterAdminClientCreator = kafka.NewSaramaAdminClient +) + +// NewKafkaSaramaProducer creates a kafka sarama producer +func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, + opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) { + changefeedID := util.ChangefeedIDFromCtx(ctx) + role := util.RoleFromCtx(ctx) + log.Info("Starting kafka sarama producer ...", zap.Any("config", config), + zap.String("changefeed", changefeedID), zap.Any("role", role)) + + cfg, err := newSaramaConfigImpl(ctx, config) + if err != nil { + return nil, err + } + + admin, err := NewAdminClientImpl(config.BrokerEndpoints, cfg) +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) if err != nil { return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } defer func() { if err := admin.Close(); err != nil { - log.Warn("close kafka cluster admin failed", zap.Error(err)) + log.Warn("close kafka cluster admin failed", zap.Error(err), + zap.String("changefeed", changefeedID), zap.Any("role", role)) + } + }() + +<<<<<<< HEAD +======= + if err := validateAndCreateTopic(admin, topic, config, cfg, opts); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + client, err := sarama.NewClient(config.BrokerEndpoints, cfg) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + asyncProducer, err := sarama.NewAsyncProducerFromClient(client) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + syncProducer, err := sarama.NewSyncProducerFromClient(client) + if err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + + notifier := new(notify.Notifier) + flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond) + if err != nil { + return nil, err + } + k := &kafkaSaramaProducer{ + client: client, + asyncProducer: asyncProducer, + syncProducer: syncProducer, + topic: topic, + partitionNum: config.PartitionNum, + partitionOffset: make([]struct { + flushed uint64 + sent uint64 + }, config.PartitionNum), + flushedNotifier: notifier, + flushedReceiver: flushedReceiver, + closeCh: make(chan struct{}), + failpointCh: make(chan error, 1), + closing: kafkaProducerRunning, + + id: changefeedID, + role: role, + } + go func() { + if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled { + select { + case <-ctx.Done(): + return + case errCh <- err: + default: + log.Error("error channel is full", zap.Error(err), + zap.String("changefeed", k.id), zap.Any("role", role)) + } } }() + return k, nil +} + +var ( + validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) + commonInvalidChar = regexp.MustCompile(`[\?:,"]`) +) + +func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (clientID string, err error) { + if configuredClientID != "" { + clientID = configuredClientID + } else { + clientID = fmt.Sprintf("TiCDC_sarama_producer_%s_%s_%s", role, captureAddr, changefeedID) + clientID = commonInvalidChar.ReplaceAllString(clientID, "_") + } + if !validClientID.MatchString(clientID) { + return "", cerror.ErrKafkaInvalidClientID.GenWithStackByArgs(clientID) + } + return +} +func validateAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config *Config, saramaConfig *sarama.Config, + opts map[string]string) error { +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) topics, err := admin.ListTopics() if err != nil { return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) @@ -417,7 +578,7 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) } if topicMaxMessageBytes < config.MaxMessageBytes { - log.Warn("topic's `max.message.bytes` less than the user set `max-message-bytes`,"+ + log.Warn("topic's `max.message.bytes` less than the `max-message-bytes`,"+ "use topic's `max.message.bytes` to initialize the Kafka producer", zap.Int("max.message.bytes", topicMaxMessageBytes), zap.Int("max-message-bytes", config.MaxMessageBytes)) @@ -452,7 +613,7 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) // TiCDC need to make sure that the producer's `MaxMessageBytes` won't larger than // broker's `message.max.bytes`. if brokerMessageMaxBytes < config.MaxMessageBytes { - log.Warn("broker's `message.max.bytes` less than the user set `max-message-bytes`,"+ + log.Warn("broker's `message.max.bytes` less than the `max-message-bytes`,"+ "use broker's `message.max.bytes` to initialize the Kafka producer", zap.Int("message.max.bytes", brokerMessageMaxBytes), zap.Int("max-message-bytes", config.MaxMessageBytes)) diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index a413af791a5..5f674396517 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -27,9 +27,13 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/cdc/sink/codec" +<<<<<<< HEAD "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/security" +======= + "github.com/pingcap/tiflow/pkg/kafka" +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) "github.com/pingcap/tiflow/pkg/util" "github.com/pingcap/tiflow/pkg/util/testleak" ) @@ -117,7 +121,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - leader.Returns(metadataResponse) + // Response for `sarama.NewClient` leader.Returns(metadataResponse) prodSuccess := new(sarama.ProduceResponse) @@ -152,7 +156,13 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") }() +<<<<<<< HEAD producer, err := NewKafkaSaramaProducer(ctx, topic, config, map[string]string{}, errCh) +======= + opts := make(map[string]string) + ctx = util.PutRoleInCtx(ctx, util.RoleTester) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) c.Assert(err, check.IsNil) c.Assert(producer.GetPartitionNum(), check.Equals, int32(2)) for i := 0; i < 100; i++ { @@ -355,8 +365,18 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { config.Version = "invalid" config.BrokerEndpoints = []string{"127.0.0.1:1111"} topic := "topic" +<<<<<<< HEAD c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) _, err := NewKafkaSaramaProducer(ctx, topic, config, map[string]string{}, errCh) +======= + NewAdminClientImpl = kafka.NewMockAdminClient + defer func() { + NewAdminClientImpl = kafka.NewSaramaAdminClient + }() + opts := make(map[string]string) + ctx = util.PutRoleInCtx(ctx, util.RoleTester) + _, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") @@ -374,7 +394,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - leader.Returns(metadataResponse) + // Response for `sarama.NewClient` leader.Returns(metadataResponse) config := NewConfig() @@ -402,7 +422,14 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { }() errCh := make(chan error, 1) +<<<<<<< HEAD producer, err := NewKafkaSaramaProducer(ctx, topic, config, map[string]string{}, errCh) +======= + opts := make(map[string]string) + ctx = util.PutRoleInCtx(ctx, util.RoleTester) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) + c.Assert(opts, check.HasKey, "max-message-bytes") +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") err := producer.Close() @@ -452,7 +479,7 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) - leader.Returns(metadataResponse) + // Response for `sarama.NewClient` leader.Returns(metadataResponse) config := NewConfig() @@ -467,7 +494,14 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) errCh := make(chan error, 1) +<<<<<<< HEAD producer, err := NewKafkaSaramaProducer(ctx, topic, config, map[string]string{}, errCh) +======= + opts := make(map[string]string) + ctx = util.PutRoleInCtx(ctx, util.RoleTester) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) + c.Assert(opts, check.HasKey, "max-message-bytes") +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) defer func() { err := producer.Close() c.Assert(err, check.IsNil) diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index b48dd2fa4e1..27f41c4e202 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/util" ) // Sink options keys @@ -116,3 +117,33 @@ func NewSink(ctx context.Context, changefeedID model.ChangeFeedID, sinkURIStr st } return nil, cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", sinkURI.Scheme) } +<<<<<<< HEAD +======= + +// Validate sink if given valid parameters. +func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig, opts map[string]string) error { + sinkFilter, err := filter.NewFilter(cfg) + if err != nil { + return err + } + errCh := make(chan error) + ctx = util.PutRoleInCtx(ctx, util.RoleClient) + // TODO: find a better way to verify a sinkURI is valid + s, err := New(ctx, "sink-verify", sinkURI, sinkFilter, cfg, opts, errCh) + if err != nil { + return err + } + err = s.Close(ctx) + if err != nil { + return err + } + select { + case err = <-errCh: + if err != nil { + return err + } + default: + } + return nil +} +>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359)) diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index e1d954572a0..a655b178fcd 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -332,6 +332,7 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { } c.sinks = make([]*partitionSink, kafkaPartitionNum) ctx, cancel := context.WithCancel(ctx) + ctx = util.PutRoleInCtx(ctx, util.RoleKafkaConsumer) errCh := make(chan error, 1) opts := map[string]string{} for i := 0; i < int(kafkaPartitionNum); i++ { diff --git a/pkg/applier/redo.go b/pkg/applier/redo.go new file mode 100644 index 00000000000..59de745105b --- /dev/null +++ b/pkg/applier/redo.go @@ -0,0 +1,229 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package applier + +import ( + "context" + "net/url" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/redo" + "github.com/pingcap/tiflow/cdc/redo/reader" + "github.com/pingcap/tiflow/cdc/sink" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/util" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +const ( + applierChangefeed = "redo-applier" + emitBatch = sink.DefaultMaxTxnRow + readBatch = sink.DefaultWorkerCount * emitBatch +) + +var errApplyFinished = errors.New("apply finished, can exit safely") + +// RedoApplierConfig is the configuration used by a redo log applier +type RedoApplierConfig struct { + SinkURI string + Storage string + Dir string +} + +// RedoApplier implements a redo log applier +type RedoApplier struct { + cfg *RedoApplierConfig + + rd reader.RedoLogReader + errCh chan error +} + +// NewRedoApplier creates a new RedoApplier instance +func NewRedoApplier(cfg *RedoApplierConfig) *RedoApplier { + return &RedoApplier{ + cfg: cfg, + } +} + +// toLogReaderConfig is an adapter to translate from applier config to redo reader config +// returns storageType, *reader.toLogReaderConfig and error +func (rac *RedoApplierConfig) toLogReaderConfig() (string, *reader.LogReaderConfig, error) { + uri, err := url.Parse(rac.Storage) + if err != nil { + return "", nil, cerror.WrapError(cerror.ErrConsistentStorage, err) + } + cfg := &reader.LogReaderConfig{ + Dir: uri.Path, + S3Storage: redo.IsS3StorageEnabled(uri.Scheme), + } + if cfg.S3Storage { + cfg.S3URI = *uri + // If use s3 as backend, applier will download redo logs to local dir. + cfg.Dir = rac.Dir + } + return uri.Scheme, cfg, nil +} + +func (ra *RedoApplier) catchError(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case err := <-ra.errCh: + return err + } + } +} + +func (ra *RedoApplier) consumeLogs(ctx context.Context) error { + checkpointTs, resolvedTs, err := ra.rd.ReadMeta(ctx) + if err != nil { + return err + } + err = ra.rd.ResetReader(ctx, checkpointTs, resolvedTs) + if err != nil { + return err + } + log.Info("apply redo log starts", zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("resolvedTs", resolvedTs)) + + // MySQL sink will use the following replication config + // - EnableOldValue: default true + // - ForceReplicate: default false + // - filter: default []string{"*.*"} + replicaConfig := config.GetDefaultReplicaConfig() + ft, err := filter.NewFilter(replicaConfig) + if err != nil { + return err + } + opts := map[string]string{} + ctx = util.PutRoleInCtx(ctx, util.RoleRedoLogApplier) + s, err := sink.New(ctx, applierChangefeed, ra.cfg.SinkURI, ft, replicaConfig, opts, ra.errCh) + if err != nil { + return err + } + defer func() { + ra.rd.Close() //nolint:errcheck + s.Close(ctx) //nolint:errcheck + }() + + // TODO: split events for large transaction + // We use lastSafeResolvedTs and lastResolvedTs to ensure the events in one + // transaction are flushed in a single batch. + // lastSafeResolvedTs records the max resolved ts of a closed transaction. + // Closed transaction means all events of this transaction have been received. + lastSafeResolvedTs := checkpointTs - 1 + // lastResolvedTs records the max resolved ts we have seen from redo logs. + lastResolvedTs := checkpointTs + cachedRows := make([]*model.RowChangedEvent, 0, emitBatch) + tableResolvedTsMap := make(map[model.TableID]model.Ts) + for { + redoLogs, err := ra.rd.ReadNextLog(ctx, readBatch) + if err != nil { + return err + } + if len(redoLogs) == 0 { + break + } + + for _, redoLog := range redoLogs { + tableID := redoLog.Row.Table.TableID + if _, ok := tableResolvedTsMap[redoLog.Row.Table.TableID]; !ok { + tableResolvedTsMap[tableID] = lastSafeResolvedTs + } + if len(cachedRows) >= emitBatch { + err := s.EmitRowChangedEvents(ctx, cachedRows...) + if err != nil { + return err + } + cachedRows = make([]*model.RowChangedEvent, 0, emitBatch) + } + cachedRows = append(cachedRows, redo.LogToRow(redoLog)) + + if redoLog.Row.CommitTs > tableResolvedTsMap[tableID] { + tableResolvedTsMap[tableID], lastResolvedTs = lastResolvedTs, redoLog.Row.CommitTs + } + } + + for tableID, tableLastResolvedTs := range tableResolvedTsMap { + _, err = s.FlushRowChangedEvents(ctx, tableID, tableLastResolvedTs) + if err != nil { + return err + } + } + } + err = s.EmitRowChangedEvents(ctx, cachedRows...) + if err != nil { + return err + } + + for tableID := range tableResolvedTsMap { + _, err = s.FlushRowChangedEvents(ctx, tableID, resolvedTs) + if err != nil { + return err + } + err = s.Barrier(ctx, tableID) + if err != nil { + return err + } + } + return errApplyFinished +} + +var createRedoReader = createRedoReaderImpl + +func createRedoReaderImpl(ctx context.Context, cfg *RedoApplierConfig) (reader.RedoLogReader, error) { + storageType, readerCfg, err := cfg.toLogReaderConfig() + if err != nil { + return nil, err + } + return redo.NewRedoReader(ctx, storageType, readerCfg) +} + +// ReadMeta creates a new redo applier and read meta from reader +func (ra *RedoApplier) ReadMeta(ctx context.Context) (checkpointTs uint64, resolvedTs uint64, err error) { + rd, err := createRedoReader(ctx, ra.cfg) + if err != nil { + return 0, 0, err + } + return rd.ReadMeta(ctx) +} + +// Apply applies redo log to given target +func (ra *RedoApplier) Apply(ctx context.Context) error { + rd, err := createRedoReader(ctx, ra.cfg) + if err != nil { + return err + } + ra.rd = rd + ra.errCh = make(chan error, 1024) + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + return ra.consumeLogs(ctx) + }) + wg.Go(func() error { + return ra.catchError(ctx) + }) + + err = wg.Wait() + if errors.Cause(err) != errApplyFinished { + return err + } + return nil +} diff --git a/pkg/util/ctx.go b/pkg/util/ctx.go index 492b00f8b4e..4f68af2cf33 100644 --- a/pkg/util/ctx.go +++ b/pkg/util/ctx.go @@ -31,6 +31,7 @@ const ( ctxKeyIsOwner = ctxKey("isOwner") ctxKeyTimezone = ctxKey("timezone") ctxKeyKVStorage = ctxKey("kvStorage") + ctxKeyRole = ctxKey("role") ) // CaptureAddrFromCtx returns a capture ID stored in the specified context. @@ -121,6 +122,21 @@ func PutChangefeedIDInCtx(ctx context.Context, changefeedID string) context.Cont return context.WithValue(ctx, ctxKeyChangefeedID, changefeedID) } +// RoleFromCtx returns a role stored in the specified context. +// It returns RoleUnknown if there's no valid role found +func RoleFromCtx(ctx context.Context) Role { + role, ok := ctx.Value(ctxKeyRole).(Role) + if !ok { + return RoleUnknown + } + return role +} + +// PutRoleInCtx return a new child context with the specified role stored. +func PutRoleInCtx(ctx context.Context, role Role) context.Context { + return context.WithValue(ctx, ctxKeyRole, role) +} + // ZapFieldCapture returns a zap field containing capture address // TODO: log redact for capture address func ZapFieldCapture(ctx context.Context) zap.Field { diff --git a/pkg/util/identity.go b/pkg/util/identity.go new file mode 100644 index 00000000000..5b0abe38a80 --- /dev/null +++ b/pkg/util/identity.go @@ -0,0 +1,47 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +// Role is the operator role, mainly used for logging at the moment. +type Role int + +const ( + RoleOwner Role = iota + RoleProcessor + RoleClient + RoleRedoLogApplier + RoleKafkaConsumer + RoleTester + RoleUnknown +) + +func (r Role) String() string { + switch r { + case RoleOwner: + return "owner" + case RoleProcessor: + return "processor" + case RoleClient: + return "cdc-client" + case RoleKafkaConsumer: + return "kafka-consumer" + case RoleRedoLogApplier: + return "redo-applier" + case RoleTester: + return "tester" + case RoleUnknown: + return "unknown" + } + return "unknown" +}