Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better retry config #124

Merged
merged 3 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,18 @@ var log = logging.Logger("data_transfer_network")

var sendMessageTimeout = time.Minute * 10

// The max number of attempts to open a stream
const defaultMaxStreamOpenAttempts = 5

// The min backoff time between retries
const defaultMinAttemptDuration = 1 * time.Second

// The max backoff time between retries
const defaultMaxAttemptDuration = 5 * time.Minute

// The multiplier in the backoff time for each retry
const defaultBackoffFactor = 5

var defaultDataTransferProtocols = []protocol.ID{datatransfer.ProtocolDataTransfer1_1, datatransfer.ProtocolDataTransfer1_0}

// Option is an option for configuring the libp2p storage market network
Expand All @@ -41,11 +49,12 @@ func DataTransferProtocols(protocols []protocol.ID) Option {
}

// RetryParameters changes the default parameters around connection reopening
func RetryParameters(minDuration time.Duration, maxDuration time.Duration, attempts float64) Option {
func RetryParameters(minDuration time.Duration, maxDuration time.Duration, attempts float64, backoffFactor float64) Option {
return func(impl *libp2pDataTransferNetwork) {
impl.maxStreamOpenAttempts = attempts
impl.minAttemptDuration = minDuration
impl.maxAttemptDuration = maxDuration
impl.backoffFactor = backoffFactor
}
}

Expand All @@ -57,6 +66,7 @@ func NewFromLibp2pHost(host host.Host, options ...Option) DataTransferNetwork {
maxStreamOpenAttempts: defaultMaxStreamOpenAttempts,
minAttemptDuration: defaultMinAttemptDuration,
maxAttemptDuration: defaultMaxAttemptDuration,
backoffFactor: defaultBackoffFactor,
dtProtocols: defaultDataTransferProtocols,
}

Expand All @@ -78,13 +88,14 @@ type libp2pDataTransferNetwork struct {
minAttemptDuration time.Duration
maxAttemptDuration time.Duration
dtProtocols []protocol.ID
backoffFactor float64
}

func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.ID, protocols ...protocol.ID) (network.Stream, error) {
b := &backoff.Backoff{
Min: impl.minAttemptDuration,
Max: impl.maxAttemptDuration,
Factor: impl.maxStreamOpenAttempts,
Factor: impl.backoffFactor,
Jitter: true,
}

Expand All @@ -95,12 +106,16 @@ func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.I
return s, err
}

nAttempts := b.Attempt()
if nAttempts == impl.maxStreamOpenAttempts {
return nil, xerrors.Errorf("exhausted %d attempts but failed to open stream, err: %w", impl.maxStreamOpenAttempts, err)
// b.Attempt() starts from zero
nAttempts := b.Attempt() + 1
if nAttempts >= impl.maxStreamOpenAttempts {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really related to this PR, but I am not sure why these counters are float64s (both maxStreamOpenAttempts and b.Attempt()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I was wondering the same thing - basically it's because of the library we're using for the restart measurement

return nil, xerrors.Errorf("exhausted %g attempts but failed to open stream to %s, err: %w", impl.maxStreamOpenAttempts, id, err)
}

d := b.Duration()
log.Warnf("failed to open stream to %s on attempt %g of %g, waiting %s to try again, err: %w",
id, nAttempts, impl.maxStreamOpenAttempts, d, err)

select {
case <-ctx.Done():
return nil, ctx.Err()
Expand Down
124 changes: 124 additions & 0 deletions network/libp2p_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ package network_test

import (
"context"
"fmt"
"math/rand"
"testing"
"time"

basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p-core/host"
libp2pnet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/message"
Expand Down Expand Up @@ -174,3 +179,122 @@ func TestMessageSendAndReceive(t *testing.T) {
})

}

// Wrap a host so that we can mock out errors when calling NewStream
type wrappedHost struct {
host.Host
errs chan error
}

func (w wrappedHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (libp2pnet.Stream, error) {
var err error
select {
case err = <-w.errs:
default:
}
if err != nil {
return nil, err
}

return w.Host.NewStream(ctx, p, pids...)
}

// TestSendMessageRetry verifies that the if the number of retry attempts
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
// is greater than the number of errors, SendMessage will succeed.
func TestSendMessageRetry(t *testing.T) {
tcases := []struct {
attempts int
errors int
expSuccess bool
}{{
attempts: 1,
errors: 0,
expSuccess: true,
}, {
attempts: 1,
errors: 1,
expSuccess: false,
}, {
attempts: 2,
errors: 1,
expSuccess: true,
}, {
attempts: 2,
errors: 2,
expSuccess: false,
}}
for _, tcase := range tcases {
name := fmt.Sprintf("%d attempts, %d errors", tcase.attempts, tcase.errors)
t.Run(name, func(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
mn := mocknet.New(ctx)

host1, err := mn.GenPeer()
require.NoError(t, err)

// Create a wrapped host that will return tcase.errors errors from
// NewStream
mockHost1 := &wrappedHost{
Host: host1,
errs: make(chan error, tcase.errors),
}
for i := 0; i < tcase.errors; i++ {
mockHost1.errs <- xerrors.Errorf("network err")
}
host1 = mockHost1

host2, err := mn.GenPeer()
require.NoError(t, err)
err = mn.LinkAll()
require.NoError(t, err)

retry := network.RetryParameters(
time.Millisecond,
time.Millisecond,
float64(tcase.attempts),
1)
dtnet1 := network.NewFromLibp2pHost(host1, retry)
dtnet2 := network.NewFromLibp2pHost(host2)
r := &receiver{
messageReceived: make(chan struct{}),
connectedPeers: make(chan peer.ID, 2),
}
dtnet1.SetDelegate(r)
dtnet2.SetDelegate(r)

err = dtnet1.ConnectTo(ctx, host2.ID())
require.NoError(t, err)

baseCid := testutil.GenerateCids(1)[0]
selector := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any).Matcher().Node()
isPull := false
id := datatransfer.TransferID(rand.Int31())
voucher := testutil.NewFakeDTType()
request, err := message.NewRequest(id, false, isPull, voucher.Type(), voucher, baseCid, selector)
require.NoError(t, err)

err = dtnet1.SendMessage(ctx, host2.ID(), request)
if !tcase.expSuccess {
require.Error(t, err)
return
}

require.NoError(t, err)

select {
case <-ctx.Done():
t.Fatal("did not receive message sent")
case <-r.messageReceived:
}

sender := r.lastSender
require.Equal(t, sender, host1.ID())

receivedRequest := r.lastRequest
require.NotNil(t, receivedRequest)
})
}
}
4 changes: 2 additions & 2 deletions testutil/gstestdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func NewGraphsyncTestingData(ctx context.Context, t *testing.T, host1Protocols [
gsData.GsNet1 = gsnet.NewFromLibp2pHost(gsData.Host1)
gsData.GsNet2 = gsnet.NewFromLibp2pHost(gsData.Host2)

opts1 := []network.Option{network.RetryParameters(0, 0, 0)}
opts2 := []network.Option{network.RetryParameters(0, 0, 0)}
opts1 := []network.Option{network.RetryParameters(0, 0, 0, 0)}
opts2 := []network.Option{network.RetryParameters(0, 0, 0, 0)}

if len(host1Protocols) != 0 {
opts1 = append(opts1, network.DataTransferProtocols(host1Protocols))
Expand Down