From a5a6bb58ba9909926743254f5a1a3ecdcbf8c890 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 18 Nov 2024 11:35:08 +0800 Subject: [PATCH] ticdc(redo, sink): return correct error in redo writer & fix default retryer (#11747) (#11763) close pingcap/tiflow#11744 --- cdc/redo/writer/memory/encoding_worker.go | 2 +- cdc/redo/writer/memory/mem_log_writer_test.go | 26 +++++++++++++++--- pkg/redo/config.go | 2 +- pkg/sink/kafka/claimcheck/claim_check.go | 6 +---- pkg/util/external_storage.go | 27 ++++++++++++++----- 5 files changed, 45 insertions(+), 18 deletions(-) diff --git a/cdc/redo/writer/memory/encoding_worker.go b/cdc/redo/writer/memory/encoding_worker.go index 9b0cc6a8652..f62f236f888 100644 --- a/cdc/redo/writer/memory/encoding_worker.go +++ b/cdc/redo/writer/memory/encoding_worker.go @@ -131,7 +131,7 @@ func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) { zap.String("namespace", e.changefeed.Namespace), zap.String("changefeed", e.changefeed.ID), zap.Error(err)) - if err != nil && errors.Cause(err) != context.Canceled { + if err != nil { e.closed <- err } close(e.closed) diff --git a/cdc/redo/writer/memory/mem_log_writer_test.go b/cdc/redo/writer/memory/mem_log_writer_test.go index 64fbe9b2d61..238830af1e6 100644 --- a/cdc/redo/writer/memory/mem_log_writer_test.go +++ b/cdc/redo/writer/memory/mem_log_writer_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo/writer" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" @@ -87,10 +88,27 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) { }) require.NoError(t, err) + require.ErrorIs(t, lw.Close(), context.Canceled) + // duplicate close should return the same error require.ErrorIs(t, lw.Close(), context.Canceled) - err = lw.WriteEvents(ctx, events...) - require.NoError(t, err) - err = lw.FlushLog(ctx) - require.NoError(t, err) + functions := map[string]func(error){ + "WriteEvents": func(expected error) { + err := lw.WriteEvents(ctx, events...) + require.ErrorIs(t, errors.Cause(err), expected) + }, + "FlushLog": func(expected error) { + err := lw.FlushLog(ctx) + require.ErrorIs(t, errors.Cause(err), expected) + }, + } + firstCall := true + for _, f := range functions { + if firstCall { + firstCall = false + f(context.Canceled) + } else { + f(nil) + } + } } diff --git a/pkg/redo/config.go b/pkg/redo/config.go index 8865e9f4f61..4b7821b1048 100644 --- a/pkg/redo/config.go +++ b/pkg/redo/config.go @@ -190,7 +190,7 @@ func IsBlackholeStorage(scheme string) bool { // InitExternalStorage init an external storage. var InitExternalStorage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) { - s, err := util.GetExternalStorageWithTimeout(ctx, uri.String(), DefaultTimeout) + s, err := util.GetExternalStorageWithDefaultTimeout(ctx, uri.String()) if err != nil { return nil, errors.WrapError(errors.ErrStorageInitialize, err, fmt.Sprintf("can't init external storage for %s", uri.String())) diff --git a/pkg/sink/kafka/claimcheck/claim_check.go b/pkg/sink/kafka/claimcheck/claim_check.go index f88a6b10fd0..4f32b69f611 100644 --- a/pkg/sink/kafka/claimcheck/claim_check.go +++ b/pkg/sink/kafka/claimcheck/claim_check.go @@ -30,10 +30,6 @@ import ( "go.uber.org/zap" ) -const ( - defaultTimeout = 5 * time.Minute -) - // ClaimCheck manage send message to the claim-check external storage. type ClaimCheck struct { storage storage.ExternalStorage @@ -54,7 +50,7 @@ func New(ctx context.Context, storageURI string, changefeedID model.ChangeFeedID zap.String("storageURI", util.MaskSensitiveDataInURI(storageURI))) start := time.Now() - externalStorage, err := util.GetExternalStorageWithTimeout(ctx, storageURI, defaultTimeout) + externalStorage, err := util.GetExternalStorageWithDefaultTimeout(ctx, storageURI) if err != nil { log.Error("create external storage failed", zap.String("namespace", changefeedID.Namespace), diff --git a/pkg/util/external_storage.go b/pkg/util/external_storage.go index 0d96c47045f..c557cde3172 100644 --- a/pkg/util/external_storage.go +++ b/pkg/util/external_storage.go @@ -35,6 +35,8 @@ import ( "golang.org/x/sync/errgroup" ) +const defaultTimeout = 5 * time.Minute + // GetExternalStorageFromURI creates a new storage.ExternalStorage from a uri. func GetExternalStorageFromURI( ctx context.Context, uri string, @@ -42,18 +44,18 @@ func GetExternalStorageFromURI( return GetExternalStorage(ctx, uri, nil, DefaultS3Retryer()) } -// GetExternalStorageWithTimeout creates a new storage.ExternalStorage from a uri +// GetExternalStorageWithDefaultTimeout creates a new storage.ExternalStorage from a uri // without retry. It is the caller's responsibility to set timeout to the context. -func GetExternalStorageWithTimeout( - ctx context.Context, uri string, timeout time.Duration, -) (storage.ExternalStorage, error) { - ctx, cancel := context.WithTimeout(ctx, timeout) +func GetExternalStorageWithDefaultTimeout(ctx context.Context, uri string) (storage.ExternalStorage, error) { + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() - s, err := GetExternalStorage(ctx, uri, nil, nil) + // total retry time is [1<<7, 1<<8] = [128, 256] + 30*6 = [308, 436] seconds + r := NewS3Retryer(7, 1*time.Second, 2*time.Second) + s, err := GetExternalStorage(ctx, uri, nil, r) return &extStorageWithTimeout{ ExternalStorage: s, - timeout: timeout, + timeout: defaultTimeout, }, err } @@ -140,6 +142,17 @@ func DefaultS3Retryer() request.Retryer { } } +// NewS3Retryer creates a new s3 retryer. +func NewS3Retryer(maxRetries int, minRetryDelay, minThrottleDelay time.Duration) request.Retryer { + return retryerWithLog{ + DefaultRetryer: client.DefaultRetryer{ + NumMaxRetries: maxRetries, + MinRetryDelay: minRetryDelay, + MinThrottleDelay: minThrottleDelay, + }, + } +} + type extStorageWithTimeout struct { storage.ExternalStorage timeout time.Duration