Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(cdc): only check sink stuck for MQ sinks (#9742) #9748

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
}
}

func (m *SinkManager) needsStuckCheck() bool {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ
}

func (m *SinkManager) initSinkFactory() (chan error, uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
Expand Down Expand Up @@ -980,15 +986,17 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
}
stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second

isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", version))
if m.needsStuckCheck() {
isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", sinkVersion))
}
}

var resolvedTs model.Ts
Expand Down
15 changes: 15 additions & 0 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,18 @@ func TestSinkManagerRunWithErrors(t *testing.T) {
log.Panic("must get an error instead of a timeout")
}
}

func TestSinkManagerNeedsStuckCheck(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

require.False(t, manager.needsStuckCheck())
}
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"go.uber.org/zap"
)

var version uint64 = 0
var tableSinkWrapperVersion uint64 = 0

// tableSinkWrapper is a wrapper of TableSink, it is used in SinkManager to manage TableSink.
// Because in the SinkManager, we write data to TableSink and RedoManager concurrently,
Expand Down Expand Up @@ -111,7 +111,7 @@ func newTableSinkWrapper(
genReplicateTs func(ctx context.Context) (model.Ts, error),
) *tableSinkWrapper {
res := &tableSinkWrapper{
version: atomic.AddUint64(&version, 1),
version: atomic.AddUint64(&tableSinkWrapperVersion, 1),
changefeed: changefeed,
span: span,
tableSinkCreater: tableSinkCreater,
Expand Down
31 changes: 29 additions & 2 deletions cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,29 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// Category is for different DML sink categories.
type Category = int

const (
// CategoryTxn is for Txn sink.
CategoryTxn Category = 1
// CategoryMQ is for MQ sink.
CategoryMQ = 2
// CategoryCloudStorage is for CloudStorage sink.
CategoryCloudStorage = 3
// CategoryBlackhole is for Blackhole sink.
CategoryBlackhole = 4
)

// SinkFactory is the factory of sink.
// It is responsible for creating sink and closing it.
// Because there is no way to convert the eventsink.EventSink[*model.RowChangedEvent]
// to eventsink.EventSink[eventsink.TableEvent].
// So we have to use this factory to create and store the sink.
type SinkFactory struct {
rowSink dmlsink.EventSink[*model.RowChangedEvent]
txnSink dmlsink.EventSink[*model.SingleTableTxn]
rowSink dmlsink.EventSink[*model.RowChangedEvent]
txnSink dmlsink.EventSink[*model.SingleTableTxn]
category Category
}

// New creates a new SinkFactory by schema.
Expand All @@ -67,6 +82,7 @@ func New(
return nil, err
}
s.txnSink = txnSink
s.category = CategoryTxn
case sink.KafkaScheme, sink.KafkaSSLScheme:
factoryCreator := kafka.NewSaramaFactory
if cfg.Sink.EnableKafkaSinkV2 {
Expand All @@ -78,15 +94,18 @@ func New(
return nil, err
}
s.txnSink = mqs
s.category = CategoryMQ
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
storageSink, err := cloudstorage.NewDMLSink(ctx, sinkURI, cfg, errCh)
if err != nil {
return nil, err
}
s.txnSink = storageSink
s.category = CategoryCloudStorage
case sink.BlackHoleScheme:
bs := blackhole.NewDMLSink()
s.rowSink = bs
s.category = CategoryBlackhole
default:
return nil,
cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema)
Expand Down Expand Up @@ -143,3 +162,11 @@ func (s *SinkFactory) Close() {
s.txnSink.Close()
}
}

// Category returns category of s.
func (s *SinkFactory) Category() Category {
if s.category == 0 {
panic("should never happen")
}
return s.category
}
5 changes: 3 additions & 2 deletions tests/integration_tests/hang_sink_suicide/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ function run() {
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
# TODO: update the case to use kafka sink instead of mysql sink.
# run $*
# check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Loading