From 1c39fe6201ab3ff8d85ef2a2f80d291644ac69d4 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Tue, 21 Apr 2020 16:41:58 -0700 Subject: [PATCH] feat(responsemanager): add listener for completed responses (#64) --- graphsync.go | 6 ++ impl/graphsync.go | 10 ++- impl/graphsync_test.go | 12 ++++ responsemanager/hooks/completedlisteners.go | 53 +++++++++++++++ .../peerresponsemanager/peerresponsesender.go | 5 +- responsemanager/responsemanager.go | 37 +++++++---- responsemanager/responsemanager_test.go | 64 +++++++++++++------ 7 files changed, 151 insertions(+), 36 deletions(-) create mode 100644 responsemanager/hooks/completedlisteners.go diff --git a/graphsync.go b/graphsync.go index 12ca81e3..3c7c0b46 100644 --- a/graphsync.go +++ b/graphsync.go @@ -217,6 +217,9 @@ type OnOutgoingBlockHook func(p peer.ID, request RequestData, block BlockData, h // It receives an interface to taking further action on the response type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest RequestData, hookActions RequestUpdatedHookActions) +// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response +type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode) + // UnregisterHookFunc is a function call to unregister a hook that was previously registered type UnregisterHookFunc func() @@ -243,6 +246,9 @@ type GraphExchange interface { // RegisterRequestUpdatedHook adds a hook that runs every time an update to a request is received RegisterRequestUpdatedHook(hook OnRequestUpdatedHook) UnregisterHookFunc + // RegisterCompletedResponseListener adds a listener on the responder for completed responses + RegisterCompletedResponseListener(listener OnResponseCompletedListener) UnregisterHookFunc + // UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID UnpauseResponse(peer.ID, RequestID) error } diff --git a/impl/graphsync.go b/impl/graphsync.go index 742057da..08949930 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -41,6 +41,7 @@ type GraphSync struct { incomingRequestHooks *responderhooks.IncomingRequestHooks outgoingBlockHooks *responderhooks.OutgoingBlockHooks requestUpdatedHooks *responderhooks.RequestUpdatedHooks + completedResponseListeners *responderhooks.CompletedResponseListeners incomingResponseHooks *requestorhooks.IncomingResponseHooks outgoingRequestHooks *requestorhooks.OutgoingRequestHooks persistenceOptions *persistenceoptions.PersistenceOptions @@ -84,7 +85,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions) outgoingBlockHooks := responderhooks.NewBlockHooks() requestUpdatedHooks := responderhooks.NewUpdateHooks() - responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks) + completedResponseListeners := responderhooks.NewCompletedResponseListeners() + responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners) unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth)) graphSync := &GraphSync{ network: network, @@ -97,6 +99,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, incomingRequestHooks: incomingRequestHooks, outgoingBlockHooks: outgoingBlockHooks, requestUpdatedHooks: requestUpdatedHooks, + completedResponseListeners: completedResponseListeners, incomingResponseHooks: incomingResponseHooks, outgoingRequestHooks: outgoingRequestHooks, peerTaskQueue: peerTaskQueue, @@ -161,6 +164,11 @@ func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedH return gs.requestUpdatedHooks.Register(hook) } +// RegisterCompletedResponseListener adds a listener on the responder for completed responses +func (gs *GraphSync) RegisterCompletedResponseListener(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc { + return gs.completedResponseListeners.Register(listener) +} + // UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID) error { return gs.responseManager.UnpauseResponse(p, requestID) diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 0e1b8543..b57ab9e2 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -208,6 +208,13 @@ func TestGraphsyncRoundTrip(t *testing.T) { } }) + finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1) + responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) { + select { + case finalResponseStatusChan <- status: + default: + } + }) progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension) blockChain.VerifyWholeChain(ctx, progressChan) @@ -217,6 +224,11 @@ func TestGraphsyncRoundTrip(t *testing.T) { // verify extension roundtrip require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data") require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data") + + // verify listener + var finalResponseStatus graphsync.ResponseStatusCode + testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status") + require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus) } func TestPauseResume(t *testing.T) { diff --git a/responsemanager/hooks/completedlisteners.go b/responsemanager/hooks/completedlisteners.go new file mode 100644 index 00000000..754e6b5a --- /dev/null +++ b/responsemanager/hooks/completedlisteners.go @@ -0,0 +1,53 @@ +package hooks + +import ( + "sync" + + "github.com/ipfs/go-graphsync" + peer "github.com/libp2p/go-libp2p-core/peer" +) + +type completedListener struct { + key uint64 + listener graphsync.OnResponseCompletedListener +} + +// CompletedResponseListeners is a set of listeners for completed responses +type CompletedResponseListeners struct { + listenersLk sync.RWMutex + nextKey uint64 + listeners []completedListener +} + +// NewCompletedResponseListeners returns a new list of completed response listeners +func NewCompletedResponseListeners() *CompletedResponseListeners { + return &CompletedResponseListeners{} +} + +// Register registers an listener for completed responses +func (crl *CompletedResponseListeners) Register(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc { + crl.listenersLk.Lock() + cl := completedListener{crl.nextKey, listener} + crl.nextKey++ + crl.listeners = append(crl.listeners, cl) + crl.listenersLk.Unlock() + return func() { + crl.listenersLk.Lock() + defer crl.listenersLk.Unlock() + for i, matchListener := range crl.listeners { + if cl.key == matchListener.key { + crl.listeners = append(crl.listeners[:i], crl.listeners[i+1:]...) + return + } + } + } +} + +// NotifyCompletedListeners runs notifies all completed listeners that a response has completed +func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) { + crl.listenersLk.RLock() + defer crl.listenersLk.RUnlock() + for _, listener := range crl.listeners { + listener.listener(p, request, status) + } +} diff --git a/responsemanager/peerresponsemanager/peerresponsesender.go b/responsemanager/peerresponsemanager/peerresponsesender.go index f4fb0a68..89b37dd6 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender.go +++ b/responsemanager/peerresponsemanager/peerresponsesender.go @@ -55,7 +55,7 @@ type PeerResponseSender interface { data []byte, ) graphsync.BlockData SendExtensionData(graphsync.RequestID, graphsync.ExtensionData) - FinishRequest(requestID graphsync.RequestID) + FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) PauseRequest(requestID graphsync.RequestID) } @@ -147,7 +147,7 @@ func (prm *peerResponseSender) SendResponse( } // FinishRequest marks the given requestID as having sent all responses -func (prm *peerResponseSender) FinishRequest(requestID graphsync.RequestID) { +func (prm *peerResponseSender) FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode { prm.linkTrackerLk.Lock() isComplete := prm.linkTracker.FinishRequest(requestID) prm.linkTrackerLk.Unlock() @@ -158,6 +158,7 @@ func (prm *peerResponseSender) FinishRequest(requestID graphsync.RequestID) { status = graphsync.RequestCompletedPartial } prm.finish(requestID, status) + return status } // FinishWithError marks the given requestID as having terminated with an error diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index bddd4110..f4187ec2 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -76,6 +76,11 @@ type UpdateHooks interface { ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult } +// CompletedListeners is an interface for notifying listeners that responses are complete +type CompletedListeners interface { + NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) +} + // PeerManager is an interface that returns sender interfaces for peer responses. type PeerManager interface { SenderForPeer(p peer.ID) peerresponsemanager.PeerResponseSender @@ -96,6 +101,7 @@ type ResponseManager struct { requestHooks RequestHooks blockHooks BlockHooks updateHooks UpdateHooks + completedListeners CompletedListeners messages chan responseManagerMessage workSignal chan struct{} ticker *time.Ticker @@ -110,7 +116,8 @@ func New(ctx context.Context, queryQueue QueryQueue, requestHooks RequestHooks, blockHooks BlockHooks, - updateHooks UpdateHooks) *ResponseManager { + updateHooks UpdateHooks, + completedListeners CompletedListeners) *ResponseManager { ctx, cancelFn := context.WithCancel(ctx) return &ResponseManager{ ctx: ctx, @@ -121,6 +128,7 @@ func New(ctx context.Context, requestHooks: requestHooks, blockHooks: blockHooks, updateHooks: updateHooks, + completedListeners: completedListeners, messages: make(chan responseManagerMessage, 16), workSignal: make(chan struct{}, 1), ticker: time.NewTicker(thawSpeed), @@ -187,8 +195,9 @@ type responseDataRequest struct { } type finishTaskRequest struct { - key responseKey - err error + key responseKey + status graphsync.ResponseStatusCode + err error } type setResponseDataRequest struct { @@ -231,9 +240,9 @@ func (rm *ResponseManager) processQueriesWorker() { case <-rm.ctx.Done(): return } - err := rm.executeTask(key, taskData) + status, err := rm.executeTask(key, taskData) select { - case rm.messages <- &finishTaskRequest{key, err}: + case rm.messages <- &finishTaskRequest{key, status, err}: case <-rm.ctx.Done(): } } @@ -243,18 +252,18 @@ func (rm *ResponseManager) processQueriesWorker() { } -func (rm *ResponseManager) executeTask(key responseKey, taskData *responseTaskData) error { +func (rm *ResponseManager) executeTask(key responseKey, taskData *responseTaskData) (graphsync.ResponseStatusCode, error) { var err error loader := taskData.loader traverser := taskData.traverser if loader == nil || traverser == nil { loader, traverser, err = rm.prepareQuery(taskData.ctx, key.p, taskData.request) if err != nil { - return err + return graphsync.RequestFailedUnknown, err } select { case <-rm.ctx.Done(): - return nil + return graphsync.RequestFailedUnknown, errors.New("context cancelled") case rm.messages <- &setResponseDataRequest{key, loader, traverser}: } } @@ -291,7 +300,7 @@ func (rm *ResponseManager) executeQuery( request gsmsg.GraphSyncRequest, loader ipld.Loader, traverser ipldutil.Traverser, - updateSignal chan struct{}) error { + updateSignal chan struct{}) (graphsync.ResponseStatusCode, error) { updateChan := make(chan []gsmsg.GraphSyncRequest) peerResponseSender := rm.peerManager.SenderForPeer(p) err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error { @@ -314,13 +323,12 @@ func (rm *ResponseManager) executeQuery( if err != nil { if err != hooks.ErrPaused { peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown) - } else { - peerResponseSender.PauseRequest(request.ID()) + return graphsync.RequestFailedUnknown, err } - return err + peerResponseSender.PauseRequest(request.ID()) + return graphsync.RequestPaused, err } - peerResponseSender.FinishRequest(request.ID()) - return nil + return peerResponseSender.FinishRequest(request.ID()), nil } func (rm *ResponseManager) checkForUpdates( @@ -492,6 +500,7 @@ func (ftr *finishTaskRequest) handle(rm *ResponseManager) { response.isPaused = true return } + rm.completedListeners.NotifyCompletedListeners(ftr.key.p, response.request, ftr.status) if ftr.err != nil { log.Infof("response failed: %w", ftr.err) } diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index a88b60cd..01482a8a 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -143,8 +143,9 @@ func (fprs *fakePeerResponseSender) SendExtensionData( fprs.sentExtensions <- sentExtension{requestID, extension} } -func (fprs *fakePeerResponseSender) FinishRequest(requestID graphsync.RequestID) { +func (fprs *fakePeerResponseSender) FinishRequest(requestID graphsync.RequestID) graphsync.ResponseStatusCode { fprs.lastCompletedRequest <- completedRequest{requestID, graphsync.RequestCompletedFull} + return graphsync.RequestCompletedFull } func (fprs *fakePeerResponseSender) FinishWithError(requestID graphsync.RequestID, status graphsync.ResponseStatusCode) { @@ -160,7 +161,7 @@ func TestIncomingQuery(t *testing.T) { defer td.cancel() blks := td.blockChain.AllBlocks() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) td.requestHooks.Register(selectorvalidator.SelectorValidator(100)) responseManager.Startup() @@ -181,7 +182,7 @@ func TestCancellationQueryInProgress(t *testing.T) { td := newTestData(t) defer td.cancel() blks := td.blockChain.AllBlocks() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) td.requestHooks.Register(selectorvalidator.SelectorValidator(100)) responseManager.Startup() responseManager.ProcessRequests(td.ctx, td.p, td.requests) @@ -228,7 +229,7 @@ func TestEarlyCancellation(t *testing.T) { td := newTestData(t) defer td.cancel() td.queryQueue.popWait.Add(1) - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() responseManager.ProcessRequests(td.ctx, td.p, td.requests) @@ -252,7 +253,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("on its own, should fail validation", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() responseManager.ProcessRequests(td.ctx, td.p, td.requests) var lastRequest completedRequest @@ -263,7 +264,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("if non validating hook succeeds, does not pass validation", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.SendExtensionData(td.extensionResponse) @@ -280,7 +281,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("if validating hook succeeds, should pass validation", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -298,7 +299,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("if any hook fails, should fail", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -319,7 +320,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("hooks can be unregistered", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() unregister := td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -349,7 +350,7 @@ func TestValidationAndExtensions(t *testing.T) { defer td.cancel() obs := make(map[ipld.Link][]byte) oloader, _ := testutil.NewTestStore(obs) - responseManager := New(td.ctx, oloader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, oloader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() // add validating hook -- so the request SHOULD succeed td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { @@ -383,7 +384,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("hooks can alter the node builder chooser", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() customChooserCallCount := 0 @@ -426,7 +427,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("can send extension data", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -448,7 +449,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("can send errors", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -465,7 +466,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("can pause/unpause", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -501,7 +502,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("can pause/unpause", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -540,7 +541,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("when unpaused", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -577,7 +578,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("when paused", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -622,7 +623,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("when unpaused", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -656,7 +657,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("when paused", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -699,6 +700,29 @@ func TestValidationAndExtensions(t *testing.T) { }) }) + t.Run("final response status listeners", func(t *testing.T) { + td := newTestData(t) + defer td.cancel() + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) + responseManager.Startup() + td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + hookActions.ValidateRequest() + }) + statusChan := make(chan graphsync.ResponseStatusCode, 1) + td.completedListeners.Register(func(p peer.ID, requestData graphsync.RequestData, status graphsync.ResponseStatusCode) { + select { + case statusChan <- status: + default: + } + }) + responseManager.ProcessRequests(td.ctx, td.p, td.requests) + var lastRequest completedRequest + testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request") + require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed") + var status graphsync.ResponseStatusCode + testutil.AssertReceive(td.ctx, t, statusChan, &status, "should receive status") + require.True(t, gsmsg.IsTerminalSuccessCode(status), "request should succeed") + }) } type testData struct { @@ -730,6 +754,7 @@ type testData struct { requestHooks *hooks.IncomingRequestHooks blockHooks *hooks.OutgoingBlockHooks updateHooks *hooks.RequestUpdatedHooks + completedListeners *hooks.CompletedResponseListeners } func newTestData(t *testing.T) testData { @@ -778,5 +803,6 @@ func newTestData(t *testing.T) testData { td.requestHooks = hooks.NewRequestHooks(td.peristenceOptions) td.blockHooks = hooks.NewBlockHooks() td.updateHooks = hooks.NewUpdateHooks() + td.completedListeners = hooks.NewCompletedResponseListeners() return td }