Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better reconnect behaviour #162

Merged
merged 1 commit into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

18 changes: 16 additions & 2 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,22 @@ func (c *Channels) Error(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.Error, err)
}

func (c *Channels) Disconnected(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.Disconnected)
// Disconnected indicates that the connection went down and it was not possible
// to restart it
func (c *Channels) Disconnected(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.Disconnected, err)
}

// RequestTimedOut indicates that the transport layer had a timeout trying to
// make a request
func (c *Channels) RequestTimedOut(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.RequestTimedOut, err)
}

// SendDataError indicates that the transport layer had an error trying
// to send data to the remote peer
func (c *Channels) SendDataError(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.SendDataError, err)
}

// HasChannel returns true if the given channel id is being tracked
Expand Down
33 changes: 31 additions & 2 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ var transferringStates = []fsm.StateKey{

// ChannelEvents describe the events taht can
var ChannelEvents = fsm.Events{
// Open a channel
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested),

// Remote peer has accepted the Open channel request
fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing),

fsm.Event(datatransfer.Restart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.Message = ""
return nil
Expand All @@ -52,15 +56,27 @@ var ChannelEvents = fsm.Events{
chst.Queued += delta
return nil
}),
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.Message = datatransfer.ErrDisconnected.Error()

fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
return nil
}),

fsm.Event(datatransfer.SendDataError).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
return nil
}),

fsm.Event(datatransfer.RequestTimedOut).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
return nil
}),

fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
return nil
}),

fsm.Event(datatransfer.NewVoucher).FromAny().ToNoChange().
Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherBytes []byte) error {
chst.Vouchers = append(chst.Vouchers, internal.EncodedVoucher{Type: vtype, Voucher: &cbg.Deferred{Raw: voucherBytes}})
Expand All @@ -72,41 +88,54 @@ var ChannelEvents = fsm.Events{
internal.EncodedVoucherResult{Type: vtype, VoucherResult: &cbg.Deferred{Raw: voucherResultBytes}})
return nil
}),

fsm.Event(datatransfer.PauseInitiator).
FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.InitiatorPaused).
From(datatransfer.ResponderPaused).To(datatransfer.BothPaused).
FromAny().ToJustRecord(),

fsm.Event(datatransfer.PauseResponder).
FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.ResponderPaused).
From(datatransfer.InitiatorPaused).To(datatransfer.BothPaused).
FromAny().ToJustRecord(),

fsm.Event(datatransfer.ResumeInitiator).
From(datatransfer.InitiatorPaused).To(datatransfer.Ongoing).
From(datatransfer.BothPaused).To(datatransfer.ResponderPaused).
FromAny().ToJustRecord(),

fsm.Event(datatransfer.ResumeResponder).
From(datatransfer.ResponderPaused).To(datatransfer.Ongoing).
From(datatransfer.BothPaused).To(datatransfer.InitiatorPaused).
From(datatransfer.Finalizing).To(datatransfer.Completing).
FromAny().ToJustRecord(),

// The transfer has finished on the local node - all data was sent / received
fsm.Event(datatransfer.FinishTransfer).
FromAny().To(datatransfer.TransferFinished).
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
From(datatransfer.ResponderCompleted).To(datatransfer.Completing).
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderFinalizingTransferFinished),

fsm.Event(datatransfer.ResponderBeginsFinalization).
FromAny().To(datatransfer.ResponderFinalizing).
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
From(datatransfer.TransferFinished).To(datatransfer.ResponderFinalizingTransferFinished),

// The remote peer sent a Complete message, meaning it has sent / received all data
fsm.Event(datatransfer.ResponderCompletes).
FromAny().To(datatransfer.ResponderCompleted).
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
From(datatransfer.ResponderPaused).To(datatransfer.ResponderFinalizing).
From(datatransfer.TransferFinished).To(datatransfer.Completing).
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderCompleted).
From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing),

fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing),

// Both the local node and the remote peer have completed the transfer
fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing),

fsm.Event(datatransfer.CleanupComplete).
From(datatransfer.Cancelling).To(datatransfer.Cancelled).
From(datatransfer.Failing).To(datatransfer.Failed).
Expand Down
16 changes: 9 additions & 7 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
Expand All @@ -37,7 +38,7 @@ func TestChannels(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
received := make(chan event)
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
received <- event{evt, chst}
Expand Down Expand Up @@ -127,7 +128,7 @@ func TestChannels(t *testing.T) {
})

t.Run("updating send/receive values", func(t *testing.T) {
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
dir := os.TempDir()
cidLists, err := cidlists.NewCIDLists(dir)
require.NoError(t, err)
Expand Down Expand Up @@ -302,7 +303,7 @@ func TestChannels(t *testing.T) {
})

t.Run("test disconnected", func(t *testing.T) {
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
received := make(chan event)
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
received <- event{evt, chst}
Expand All @@ -320,10 +321,11 @@ func TestChannels(t *testing.T) {
state := checkEvent(ctx, t, received, datatransfer.Open)
require.Equal(t, datatransfer.Requested, state.Status())

err = channelList.Disconnected(chid)
disconnectErr := xerrors.Errorf("disconnected")
err = channelList.Disconnected(chid, disconnectErr)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.Disconnected)
require.Equal(t, datatransfer.ErrDisconnected.Error(), state.Message())
require.Equal(t, disconnectErr.Error(), state.Message())
})

t.Run("test self peer and other peer", func(t *testing.T) {
Expand Down Expand Up @@ -364,7 +366,7 @@ func TestMigrationsV0(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
received := make(chan event)
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
received <- event{evt, chst}
Expand Down Expand Up @@ -484,7 +486,7 @@ func TestMigrationsV1(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
received := make(chan event)
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
received <- event{evt, chst}
Expand Down
6 changes: 0 additions & 6 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,3 @@ const ErrRejected = errorType("response rejected")

// ErrUnsupported indicates an operation is not supported by the transport protocol
const ErrUnsupported = errorType("unsupported")

// ErrDisconnected indicates the other peer may have hung up and you should try restarting the channel.
const ErrDisconnected = errorType("other peer appears to have hung up. restart Channel")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ErrDisconnected and ErrRemoved are no longer used (with this PR we try to reconnect automatically)


// ErrRemoved indicates the channel was inactive long enough that it was put in a permaneant error state
const ErrRemoved = errorType("channel removed due to inactivity")
8 changes: 8 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ const (
// the remote peer. It is used to measure progress of how much of the total
// data has been received.
DataReceivedProgress

// RequestTimedOut indicates that the transport layer had a timeout trying to
// make a request
RequestTimedOut

// SendDataError indicates that the transport layer had an error trying
// to send data to the remote peer
SendDataError
)

// Events are human readable names for data transfer events
Expand Down
3 changes: 0 additions & 3 deletions impl/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,5 @@ func (ce *channelEnvironment) ID() peer.ID {
}

func (ce *channelEnvironment) CleanupChannel(chid datatransfer.ChannelID) {
ce.m.reconnectsLk.Lock()
delete(ce.m.reconnects, chid)
ce.m.reconnectsLk.Unlock()
ce.m.transport.CleanupChannel(chid)
}
Loading