From 245a7bdcab9243c0ef442827edc09e892134d6bb Mon Sep 17 00:00:00 2001 From: "Masih H. Derkani" Date: Tue, 26 Oct 2021 15:17:54 +0100 Subject: [PATCH] Use `context.CancelFunc` instead of `func()` (#257) Where context cancellation function is referenced in struct types use existing `context.CancelFunc` to represent. Fixes #223 --- responsemanager/client.go | 6 ++++++ responsemanager/queryexecutor.go | 27 +++++++++++++++++++++++++++ responsemanager/server.go | 23 +++++++++++++++-------- taskqueue/taskqueue.go | 5 +++++ 4 files changed, 53 insertions(+), 8 deletions(-) diff --git a/responsemanager/client.go b/responsemanager/client.go index 5212e2ad..02f06024 100644 --- a/responsemanager/client.go +++ b/responsemanager/client.go @@ -17,6 +17,7 @@ import ( "github.com/ipfs/go-graphsync/notifications" "github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync/responsemanager/responseassembler" + "github.com/ipfs/go-graphsync/taskqueue" ) // The code in this file implements the public interface of the response manager. @@ -157,6 +158,7 @@ type ResponseManager struct { connManager network.ConnManager // maximum number of links to traverse per request. A value of zero = infinity, or no limit maxLinksPerRequest uint64 + taskQueue *taskqueue.WorkerTaskQueue } // New creates a new response manager for responding to requests @@ -179,6 +181,7 @@ func New(ctx context.Context, ctx, cancelFn := context.WithCancel(ctx) messages := make(chan responseManagerMessage, 16) workSignal := make(chan struct{}, 1) + taskQueue := taskqueue.NewTaskQueue(ctx) rm := &ResponseManager{ ctx: ctx, cancelFn: cancelFn, @@ -198,6 +201,7 @@ func New(ctx context.Context, maxInProcessRequests: maxInProcessRequests, connManager: connManager, maxLinksPerRequest: maxLinksPerRequest, + taskQueue: taskQueue, } rm.qe = &queryExecutor{ blockHooks: blockHooks, @@ -295,6 +299,8 @@ func (rm *ResponseManager) send(message responseManagerMessage, done <-chan stru // Startup starts processing for the WantManager. func (rm *ResponseManager) Startup() { + // maxInProgressResponses := gsConfig.maxInProgressResponses + rm.taskQueue.Startup(rm.maxInProcessRequests, rm.qe) go rm.run() } diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index cd82ceab..3adcef52 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -43,6 +43,7 @@ type queryExecutor struct { connManager network.ConnManager } +/* func (qe *queryExecutor) processQueriesWorker() { const targetWork = 1 taskDataChan := make(chan ResponseTaskData) @@ -83,6 +84,32 @@ func (qe *queryExecutor) processQueriesWorker() { } } } +*/ + +func (qe *queryExecutor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask.Task) bool { + taskDataChan := make(chan ResponseTaskData) + var taskData ResponseTaskData + qe.manager.StartTask(task, taskDataChan) + select { + case taskData = <-taskDataChan: + case <-qe.ctx.Done(): + return true + } + if taskData.Empty { + log.Info("Empty task on peer request stack") + return false + } + log.Debugw("beginning response execution", "id", taskData.Request.ID(), "peer", pid.String(), "root_cid", taskData.Request.Root().String()) + _, err := qe.executeQuery(pid, taskData.Request, taskData.Loader, taskData.Traverser, taskData.Signals, taskData.Subscriber) + isCancelled := err != nil && isContextErr(err) + if isCancelled { + qe.connManager.Unprotect(pid, taskData.Request.ID().Tag()) + qe.cancelledListeners.NotifyCancelledListeners(pid, taskData.Request) + } + qe.manager.FinishTask(task, err) + log.Debugw("finishing response execution", "id", taskData.Request.ID(), "peer", pid.String(), "root_cid", taskData.Request.Root().String()) + return false +} func (qe *queryExecutor) executeQuery( p peer.ID, diff --git a/responsemanager/server.go b/responsemanager/server.go index a2a675b7..0e229af8 100644 --- a/responsemanager/server.go +++ b/responsemanager/server.go @@ -27,9 +27,11 @@ func (rm *ResponseManager) cleanupInProcessResponses() { func (rm *ResponseManager) run() { defer rm.cleanupInProcessResponses() - for i := uint64(0); i < rm.maxInProcessRequests; i++ { - go rm.qe.processQueriesWorker() - } + /* + for i := uint64(0); i < rm.maxInProcessRequests; i++ { + go rm.qe.processQueriesWorker() + } + */ for { select { @@ -101,7 +103,8 @@ func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.Request return nil }) } - rm.queryQueue.PushTasks(p, peertask.Task{Topic: key, Priority: math.MaxInt32, Work: 1}) + rm.taskQueue.PushTask(p, peertask.Task{Topic: key, Priority: math.MaxInt32, Work: 1}) + // rm.queryQueue.PushTasks(p, peertask.Task{Topic: key, Priority: math.MaxInt32, Work: 1}) select { case rm.workSignal <- struct{}{}: default: @@ -111,7 +114,8 @@ func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.Request func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID, err error) error { key := responseKey{p, requestID} - rm.queryQueue.Remove(key, key.p) + rm.taskQueue.Remove(key, key.p) + // rm.queryQueue.Remove(key, key.p) response, ok := rm.inProgressResponses[key] if !ok { return errors.New("could not find request") @@ -181,7 +185,8 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync } // TODO: Use a better work estimation metric. - rm.queryQueue.PushTasks(p, peertask.Task{Topic: key, Priority: int(request.Priority()), Work: 1}) + rm.taskQueue.PushTask(p, peertask.Task{Topic: key, Priority: int(request.Priority()), Work: 1}) + // rm.queryQueue.PushTasks(p, peertask.Task{Topic: key, Priority: int(request.Priority()), Work: 1}) select { case rm.workSignal <- struct{}{}: @@ -217,14 +222,16 @@ func (rm *ResponseManager) startTask(task *peertask.Task) ResponseTaskData { key := task.Topic.(responseKey) taskData := rm.taskDataForKey(key) if taskData.Empty { - rm.queryQueue.TasksDone(key.p, task) + rm.taskQueue.TaskDone(key.p, task) + // rm.queryQueue.TasksDone(key.p, task) } return taskData } func (rm *ResponseManager) finishTask(task *peertask.Task, err error) { key := task.Topic.(responseKey) - rm.queryQueue.TasksDone(key.p, task) + rm.taskQueue.TaskDone(key.p, task) + // rm.queryQueue.TasksDone(key.p, task) response, ok := rm.inProgressResponses[key] if !ok { return diff --git a/taskqueue/taskqueue.go b/taskqueue/taskqueue.go index c818c9f8..4acf2abc 100644 --- a/taskqueue/taskqueue.go +++ b/taskqueue/taskqueue.go @@ -70,6 +70,11 @@ func (tq *WorkerTaskQueue) Stats() graphsync.RequestStats { } } +// Remove removes a task from the execution queue +func (tq *WorkerTaskQueue) Remove(topic peertask.Topic, p peer.ID) { + tq.peerTaskQueue.Remove(topic, p) +} + // Startup runs the given number of task workers with the given executor func (tq *WorkerTaskQueue) Startup(workerCount uint64, executor Executor) { for i := uint64(0); i < workerCount; i++ {