Skip to content

Commit

Permalink
feat(responsemanager): add listener for completed responses (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward authored Apr 21, 2020
1 parent 9ada784 commit 1c39fe6
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 36 deletions.
6 changes: 6 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ type OnOutgoingBlockHook func(p peer.ID, request RequestData, block BlockData, h
// It receives an interface to taking further action on the response
type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest RequestData, hookActions RequestUpdatedHookActions)

// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response
type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode)

// UnregisterHookFunc is a function call to unregister a hook that was previously registered
type UnregisterHookFunc func()

Expand All @@ -243,6 +246,9 @@ type GraphExchange interface {
// RegisterRequestUpdatedHook adds a hook that runs every time an update to a request is received
RegisterRequestUpdatedHook(hook OnRequestUpdatedHook) UnregisterHookFunc

// RegisterCompletedResponseListener adds a listener on the responder for completed responses
RegisterCompletedResponseListener(listener OnResponseCompletedListener) UnregisterHookFunc

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
UnpauseResponse(peer.ID, RequestID) error
}
10 changes: 9 additions & 1 deletion impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type GraphSync struct {
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
completedResponseListeners *responderhooks.CompletedResponseListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
persistenceOptions *persistenceoptions.PersistenceOptions
Expand Down Expand Up @@ -84,7 +85,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks)
completedResponseListeners := responderhooks.NewCompletedResponseListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
Expand All @@ -97,6 +99,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
completedResponseListeners: completedResponseListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
peerTaskQueue: peerTaskQueue,
Expand Down Expand Up @@ -161,6 +164,11 @@ func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedH
return gs.requestUpdatedHooks.Register(hook)
}

// RegisterCompletedResponseListener adds a listener on the responder for completed responses
func (gs *GraphSync) RegisterCompletedResponseListener(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
return gs.completedResponseListeners.Register(listener)
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.UnpauseResponse(p, requestID)
Expand Down
12 changes: 12 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ func TestGraphsyncRoundTrip(t *testing.T) {
}
})

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyWholeChain(ctx, progressChan)
Expand All @@ -217,6 +224,11 @@ func TestGraphsyncRoundTrip(t *testing.T) {
// verify extension roundtrip
require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data")
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")

// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)
}

func TestPauseResume(t *testing.T) {
Expand Down
53 changes: 53 additions & 0 deletions responsemanager/hooks/completedlisteners.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package hooks

import (
"sync"

"github.com/ipfs/go-graphsync"
peer "github.com/libp2p/go-libp2p-core/peer"
)

type completedListener struct {
key uint64
listener graphsync.OnResponseCompletedListener
}

// CompletedResponseListeners is a set of listeners for completed responses
type CompletedResponseListeners struct {
listenersLk sync.RWMutex
nextKey uint64
listeners []completedListener
}

// NewCompletedResponseListeners returns a new list of completed response listeners
func NewCompletedResponseListeners() *CompletedResponseListeners {
return &CompletedResponseListeners{}
}

// Register registers an listener for completed responses
func (crl *CompletedResponseListeners) Register(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
crl.listenersLk.Lock()
cl := completedListener{crl.nextKey, listener}
crl.nextKey++
crl.listeners = append(crl.listeners, cl)
crl.listenersLk.Unlock()
return func() {
crl.listenersLk.Lock()
defer crl.listenersLk.Unlock()
for i, matchListener := range crl.listeners {
if cl.key == matchListener.key {
crl.listeners = append(crl.listeners[:i], crl.listeners[i+1:]...)
return
}
}
}
}

// NotifyCompletedListeners runs notifies all completed listeners that a response has completed
func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
crl.listenersLk.RLock()
defer crl.listenersLk.RUnlock()
for _, listener := range crl.listeners {
listener.listener(p, request, status)
}
}
5 changes: 3 additions & 2 deletions responsemanager/peerresponsemanager/peerresponsesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type PeerResponseSender interface {
data []byte,
) graphsync.BlockData
SendExtensionData(graphsync.RequestID, graphsync.ExtensionData)
FinishRequest(requestID graphsync.RequestID)
FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode
FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode)
PauseRequest(requestID graphsync.RequestID)
}
Expand Down Expand Up @@ -147,7 +147,7 @@ func (prm *peerResponseSender) SendResponse(
}

