Skip to content

Commit

Permalink
feat: better reconnect behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Mar 17, 2021
1 parent 42e0a5b commit 7e770d6
Show file tree
Hide file tree
Showing 15 changed files with 867 additions and 577 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

18 changes: 16 additions & 2 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,22 @@ func (c *Channels) Error(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.Error, err)
}

func (c *Channels) Disconnected(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.Disconnected)
// Disconnected indicates that the connection went down and it was not possible
// to restart it
func (c *Channels) Disconnected(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.Disconnected, err)
}

// RequestTimedOut indicates that the transport layer had a timeout trying to
// make a request
func (c *Channels) RequestTimedOut(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.RequestTimedOut, err)
}

// SendDataError indicates that the transport layer had an error trying
// to send data to the remote peer
func (c *Channels) SendDataError(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.SendDataError, err)
}

// HasChannel returns true if the given channel id is being tracked
Expand Down
33 changes: 31 additions & 2 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ var transferringStates = []fsm.StateKey{

// ChannelEvents describe the events taht can
var ChannelEvents = fsm.Events{
// Open a channel
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested),

// Remote peer has accepted the Open channel request
fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing),

fsm.Event(datatransfer.Restart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.Message = ""
return nil
Expand All @@ -52,15 +56,27 @@ var ChannelEvents = fsm.Events{
chst.Queued += delta
return nil
}),
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.Message = datatransfer.ErrDisconnected.Error()

fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
return nil
}),

fsm.Event(datatransfer.SendDataError).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
return nil
}),

fsm.Event(datatransfer.RequestTimedOut).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
return nil
}),

fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
return nil
}),

fsm.Event(datatransfer.NewVoucher).FromAny().ToNoChange().
Action(func(chst *internal.ChannelState, vtype datatransfer.TypeIdentifier, voucherBytes []byte) error {
chst.Vouchers = append(chst.Vouchers, internal.EncodedVoucher{Type: vtype, Voucher: &cbg.Deferred{Raw: voucherBytes}})
Expand All @@ -72,41 +88,54 @@ var ChannelEvents = fsm.Events{
internal.EncodedVoucherResult{Type: vtype, VoucherResult: &cbg.Deferred{Raw: voucherResultBytes}})
return nil
}),

fsm.Event(datatransfer.PauseInitiator).
FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.InitiatorPaused).
From(datatransfer.ResponderPaused).To(datatransfer.BothPaused).
FromAny().ToJustRecord(),

fsm.Event(datatransfer.PauseResponder).
FromMany(datatransfer.Requested, datatransfer.Ongoing).To(datatransfer.ResponderPaused).
From(datatransfer.InitiatorPaused).To(datatransfer.BothPaused).
FromAny().ToJustRecord(),

fsm.Event(datatransfer.ResumeInitiator).
From(datatransfer.InitiatorPaused).To(datatransfer.Ongoing).
From(datatransfer.BothPaused).To(datatransfer.ResponderPaused).
FromAny().ToJustRecord(),

fsm.Event(datatransfer.ResumeResponder).
From(datatransfer.ResponderPaused).To(datatransfer.Ongoing).
From(datatransfer.BothPaused).To(datatransfer.InitiatorPaused).
From(datatransfer.Finalizing).To(datatransfer.Completing).
FromAny().ToJustRecord(),

// The transfer has finished on the local node - all data was sent / received
fsm.Event(datatransfer.FinishTransfer).
FromAny().To(datatransfer.TransferFinished).
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
From(datatransfer.ResponderCompleted).To(datatransfer.Completing).
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderFinalizingTransferFinished),

fsm.Event(datatransfer.ResponderBeginsFinalization).
FromAny().To(datatransfer.ResponderFinalizing).
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
From(datatransfer.TransferFinished).To(datatransfer.ResponderFinalizingTransferFinished),

// The remote peer sent a Complete message, meaning it has sent / received all data
fsm.Event(datatransfer.ResponderCompletes).
FromAny().To(datatransfer.ResponderCompleted).
FromMany(datatransfer.Failing, datatransfer.Cancelling).ToJustRecord().
From(datatransfer.ResponderPaused).To(datatransfer.ResponderFinalizing).
From(datatransfer.TransferFinished).To(datatransfer.Completing).
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderCompleted).
From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing),

fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing),

// Both the local node and the remote peer have completed the transfer
fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing),

fsm.Event(datatransfer.CleanupComplete).
From(datatransfer.Cancelling).To(datatransfer.Cancelled).
From(datatransfer.Failing).To(datatransfer.Failed).
Expand Down
16 changes: 9 additions & 7 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
Expand All @@ -37,7 +38,7 @@ func TestChannels(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
received := make(chan event)
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
received <- event{evt, chst}
Expand Down Expand Up @@ -127,7 +128,7 @@ func TestChannels(t *testing.T) {
})

t.Run("updating send/receive values", func(t *testing.T) {
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
dir := os.TempDir()
cidLists, err := cidlists.NewCIDLists(dir)
require.NoError(t, err)
Expand Down Expand Up @@ -302,7 +303,7 @@ func TestChannels(t *testing.T) {
})

t.Run("test disconnected", func(t *testing.T) {
ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
received := make(chan event)
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
received <- event{evt, chst}
Expand All @@ -320,10 +321,11 @@ func TestChannels(t *testing.T) {
state := checkEvent(ctx, t, received, datatransfer.Open)
require.Equal(t, datatransfer.Requested, state.Status())

err = channelList.Disconnected(chid)
disconnectErr := xerrors.Errorf("disconnected")
err = channelList.Disconnected(chid, disconnectErr)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.Disconnected)
require.Equal(t, datatransfer.ErrDisconnected.Error(), state.Message())
require.Equal(t, disconnectErr.Error(), state.Message())
})

t.Run("test self peer and other peer", func(t *testing.T) {
Expand Down Expand Up @@ -364,7 +366,7 @@ func TestMigrationsV0(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
received := make(chan event)
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
received <- event{evt, chst}
Expand Down Expand Up @@ -484,7 +486,7 @@ func TestMigrationsV1(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

ds := datastore.NewMapDatastore()
ds := dss.MutexWrap(datastore.NewMapDatastore())
received := make(chan event)
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
received <- event{evt, chst}
Expand Down
6 changes: 0 additions & 6 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,3 @@ const ErrRejected = errorType("response rejected")

// ErrUnsupported indicates an operation is not supported by the transport protocol
const ErrUnsupported = errorType("unsupported")

// ErrDisconnected indicates the other peer may have hung up and you should try restarting the channel.
const ErrDisconnected = errorType("other peer appears to have hung up. restart Channel")

// ErrRemoved indicates the channel was inactive long enough that it was put in a permaneant error state
const ErrRemoved = errorType("channel removed due to inactivity")
8 changes: 8 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ const (
// the remote peer. It is used to measure progress of how much of the total
// data has been received.
DataReceivedProgress

// RequestTimedOut indicates that the transport layer had a timeout trying to
// make a request
RequestTimedOut

// SendDataError indicates that the transport layer had an error trying
// to send data to the remote peer
SendDataError
)

// Events are human readable names for data transfer events
Expand Down
3 changes: 0 additions & 3 deletions impl/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,5 @@ func (ce *channelEnvironment) ID() peer.ID {
}

func (ce *channelEnvironment) CleanupChannel(chid datatransfer.ChannelID) {
ce.m.reconnectsLk.Lock()
delete(ce.m.reconnects, chid)
ce.m.reconnectsLk.Unlock()
ce.m.transport.CleanupChannel(chid)
}
Loading

0 comments on commit 7e770d6

Please sign in to comment.