Skip to content

Commit

Permalink
app/retry: improve logging (#1303)
Browse files Browse the repository at this point in the history
Improve logging of retry component:
 - Use topic of component being retried
 - Do not log on shutdown

category: refactor
ticket: #959
  • Loading branch information
corverroos authored Oct 17, 2022
1 parent 6534681 commit 428fa8a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 14 deletions.
22 changes: 14 additions & 8 deletions app/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"net"
"path"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -104,25 +105,28 @@ type Retryer[T any] struct {
// It is intended to be used asynchronously:
//
// go retryer.DoAsync(ctx, duty, "foo", fn)
func (r *Retryer[T]) DoAsync(parent context.Context, t T, name string, fn func(context.Context) error) {
func (r *Retryer[T]) DoAsync(parent context.Context, t T, topic, name string, fn func(context.Context) error) {
if r.isShutdown() {
return
}

r.asyncStarted(name)
defer r.asyncEnded(name)
label := path.Join(topic, name)

r.asyncStarted(label)
defer r.asyncEnded(label)

backoffFunc := r.backoffProvider()

// Switch to the async context since parent context may be closed soon.
ctx := log.CopyFields(r.asyncCtx, parent) // Copy log fields to new context
ctx = trace.ContextWithSpan(ctx, trace.SpanFromContext(parent)) // Copy tracing span to new context
ctx = log.WithTopic(ctx, "retry")
ctx = log.WithTopic(ctx, topic)
ctx, cancel := r.ctxTimeoutFunc(ctx, t)
defer cancel()

ctx, span := tracer.Start(ctx, "app/retry.DoAsync")
defer span.End()
span.SetAttributes(attribute.String("topic", topic))
span.SetAttributes(attribute.String("name", name))

for i := 0; ; i++ {
Expand All @@ -140,12 +144,12 @@ func (r *Retryer[T]) DoAsync(parent context.Context, t T, name string, fn func(c
// Note that the local context is not checked, since we care about downstream timeouts.

if !isCtxErr && !isNetErr && !isTempErr {
log.Error(ctx, "Permanent failure calling "+name, err)
log.Error(ctx, "Permanent failure calling "+label, err)
return
}

if ctx.Err() == nil {
log.Warn(ctx, "Temporary failure (will retry) calling "+name, err)
log.Warn(ctx, "Temporary failure (will retry) calling "+label, err)
span.AddEvent("retry.backoff.start")
select {
case <-backoffFunc():
Expand All @@ -156,9 +160,11 @@ func (r *Retryer[T]) DoAsync(parent context.Context, t T, name string, fn func(c
span.AddEvent("retry.backoff.done")
}

if ctx.Err() != nil {
if r.asyncCtx.Err() != nil {
return // Shutdown, return without logging
} else if ctx.Err() != nil {
// No need to log this at error level since tracker will analyse and report on failed duties.
log.Debug(ctx, "Retry timeout calling "+name+", duty expired")
log.Debug(ctx, "Retry timeout calling "+label+", duty expired")
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions app/retry/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestRetryer(t *testing.T) {
require.NoError(t, err)

var attempt int
retryer.DoAsync(ctx, core.NewAttesterDuty(999), "test", func(ctx context.Context) error {
retryer.DoAsync(ctx, core.NewAttesterDuty(999), "test", "test", func(ctx context.Context) error {
defer func() { attempt++ }()
return test.Func(ctx, attempt)
})
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestShutdown(t *testing.T) {

// Start 3 long-running functions
for i := 0; i < 3; i++ {
go retryer.DoAsync(ctx, core.NewProposerDuty(999999), "test", func(ctx context.Context) error {
go retryer.DoAsync(ctx, core.NewProposerDuty(999999), "test", "test", func(ctx context.Context) error {
waiting <- struct{}{}
<-stop
<-ctx.Done()
Expand Down
1 change: 1 addition & 0 deletions core/deadline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/obolnetwork/charon/core"
)

//go:generate go test .
func TestDeadliner(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
8 changes: 4 additions & 4 deletions core/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,28 @@ func WithAsyncRetry(retryer *retry.Retryer[Duty]) WireOption {
return func(w *wireFuncs) {
clone := *w
w.FetcherFetch = func(ctx context.Context, duty Duty, set DutyDefinitionSet) error {
go retryer.DoAsync(ctx, duty, "fetcher fetch", func(ctx context.Context) error {
go retryer.DoAsync(ctx, duty, "fetcher", "fetch", func(ctx context.Context) error {
return clone.FetcherFetch(ctx, duty, set)
})

return nil
}
w.ConsensusPropose = func(ctx context.Context, duty Duty, set UnsignedDataSet) error {
go retryer.DoAsync(ctx, duty, "consensus propose", func(ctx context.Context) error {
go retryer.DoAsync(ctx, duty, "consensus", "propose", func(ctx context.Context) error {
return clone.ConsensusPropose(ctx, duty, set)
})

return nil
}
w.ParSigExBroadcast = func(ctx context.Context, duty Duty, set ParSignedDataSet) error {
go retryer.DoAsync(ctx, duty, "parsigex broadcast", func(ctx context.Context) error {
go retryer.DoAsync(ctx, duty, "parsigex", "broadcast", func(ctx context.Context) error {
return clone.ParSigExBroadcast(ctx, duty, set)
})

return nil
}
w.BroadcasterBroadcast = func(ctx context.Context, duty Duty, key PubKey, data SignedData) error {
go retryer.DoAsync(ctx, duty, "bcast broadcast", func(ctx context.Context) error {
go retryer.DoAsync(ctx, duty, "bcast", "broadcast", func(ctx context.Context) error {
return clone.BroadcasterBroadcast(ctx, duty, key, data)
})

Expand Down

0 comments on commit 428fa8a

Please sign in to comment.