From 80a2fd907c9c78fe2d3b4dc77ebf6332a5e6d1df Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Tue, 9 Apr 2024 16:19:51 +0800 Subject: [PATCH] retry, client: support to log after execing some times (#7895) ref tikv/pd#7894 Signed-off-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/retry/backoff.go | 45 +++++++++++++++-- client/retry/backoff_test.go | 95 ++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 3 deletions(-) diff --git a/client/retry/backoff.go b/client/retry/backoff.go index 6c293098971..580e466badb 100644 --- a/client/retry/backoff.go +++ b/client/retry/backoff.go @@ -16,15 +16,31 @@ package retry import ( "context" + "reflect" + "runtime" + "strings" "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/log" "go.uber.org/multierr" + "go.uber.org/zap" ) const maxRecordErrorCount = 20 +// Option is used to customize the backoffer. +type Option func(*Backoffer) + +// withMinLogInterval sets the minimum log interval for retrying. +// Because the retry interval may be not the factor of log interval, so this is the minimum interval. +func withMinLogInterval(interval time.Duration) Option { + return func(bo *Backoffer) { + bo.logInterval = interval + } +} + // Backoffer is a backoff policy for retrying operations. type Backoffer struct { // base defines the initial time interval to wait before each retry. @@ -36,6 +52,10 @@ type Backoffer struct { // retryableChecker is used to check if the error is retryable. // By default, all errors are retryable. retryableChecker func(err error) bool + // logInterval defines the log interval for retrying. + logInterval time.Duration + // nextLogTime is used to record the next log time. + nextLogTime time.Duration attempt int next time.Duration @@ -50,10 +70,12 @@ func (bo *Backoffer) Exec( defer bo.resetBackoff() var ( allErrors error + err error after *time.Timer ) + fnName := getFunctionName(fn) for { - err := fn() + err = fn() bo.attempt++ if bo.attempt < maxRecordErrorCount { // multierr.Append will ignore nil error. @@ -63,6 +85,13 @@ func (bo *Backoffer) Exec( break } currentInterval := bo.nextInterval() + bo.nextLogTime += currentInterval + if err != nil { + if bo.logInterval > 0 && bo.nextLogTime >= bo.logInterval { + bo.nextLogTime %= bo.logInterval + log.Warn("call PD API failed and retrying", zap.String("api", fnName), zap.Int("retry-time", bo.attempt), zap.Error(err)) + } + } if after == nil { after = time.NewTimer(currentInterval) } else { @@ -93,7 +122,7 @@ func (bo *Backoffer) Exec( // - `base` defines the initial time interval to wait before each retry. // - `max` defines the max time interval to wait before each retry. // - `total` defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success. -func InitialBackoffer(base, max, total time.Duration) *Backoffer { +func InitialBackoffer(base, max, total time.Duration, opts ...Option) *Backoffer { // Make sure the base is less than or equal to the max. if base > max { base = max @@ -102,7 +131,7 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer { if total > 0 && total < base { total = base } - return &Backoffer{ + bo := &Backoffer{ base: base, max: max, total: total, @@ -113,6 +142,10 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer { currentTotal: 0, attempt: 0, } + for _, opt := range opts { + opt(bo) + } + return bo } // SetRetryableChecker sets the retryable checker. @@ -152,6 +185,7 @@ func (bo *Backoffer) resetBackoff() { bo.next = bo.base bo.currentTotal = 0 bo.attempt = 0 + bo.nextLogTime = 0 } // Only used for test. @@ -161,3 +195,8 @@ var testBackOffExecuteFlag = false func TestBackOffExecute() bool { return testBackOffExecuteFlag } + +func getFunctionName(f any) string { + strs := strings.Split(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), ".") + return strings.Split(strs[len(strs)-1], "-")[0] +} diff --git a/client/retry/backoff_test.go b/client/retry/backoff_test.go index 32a42d083bd..c877860b5ae 100644 --- a/client/retry/backoff_test.go +++ b/client/retry/backoff_test.go @@ -15,12 +15,15 @@ package retry import ( + "bytes" "context" "errors" "testing" "time" + "github.com/pingcap/log" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestBackoffer(t *testing.T) { @@ -107,3 +110,95 @@ func TestBackoffer(t *testing.T) { func isBackofferReset(bo *Backoffer) bool { return bo.next == bo.base && bo.currentTotal == 0 } + +func TestBackofferWithLog(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + conf := &log.Config{Level: "debug", File: log.FileLogConfig{}, DisableTimestamp: true} + lg := newZapTestLogger(conf) + log.ReplaceGlobals(lg.Logger, nil) + + bo := InitialBackoffer(time.Millisecond*10, time.Millisecond*100, time.Millisecond*1000, withMinLogInterval(time.Millisecond*100)) + err := bo.Exec(ctx, testFn) + re.ErrorIs(err, errTest) + + ms := lg.Messages() + len1 := len(ms) + // 10 + 20 + 40 + 80(log) + 100(log) * 9 >= 1000, so log ten times. + re.Len(ms, 10) + // 10 + 20 + 40 + 80 + 100 * 9, 13 times retry. + rfc := `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]` + re.Contains(ms[len(ms)-1], rfc) + // 10 + 20 + 40 + 80(log), 4 times retry. + rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]` + re.Contains(ms[0], rfc) + + bo.resetBackoff() + err = bo.Exec(ctx, testFn) + re.ErrorIs(err, errTest) + + ms = lg.Messages() + re.Len(ms, 20) + rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=13] [error=test]` + re.Contains(ms[len(ms)-1], rfc) + rfc = `["call PD API failed and retrying"] [api=testFn] [retry-time=4] [error=test]` + re.Contains(ms[len1], rfc) +} + +var errTest = errors.New("test") + +func testFn() error { + return errTest +} + +// testingWriter is a WriteSyncer that writes the the messages. +type testingWriter struct { + messages []string +} + +func newTestingWriter() *testingWriter { + return &testingWriter{} +} + +func (w *testingWriter) Write(p []byte) (n int, err error) { + n = len(p) + p = bytes.TrimRight(p, "\n") + m := string(p) + w.messages = append(w.messages, m) + return n, nil +} +func (w *testingWriter) Sync() error { + return nil +} + +type verifyLogger struct { + *zap.Logger + w *testingWriter +} + +func (logger *verifyLogger) Message() string { + if logger.w.messages == nil { + return "" + } + return logger.w.messages[len(logger.w.messages)-1] +} + +func (logger *verifyLogger) Messages() []string { + if logger.w.messages == nil { + return nil + } + return logger.w.messages +} + +func newZapTestLogger(cfg *log.Config, opts ...zap.Option) verifyLogger { + // TestingWriter is used to write to memory. + // Used in the verify logger. + writer := newTestingWriter() + lg, _, _ := log.InitLoggerWithWriteSyncer(cfg, writer, writer, opts...) + return verifyLogger{ + Logger: lg, + w: writer, + } +}