Skip to content

Commit

Permalink
feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID
Browse files Browse the repository at this point in the history
Closes: #349
  • Loading branch information
rvagg committed Feb 10, 2022
1 parent 586931b commit fde604d
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 156 deletions.
22 changes: 6 additions & 16 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,25 +486,15 @@ type GraphExchange interface {
// RegisterReceiverNetworkErrorListener adds a listener for when errors occur receiving data over the wire
RegisterReceiverNetworkErrorListener(listener OnReceiverNetworkErrorListener) UnregisterHookFunc

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
UnpauseRequest(RequestID, ...ExtensionData) error

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
PauseRequest(RequestID) error
// Pause pauses an in progress request or response (may take 1 or more blocks to process)
Pause(context.Context, RequestID) error

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
// Unpause unpauses a request or response that was paused
// Can also send extensions with unpause
UnpauseResponse(peer.ID, RequestID, ...ExtensionData) error

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
PauseResponse(peer.ID, RequestID) error

// CancelResponse cancels an in progress response
CancelResponse(peer.ID, RequestID) error
Unpause(context.Context, RequestID, ...ExtensionData) error

// CancelRequest cancels an in progress request
CancelRequest(context.Context, RequestID) error
// Cancel cancels an in progress request or response
Cancel(context.Context, RequestID) error

// Stats produces insight on the current state of a graphsync exchange
Stats() Stats
Expand Down
47 changes: 23 additions & 24 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package graphsync

import (
"context"
"errors"
"time"

logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -296,6 +297,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
responseManager.Startup()
responseQueue.Startup(gsConfig.maxInProgressIncomingRequests, queryExecutor)
network.SetDelegate((*graphSyncReceiver)(graphSync))

return graphSync
}

Expand Down Expand Up @@ -402,34 +404,31 @@ func (gs *GraphSync) RegisterReceiverNetworkErrorListener(listener graphsync.OnR
return gs.receiverErrorListeners.Register(listener)
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (gs *GraphSync) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
return gs.requestManager.UnpauseRequest(requestID, extensions...)
}

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
func (gs *GraphSync) PauseRequest(requestID graphsync.RequestID) error {
return gs.requestManager.PauseRequest(requestID)
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
return gs.responseManager.UnpauseResponse(p, requestID, extensions...)
}

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
func (gs *GraphSync) PauseResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.PauseResponse(p, requestID)
// Pause pauses an in progress request or response
func (gs *GraphSync) Pause(ctx context.Context, requestID graphsync.RequestID) error {
var reqNotFound *graphsync.RequestNotFoundErr
if err := gs.requestManager.PauseRequest(ctx, requestID); !errors.As(err, &reqNotFound) {
return err
}
return gs.responseManager.PauseResponse(ctx, requestID)
}

// CancelResponse cancels an in progress response
func (gs *GraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.CancelResponse(p, requestID)
// Unpause unpauses a request or response that was paused
// Can also send extensions with unpause
func (gs *GraphSync) Unpause(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
var reqNotFound *graphsync.RequestNotFoundErr
if err := gs.requestManager.UnpauseRequest(ctx, requestID, extensions...); !errors.As(err, &reqNotFound) {
return err
}
return gs.responseManager.UnpauseResponse(ctx, requestID, extensions...)
}

// CancelRequest cancels an in progress request
func (gs *GraphSync) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error {
// Cancel cancels an in progress request or response
func (gs *GraphSync) Cancel(ctx context.Context, requestID graphsync.RequestID) error {
var reqNotFound *graphsync.RequestNotFoundErr
if err := gs.responseManager.CancelResponse(ctx, requestID); !errors.As(err, &reqNotFound) {
return err
}
return gs.requestManager.CancelRequest(ctx, requestID)
}

Expand Down
6 changes: 3 additions & 3 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ func TestPauseResume(t *testing.T) {
require.Len(t, responderPeerState.IncomingState.Diagnostics(), 0)

requestID := <-requestIDChan
err := responder.UnpauseResponse(td.host1.ID(), requestID)
err := responder.Unpause(ctx, requestID)
require.NoError(t, err)

blockChain.VerifyRemainder(ctx, progressChan, stopPoint)
Expand Down Expand Up @@ -793,7 +793,7 @@ func TestPauseResumeRequest(t *testing.T) {
testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan)

requestID := <-requestIDChan
err := requestor.UnpauseRequest(requestID, td.extensionUpdate)
err := requestor.Unpause(ctx, requestID, td.extensionUpdate)
require.NoError(t, err)

blockChain.VerifyRemainder(ctx, progressChan, stopPoint)
Expand Down Expand Up @@ -1092,7 +1092,7 @@ func TestNetworkDisconnect(t *testing.T) {
require.NoError(t, td.mn.DisconnectPeers(td.host1.ID(), td.host2.ID()))
require.NoError(t, td.mn.UnlinkPeers(td.host1.ID(), td.host2.ID()))
requestID := <-requestIDChan
err := responder.UnpauseResponse(td.host1.ID(), requestID)
err := responder.Unpause(ctx, requestID)
require.NoError(t, err)

testutil.AssertReceive(ctx, t, networkError, &err, "should receive network error")
Expand Down
8 changes: 4 additions & 4 deletions requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,9 @@ func (rm *RequestManager) ProcessResponses(p peer.ID,

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (rm *RequestManager) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
func (rm *RequestManager) UnpauseRequest(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
response := make(chan error, 1)
rm.send(&unpauseRequestMessage{requestID, extensions, response}, nil)
rm.send(&unpauseRequestMessage{requestID, extensions, response}, ctx.Done())
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand All @@ -304,9 +304,9 @@ func (rm *RequestManager) UnpauseRequest(requestID graphsync.RequestID, extensio
}

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
func (rm *RequestManager) PauseRequest(requestID graphsync.RequestID) error {
func (rm *RequestManager) PauseRequest(ctx context.Context, requestID graphsync.RequestID) error {
response := make(chan error, 1)
rm.send(&pauseRequestMessage{requestID, response}, nil)
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand Down
8 changes: 4 additions & 4 deletions requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ func TestPauseResume(t *testing.T) {

// attempt to unpause while request is not paused (note: hook on second block will keep it from
// reaching pause point)
err := td.requestManager.UnpauseRequest(rr.gsr.ID())
err := td.requestManager.UnpauseRequest(ctx, rr.gsr.ID())
require.EqualError(t, err, "request is not paused")
close(holdForResumeAttempt)
// verify responses sent read ONLY for blocks BEFORE the pause
Expand All @@ -834,7 +834,7 @@ func TestPauseResume(t *testing.T) {
td.fal.CleanupRequest(peers[0], rr.gsr.ID())

// unpause
err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
err = td.requestManager.UnpauseRequest(ctx, rr.gsr.ID(), td.extension1, td.extension2)
require.NoError(t, err)

// verify the correct new request with Do-no-send-cids & other extensions
Expand Down Expand Up @@ -875,7 +875,7 @@ func TestPauseResumeExternal(t *testing.T) {
hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
blocksReceived++
if blocksReceived == pauseAt {
err := td.requestManager.PauseRequest(responseData.RequestID())
err := td.requestManager.PauseRequest(ctx, responseData.RequestID())
require.NoError(t, err)
close(holdForPause)
}
Expand Down Expand Up @@ -909,7 +909,7 @@ func TestPauseResumeExternal(t *testing.T) {
td.fal.CleanupRequest(peers[0], rr.gsr.ID())

// unpause
err := td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
err := td.requestManager.UnpauseRequest(ctx, rr.gsr.ID(), td.extension1, td.extension2)
require.NoError(t, err)

// verify the correct new request with Do-no-send-cids & other extensions
Expand Down
6 changes: 3 additions & 3 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, onTermina
if !ok {
if onTerminated != nil {
select {
case onTerminated <- graphsync.RequestNotFoundErr{}:
case onTerminated <- &graphsync.RequestNotFoundErr{}:
case <-rm.ctx.Done():
}
}
Expand Down Expand Up @@ -380,7 +380,7 @@ func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.
func (rm *RequestManager) unpause(id graphsync.RequestID, extensions []graphsync.ExtensionData) error {
inProgressRequestStatus, ok := rm.inProgressRequestStatuses[id]
if !ok {
return graphsync.RequestNotFoundErr{}
return &graphsync.RequestNotFoundErr{}
}
if inProgressRequestStatus.state != graphsync.Paused {
return errors.New("request is not paused")
Expand All @@ -394,7 +394,7 @@ func (rm *RequestManager) unpause(id graphsync.RequestID, extensions []graphsync
func (rm *RequestManager) pause(id graphsync.RequestID) error {
inProgressRequestStatus, ok := rm.inProgressRequestStatuses[id]
if !ok {
return graphsync.RequestNotFoundErr{}
return &graphsync.RequestNotFoundErr{}
}
if inProgressRequestStatus.state == graphsync.Paused {
return errors.New("request is already paused")
Expand Down
34 changes: 15 additions & 19 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type inProgressResponseStatus struct {
ctx context.Context
span trace.Span
cancelFn func()
peer peer.ID
request gsmsg.GraphSyncRequest
loader ipld.BlockReadOpener
traverser ipldutil.Traverser
Expand All @@ -43,11 +44,6 @@ type inProgressResponseStatus struct {
responseStream responseassembler.ResponseStream
}

type responseKey struct {
p peer.ID
requestID graphsync.RequestID
}

// RequestHooks is an interface for processing request hooks
type RequestHooks interface {
ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult
Expand Down Expand Up @@ -107,7 +103,7 @@ type ResponseManager struct {
blockSentListeners BlockSentListeners
networkErrorListeners NetworkErrorListeners
messages chan responseManagerMessage
inProgressResponses map[responseKey]*inProgressResponseStatus
inProgressResponses map[graphsync.RequestID]*inProgressResponseStatus
connManager network.ConnManager
// maximum number of links to traverse per request. A value of zero = infinity, or no limit
maxLinksPerRequest uint64
Expand Down Expand Up @@ -144,7 +140,7 @@ func New(ctx context.Context,
blockSentListeners: blockSentListeners,
networkErrorListeners: networkErrorListeners,
messages: messages,
inProgressResponses: make(map[responseKey]*inProgressResponseStatus),
inProgressResponses: make(map[graphsync.RequestID]*inProgressResponseStatus),
connManager: connManager,
maxLinksPerRequest: maxLinksPerRequest,
responseQueue: responseQueue,
Expand All @@ -158,9 +154,9 @@ func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, reque
}

// UnpauseResponse unpauses a response that was previously paused
func (rm *ResponseManager) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
func (rm *ResponseManager) UnpauseResponse(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
response := make(chan error, 1)
rm.send(&unpauseRequestMessage{p, requestID, response, extensions}, nil)
rm.send(&unpauseRequestMessage{requestID, response, extensions}, ctx.Done())
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand All @@ -170,9 +166,9 @@ func (rm *ResponseManager) UnpauseResponse(p peer.ID, requestID graphsync.Reques
}

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
func (rm *ResponseManager) PauseResponse(p peer.ID, requestID graphsync.RequestID) error {
func (rm *ResponseManager) PauseResponse(ctx context.Context, requestID graphsync.RequestID) error {
response := make(chan error, 1)
rm.send(&pauseRequestMessage{p, requestID, response}, nil)
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand All @@ -182,9 +178,9 @@ func (rm *ResponseManager) PauseResponse(p peer.ID, requestID graphsync.RequestI
}

// CancelResponse cancels an in progress response
func (rm *ResponseManager) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
func (rm *ResponseManager) CancelResponse(ctx context.Context, requestID graphsync.RequestID) error {
response := make(chan error, 1)
rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrCancelledByCommand, response}, nil)
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrCancelledByCommand, response}, ctx.Done())
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand All @@ -209,8 +205,8 @@ func (rm *ResponseManager) StartTask(task *peertask.Task, responseTaskChan chan<
}

// GetUpdates is called to read pending updates for a task and clear them
func (rm *ResponseManager) GetUpdates(p peer.ID, requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) {
rm.send(&responseUpdateRequest{responseKey{p, requestID}, updatesChan}, nil)
func (rm *ResponseManager) GetUpdates(requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) {
rm.send(&responseUpdateRequest{requestID, updatesChan}, nil)
}

// FinishTask marks a task from the task queue as done
Expand All @@ -224,19 +220,19 @@ func (rm *ResponseManager) FinishTask(task *peertask.Task, err error) {
}

// CloseWithNetworkError closes a request due to a network error
func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync.RequestID) {
func (rm *ResponseManager) CloseWithNetworkError(requestID graphsync.RequestID) {
done := make(chan error, 1)
rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrNetworkError, done}, nil)
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrNetworkError, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
}
}

// TerminateRequest indicates a request has finished sending data and should no longer be tracked
func (rm *ResponseManager) TerminateRequest(p peer.ID, requestID graphsync.RequestID) {
func (rm *ResponseManager) TerminateRequest(requestID graphsync.RequestID) {
done := make(chan struct{}, 1)
rm.send(&terminateRequestMessage{p, requestID, done}, nil)
rm.send(&terminateRequestMessage{requestID, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
Expand Down
Loading

0 comments on commit fde604d

Please sign in to comment.