diff --git a/network/libp2p_impl.go b/network/libp2p_impl.go index 98d5a8a3..526d489a 100644 --- a/network/libp2p_impl.go +++ b/network/libp2p_impl.go @@ -23,10 +23,18 @@ var log = logging.Logger("data_transfer_network") var sendMessageTimeout = time.Minute * 10 +// The max number of attempts to open a stream const defaultMaxStreamOpenAttempts = 5 + +// The min backoff time between retries const defaultMinAttemptDuration = 1 * time.Second + +// The max backoff time between retries const defaultMaxAttemptDuration = 5 * time.Minute +// The multiplier in the backoff time for each retry +const defaultBackoffFactor = 5 + var defaultDataTransferProtocols = []protocol.ID{datatransfer.ProtocolDataTransfer1_1, datatransfer.ProtocolDataTransfer1_0} // Option is an option for configuring the libp2p storage market network @@ -41,11 +49,12 @@ func DataTransferProtocols(protocols []protocol.ID) Option { } // RetryParameters changes the default parameters around connection reopening -func RetryParameters(minDuration time.Duration, maxDuration time.Duration, attempts float64) Option { +func RetryParameters(minDuration time.Duration, maxDuration time.Duration, attempts float64, backoffFactor float64) Option { return func(impl *libp2pDataTransferNetwork) { impl.maxStreamOpenAttempts = attempts impl.minAttemptDuration = minDuration impl.maxAttemptDuration = maxDuration + impl.backoffFactor = backoffFactor } } @@ -57,6 +66,7 @@ func NewFromLibp2pHost(host host.Host, options ...Option) DataTransferNetwork { maxStreamOpenAttempts: defaultMaxStreamOpenAttempts, minAttemptDuration: defaultMinAttemptDuration, maxAttemptDuration: defaultMaxAttemptDuration, + backoffFactor: defaultBackoffFactor, dtProtocols: defaultDataTransferProtocols, } @@ -78,13 +88,14 @@ type libp2pDataTransferNetwork struct { minAttemptDuration time.Duration maxAttemptDuration time.Duration dtProtocols []protocol.ID + backoffFactor float64 } func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.ID, protocols ...protocol.ID) (network.Stream, error) { b := &backoff.Backoff{ Min: impl.minAttemptDuration, Max: impl.maxAttemptDuration, - Factor: impl.maxStreamOpenAttempts, + Factor: impl.backoffFactor, Jitter: true, } @@ -95,12 +106,16 @@ func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.I return s, err } - nAttempts := b.Attempt() - if nAttempts == impl.maxStreamOpenAttempts { - return nil, xerrors.Errorf("exhausted %d attempts but failed to open stream, err: %w", impl.maxStreamOpenAttempts, err) + // b.Attempt() starts from zero + nAttempts := b.Attempt() + 1 + if nAttempts >= impl.maxStreamOpenAttempts { + return nil, xerrors.Errorf("exhausted %g attempts but failed to open stream to %s, err: %w", impl.maxStreamOpenAttempts, id, err) } d := b.Duration() + log.Warnf("failed to open stream to %s on attempt %g of %g, waiting %s to try again, err: %w", + id, nAttempts, impl.maxStreamOpenAttempts, d, err) + select { case <-ctx.Done(): return nil, ctx.Err() diff --git a/network/libp2p_impl_test.go b/network/libp2p_impl_test.go index a5f8346d..934f3f6c 100644 --- a/network/libp2p_impl_test.go +++ b/network/libp2p_impl_test.go @@ -2,16 +2,21 @@ package network_test import ( "context" + "fmt" "math/rand" "testing" "time" basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal/selector/builder" + "github.com/libp2p/go-libp2p-core/host" + libp2pnet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/xerrors" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/message" @@ -174,3 +179,122 @@ func TestMessageSendAndReceive(t *testing.T) { }) } + +// Wrap a host so that we can mock out errors when calling NewStream +type wrappedHost struct { + host.Host + errs chan error +} + +func (w wrappedHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (libp2pnet.Stream, error) { + var err error + select { + case err = <-w.errs: + default: + } + if err != nil { + return nil, err + } + + return w.Host.NewStream(ctx, p, pids...) +} + +// TestSendMessageRetry verifies that if the number of retry attempts +// is greater than the number of errors, SendMessage will succeed. +func TestSendMessageRetry(t *testing.T) { + tcases := []struct { + attempts int + errors int + expSuccess bool + }{{ + attempts: 1, + errors: 0, + expSuccess: true, + }, { + attempts: 1, + errors: 1, + expSuccess: false, + }, { + attempts: 2, + errors: 1, + expSuccess: true, + }, { + attempts: 2, + errors: 2, + expSuccess: false, + }} + for _, tcase := range tcases { + name := fmt.Sprintf("%d attempts, %d errors", tcase.attempts, tcase.errors) + t.Run(name, func(t *testing.T) { + // create network + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + mn := mocknet.New(ctx) + + host1, err := mn.GenPeer() + require.NoError(t, err) + + // Create a wrapped host that will return tcase.errors errors from + // NewStream + mockHost1 := &wrappedHost{ + Host: host1, + errs: make(chan error, tcase.errors), + } + for i := 0; i < tcase.errors; i++ { + mockHost1.errs <- xerrors.Errorf("network err") + } + host1 = mockHost1 + + host2, err := mn.GenPeer() + require.NoError(t, err) + err = mn.LinkAll() + require.NoError(t, err) + + retry := network.RetryParameters( + time.Millisecond, + time.Millisecond, + float64(tcase.attempts), + 1) + dtnet1 := network.NewFromLibp2pHost(host1, retry) + dtnet2 := network.NewFromLibp2pHost(host2) + r := &receiver{ + messageReceived: make(chan struct{}), + connectedPeers: make(chan peer.ID, 2), + } + dtnet1.SetDelegate(r) + dtnet2.SetDelegate(r) + + err = dtnet1.ConnectTo(ctx, host2.ID()) + require.NoError(t, err) + + baseCid := testutil.GenerateCids(1)[0] + selector := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any).Matcher().Node() + isPull := false + id := datatransfer.TransferID(rand.Int31()) + voucher := testutil.NewFakeDTType() + request, err := message.NewRequest(id, false, isPull, voucher.Type(), voucher, baseCid, selector) + require.NoError(t, err) + + err = dtnet1.SendMessage(ctx, host2.ID(), request) + if !tcase.expSuccess { + require.Error(t, err) + return + } + + require.NoError(t, err) + + select { + case <-ctx.Done(): + t.Fatal("did not receive message sent") + case <-r.messageReceived: + } + + sender := r.lastSender + require.Equal(t, sender, host1.ID()) + + receivedRequest := r.lastRequest + require.NotNil(t, receivedRequest) + }) + } +} diff --git a/testutil/gstestdata.go b/testutil/gstestdata.go index f1d22fe5..6f454e6b 100644 --- a/testutil/gstestdata.go +++ b/testutil/gstestdata.go @@ -143,8 +143,8 @@ func NewGraphsyncTestingData(ctx context.Context, t *testing.T, host1Protocols [ gsData.GsNet1 = gsnet.NewFromLibp2pHost(gsData.Host1) gsData.GsNet2 = gsnet.NewFromLibp2pHost(gsData.Host2) - opts1 := []network.Option{network.RetryParameters(0, 0, 0)} - opts2 := []network.Option{network.RetryParameters(0, 0, 0)} + opts1 := []network.Option{network.RetryParameters(0, 0, 0, 0)} + opts2 := []network.Option{network.RetryParameters(0, 0, 0, 0)} if len(host1Protocols) != 0 { opts1 = append(opts1, network.DataTransferProtocols(host1Protocols))