Skip to content

Commit

Permalink
feat: extend RetryDelayFn to take the command to be retried
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Oct 13, 2024
1 parent 113c567 commit deaea1a
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 80 deletions.
20 changes: 10 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ retry:
resp = c.conn.Do(ctx, cmd)
if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.NonRedisError(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, resp.Error(),
ctx, attempts, cmd, resp.Error(),
)
if shouldRetry {
attempts++
Expand Down Expand Up @@ -87,10 +87,10 @@ func (c *singleClient) DoMulti(ctx context.Context, multi ...Completed) (resps [
retry:
resps = c.conn.DoMulti(ctx, multi...).s
if c.retry && allReadOnly(multi) {
for _, resp := range resps {
for i, resp := range resps {
if c.isRetryable(resp.NonRedisError(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, resp.Error(),
ctx, attempts, multi[i], resp.Error(),
)
if shouldRetry {
attempts++
Expand All @@ -115,10 +115,10 @@ func (c *singleClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL)
retry:
resps = c.conn.DoMultiCache(ctx, multi...).s
if c.retry {
for _, resp := range resps {
for i, resp := range resps {
if c.isRetryable(resp.NonRedisError(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, resp.Error(),
ctx, attempts, Completed(multi[i].Cmd), resp.Error(),
)
if shouldRetry {
attempts++
Expand All @@ -140,7 +140,7 @@ func (c *singleClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dura
retry:
resp = c.conn.DoCache(ctx, cmd, ttl)
if c.retry && c.isRetryable(resp.NonRedisError(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error())
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error())
if shouldRetry {
attempts++
goto retry
Expand All @@ -158,7 +158,7 @@ retry:
err = c.conn.Receive(ctx, subscribe, fn)
if c.retry {
if _, ok := err.(*RedisError); !ok && c.isRetryable(err, ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, err)
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err)
if shouldRetry {
attempts++
goto retry
Expand Down Expand Up @@ -217,7 +217,7 @@ retry:
resp = c.wire.Do(ctx, cmd)
if c.retry && cmd.IsReadOnly() && isRetryable(resp.NonRedisError(), c.wire, ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, resp.Error(),
ctx, attempts, cmd, resp.Error(),
)
if shouldRetry {
attempts++
Expand Down Expand Up @@ -247,7 +247,7 @@ retry:
for i, cmd := range multi {
if retryable && isRetryable(resp[i].NonRedisError(), c.wire, ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, resp[i].Error(),
ctx, attempts, multi[i], resp[i].Error(),
)
if shouldRetry {
attempts++
Expand All @@ -271,7 +271,7 @@ retry:
if c.retry {
if _, ok := err.(*RedisError); !ok && isRetryable(err, c.wire, ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, err,
ctx, attempts, subscribe, err,
)
if shouldRetry {
attempts++
Expand Down
46 changes: 23 additions & 23 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,21 +698,21 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
c, m := setup()
if cli, ok := c.(*sentinelClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*clusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*singleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
Expand Down Expand Up @@ -768,17 +768,17 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
c, m := setup()
if cli, ok := c.(*sentinelClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*clusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
RetryDelayFn: func(attempts int, err error) time.Duration {
RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration {
return -1
},
WaitForRetryFn: func(ctx context.Context, duration time.Duration) {
Expand All @@ -790,7 +790,7 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
}
if cli, ok := c.(*singleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
Expand Down Expand Up @@ -846,21 +846,21 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
c, m := setup()
if cli, ok := c.(*sentinelClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*clusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*singleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
Expand Down Expand Up @@ -908,17 +908,17 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
c, m := setup()
if cli, ok := c.(*sentinelClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*clusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
RetryDelayFn: func(attempts int, err error) time.Duration {
RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration {
return -1
},
WaitForRetryFn: func(ctx context.Context, duration time.Duration) {
Expand All @@ -930,7 +930,7 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
}
if cli, ok := c.(*singleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
Expand Down Expand Up @@ -975,21 +975,21 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
c, m := setup()
if cli, ok := c.(*sentinelClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*clusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := c.(*singleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
Expand Down Expand Up @@ -1052,14 +1052,14 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
if ret := c.Dedicated(func(cc DedicatedClient) error {
if cli, ok := cc.(*dedicatedClusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := cc.(*dedicatedSingleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
Expand Down Expand Up @@ -1137,14 +1137,14 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
if ret := c.Dedicated(func(cc DedicatedClient) error {
if cli, ok := cc.(*dedicatedClusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := cc.(*dedicatedSingleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
Expand Down Expand Up @@ -1216,14 +1216,14 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
if ret := c.Dedicated(func(cc DedicatedClient) error {
if cli, ok := cc.(*dedicatedClusterClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
}
if cli, ok := cc.(*dedicatedSingleClient); ok {
cli.retryHandler = &mockRetryHandler{
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
return false
},
}
Expand Down
20 changes: 10 additions & 10 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ process:
goto process
case RedirectRetry:
if c.retry && cmd.IsReadOnly() {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error())
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, cmd, resp.Error())
if shouldRetry {
attempts++
goto retry
Expand Down Expand Up @@ -614,7 +614,7 @@ func (c *clusterClient) doresultfn(
continue
}

retryDelay = c.retryHandler.RetryDelay(attempts, resp.Error())
retryDelay = c.retryHandler.RetryDelay(attempts, cm, resp.Error())
} else {
nc = c.redirectOrNew(addr, cc, cm.Slot(), mode)
}
Expand Down Expand Up @@ -753,7 +753,7 @@ process:
}
case RedirectRetry:
if c.retry && allReadOnly(multi) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error())
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, multi[i], resp.Error())
if shouldRetry {
resultsp.Put(resps)
attempts++
Expand Down Expand Up @@ -786,7 +786,7 @@ process:
goto process
case RedirectRetry:
if c.retry {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error())
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error())
if shouldRetry {
attempts++
goto retry
Expand Down Expand Up @@ -930,7 +930,7 @@ func (c *clusterClient) resultcachefn(
continue
}

retryDelay = c.retryHandler.RetryDelay(attempts, resp.Error())
retryDelay = c.retryHandler.RetryDelay(attempts, Completed(cm.Cmd), resp.Error())
} else {
nc = c.redirectOrNew(addr, cc, cm.Cmd.Slot(), mode)
}
Expand Down Expand Up @@ -1037,7 +1037,7 @@ retry:
}
err = cc.Receive(ctx, subscribe, fn)
if _, mode := c.shouldRefreshRetry(err, ctx); c.retry && mode != RedirectNone {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, err)
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err)
if shouldRetry {
attempts++
goto retry
Expand Down Expand Up @@ -1222,7 +1222,7 @@ retry:
case RedirectRetry:
if c.retry && cmd.IsReadOnly() && w.Error() == nil {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, resp.Error(),
ctx, attempts, cmd, resp.Error(),
)
if shouldRetry {
attempts++
Expand Down Expand Up @@ -1253,11 +1253,11 @@ func (c *dedicatedClusterClient) DoMulti(ctx context.Context, multi ...Completed
retry:
if w, err := c.acquire(ctx, slot); err == nil {
resp = w.DoMulti(ctx, multi...).s
for _, r := range resp {
for i, r := range resp {
_, mode := c.client.shouldRefreshRetry(r.Error(), ctx)
if mode == RedirectRetry && retryable && w.Error() == nil {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, r.Error(),
ctx, attempts, multi[i], r.Error(),
)
if shouldRetry {
attempts++
Expand Down Expand Up @@ -1291,7 +1291,7 @@ retry:
if w, err = c.acquire(ctx, subscribe.Slot()); err == nil {
err = w.Receive(ctx, subscribe, fn)
if _, mode := c.client.shouldRefreshRetry(err, ctx); c.retry && mode == RedirectRetry && w.Error() == nil {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, err)
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err)
if shouldRetry {
attempts++
goto retry
Expand Down
Loading

0 comments on commit deaea1a

Please sign in to comment.