Skip to content

Commit

Permalink
This is an automated cherry-pick of #9742
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
hicqu authored and ti-chi-bot committed Sep 14, 2023
1 parent b9dcc64 commit f68f079
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 6 deletions.
21 changes: 21 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
}
}

func (m *SinkManager) needsStuckCheck() bool {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ
}

func (m *SinkManager) initSinkFactory() (chan error, uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
Expand Down Expand Up @@ -985,6 +991,7 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
advanceTimeoutInSec = config.DefaultAdvanceTimeoutInSec
}
stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second
<<<<<<< HEAD
if version > 0 && time.Since(advanced) > stuckCheck &&
oracle.GetTimeFromTS(tableSink.getUpperBoundTs()).Sub(oracle.GetTimeFromTS(checkpointTs.Ts)) > stuckCheck {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
Expand All @@ -996,6 +1003,20 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
zap.Uint64("factoryVersion", version))
tableSink.updateTableSinkAdvanced()
m.putSinkFactoryError(errors.New("table sink stuck"), version)
=======

if m.needsStuckCheck() {
isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", sinkVersion))
}
>>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742))
}

var resolvedTs model.Ts
Expand Down
15 changes: 15 additions & 0 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,18 @@ func TestSinkManagerRunWithErrors(t *testing.T) {
log.Panic("must get an error instead of a timeout")
}
}

func TestSinkManagerNeedsStuckCheck(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

require.False(t, manager.needsStuckCheck())
}
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"go.uber.org/zap"
)

var version uint64 = 0
var tableSinkWrapperVersion uint64 = 0

// tableSinkWrapper is a wrapper of TableSink, it is used in SinkManager to manage TableSink.
// Because in the SinkManager, we write data to TableSink and RedoManager concurrently,
Expand Down Expand Up @@ -108,7 +108,7 @@ func newTableSinkWrapper(
genReplicateTs func(ctx context.Context) (model.Ts, error),
) *tableSinkWrapper {
res := &tableSinkWrapper{
version: atomic.AddUint64(&version, 1),
version: atomic.AddUint64(&tableSinkWrapperVersion, 1),
changefeed: changefeed,
span: span,
tableSinkCreater: tableSinkCreater,
Expand Down
43 changes: 41 additions & 2 deletions cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,29 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// Category is for different DML sink categories.
type Category = int

const (
// CategoryTxn is for Txn sink.
CategoryTxn Category = 1
// CategoryMQ is for MQ sink.
CategoryMQ = 2
// CategoryCloudStorage is for CloudStorage sink.
CategoryCloudStorage = 3
// CategoryBlackhole is for Blackhole sink.
CategoryBlackhole = 4
)

// SinkFactory is the factory of sink.
// It is responsible for creating sink and closing it.
// Because there is no way to convert the eventsink.EventSink[*model.RowChangedEvent]
// to eventsink.EventSink[eventsink.TableEvent].
// So we have to use this factory to create and store the sink.
type SinkFactory struct {
rowSink dmlsink.EventSink[*model.RowChangedEvent]
txnSink dmlsink.EventSink[*model.SingleTableTxn]
rowSink dmlsink.EventSink[*model.RowChangedEvent]
txnSink dmlsink.EventSink[*model.SingleTableTxn]
category Category
}

// New creates a new SinkFactory by schema.
Expand All @@ -67,6 +82,7 @@ func New(
return nil, err
}
s.txnSink = txnSink
s.category = CategoryTxn
case sink.KafkaScheme, sink.KafkaSSLScheme:
factoryCreator := kafka.NewSaramaFactory
if cfg.Sink.EnableKafkaSinkV2 {
Expand All @@ -78,15 +94,30 @@ func New(
return nil, err
}
s.txnSink = mqs
s.category = CategoryMQ
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
storageSink, err := cloudstorage.NewDMLSink(ctx, sinkURI, cfg, errCh)
if err != nil {
return nil, err
}
s.txnSink = storageSink
s.category = CategoryCloudStorage
case sink.BlackHoleScheme:
bs := blackhole.NewDMLSink()
s.rowSink = bs
<<<<<<< HEAD

Check failure on line 108 in cdc/sink/dmlsink/factory/factory.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expected case or default or }
=======
s.category = CategoryBlackhole
case sink.PulsarScheme:

Check failure on line 111 in cdc/sink/dmlsink/factory/factory.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected case, expected :
mqs, err := mq.NewPulsarDMLSink(ctx, changefeedID, sinkURI, cfg, errCh,
manager.NewPulsarTopicManager,
pulsarConfig.NewCreatorFactory, dmlproducer.NewPulsarDMLProducer)
if err != nil {
return nil, err
}
s.txnSink = mqs
s.category = CategoryMQ
>>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742))

Check failure on line 120 in cdc/sink/dmlsink/factory/factory.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected >>, expected case or default or }

Check failure on line 120 in cdc/sink/dmlsink/factory/factory.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'
default:
return nil,

Check failure on line 122 in cdc/sink/dmlsink/factory/factory.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected return, expected expression
cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema)

Check failure on line 123 in cdc/sink/dmlsink/factory/factory.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected newline, expected := or = or comma
Expand Down Expand Up @@ -143,3 +174,11 @@ func (s *SinkFactory) Close() {
s.txnSink.Close()
}
}

// Category returns category of s.
func (s *SinkFactory) Category() Category {
if s.category == 0 {
panic("should never happen")
}
return s.category
}
5 changes: 3 additions & 2 deletions tests/integration_tests/hang_sink_suicide/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ function run() {
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
# TODO: update the case to use kafka sink instead of mysql sink.
# run $*
# check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit f68f079

Please sign in to comment.