Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle network errors/stalls #101

Merged
merged 2 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ var log = logging.Logger("data-transfer")
var ChannelEvents = fsm.Events{
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested),
fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing),
fsm.Event(datatransfer.Restart).FromAny().To(datatransfer.Ongoing),
fsm.Event(datatransfer.Restart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.Message = ""
return nil
}),

fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling),

Expand Down Expand Up @@ -46,7 +49,10 @@ var ChannelEvents = fsm.Events{
return nil
}),

fsm.Event(datatransfer.Disconnected).FromAny().To(datatransfer.PeerDisconnected),
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.Message = datatransfer.ErrDisconnected.Error()
return nil
}),

fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
Expand Down
2 changes: 1 addition & 1 deletion channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestChannels(t *testing.T) {
err = channelList.Disconnected(chid)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.Disconnected)
require.Equal(t, datatransfer.PeerDisconnected, state.Status())
require.Equal(t, datatransfer.ErrDisconnected.Error(), state.Message())
})

t.Run("test self peer and other peer", func(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ const ErrRejected = errorType("response rejected")

// ErrUnsupported indicates an operation is not supported by the transport protocol
const ErrUnsupported = errorType("unsupported")

// ErrDisconnected indicates the other peer may have hung up and you should try restarting the channel.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hannahhoward For my own curiosity:

What's the advantage of defining:

type errorType string

func (e errorType) Error() string {
	return string(e)
}

rather than using errors.New()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it allows you to use const instead of var -- there is a danger of var being changed -- technically anyone using the module can do so.

const ErrDisconnected = errorType("other peer appears to have hung up. restart Channel")

// ErrRemoved indicates the channel was inactive long enough that it was put in a permaneant error state
const ErrRemoved = errorType("channel removed due to inactivity")
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-graphsync v0.2.1
github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaH
github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.2.1 h1:MdehhqBSuTI2LARfKLkpYnt0mUrqHs/mtuDnESXHBfU=
github.com/ipfs/go-graphsync v0.2.1/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10=
github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c h1:De/AZGvRa3WMyw5zdMMhcvRcho46BVo+C0NRud+T4io=
github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=
Expand Down
38 changes: 33 additions & 5 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"github.com/filecoin-project/go-data-transfer/registry"
)

var ChannelRemoveTimeout = 1 * time.Hour

func (m *manager) OnChannelOpened(chid datatransfer.ChannelID) error {
has, err := m.channels.HasChannel(chid)
if err != nil {
Expand Down Expand Up @@ -170,12 +168,42 @@ func (m *manager) OnRequestTimedOut(ctx context.Context, chid datatransfer.Chann
go func() {
select {
case <-ctx.Done():
case <-time.After(ChannelRemoveTimeout):
case <-time.After(m.channelRemoveTimeout):
channel, err := m.channels.GetByID(ctx, chid)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a channel times/out disconnects and then the FSM/data-trasfer crashes ? I think we should have a handler for this state that starts this wait even when the FSM restarts. Does that make sense ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea we can address this when we have a better way to actually track disconencts other than the Message -- see comment above about challenges of essentially pushing state

if err == nil {
if !(channels.IsChannelTerminated(channel.Status()) ||
channels.IsChannelCleaningUp(channel.Status())) {
if err := m.channels.Error(chid, datatransfer.ErrRemoved); err != nil {
log.Errorf("failed to cancel timed-out channel: %v", err)
return
}
log.Warnf("channel %+v has ben cancelled because of timeout", chid)
}
}
}
}()

return nil
}

func (m *manager) OnRequestDisconnected(ctx context.Context, chid datatransfer.ChannelID) error {
log.Warnf("channel %+v has stalled or disconnected", chid)

// mark peer disconnected for informational purposes
err := m.channels.Disconnected(chid)
if err != nil {
return err
}

go func() {
select {
case <-ctx.Done():
case <-time.After(m.channelRemoveTimeout):
channel, err := m.channels.GetByID(ctx, chid)
if err == nil {
if !(channels.IsChannelTerminated(channel.Status()) ||
channels.IsChannelCleaningUp(channel.Status())) {
if err := m.channels.Cancel(chid); err != nil {
if err := m.channels.Error(chid, datatransfer.ErrRemoved); err != nil {
log.Errorf("failed to cancel timed-out channel: %v", err)
return
}
Expand All @@ -198,7 +226,7 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool)
if msg != nil {
if err := m.dataTransferNetwork.SendMessage(context.TODO(), chid.Initiator, msg); err != nil {
log.Warnf("failed to send completion message, err : %v", err)
return m.channels.Disconnected(chid)
return m.OnRequestDisconnected(context.TODO(), chid)
}
}
if msg.Accepted() {
Expand Down
26 changes: 22 additions & 4 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/hannahhoward/go-pubsub"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -39,6 +40,7 @@ type manager struct {
peerID peer.ID
transport datatransfer.Transport
storedCounter *storedcounter.StoredCounter
channelRemoveTimeout time.Duration
}

type internalEvent struct {
Expand Down Expand Up @@ -72,8 +74,20 @@ func readyDispatcher(evt pubsub.Event, fn pubsub.SubscriberFn) error {
return nil
}

// DataTransferOption configures the data transfer manager
type DataTransferOption func(*manager)

// ChannelRemoveTimeout sets the timeout after which channels are removed from the manager
func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {
return func(m *manager) {
m.channelRemoveTimeout = timeout
}
}

const defaultChannelRemoveTimeout = 1 * time.Hour

// NewDataTransfer initializes a new instance of a data transfer manager
func NewDataTransfer(ds datastore.Batching, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, storedCounter *storedcounter.StoredCounter) (datatransfer.Manager, error) {
func NewDataTransfer(ds datastore.Batching, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, storedCounter *storedcounter.StoredCounter, options ...DataTransferOption) (datatransfer.Manager, error) {
m := &manager{
dataTransferNetwork: dataTransferNetwork,
validatedTypes: registry.NewRegistry(),
Expand All @@ -85,12 +99,16 @@ func NewDataTransfer(ds datastore.Batching, dataTransferNetwork network.DataTran
peerID: dataTransferNetwork.ID(),
transport: transport,
storedCounter: storedCounter,
channelRemoveTimeout: defaultChannelRemoveTimeout,
}
channels, err := channels.New(ds, m.notifier, m.voucherDecoder, m.resultTypes.Decoder, &channelEnvironment{m}, dataTransferNetwork.ID())
if err != nil {
return nil, err
}
m.channels = channels
for _, option := range options {
option(m)
}
return m, nil
}

Expand Down Expand Up @@ -230,7 +248,7 @@ func (m *manager) SendVoucher(ctx context.Context, channelID datatransfer.Channe
}
if err := m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), updateRequest); err != nil {
err = fmt.Errorf("Unable to send request: %w", err)
_ = m.channels.Error(channelID, err)
_ = m.OnRequestDisconnected(ctx, channelID)
return err
}
return m.channels.NewVoucher(channelID, voucher)
Expand All @@ -249,7 +267,7 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe

if err := m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid)); err != nil {
err = fmt.Errorf("Unable to send cancel message: %w", err)
_ = m.channels.Error(chid, err)
_ = m.OnRequestDisconnected(ctx, chid)
return err
}

Expand All @@ -271,7 +289,7 @@ func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfe

if err := m.dataTransferNetwork.SendMessage(ctx, chid.OtherParty(m.peerID), m.pauseMessage(chid)); err != nil {
err = fmt.Errorf("Unable to send pause message: %w", err)
_ = m.channels.Error(chid, err)
_ = m.OnRequestDisconnected(ctx, chid)
return err
}

Expand Down
23 changes: 15 additions & 8 deletions impl/initiating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestDataTransferInitiating(t *testing.T) {
ctx := context.Background()
testCases := map[string]struct {
expectedEvents []datatransfer.EventCode
options []DataTransferOption
verify func(t *testing.T, h *harness)
}{
"OpenPushDataTransfer": {
Expand Down Expand Up @@ -83,21 +84,27 @@ func TestDataTransferInitiating(t *testing.T) {
},
},
"Remove Timed-out request": {
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Cancel, datatransfer.CleanupComplete},
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Error, datatransfer.CleanupComplete},
options: []DataTransferOption{ChannelRemoveTimeout(10 * time.Millisecond)},
verify: func(t *testing.T, h *harness) {
orig := ChannelRemoveTimeout
ChannelRemoveTimeout = 10 * time.Millisecond
defer func() {
ChannelRemoveTimeout = orig
}()

channelID, err := h.dt.OpenPullDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor)
require.NoError(t, err)
require.NoError(t, h.transport.EventHandler.OnRequestTimedOut(ctx, channelID))
// need time for the events to take place
time.Sleep(1 * time.Second)
},
},
"Remove disconnected request": {
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Disconnected, datatransfer.Error, datatransfer.CleanupComplete},
options: []DataTransferOption{ChannelRemoveTimeout(10 * time.Millisecond)},
verify: func(t *testing.T, h *harness) {
channelID, err := h.dt.OpenPullDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor)
require.NoError(t, err)
require.NoError(t, h.transport.EventHandler.OnRequestDisconnected(ctx, channelID))
// need time for the events to take place
time.Sleep(1 * time.Second)
},
},
"SendVoucher with no channel open": {
verify: func(t *testing.T, h *harness) {
err := h.dt.SendVoucher(h.ctx, datatransfer.ChannelID{Initiator: h.peers[1], Responder: h.peers[0], ID: 999999}, h.voucher)
Expand Down Expand Up @@ -344,7 +351,7 @@ func TestDataTransferInitiating(t *testing.T) {
h.transport = testutil.NewFakeTransport()
h.ds = dss.MutexWrap(datastore.NewMapDatastore())
h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter"))
dt, err := NewDataTransfer(h.ds, h.network, h.transport, h.storedCounter)
dt, err := NewDataTransfer(h.ds, h.network, h.transport, h.storedCounter, verify.options...)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt)
h.dt = dt
Expand Down
5 changes: 3 additions & 2 deletions impl/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/go-data-transfer/message"
)

// ChannelDataTransferType identifies the type of a data transfer channel for the purposes of a restart
type ChannelDataTransferType int

const (
Expand All @@ -23,10 +24,10 @@ const (
// ManagerPeerCreatePush is the type of a channel wherein the manager peer created a Push Data Transfer
ManagerPeerCreatePush

// ManagerPeerCreatePush is the type of a channel wherein the manager peer received a Pull Data Transfer Request
// ManagerPeerReceivePull is the type of a channel wherein the manager peer received a Pull Data Transfer Request
ManagerPeerReceivePull

// ManagerPeerCreatePush is the type of a channel wherein the manager peer received a Push Data Transfer Request
// ManagerPeerReceivePush is the type of a channel wherein the manager peer received a Push Data Transfer Request
ManagerPeerReceivePush
)

Expand Down
3 changes: 0 additions & 3 deletions statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ const (

// ChannelNotFoundError means the searched for data transfer does not exist
ChannelNotFoundError

// PeerDisconnected means that we do NOT have a connection to the other peer
PeerDisconnected
)

// Statuses are human readable names for data transfer states
Expand Down
14 changes: 14 additions & 0 deletions testutil/fakegraphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ type FakeGraphSync struct {
RequestUpdatedHook graphsync.OnRequestUpdatedHook
IncomingResponseHook graphsync.OnIncomingResponseHook
RequestorCancelledListener graphsync.OnRequestorCancelledListener
BlockSentListener graphsync.OnBlockSentListener
NetworkErrorListener graphsync.OnNetworkErrorListener
}

// NewFakeGraphSync returns a new fake graphsync implementation
Expand Down Expand Up @@ -352,6 +354,18 @@ func (fgs *FakeGraphSync) RegisterRequestorCancelledListener(listener graphsync.
return nil
}

// RegisterBlockSentListener adds a listener on the responder as blocks go out
func (fgs *FakeGraphSync) RegisterBlockSentListener(listener graphsync.OnBlockSentListener) graphsync.UnregisterHookFunc {
fgs.BlockSentListener = listener
return nil
}

// RegisterNetworkErrorListener adds a listener on the responder as blocks go out
func (fgs *FakeGraphSync) RegisterNetworkErrorListener(listener graphsync.OnNetworkErrorListener) graphsync.UnregisterHookFunc {
fgs.NetworkErrorListener = listener
return nil
}

var _ graphsync.GraphExchange = &FakeGraphSync{}

type fakeBlkData struct {
Expand Down
4 changes: 4 additions & 0 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type EventsHandler interface {
// OnRequestTimedOut is called when a request we opened (with the given channel Id) to receive data times out.
// Error returns are logged but otherwise have no effect
OnRequestTimedOut(ctx context.Context, chid ChannelID) error

// OnRequestDisconnected is called when a network error occurs in a graphsync request
// or we appear to stall while receiving data
OnRequestDisconnected(ctx context.Context, chid ChannelID) error
}

/*
Expand Down
14 changes: 14 additions & 0 deletions transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error {
t.gs.RegisterIncomingResponseHook(t.gsIncomingResponseHook)
t.gs.RegisterRequestUpdatedHook(t.gsRequestUpdatedHook)
t.gs.RegisterRequestorCancelledListener(t.gsRequestorCancelledListener)
t.gs.RegisterNetworkErrorListener(t.gsNetworkErrorListener)
return nil
}

Expand Down Expand Up @@ -597,3 +598,16 @@ func (t *Transport) gsRequestorCancelledListener(p peer.ID, request graphsync.Re
t.requestorCancelledMap[chid] = struct{}{}
}
}

func (t *Transport) gsNetworkErrorListener(p peer.ID, request graphsync.RequestData, err error) {
t.dataLock.Lock()
defer t.dataLock.Unlock()

chid, ok := t.graphsyncRequestMap[graphsyncKey{request.ID(), p}]
if ok {
err := t.events.OnRequestDisconnected(context.TODO(), chid)
if err != nil {
log.Error(err)
}
}
}
Loading