diff --git a/CHANGELOG.md b/CHANGELOG.md index f5c42ce4..43bfdf9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,26 @@ # go-data-transfer changelog +# go-data-transfer 1.5.0 + +Support the data transfer being restarted. + +- github.com/filecoin-project/go-data-transfer: + - Add isRestart param to validators (#197) ([filecoin-project/go-data-transfer#197](https://github.com/filecoin-project/go-data-transfer/pull/197)) + - fix: flaky TestChannelMonitorAutoRestart (#198) ([filecoin-project/go-data-transfer#198](https://github.com/filecoin-project/go-data-transfer/pull/198)) + - Channel monitor watches for errors instead of measuring data rate (#190) ([filecoin-project/go-data-transfer#190](https://github.com/filecoin-project/go-data-transfer/pull/190)) + - fix: prevent concurrent restarts for same channel (#195) ([filecoin-project/go-data-transfer#195](https://github.com/filecoin-project/go-data-transfer/pull/195)) + - fix: channel state machine event handling (#194) ([filecoin-project/go-data-transfer#194](https://github.com/filecoin-project/go-data-transfer/pull/194)) + - Dont double count data sent (#185) ([filecoin-project/go-data-transfer#185](https://github.com/filecoin-project/go-data-transfer/pull/185)) +- github.com/ipfs/go-graphsync (v0.6.0 -> v0.6.1): + - feat: fire network error when network disconnects during request (#164) ([ipfs/go-graphsync#164](https://github.com/ipfs/go-graphsync/pull/164)) + +Contributors + +| Contributor | Commits | Lines ± | Files Changed | +|-------------|---------|---------|---------------| +| dirkmc | 8 | +1235/-868 | 37 | +| Dirk McCormick | 1 | +11/-0 | 1 | + # go-data-transfer 1.4.3 - github.com/filecoin-project/go-data-transfer: diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index 5c53624b..e5138e1f 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -105,7 +105,12 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) *monitore } m.lk.Lock() - defer m.lk.Unlock() + log.Debugf("aquired lock to create channel monitor for channelID=%s", chid) + defer func() { + log.Debugf("will release channel monitor lock for channelID=%s", chid) + m.lk.Unlock() + log.Debugf("released channel monitor lock for channelID=%s", chid) + }() // Check if there is already a monitor for this channel if _, ok := m.channels[chid]; ok { @@ -118,8 +123,10 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) *monitore return nil } + log.Debugf("will create channel monitor for channelID=%s", chid) mpc := newMonitoredChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown) m.channels[chid] = mpc + log.Debugf("created channel monitor for channelID=%s", chid) return mpc } @@ -229,6 +236,8 @@ func (mc *monitoredChannel) start() { // Watch to make sure the responder accepts the channel in time cancelAcceptTimer := mc.watchForResponderAccept() + log.Debugf("finished creating timer for accept messages, channelID=%s", mc.chid) + // Watch for data-transfer channel events mc.unsub = mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { if channelState.ChannelID() != mc.chid { @@ -308,7 +317,7 @@ func (mc *monitoredChannel) watchForResponderComplete() { case <-timer.C: // Timer expired before we received a Complete message from the responder err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer", - mc.chid, mc.cfg.AcceptTimeout) + mc.chid, mc.cfg.CompleteTimeout) mc.closeChannelAndShutdown(err) } } @@ -438,12 +447,14 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error { log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start)) // Send a restart message for the channel - restartResult := mc.waitForRestartResponse() log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount) err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid) if err != nil { return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err) } + log.Infof("%s: sent restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount) + + restartResult := mc.waitForRestartResponse() // The restart message is fire and forget, so we need to watch for a // restart response to know that the restart message reached the peer. diff --git a/impl/events.go b/impl/events.go index abcadf3f..8a153f08 100644 --- a/impl/events.go +++ b/impl/events.go @@ -161,29 +161,40 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatra } func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error { + log.Infof("channel %s: received response %+v from provider", chid, response) + if response.IsCancel() { log.Infof("channel %s: received cancel response, cancelling channel", chid) return m.channels.Cancel(chid) } + if response.IsVoucherResult() { + log.Infof("channel %s: received response %+v from provider is a voucher result", chid, response) if !response.EmptyVoucherResult() { + log.Debugf("channel %s: processing non-empty voucher result", chid) vresult, err := m.decodeVoucherResult(response) if err != nil { + log.Errorf("channel %s:, failed to decode voucher result, err=%s", chid, err) return err } + log.Infof("channel %s: received voucher response %+v", chid, vresult) err = m.channels.NewVoucherResult(chid, vresult) if err != nil { + log.Errorf("channel %s: failed NewVoucherResult, err=%s ", chid, err) return err } } + if !response.Accepted() { log.Infof("channel %s: received rejected response, erroring out channel", chid) return m.channels.Error(chid, datatransfer.ErrRejected) } + if response.IsNew() { log.Infof("channel %s: received new response, accepting channel", chid) err := m.channels.Accept(chid) if err != nil { + log.Errorf("channel %s: failed to accept new response, err=%s", chid, err) return err } } @@ -196,16 +207,21 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat } } } + if response.IsComplete() && response.Accepted() { if !response.IsPaused() { log.Infof("channel %s: received complete response, completing channel", chid) return m.channels.ResponderCompletes(chid) } + + log.Infof("channel %s: received complete response but responder is paused", chid) + err := m.channels.ResponderBeginsFinalization(chid) if err != nil { return nil } } + if response.IsPaused() { return m.pauseOther(chid) } @@ -432,6 +448,10 @@ func (m *manager) validateVoucher( } result, err := validatorFunc(isRestart, sender, vouch, baseCid, stor) + if isPull { + log.Infof("ValidatePull, result=%s, err=%s", result, err) + } + return vouch, result, err } diff --git a/impl/impl.go b/impl/impl.go index b0210f04..17ee89f6 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -204,20 +204,18 @@ func (m *manager) OpenPushDataChannel(ctx context.Context, requestTo peer.ID, vo transportConfigurer(chid, voucher, m.transport) } m.dataTransferNetwork.Protect(requestTo, chid.String()) - monitoredChan := m.channelMonitor.AddPushChannel(chid) + if err := m.dataTransferNetwork.SendMessage(ctx, requestTo, req); err != nil { err = fmt.Errorf("Unable to send request: %w", err) _ = m.channels.Error(chid, err) - - // If push channel monitoring is enabled, shutdown the monitor as it - // wasn't possible to start the data transfer - if monitoredChan != nil { - monitoredChan.Shutdown() - } - return chid, err } + log.Debugf("sent push request message, channelID=%s", chid) + + m.channelMonitor.AddPushChannel(chid) + log.Infof("started new channel monitor for push request, channelID=%s", chid) + return chid, nil } @@ -242,19 +240,18 @@ func (m *manager) OpenPullDataChannel(ctx context.Context, requestTo peer.ID, vo transportConfigurer(chid, voucher, m.transport) } m.dataTransferNetwork.Protect(requestTo, chid.String()) - monitoredChan := m.channelMonitor.AddPullChannel(chid) + if err := m.transport.OpenChannel(ctx, requestTo, chid, cidlink.Link{Cid: baseCid}, selector, nil, req); err != nil { err = fmt.Errorf("Unable to send request: %w", err) _ = m.channels.Error(chid, err) - // If pull channel monitoring is enabled, shutdown the monitor as it - // wasn't possible to start the data transfer - if monitoredChan != nil { - monitoredChan.Shutdown() - } - return chid, err } + + log.Debugf("sent pull channel request channelID=%s", chid) + m.channelMonitor.AddPullChannel(chid) + log.Infof("started new channel monitor for pull request: channelID=%s", chid) + return chid, nil } diff --git a/network/libp2p_impl.go b/network/libp2p_impl.go index c9b13b2b..74d4e822 100644 --- a/network/libp2p_impl.go +++ b/network/libp2p_impl.go @@ -154,11 +154,13 @@ func (dtnet *libp2pDataTransferNetwork) SendMessage( ctx context.Context, p peer.ID, outgoing datatransfer.Message) error { + log.Debugf("opening stream to peer %s to send message %+v", p, outgoing) s, err := dtnet.openStream(ctx, p, dtnet.dtProtocols...) if err != nil { return err } + log.Debugf("finished opening stream to peer %s to send message %+v", p, outgoing) outgoing, err = outgoing.MessageForProtocol(s.Protocol()) if err != nil { diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index a5bbb6ab..64a902a6 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -592,7 +592,9 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook // when a DT request comes in on graphsync, it's a pull chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: p, Responder: t.peerID} request := msg.(datatransfer.Request) + log.Debugf("will validate received gs request, chid=%s, request=%+v", chid, request) responseMessage, err = t.events.OnRequestReceived(chid, request) + log.Debugf("will send response message %+v for request gs chid=%s, error/pause/resume value=%s", responseMessage, chid, err) } else { // when a DT response comes in on graphsync, it's a push chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: t.peerID, Responder: p} @@ -604,15 +606,18 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook extensions, extensionErr := extension.ToExtensionData(responseMessage, t.supportedExtensions) if extensionErr != nil { hookActions.TerminateWithError(err) + log.Errorf("terminated client gs request chid=%s with extension err=%s", chid, err) return } for _, extension := range extensions { + log.Debugf("queued up extension %+v for response, gs chid=%s", extension, chid) hookActions.SendExtensionData(extension) } } if err != nil && err != datatransfer.ErrPause { hookActions.TerminateWithError(err) + log.Errorf("terminated client gs request chid=%s with err=%s", chid, err) return } @@ -632,6 +637,7 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook hasXferStarted, isRestart := t.channelXferStarted[chid] if isRestart && !hasXferStarted && !paused { paused = true + log.Debugf("pausing responder for request gs chid=%s, even though validator sent no-op as it's a restart req", chid) hookActions.PauseResponse() } t.channelXferStarted[chid] = !paused @@ -820,7 +826,11 @@ func (t *Transport) processExtension(chid datatransfer.ChannelID, gsMsg extensio } dtResponse := msg.(datatransfer.Response) - return nil, t.events.OnResponseReceived(chid, dtResponse) + err = t.events.OnResponseReceived(chid, dtResponse) + if err != nil { + log.Errorf("error receieved from OnResponseReceived is %s", err) + } + return nil, err } func (t *Transport) gsRequestorCancelledListener(p peer.ID, request graphsync.RequestData) {