diff --git a/requestmanager/client.go b/requestmanager/client.go index c18d7bfe..c58cad78 100644 --- a/requestmanager/client.go +++ b/requestmanager/client.go @@ -9,7 +9,6 @@ import ( "github.com/hannahhoward/go-pubsub" blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-peertaskqueue/peertask" "github.com/ipld/go-ipld-prime" @@ -46,23 +45,23 @@ const ( ) type inProgressRequestStatus struct { - ctx context.Context - span trace.Span - startTime time.Time - cancelFn func() - p peer.ID - terminalError error - pauseMessages chan struct{} - state graphsync.RequestState - lastResponse atomic.Value - onTerminated []chan<- error - request gsmsg.GraphSyncRequest - doNotSendCids *cid.Set - nodeStyleChooser traversal.LinkTargetNodePrototypeChooser - inProgressChan chan graphsync.ResponseProgress - inProgressErr chan error - traverser ipldutil.Traverser - traverserCancel context.CancelFunc + ctx context.Context + span trace.Span + startTime time.Time + cancelFn func() + p peer.ID + terminalError error + pauseMessages chan struct{} + state graphsync.RequestState + lastResponse atomic.Value + onTerminated []chan<- error + request gsmsg.GraphSyncRequest + doNotSendFirstBlocks int64 + nodeStyleChooser traversal.LinkTargetNodePrototypeChooser + inProgressChan chan graphsync.ResponseProgress + inProgressErr chan error + traverser ipldutil.Traverser + traverserCancel context.CancelFunc } // PeerHandler is an interface that can send requests to peers diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index 54790345..92516c1e 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -5,11 +5,9 @@ import ( "context" "sync/atomic" - "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-peertaskqueue/peertask" "github.com/ipld/go-ipld-prime" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/traversal" "github.com/libp2p/go-libp2p-core/peer" "go.opentelemetry.io/otel" @@ -17,7 +15,7 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" - "github.com/ipfs/go-graphsync/cidset" + "github.com/ipfs/go-graphsync/donotsendfirstblocks" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/requestmanager/hooks" @@ -102,17 +100,17 @@ func (e *Executor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask. // RequestTask are parameters for a single request execution type RequestTask struct { - Ctx context.Context - Span trace.Span - Request gsmsg.GraphSyncRequest - LastResponse *atomic.Value - DoNotSendCids *cid.Set - PauseMessages <-chan struct{} - Traverser ipldutil.Traverser - P peer.ID - InProgressErr chan error - Empty bool - InitialRequest bool + Ctx context.Context + Span trace.Span + Request gsmsg.GraphSyncRequest + LastResponse *atomic.Value + DoNotSendFirstBlocks int64 + PauseMessages <-chan struct{} + Traverser ipldutil.Traverser + P peer.ID + InProgressErr chan error + Empty bool + InitialRequest bool } func (e *Executor) traverse(rt RequestTask) error { @@ -177,7 +175,6 @@ func (e *Executor) processBlockHooks(p peer.ID, response graphsync.ResponseData, } func (e *Executor) onNewBlock(rt RequestTask, block graphsync.BlockData) error { - rt.DoNotSendCids.Add(block.Link().(cidlink.Link).Cid) response := rt.LastResponse.Load().(gsmsg.GraphSyncResponse) return e.processBlockHooks(rt.P, response, block) } @@ -218,12 +215,16 @@ func (e *Executor) processResult(rt RequestTask, link ipld.Link, result types.As func (e *Executor) startRemoteRequest(rt RequestTask) error { request := rt.Request - if rt.DoNotSendCids.Len() > 0 { - cidsData, err := cidset.EncodeCidSet(rt.DoNotSendCids) + doNotSendFirstBlocks := rt.DoNotSendFirstBlocks + if doNotSendFirstBlocks < int64(rt.Traverser.NBlocksTraversed()) { + doNotSendFirstBlocks = int64(rt.Traverser.NBlocksTraversed()) + } + if doNotSendFirstBlocks > 0 { + doNotSendFirstBlocksData, err := donotsendfirstblocks.EncodeDoNotSendFirstBlocks(doNotSendFirstBlocks) if err != nil { return err } - request = rt.Request.ReplaceExtensions([]graphsync.ExtensionData{{Name: graphsync.ExtensionDoNotSendCIDs, Data: cidsData}}) + request = rt.Request.ReplaceExtensions([]graphsync.ExtensionData{{Name: graphsync.ExtensionsDoNotSendFirstBlocks, Data: doNotSendFirstBlocksData}}) } log.Debugw("starting remote request", "id", rt.Request.ID(), "peer", rt.P.String(), "root_cid", rt.Request.Root().String()) e.manager.SendRequest(rt.P, request) diff --git a/requestmanager/executor/executor_test.go b/requestmanager/executor/executor_test.go index 2119d421..1b181ad1 100644 --- a/requestmanager/executor/executor_test.go +++ b/requestmanager/executor/executor_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - "github.com/ipfs/go-cid" "github.com/ipfs/go-peertaskqueue/peertask" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" @@ -17,7 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/ipfs/go-graphsync" - "github.com/ipfs/go-graphsync/cidset" + "github.com/ipfs/go-graphsync/donotsendfirstblocks" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/requestmanager/executor" @@ -102,9 +101,9 @@ func TestRequestExecutionBlockChain(t *testing.T) { require.EqualError(t, ree.terminalError, hooks.ErrPaused{}.Error()) }, }, - "preexisting do not send cids": { + "preexisting do not send firstBlocks": { configureRequestExecution: func(p peer.ID, requestID graphsync.RequestID, tbc *testutil.TestBlockChain, ree *requestExecutionEnv) { - ree.doNotSendCids.Add(tbc.GenisisLink.(cidlink.Link).Cid) + ree.doNotSendFirstBlocks = 1 }, verifyResults: func(t *testing.T, tbc *testutil.TestBlockChain, ree *requestExecutionEnv, responses []graphsync.ResponseProgress, receivedErrors []error) { tbc.VerifyWholeChainSync(responses) @@ -112,11 +111,11 @@ func TestRequestExecutionBlockChain(t *testing.T) { require.Equal(t, ree.request.ID(), ree.requestsSent[0].request.ID()) require.Equal(t, ree.request.Root(), ree.requestsSent[0].request.Root()) require.Equal(t, ree.request.Selector(), ree.requestsSent[0].request.Selector()) - doNotSendCidsExt, has := ree.requestsSent[0].request.Extension(graphsync.ExtensionDoNotSendCIDs) + doNotSendFirstBlocksData, has := ree.requestsSent[0].request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks) require.True(t, has) - cidSet, err := cidset.DecodeCidSet(doNotSendCidsExt) + doNotSendFirstBlocks, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData) require.NoError(t, err) - require.Equal(t, 1, cidSet.Len()) + require.Equal(t, int64(1), doNotSendFirstBlocks) require.Len(t, ree.blookHooksCalled, 10) require.NoError(t, ree.terminalError) }, @@ -145,11 +144,11 @@ func TestRequestExecutionBlockChain(t *testing.T) { require.Equal(t, ree.request.ID(), ree.requestsSent[0].request.ID()) require.Equal(t, ree.request.Root(), ree.requestsSent[0].request.Root()) require.Equal(t, ree.request.Selector(), ree.requestsSent[0].request.Selector()) - doNotSendCidsExt, has := ree.requestsSent[0].request.Extension(graphsync.ExtensionDoNotSendCIDs) + doNotSendFirstBlocksData, has := ree.requestsSent[0].request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks) require.True(t, has) - cidSet, err := cidset.DecodeCidSet(doNotSendCidsExt) + doNotSendFirstBlocks, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData) require.NoError(t, err) - require.Equal(t, 6, cidSet.Len()) + require.Equal(t, int64(6), doNotSendFirstBlocks) require.Len(t, ree.blookHooksCalled, 10) require.NoError(t, ree.terminalError) }, @@ -202,16 +201,16 @@ func TestRequestExecutionBlockChain(t *testing.T) { defer requestCancel() var responsesReceived []graphsync.ResponseProgress ree := &requestExecutionEnv{ - ctx: requestCtx, - p: p, - pauseMessages: make(chan struct{}, 1), - blockHookResults: make(map[blockHookKey]hooks.UpdateResult), - doNotSendCids: cid.NewSet(), - request: gsmsg.NewRequest(requestID, tbc.TipLink.(cidlink.Link).Cid, tbc.Selector(), graphsync.Priority(rand.Int31())), - fal: fal, - tbc: tbc, - initialRequest: true, - inProgressErr: make(chan error, 1), + ctx: requestCtx, + p: p, + pauseMessages: make(chan struct{}, 1), + blockHookResults: make(map[blockHookKey]hooks.UpdateResult), + doNotSendFirstBlocks: 0, + request: gsmsg.NewRequest(requestID, tbc.TipLink.(cidlink.Link).Cid, tbc.Selector(), graphsync.Priority(rand.Int31())), + fal: fal, + tbc: tbc, + initialRequest: true, + inProgressErr: make(chan error, 1), traverser: ipldutil.TraversalBuilder{ Root: tbc.TipLink, Selector: tbc.Selector(), @@ -276,7 +275,7 @@ type requestExecutionEnv struct { request gsmsg.GraphSyncRequest p peer.ID blockHookResults map[blockHookKey]hooks.UpdateResult - doNotSendCids *cid.Set + doNotSendFirstBlocks int64 pauseMessages chan struct{} externalPause pauseKey loadLocallyUntil int @@ -304,16 +303,16 @@ func (ree *requestExecutionEnv) GetRequestTask(_ peer.ID, _ *peertask.Task, requ lastResponse.Store(gsmsg.NewResponse(ree.request.ID(), graphsync.RequestAcknowledged)) requestExecution := executor.RequestTask{ - Ctx: ree.ctx, - Request: ree.request, - LastResponse: &lastResponse, - DoNotSendCids: ree.doNotSendCids, - PauseMessages: ree.pauseMessages, - Traverser: ree.traverser, - P: ree.p, - InProgressErr: ree.inProgressErr, - Empty: false, - InitialRequest: ree.initialRequest, + Ctx: ree.ctx, + Request: ree.request, + LastResponse: &lastResponse, + DoNotSendFirstBlocks: ree.doNotSendFirstBlocks, + PauseMessages: ree.pauseMessages, + Traverser: ree.traverser, + P: ree.p, + InProgressErr: ree.inProgressErr, + Empty: false, + InitialRequest: ree.initialRequest, } go func() { select { diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index ee5dcbf2..e107ae38 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -15,8 +15,8 @@ import ( "github.com/stretchr/testify/require" "github.com/ipfs/go-graphsync" - "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/dedupkey" + "github.com/ipfs/go-graphsync/donotsendfirstblocks" "github.com/ipfs/go-graphsync/listeners" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/messagequeue" @@ -877,10 +877,10 @@ func TestPauseResume(t *testing.T) { // verify the correct new request with Do-no-send-cids & other extensions resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] - doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs) - doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData) + doNotSendFirstBlocksData, has := resumedRequest.gsr.Extension(graphsync.ExtensionsDoNotSendFirstBlocks) + doNotSendFirstBlocks, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData) require.NoError(t, err) - require.Equal(t, pauseAt, doNotSendCids.Len()) + require.Equal(t, pauseAt, int(doNotSendFirstBlocks)) require.True(t, has) ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1) require.True(t, has) @@ -957,10 +957,10 @@ func TestPauseResumeExternal(t *testing.T) { // verify the correct new request with Do-no-send-cids & other extensions resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] - doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs) - doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData) + doNotSendFirstBlocksData, has := resumedRequest.gsr.Extension(graphsync.ExtensionsDoNotSendFirstBlocks) + doNotSendFirstBlocks, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData) require.NoError(t, err) - require.Equal(t, pauseAt, doNotSendCids.Len()) + require.Equal(t, pauseAt, int(doNotSendFirstBlocks)) require.True(t, has) ext1Data, has := resumedRequest.gsr.Extension(td.extensionName1) require.True(t, has) diff --git a/requestmanager/server.go b/requestmanager/server.go index f8cc47db..63de7a91 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -8,7 +8,6 @@ import ( "time" blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" "github.com/ipfs/go-peertaskqueue/peertask" "github.com/ipfs/go-peertaskqueue/peertracker" "github.com/ipld/go-ipld-prime" @@ -22,8 +21,8 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" - "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/dedupkey" + "github.com/ipfs/go-graphsync/donotsendfirstblocks" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/peerstate" @@ -73,10 +72,10 @@ func (rm *RequestManager) newRequest(parentSpan trace.Span, p peer.ID, root ipld rp, err := rm.singleErrorResponse(err) return request, rp, err } - doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs) - var doNotSendCids *cid.Set + doNotSendFirstBlocksData, has := request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks) + var doNotSendFirstBlocks int64 if has { - doNotSendCids, err = cidset.DecodeCidSet(doNotSendCidsData) + doNotSendFirstBlocks, err = donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -84,23 +83,21 @@ func (rm *RequestManager) newRequest(parentSpan trace.Span, p peer.ID, root ipld rp, err := rm.singleErrorResponse(err) return request, rp, err } - } else { - doNotSendCids = cid.NewSet() } ctx, cancel := context.WithCancel(ctx) requestStatus := &inProgressRequestStatus{ - ctx: ctx, - span: parentSpan, - startTime: time.Now(), - cancelFn: cancel, - p: p, - pauseMessages: make(chan struct{}, 1), - doNotSendCids: doNotSendCids, - request: request, - state: graphsync.Queued, - nodeStyleChooser: hooksResult.CustomChooser, - inProgressChan: make(chan graphsync.ResponseProgress), - inProgressErr: make(chan error), + ctx: ctx, + span: parentSpan, + startTime: time.Now(), + cancelFn: cancel, + p: p, + pauseMessages: make(chan struct{}, 1), + doNotSendFirstBlocks: doNotSendFirstBlocks, + request: request, + state: graphsync.Queued, + nodeStyleChooser: hooksResult.CustomChooser, + inProgressChan: make(chan graphsync.ResponseProgress), + inProgressErr: make(chan error), } requestStatus.lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged)) rm.inProgressRequestStatuses[request.ID()] = requestStatus @@ -157,17 +154,17 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re ipr.state = graphsync.Running return executor.RequestTask{ - Ctx: ipr.ctx, - Span: ipr.span, - Request: ipr.request, - LastResponse: &ipr.lastResponse, - DoNotSendCids: ipr.doNotSendCids, - PauseMessages: ipr.pauseMessages, - Traverser: ipr.traverser, - P: ipr.p, - InProgressErr: ipr.inProgressErr, - InitialRequest: initialRequest, - Empty: false, + Ctx: ipr.ctx, + Span: ipr.span, + Request: ipr.request, + LastResponse: &ipr.lastResponse, + DoNotSendFirstBlocks: ipr.doNotSendFirstBlocks, + PauseMessages: ipr.pauseMessages, + Traverser: ipr.traverser, + P: ipr.p, + InProgressErr: ipr.inProgressErr, + InitialRequest: initialRequest, + Empty: false, } } @@ -259,6 +256,7 @@ func (rm *RequestManager) cancelOnError(requestID graphsync.RequestID, ipr *inPr rm.terminateRequest(requestID, ipr) } else { ipr.cancelFn() + rm.asyncLoader.CompleteResponsesFor(requestID) } }