Skip to content

Commit

Permalink
Accept/Reject requests up front (#384)
Browse files Browse the repository at this point in the history
* feat(responsemanager): move request validation to front

it makes much more sense to reject requests early than put them in the processing queue. While there
may be a minimal cost to evaluating a hook, there is no reason to wait to send a rejection

* refactor(listeners): cleanup request processing listeners

* test(responsemanager): verify response procesisng test

add test to verify response processing listerner called

* test(graphsync): add hooks test

add test for hooks process (issue caught and fixed in process) and address PR comments
  • Loading branch information
hannahhoward authored Jun 25, 2022
1 parent 35ac375 commit bc1b9b0
Show file tree
Hide file tree
Showing 13 changed files with 487 additions and 401 deletions.
28 changes: 10 additions & 18 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ type BlockData interface {
// IncomingRequestHookActions are actions that a request hook can take to change
// behavior for the response
type IncomingRequestHookActions interface {
AugmentContext(func(reqCtx context.Context) context.Context)
SendExtensionData(ExtensionData)
UsePersistenceOption(name string)
UseLinkTargetNodePrototypeChooser(traversal.LinkTargetNodePrototypeChooser)
Expand Down Expand Up @@ -323,16 +324,6 @@ type RequestUpdatedHookActions interface {
UnpauseResponse()
}

// RequestQueuedHookActions are actions that can be taken in a request queued hook to
// change execution of the response
type RequestQueuedHookActions interface {
AugmentContext(func(reqCtx context.Context) context.Context)
}

// OnIncomingRequestQueuedHook is a hook that runs each time a new incoming request is added to the responder's task queue.
// It receives the peer that sent the request and all data about the request.
type OnIncomingRequestQueuedHook func(p peer.ID, request RequestData, hookActions RequestQueuedHookActions)

// OnIncomingRequestHook is a hook that runs each time a new request is received.
// It receives the peer that sent the request and all data about the request.
// It receives an interface for customizing the response to this request
Expand Down Expand Up @@ -381,9 +372,9 @@ type OnReceiverNetworkErrorListener func(p peer.ID, err error)
// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response
type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode)

// OnOutgoingRequestProcessingListener is called when a request actually begins processing (reaches
// the top of the outgoing request queue)
type OnOutgoingRequestProcessingListener func(p peer.ID, request RequestData, inProgressRequestCount int)
// OnRequestProcessingListener is called when a request actually begins processing (reaches
// the top of the request queue)
type OnRequestProcessingListener func(p peer.ID, request RequestData, inProgressRequestCount int)

// OnRequestorCancelledListener provides a way to listen for responses the requestor canncels
type OnRequestorCancelledListener func(p peer.ID, request RequestData)
Expand Down Expand Up @@ -480,9 +471,6 @@ type GraphExchange interface {
// UnregisterPersistenceOption unregisters an alternate loader/storer combo
UnregisterPersistenceOption(name string) error

// RegisterIncomingRequestQueuedHook adds a hook that runs when a new incoming request is added to the responder's task queue.
RegisterIncomingRequestQueuedHook(hook OnIncomingRequestQueuedHook) UnregisterHookFunc

// RegisterIncomingRequestHook adds a hook that runs when a request is received
RegisterIncomingRequestHook(hook OnIncomingRequestHook) UnregisterHookFunc

Expand All @@ -501,9 +489,13 @@ type GraphExchange interface {
// RegisterRequestUpdatedHook adds a hook that runs every time an update to a request is received
RegisterRequestUpdatedHook(hook OnRequestUpdatedHook) UnregisterHookFunc

// RegisterOutgoingRequestProcessingListener adds a listener that gets called when a request actually begins processing (reaches
// RegisterOutgoingRequestProcessingListener adds a listener that gets called when an outgoing request actually begins processing (reaches
// the top of the outgoing request queue)
RegisterOutgoingRequestProcessingListener(listener OnRequestProcessingListener) UnregisterHookFunc

// RegisterOutgoingRequestProcessingListener adds a listener that gets called when an incoming request actually begins processing (reaches
// the top of the outgoing request queue)
RegisterOutgoingRequestProcessingListener(listener OnOutgoingRequestProcessingListener) UnregisterHookFunc
RegisterIncomingRequestProcessingListener(listener OnRequestProcessingListener) UnregisterHookFunc

// RegisterCompletedResponseListener adds a listener on the responder for completed responses
RegisterCompletedResponseListener(listener OnResponseCompletedListener) UnregisterHookFunc
Expand Down
69 changes: 35 additions & 34 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ type GraphSync struct {
requestExecutor *executor.Executor
responseAssembler *responseassembler.ResponseAssembler
peerManager *peermanager.PeerMessageManager
incomingRequestQueuedHooks *responderhooks.IncomingRequestQueuedHooks
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
outgoingRequestProcessingListeners *listeners.OutgoingRequestProcessingListeners
incomingRequestProcessingListeners *listeners.RequestProcessingListeners
outgoingRequestProcessingListeners *listeners.RequestProcessingListeners
completedResponseListeners *listeners.CompletedResponseListeners
requestorCancelledListeners *listeners.RequestorCancelledListeners
blockSentListeners *listeners.BlockSentListeners
Expand Down Expand Up @@ -224,9 +224,9 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingBlockHooks := requestorhooks.NewBlockHooks()
networkErrorListeners := listeners.NewNetworkErrorListeners()
receiverErrorListeners := listeners.NewReceiverNetworkErrorListeners()
outgoingRequestProcessingListeners := listeners.NewOutgoingRequestProcessingListeners()
outgoingRequestProcessingListeners := listeners.NewRequestProcessingListeners()
incomingRequestProcessingListeners := listeners.NewRequestProcessingListeners()
persistenceOptions := persistenceoptions.New()
requestQueuedHooks := responderhooks.NewRequestQueuedHooks()
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
Expand Down Expand Up @@ -255,7 +255,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
ctx,
linkSystem,
responseAssembler,
requestQueuedHooks,
incomingRequestProcessingListeners,
incomingRequestHooks,
requestUpdatedHooks,
completedResponseListeners,
Expand All @@ -273,32 +273,33 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestUpdatedHooks,
)
graphSync := &GraphSync{
network: network,
linkSystem: linkSystem,
requestManager: requestManager,
responseManager: responseManager,
queryExecutor: queryExecutor,
responseQueue: responseQueue,
requestQueue: requestQueue,
requestExecutor: requestExecutor,
responseAssembler: responseAssembler,
peerManager: peerManager,
incomingRequestQueuedHooks: requestQueuedHooks,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
completedResponseListeners: completedResponseListeners,
requestorCancelledListeners: requestorCancelledListeners,
blockSentListeners: blockSentListeners,
networkErrorListeners: networkErrorListeners,
receiverErrorListeners: receiverErrorListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
incomingBlockHooks: incomingBlockHooks,
persistenceOptions: persistenceOptions,
ctx: ctx,
cancel: cancel,
responseAllocator: responseAllocator,
network: network,
linkSystem: linkSystem,
requestManager: requestManager,
responseManager: responseManager,
queryExecutor: queryExecutor,
responseQueue: responseQueue,
requestQueue: requestQueue,
requestExecutor: requestExecutor,
responseAssembler: responseAssembler,
peerManager: peerManager,
incomingRequestProcessingListeners: incomingRequestProcessingListeners,
outgoingRequestProcessingListeners: outgoingRequestProcessingListeners,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
completedResponseListeners: completedResponseListeners,
requestorCancelledListeners: requestorCancelledListeners,
blockSentListeners: blockSentListeners,
networkErrorListeners: networkErrorListeners,
receiverErrorListeners: receiverErrorListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
incomingBlockHooks: incomingBlockHooks,
persistenceOptions: persistenceOptions,
ctx: ctx,
cancel: cancel,
responseAllocator: responseAllocator,
}

requestManager.SetDelegate(peerManager)
Expand Down Expand Up @@ -335,8 +336,8 @@ func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingReques

// RegisterIncomingRequestQueuedHook adds a hook that runs when a new incoming request is added
// to the responder's task queue.
func (gs *GraphSync) RegisterIncomingRequestQueuedHook(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc {
return gs.incomingRequestQueuedHooks.Register(hook)
func (gs *GraphSync) RegisterIncomingRequestProcessingListener(listener graphsync.OnRequestProcessingListener) graphsync.UnregisterHookFunc {
return gs.incomingRequestProcessingListeners.Register(listener)
}

// RegisterIncomingResponseHook adds a hook that runs when a response is received
Expand Down Expand Up @@ -371,7 +372,7 @@ func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedH

// RegisterOutgoingRequestProcessingListener adds a listener that gets called when a request actually begins processing (reaches
// the top of the outgoing request queue)
func (gs *GraphSync) RegisterOutgoingRequestProcessingListener(listener graphsync.OnOutgoingRequestProcessingListener) graphsync.UnregisterHookFunc {
func (gs *GraphSync) RegisterOutgoingRequestProcessingListener(listener graphsync.OnRequestProcessingListener) graphsync.UnregisterHookFunc {
return gs.outgoingRequestProcessingListeners.Register(listener)
}

Expand Down
Loading

0 comments on commit bc1b9b0

Please sign in to comment.