// FinishRequest marks the given requestID as having sent all responses
func (prm *peerResponseSender) FinishRequest(requestID graphsync.RequestID) {
func (prm *peerResponseSender) FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode {
prm.linkTrackerLk.Lock()
isComplete := prm.linkTracker.FinishRequest(requestID)
prm.linkTrackerLk.Unlock()
Expand All @@ -158,6 +158,7 @@ func (prm *peerResponseSender) FinishRequest(requestID graphsync.RequestID) {
status = graphsync.RequestCompletedPartial
}
prm.finish(requestID, status)
return status
}

// FinishWithError marks the given requestID as having terminated with an error
Expand Down
37 changes: 23 additions & 14 deletions responsemanager/responsemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ type UpdateHooks interface {
ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult
}

// CompletedListeners is an interface for notifying listeners that responses are complete
type CompletedListeners interface {
NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode)
}

// PeerManager is an interface that returns sender interfaces for peer responses.
type PeerManager interface {
SenderForPeer(p peer.ID) peerresponsemanager.PeerResponseSender
Expand All @@ -96,6 +101,7 @@ type ResponseManager struct {
requestHooks RequestHooks
blockHooks BlockHooks
updateHooks UpdateHooks
completedListeners CompletedListeners
messages chan responseManagerMessage
workSignal chan struct{}
ticker *time.Ticker
Expand All @@ -110,7 +116,8 @@ func New(ctx context.Context,
queryQueue QueryQueue,
requestHooks RequestHooks,
blockHooks BlockHooks,
updateHooks UpdateHooks) *ResponseManager {
updateHooks UpdateHooks,
completedListeners CompletedListeners) *ResponseManager {
ctx, cancelFn := context.WithCancel(ctx)
return &ResponseManager{
ctx: ctx,
Expand All @@ -121,6 +128,7 @@ func New(ctx context.Context,
requestHooks: requestHooks,
blockHooks: blockHooks,
updateHooks: updateHooks,
completedListeners: completedListeners,
messages: make(chan responseManagerMessage, 16),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(thawSpeed),
Expand Down Expand Up @@ -187,8 +195,9 @@ type responseDataRequest struct {
}

type finishTaskRequest struct {
key responseKey
err error
key responseKey
status graphsync.ResponseStatusCode
err error
}

type setResponseDataRequest struct {
Expand Down Expand Up @@ -231,9 +240,9 @@ func (rm *ResponseManager) processQueriesWorker() {
case <-rm.ctx.Done():
return
}
err := rm.executeTask(key, taskData)
status, err := rm.executeTask(key, taskData)
select {
case rm.messages <- &finishTaskRequest{key, err}:
case rm.messages <- &finishTaskRequest{key, status, err}:
case <-rm.ctx.Done():
}
}
Expand All @@ -243,18 +252,18 @@ func (rm *ResponseManager) processQueriesWorker() {

}

func (rm *ResponseManager) executeTask(key responseKey, taskData *responseTaskData) error {
func (rm *ResponseManager) executeTask(key responseKey, taskData *responseTaskData) (graphsync.ResponseStatusCode, error) {
var err error
loader := taskData.loader
traverser := taskData.traverser
if loader == nil || traverser == nil {
loader, traverser, err = rm.prepareQuery(taskData.ctx, key.p, taskData.request)
if err != nil {
return err
return graphsync.RequestFailedUnknown, err
}
select {
case <-rm.ctx.Done():
return nil
return graphsync.RequestFailedUnknown, errors.New("context cancelled")
case rm.messages <- &setResponseDataRequest{key, loader, traverser}:
}
}
Expand Down Expand Up @@ -291,7 +300,7 @@ func (rm *ResponseManager) executeQuery(
request gsmsg.GraphSyncRequest,
loader ipld.Loader,
traverser ipldutil.Traverser,
updateSignal chan struct{}) error {
updateSignal chan struct{}) (graphsync.ResponseStatusCode, error) {
updateChan := make(chan []gsmsg.GraphSyncRequest)
peerResponseSender := rm.peerManager.SenderForPeer(p)
err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error {
Expand All @@ -314,13 +323,12 @@ func (rm *ResponseManager) executeQuery(
if err != nil {
if err != hooks.ErrPaused {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
} else {
peerResponseSender.PauseRequest(request.ID())
return graphsync.RequestFailedUnknown, err
}
return err
peerResponseSender.PauseRequest(request.ID())
return graphsync.RequestPaused, err
}
peerResponseSender.FinishRequest(request.ID())
return nil
return peerResponseSender.FinishRequest(request.ID()), nil
}

func (rm *ResponseManager) checkForUpdates(
Expand Down Expand Up @@ -492,6 +500,7 @@ func (ftr *finishTaskRequest) handle(rm *ResponseManager) {
response.isPaused = true
return
}
rm.completedListeners.NotifyCompletedListeners(ftr.key.p, response.request, ftr.status)
if ftr.err != nil {
log.Infof("response failed: %w", ftr.err)
}
Expand Down
Loading

0 comments on commit 1c39fe6

Please sign in to comment.