Skip to content

Commit

Permalink
Use context.CancelFunc instead of func() (#257)
Browse files Browse the repository at this point in the history
Where context cancellation function is referenced in struct types use
existing `context.CancelFunc` to represent.

Fixes #223
  • Loading branch information
masih authored and rvagg committed Oct 26, 2021
1 parent b1d455e commit 245a7bd
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 8 deletions.
6 changes: 6 additions & 0 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/responseassembler"
"github.com/ipfs/go-graphsync/taskqueue"
)

// The code in this file implements the public interface of the response manager.
Expand Down Expand Up @@ -157,6 +158,7 @@ type ResponseManager struct {
connManager network.ConnManager
// maximum number of links to traverse per request. A value of zero = infinity, or no limit
maxLinksPerRequest uint64
taskQueue *taskqueue.WorkerTaskQueue
}

// New creates a new response manager for responding to requests
Expand All @@ -179,6 +181,7 @@ func New(ctx context.Context,
ctx, cancelFn := context.WithCancel(ctx)
messages := make(chan responseManagerMessage, 16)
workSignal := make(chan struct{}, 1)
taskQueue := taskqueue.NewTaskQueue(ctx)
rm := &ResponseManager{
ctx: ctx,
cancelFn: cancelFn,
Expand All @@ -198,6 +201,7 @@ func New(ctx context.Context,
maxInProcessRequests: maxInProcessRequests,
connManager: connManager,
maxLinksPerRequest: maxLinksPerRequest,
taskQueue: taskQueue,
}
rm.qe = &queryExecutor{
blockHooks: blockHooks,
Expand Down Expand Up @@ -295,6 +299,8 @@ func (rm *ResponseManager) send(message responseManagerMessage, done <-chan stru

// Startup starts processing for the WantManager.
func (rm *ResponseManager) Startup() {
// maxInProgressResponses := gsConfig.maxInProgressResponses
rm.taskQueue.Startup(rm.maxInProcessRequests, rm.qe)
go rm.run()
}

Expand Down
27 changes: 27 additions & 0 deletions responsemanager/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type queryExecutor struct {
connManager network.ConnManager
}

/*
func (qe *queryExecutor) processQueriesWorker() {
const targetWork = 1
taskDataChan := make(chan ResponseTaskData)
Expand Down Expand Up @@ -83,6 +84,32 @@ func (qe *queryExecutor) processQueriesWorker() {
}
}
}
*/

func (qe *queryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask.Task) bool {
taskDataChan := make(chan ResponseTaskData)
var taskData ResponseTaskData
qe.manager.StartTask(task, taskDataChan)
select {
case taskData = <-taskDataChan:
case <-qe.ctx.Done():
return true
}
if taskData.Empty {
log.Info("Empty task on peer request stack")
return false
}
log.Debugw("beginning response execution", "id", taskData.Request.ID(), "peer", pid.String(), "root_cid", taskData.Request.Root().String())
_, err := qe.executeQuery(pid, taskData.Request, taskData.Loader, taskData.Traverser, taskData.Signals, taskData.Subscriber)
isCancelled := err != nil && isContextErr(err)
if isCancelled {
qe.connManager.Unprotect(pid, taskData.Request.ID().Tag())
qe.cancelledListeners.NotifyCancelledListeners(pid, taskData.Request)
}
qe.manager.FinishTask(task, err)
log.Debugw("finishing response execution", "id", taskData.Request.ID(), "peer", pid.String(), "root_cid", taskData.Request.Root().String())
return false
}

func (qe *queryExecutor) executeQuery(
p peer.ID,
Expand Down
23 changes: 15 additions & 8 deletions responsemanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ func (rm *ResponseManager) cleanupInProcessResponses() {

func (rm *ResponseManager) run() {
defer rm.cleanupInProcessResponses()
for i := uint64(0); i < rm.maxInProcessRequests; i++ {
go rm.qe.processQueriesWorker()
}
/*
for i := uint64(0); i < rm.maxInProcessRequests; i++ {
go rm.qe.processQueriesWorker()
}
*/

for {
select {
Expand Down Expand Up @@ -101,7 +103,8 @@ func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.Request
return nil
})
}
rm.queryQueue.PushTasks(p, peertask.Task{Topic: key, Priority: math.MaxInt32, Work: 1})
rm.taskQueue.PushTask(p, peertask.Task{Topic: key, Priority: math.MaxInt32, Work: 1})
// rm.queryQueue.PushTasks(p, peertask.Task{Topic: key, Priority: math.MaxInt32, Work: 1})
select {
case rm.workSignal <- struct{}{}:
default:
Expand All @@ -111,7 +114,8 @@ func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.Request

func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID, err error) error {
key := responseKey{p, requestID}
rm.queryQueue.Remove(key, key.p)
rm.taskQueue.Remove(key, key.p)
// rm.queryQueue.Remove(key, key.p)
response, ok := rm.inProgressResponses[key]
if !ok {
return errors.New("could not find request")
Expand Down Expand Up @@ -181,7 +185,8 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync
}
// TODO: Use a better work estimation metric.

rm.queryQueue.PushTasks(p, peertask.Task{Topic: key, Priority: int(request.Priority()), Work: 1})
rm.taskQueue.PushTask(p, peertask.Task{Topic: key, Priority: int(request.Priority()), Work: 1})
// rm.queryQueue.PushTasks(p, peertask.Task{Topic: key, Priority: int(request.Priority()), Work: 1})

select {
case rm.workSignal <- struct{}{}:
Expand Down Expand Up @@ -217,14 +222,16 @@ func (rm *ResponseManager) startTask(task *peertask.Task) ResponseTaskData {
key := task.Topic.(responseKey)
taskData := rm.taskDataForKey(key)
if taskData.Empty {
rm.queryQueue.TasksDone(key.p, task)
rm.taskQueue.TaskDone(key.p, task)
// rm.queryQueue.TasksDone(key.p, task)
}
return taskData
}

func (rm *ResponseManager) finishTask(task *peertask.Task, err error) {
key := task.Topic.(responseKey)
rm.queryQueue.TasksDone(key.p, task)
rm.taskQueue.TaskDone(key.p, task)
// rm.queryQueue.TasksDone(key.p, task)
response, ok := rm.inProgressResponses[key]
if !ok {
return
Expand Down
5 changes: 5 additions & 0 deletions taskqueue/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func (tq *WorkerTaskQueue) Stats() graphsync.RequestStats {
}
}

// Remove removes a task from the execution queue
func (tq *WorkerTaskQueue) Remove(topic peertask.Topic, p peer.ID) {
tq.peerTaskQueue.Remove(topic, p)
}

// Startup runs the given number of task workers with the given executor
func (tq *WorkerTaskQueue) Startup(workerCount uint64, executor Executor) {
for i := uint64(0); i < workerCount; i++ {
Expand Down

0 comments on commit 245a7bd

Please sign in to comment.