Skip to content

Commit

Permalink
ticdc(redo, sink): return correct error in redo writer & fix default …
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 18, 2024
1 parent 045d670 commit a5a6bb5
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 18 deletions.
2 changes: 1 addition & 1 deletion cdc/redo/writer/memory/encoding_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (e *encodingWorkerGroup) Run(ctx context.Context) (err error) {
zap.String("namespace", e.changefeed.Namespace),
zap.String("changefeed", e.changefeed.ID),
zap.Error(err))
if err != nil && errors.Cause(err) != context.Canceled {
if err != nil {
e.closed <- err
}
close(e.closed)
Expand Down
26 changes: 22 additions & 4 deletions cdc/redo/writer/memory/mem_log_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -87,10 +88,27 @@ func testWriteEvents(t *testing.T, events []writer.RedoEvent) {
})
require.NoError(t, err)

require.ErrorIs(t, lw.Close(), context.Canceled)
// duplicate close should return the same error
require.ErrorIs(t, lw.Close(), context.Canceled)

err = lw.WriteEvents(ctx, events...)
require.NoError(t, err)
err = lw.FlushLog(ctx)
require.NoError(t, err)
functions := map[string]func(error){
"WriteEvents": func(expected error) {
err := lw.WriteEvents(ctx, events...)
require.ErrorIs(t, errors.Cause(err), expected)
},
"FlushLog": func(expected error) {
err := lw.FlushLog(ctx)
require.ErrorIs(t, errors.Cause(err), expected)
},
}
firstCall := true
for _, f := range functions {
if firstCall {
firstCall = false
f(context.Canceled)
} else {
f(nil)
}
}
}
2 changes: 1 addition & 1 deletion pkg/redo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func IsBlackholeStorage(scheme string) bool {

// InitExternalStorage init an external storage.
var InitExternalStorage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
s, err := util.GetExternalStorageWithTimeout(ctx, uri.String(), DefaultTimeout)
s, err := util.GetExternalStorageWithDefaultTimeout(ctx, uri.String())
if err != nil {
return nil, errors.WrapError(errors.ErrStorageInitialize, err,
fmt.Sprintf("can't init external storage for %s", uri.String()))
Expand Down
6 changes: 1 addition & 5 deletions pkg/sink/kafka/claimcheck/claim_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ import (
"go.uber.org/zap"
)

const (
defaultTimeout = 5 * time.Minute
)

// ClaimCheck manage send message to the claim-check external storage.
type ClaimCheck struct {
storage storage.ExternalStorage
Expand All @@ -54,7 +50,7 @@ func New(ctx context.Context, storageURI string, changefeedID model.ChangeFeedID
zap.String("storageURI", util.MaskSensitiveDataInURI(storageURI)))

start := time.Now()
externalStorage, err := util.GetExternalStorageWithTimeout(ctx, storageURI, defaultTimeout)
externalStorage, err := util.GetExternalStorageWithDefaultTimeout(ctx, storageURI)
if err != nil {
log.Error("create external storage failed",
zap.String("namespace", changefeedID.Namespace),
Expand Down
27 changes: 20 additions & 7 deletions pkg/util/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,27 @@ import (
"golang.org/x/sync/errgroup"
)

const defaultTimeout = 5 * time.Minute

// GetExternalStorageFromURI creates a new storage.ExternalStorage from a uri.
func GetExternalStorageFromURI(
ctx context.Context, uri string,
) (storage.ExternalStorage, error) {
return GetExternalStorage(ctx, uri, nil, DefaultS3Retryer())
}

// GetExternalStorageWithTimeout creates a new storage.ExternalStorage from a uri
// GetExternalStorageWithDefaultTimeout creates a new storage.ExternalStorage from a uri
// without retry. It is the caller's responsibility to set timeout to the context.
func GetExternalStorageWithTimeout(
ctx context.Context, uri string, timeout time.Duration,
) (storage.ExternalStorage, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
func GetExternalStorageWithDefaultTimeout(ctx context.Context, uri string) (storage.ExternalStorage, error) {
ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
s, err := GetExternalStorage(ctx, uri, nil, nil)
// total retry time is [1<<7, 1<<8] = [128, 256] + 30*6 = [308, 436] seconds
r := NewS3Retryer(7, 1*time.Second, 2*time.Second)
s, err := GetExternalStorage(ctx, uri, nil, r)

return &extStorageWithTimeout{
ExternalStorage: s,
timeout: timeout,
timeout: defaultTimeout,
}, err
}

Expand Down Expand Up @@ -140,6 +142,17 @@ func DefaultS3Retryer() request.Retryer {
}
}

// NewS3Retryer creates a new s3 retryer.
func NewS3Retryer(maxRetries int, minRetryDelay, minThrottleDelay time.Duration) request.Retryer {
return retryerWithLog{
DefaultRetryer: client.DefaultRetryer{
NumMaxRetries: maxRetries,
MinRetryDelay: minRetryDelay,
MinThrottleDelay: minThrottleDelay,
},
}
}

type extStorageWithTimeout struct {
storage.ExternalStorage
timeout time.Duration
Expand Down

0 comments on commit a5a6bb5

Please sign in to comment.