Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Feat/debug accept message error #210

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
# go-data-transfer changelog

# go-data-transfer 1.5.0

Support the data transfer being restarted.

- github.com/filecoin-project/go-data-transfer:
- Add isRestart param to validators (#197) ([filecoin-project/go-data-transfer#197](https://github.com/filecoin-project/go-data-transfer/pull/197))
- fix: flaky TestChannelMonitorAutoRestart (#198) ([filecoin-project/go-data-transfer#198](https://github.com/filecoin-project/go-data-transfer/pull/198))
- Channel monitor watches for errors instead of measuring data rate (#190) ([filecoin-project/go-data-transfer#190](https://github.com/filecoin-project/go-data-transfer/pull/190))
- fix: prevent concurrent restarts for same channel (#195) ([filecoin-project/go-data-transfer#195](https://github.com/filecoin-project/go-data-transfer/pull/195))
- fix: channel state machine event handling (#194) ([filecoin-project/go-data-transfer#194](https://github.com/filecoin-project/go-data-transfer/pull/194))
- Dont double count data sent (#185) ([filecoin-project/go-data-transfer#185](https://github.com/filecoin-project/go-data-transfer/pull/185))
- github.com/ipfs/go-graphsync (v0.6.0 -> v0.6.1):
- feat: fire network error when network disconnects during request (#164) ([ipfs/go-graphsync#164](https://github.com/ipfs/go-graphsync/pull/164))

Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| dirkmc | 8 | +1235/-868 | 37 |
| Dirk McCormick | 1 | +11/-0 | 1 |

# go-data-transfer 1.4.3

- github.com/filecoin-project/go-data-transfer:
Expand Down
2 changes: 1 addition & 1 deletion channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (mc *monitoredChannel) watchForResponderComplete() {
case <-timer.C:
// Timer expired before we received a Complete message from the responder
err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer",
mc.chid, mc.cfg.AcceptTimeout)
mc.chid, mc.cfg.CompleteTimeout)
mc.closeChannelAndShutdown(err)
}
}
Expand Down
20 changes: 20 additions & 0 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,29 +161,40 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatra
}

func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error {
log.Infof("channel %s: received response %+v from provider", chid, response)

if response.IsCancel() {
log.Infof("channel %s: received cancel response, cancelling channel", chid)
return m.channels.Cancel(chid)
}

if response.IsVoucherResult() {
log.Infof("channel %s: received response %+v from provider is a voucher result", chid, response)
if !response.EmptyVoucherResult() {
log.Debug("processing non-empty voucher result")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add the chid as part of this logline, so that we can correlate to which channel this refers?

vresult, err := m.decodeVoucherResult(response)
if err != nil {
log.Errorf("channel %s:, failed to decode voucher result, err=%s", chid, err)
return err
}
log.Infof("channel %s: received voucher response %+v", chid, vresult)
err = m.channels.NewVoucherResult(chid, vresult)
if err != nil {
log.Errorf("channel %s: failed NewVoucherResult, err=%s ", chid, err)
return err
}
}

if !response.Accepted() {
log.Infof("channel %s: received rejected response, erroring out channel", chid)
return m.channels.Error(chid, datatransfer.ErrRejected)
}

if response.IsNew() {
log.Infof("channel %s: received new response, accepting channel", chid)
err := m.channels.Accept(chid)
if err != nil {
log.Errorf("channel %s: failed to accept new response, err=%s", chid, err)
return err
}
}
Expand All @@ -196,16 +207,21 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat
}
}
}

if response.IsComplete() && response.Accepted() {
if !response.IsPaused() {
log.Infof("channel %s: received complete response, completing channel", chid)
return m.channels.ResponderCompletes(chid)
}

log.Infof("channel %s: received complete response but responder is paused", chid)

err := m.channels.ResponderBeginsFinalization(chid)
if err != nil {
return nil
}
}

if response.IsPaused() {
return m.pauseOther(chid)
}
Expand Down Expand Up @@ -432,6 +448,10 @@ func (m *manager) validateVoucher(
}

result, err := validatorFunc(isRestart, sender, vouch, baseCid, stor)
if isPull {
log.Infof("\n ValidatePull, result=%s, err=%s", result, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Not sure why we have \n inside a log.Infof

}

return vouch, result, err
}

Expand Down
12 changes: 11 additions & 1 deletion transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,9 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook
// when a DT request comes in on graphsync, it's a pull
chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: p, Responder: t.peerID}
request := msg.(datatransfer.Request)
log.Debugf("will validate recieved gs request, chid=%s, request=%+v", chid, request)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Debugf("will validate recieved gs request, chid=%s, request=%+v", chid, request)
log.Debugf("will validate received gs request, chid=%s, request=%+v", chid, request)

responseMessage, err = t.events.OnRequestReceived(chid, request)
log.Debugf("will send response message %+v for request gs chid=%s, error/pause/resume value=%s", responseMessage, chid, err)
} else {
// when a DT response comes in on graphsync, it's a push
chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: t.peerID, Responder: p}
Expand All @@ -604,15 +606,18 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook
extensions, extensionErr := extension.ToExtensionData(responseMessage, t.supportedExtensions)
if extensionErr != nil {
hookActions.TerminateWithError(err)
log.Errorf("terminated client gs request chid=%s with extension err=%s", chid, err)
return
}
for _, extension := range extensions {
log.Debugf("queued up extension %+v for response, gs chid=%s", extension, chid)
hookActions.SendExtensionData(extension)
}
}

if err != nil && err != datatransfer.ErrPause {
hookActions.TerminateWithError(err)
log.Errorf("terminated client gs request chid=%s with err=%s", chid, err)
return
}

Expand All @@ -632,6 +637,7 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook
hasXferStarted, isRestart := t.channelXferStarted[chid]
if isRestart && !hasXferStarted && !paused {
paused = true
log.Debugf("pausing responder for request gs chid=%s, even though validator sent no-op as it's a restart req", chid)
hookActions.PauseResponse()
}
t.channelXferStarted[chid] = !paused
Expand Down Expand Up @@ -820,7 +826,11 @@ func (t *Transport) processExtension(chid datatransfer.ChannelID, gsMsg extensio
}

dtResponse := msg.(datatransfer.Response)
return nil, t.events.OnResponseReceived(chid, dtResponse)
err = t.events.OnResponseReceived(chid, dtResponse)
if err != nil {
log.Errorf("\n error receieved from OnResponseReceived is %s", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here - why \n at the start?

}
return nil, err
}

func (t *Transport) gsRequestorCancelledListener(p peer.ID, request graphsync.RequestData) {
Expand Down