From f68f079fc2e5c74d53558e3f646d28c7c2aa4474 Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 14 Sep 2023 13:30:09 +0800 Subject: [PATCH] This is an automated cherry-pick of #9742 Signed-off-by: ti-chi-bot --- cdc/processor/sinkmanager/manager.go | 21 +++++++++ cdc/processor/sinkmanager/manager_test.go | 15 +++++++ .../sinkmanager/table_sink_wrapper.go | 4 +- cdc/sink/dmlsink/factory/factory.go | 43 ++++++++++++++++++- .../hang_sink_suicide/run.sh | 5 ++- 5 files changed, 82 insertions(+), 6 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index a42d4293cd2..9500ce20e49 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -304,6 +304,12 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er } } +func (m *SinkManager) needsStuckCheck() bool { + m.sinkFactory.Lock() + defer m.sinkFactory.Unlock() + return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ +} + func (m *SinkManager) initSinkFactory() (chan error, uint64) { m.sinkFactory.Lock() defer m.sinkFactory.Unlock() @@ -985,6 +991,7 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { advanceTimeoutInSec = config.DefaultAdvanceTimeoutInSec } stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second +<<<<<<< HEAD if version > 0 && time.Since(advanced) > stuckCheck && oracle.GetTimeFromTS(tableSink.getUpperBoundTs()).Sub(oracle.GetTimeFromTS(checkpointTs.Ts)) > stuckCheck { log.Warn("Table checkpoint is stuck too long, will restart the sink backend", @@ -996,6 +1003,20 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { zap.Uint64("factoryVersion", version)) tableSink.updateTableSinkAdvanced() m.putSinkFactoryError(errors.New("table sink stuck"), version) +======= + + if m.needsStuckCheck() { + isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck) + if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) { + log.Warn("Table checkpoint is stuck too long, will restart the sink backend", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Stringer("span", &span), + zap.Any("checkpointTs", checkpointTs), + zap.Float64("stuckCheck", stuckCheck.Seconds()), + zap.Uint64("factoryVersion", sinkVersion)) + } +>>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)) } var resolvedTs model.Ts diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index d79657e749f..f141e6d471d 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -357,3 +357,18 @@ func TestSinkManagerRunWithErrors(t *testing.T) { log.Panic("must get an error instead of a timeout") } } + +func TestSinkManagerNeedsStuckCheck(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 16) + changefeedInfo := getChangefeedInfo() + manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh) + defer func() { + cancel() + manager.Close() + }() + + require.False(t, manager.needsStuckCheck()) +} diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 1f6f7dca3c7..e75cdfa5f54 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -33,7 +33,7 @@ import ( "go.uber.org/zap" ) -var version uint64 = 0 +var tableSinkWrapperVersion uint64 = 0 // tableSinkWrapper is a wrapper of TableSink, it is used in SinkManager to manage TableSink. // Because in the SinkManager, we write data to TableSink and RedoManager concurrently, @@ -108,7 +108,7 @@ func newTableSinkWrapper( genReplicateTs func(ctx context.Context) (model.Ts, error), ) *tableSinkWrapper { res := &tableSinkWrapper{ - version: atomic.AddUint64(&version, 1), + version: atomic.AddUint64(&tableSinkWrapperVersion, 1), changefeed: changefeed, span: span, tableSinkCreater: tableSinkCreater, diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index ad66527ca6c..25e738cd997 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -36,14 +36,29 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// Category is for different DML sink categories. +type Category = int + +const ( + // CategoryTxn is for Txn sink. + CategoryTxn Category = 1 + // CategoryMQ is for MQ sink. + CategoryMQ = 2 + // CategoryCloudStorage is for CloudStorage sink. + CategoryCloudStorage = 3 + // CategoryBlackhole is for Blackhole sink. + CategoryBlackhole = 4 +) + // SinkFactory is the factory of sink. // It is responsible for creating sink and closing it. // Because there is no way to convert the eventsink.EventSink[*model.RowChangedEvent] // to eventsink.EventSink[eventsink.TableEvent]. // So we have to use this factory to create and store the sink. type SinkFactory struct { - rowSink dmlsink.EventSink[*model.RowChangedEvent] - txnSink dmlsink.EventSink[*model.SingleTableTxn] + rowSink dmlsink.EventSink[*model.RowChangedEvent] + txnSink dmlsink.EventSink[*model.SingleTableTxn] + category Category } // New creates a new SinkFactory by schema. @@ -67,6 +82,7 @@ func New( return nil, err } s.txnSink = txnSink + s.category = CategoryTxn case sink.KafkaScheme, sink.KafkaSSLScheme: factoryCreator := kafka.NewSaramaFactory if cfg.Sink.EnableKafkaSinkV2 { @@ -78,15 +94,30 @@ func New( return nil, err } s.txnSink = mqs + s.category = CategoryMQ case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: storageSink, err := cloudstorage.NewDMLSink(ctx, sinkURI, cfg, errCh) if err != nil { return nil, err } s.txnSink = storageSink + s.category = CategoryCloudStorage case sink.BlackHoleScheme: bs := blackhole.NewDMLSink() s.rowSink = bs +<<<<<<< HEAD +======= + s.category = CategoryBlackhole + case sink.PulsarScheme: + mqs, err := mq.NewPulsarDMLSink(ctx, changefeedID, sinkURI, cfg, errCh, + manager.NewPulsarTopicManager, + pulsarConfig.NewCreatorFactory, dmlproducer.NewPulsarDMLProducer) + if err != nil { + return nil, err + } + s.txnSink = mqs + s.category = CategoryMQ +>>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)) default: return nil, cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema) @@ -143,3 +174,11 @@ func (s *SinkFactory) Close() { s.txnSink.Close() } } + +// Category returns category of s. +func (s *SinkFactory) Category() Category { + if s.category == 0 { + panic("should never happen") + } + return s.category +} diff --git a/tests/integration_tests/hang_sink_suicide/run.sh b/tests/integration_tests/hang_sink_suicide/run.sh index 3489df74e05..e4e663cb975 100644 --- a/tests/integration_tests/hang_sink_suicide/run.sh +++ b/tests/integration_tests/hang_sink_suicide/run.sh @@ -41,6 +41,7 @@ function run() { } trap stop_tidb_cluster EXIT -run $* -check_logs $WORK_DIR +# TODO: update the case to use kafka sink instead of mysql sink. +# run $* +# check_logs $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"