diff --git a/graphsync.go b/graphsync.go index 8c75e778..4bc2abaf 100644 --- a/graphsync.go +++ b/graphsync.go @@ -190,6 +190,25 @@ type ResponseData interface { // Extension returns the content for an extension on a response, or errors // if extension is not present Extension(name ExtensionName) (datamodel.Node, bool) + + // Metadata returns a copy of the link metadata contained in this response + Metadata() LinkMetadata +} + +// TODO: docs for these new bits + +type LinkAction string + +const ( + LinkActionPresent = LinkAction("present") + + LinkActionMissing = LinkAction("missing") +) + +type LinkMetadataIterator func(cid.Cid, LinkAction) + +type LinkMetadata interface { + Iterate(LinkMetadataIterator) } // BlockData gives information about a block included in a graphsync response diff --git a/message/builder.go b/message/builder.go index 5ea86f92..7e182db4 100644 --- a/message/builder.go +++ b/message/builder.go @@ -111,13 +111,8 @@ func (b *Builder) ScrubResponses(requestIDs []graphsync.RequestID) uint64 { func (b *Builder) Build() (GraphSyncMessage, error) { responses := make(map[graphsync.RequestID]GraphSyncResponse, len(b.outgoingResponses)) for requestID, linkMap := range b.outgoingResponses { - mdRaw := metadata.EncodeMetadata(linkMap) - b.extensions[requestID] = append(b.extensions[requestID], graphsync.ExtensionData{ - Name: graphsync.ExtensionMetadata, - Data: mdRaw, - }) status, isComplete := b.completedResponses[requestID] - responses[requestID] = NewResponse(requestID, responseCode(status, isComplete), b.extensions[requestID]...) + responses[requestID] = NewResponse(requestID, responseCode(status, isComplete), linkMap, b.extensions[requestID]...) } return GraphSyncMessage{ b.requests, responses, b.outgoingBlocks, diff --git a/message/message.go b/message/message.go index f88d335b..ed805b3b 100644 --- a/message/message.go +++ b/message/message.go @@ -15,6 +15,7 @@ import ( "github.com/libp2p/go-msgio" "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/metadata" ) type MessageHandler interface { @@ -64,12 +65,18 @@ func (gsr GraphSyncRequest) String() string { type GraphSyncResponse struct { requestID graphsync.RequestID status graphsync.ResponseStatusCode + metadata metadata.Metadata extensions map[string]datamodel.Node } +type GraphSyncLinkMetadata struct { + linkMetadata metadata.Metadata +} + // String returns a human-readable form of a GraphSyncResponse func (gsr GraphSyncResponse) String() string { extStr := strings.Builder{} + // TODO: metadata for _, name := range gsr.ExtensionNames() { extStr.WriteString(string(name)) extStr.WriteString("|") @@ -101,8 +108,7 @@ func NewMessage( // its contents func (gsm GraphSyncMessage) String() string { cts := make([]string, 0) - for i, req := range gsm.requests { - fmt.Printf("req.String(%v)\n", i) + for _, req := range gsm.requests { cts = append(cts, req.String()) } for _, resp := range gsm.responses { @@ -134,6 +140,10 @@ func NewUpdateRequest(id graphsync.RequestID, extensions ...graphsync.ExtensionD return newRequest(id, cid.Cid{}, nil, 0, false, true, toExtensionsMap(extensions)) } +func NewGraphSyncLinkMetadata(md metadata.Metadata) GraphSyncLinkMetadata { + return GraphSyncLinkMetadata{md} +} + func toExtensionsMap(extensions []graphsync.ExtensionData) (extensionsMap map[string]datamodel.Node) { if len(extensions) > 0 { extensionsMap = make(map[string]datamodel.Node, len(extensions)) @@ -165,15 +175,21 @@ func newRequest(id graphsync.RequestID, // NewResponse builds a new Graphsync response func NewResponse(requestID graphsync.RequestID, status graphsync.ResponseStatusCode, + md metadata.Metadata, extensions ...graphsync.ExtensionData) GraphSyncResponse { - return newResponse(requestID, status, toExtensionsMap(extensions)) + + return newResponse(requestID, status, md, toExtensionsMap(extensions)) } func newResponse(requestID graphsync.RequestID, - status graphsync.ResponseStatusCode, extensions map[string]datamodel.Node) GraphSyncResponse { + status graphsync.ResponseStatusCode, + responseMetadata metadata.Metadata, + extensions map[string]datamodel.Node) GraphSyncResponse { + return GraphSyncResponse{ requestID: requestID, status: status, + metadata: responseMetadata, extensions: extensions, } } @@ -282,14 +298,18 @@ func (gsr GraphSyncResponse) Status() graphsync.ResponseStatusCode { return gsr. // Extension returns the content for an extension on a response, or errors // if extension is not present func (gsr GraphSyncResponse) Extension(name graphsync.ExtensionName) (datamodel.Node, bool) { - if gsr.extensions == nil { - return nil, false - } - val, ok := gsr.extensions[string(name)] - if !ok { - return nil, false + if name == graphsync.ExtensionMetadata { + return metadata.EncodeMetadata(gsr.metadata), true + } else { + if gsr.extensions == nil { + return nil, false + } + val, ok := gsr.extensions[string(name)] + if !ok { + return nil, false + } + return val, true } - return val, true } // ExtensionNames returns the names of the extensions included in this request @@ -298,9 +318,26 @@ func (gsr GraphSyncResponse) ExtensionNames() []graphsync.ExtensionName { for ext := range gsr.extensions { extNames = append(extNames, graphsync.ExtensionName(ext)) } + if len(gsr.metadata) > 0 { + extNames = append(extNames, graphsync.ExtensionMetadata) + } return extNames } +func (gsr GraphSyncResponse) Metadata() graphsync.LinkMetadata { + return GraphSyncLinkMetadata{gsr.metadata} +} + +func (gslm GraphSyncLinkMetadata) Iterate(iter graphsync.LinkMetadataIterator) { + for _, md := range gslm.linkMetadata { + action := graphsync.LinkActionPresent + if !md.BlockPresent { + action = graphsync.LinkActionMissing + } + iter(md.Link, action) + } +} + // ReplaceExtensions merges the extensions given extensions into the request to create a new request, // but always uses new data func (gsr GraphSyncRequest) ReplaceExtensions(extensions []graphsync.ExtensionData) GraphSyncRequest { diff --git a/message/v1/message.go b/message/v1/message.go index d6cb4515..e5f7fc7d 100644 --- a/message/v1/message.go +++ b/message/v1/message.go @@ -8,6 +8,7 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime/datamodel" pool "github.com/libp2p/go-buffer-pool" "github.com/libp2p/go-libp2p-core/network" @@ -19,8 +20,11 @@ import ( "github.com/ipfs/go-graphsync/ipldutil" "github.com/ipfs/go-graphsync/message" pb "github.com/ipfs/go-graphsync/message/pb" + "github.com/ipfs/go-graphsync/metadata" ) +var log = logging.Logger("graphsync") + type MessagePartWithExtensions interface { ExtensionNames() []graphsync.ExtensionName Extension(name graphsync.ExtensionName) (datamodel.Node, bool) @@ -215,7 +219,18 @@ func (mh *MessageHandler) newMessageFromProto(p peer.ID, pbm *pb.Message) (messa if err != nil { return message.GraphSyncMessage{}, err } - responses[id] = message.NewResponse(id, graphsync.ResponseStatusCode(res.Status), exts...) + var md metadata.Metadata + for _, ext := range exts { + if ext.Name == graphsync.ExtensionMetadata { + var err error + md, err = metadata.DecodeMetadata(ext.Data) + if err != nil { + log.Warnf("Unable to decode metadata in response for request id %d: %w", id, err) + } + } + break + } + responses[id] = message.NewResponse(id, graphsync.ResponseStatusCode(res.Status), md, exts...) } blks := make(map[cid.Cid]blocks.Block, len(pbm.GetData())) diff --git a/message/v2/message.go b/message/v2/message.go index 7c63a3a7..ed793a44 100644 --- a/message/v2/message.go +++ b/message/v2/message.go @@ -7,6 +7,7 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime/codec/dagcbor" "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/node/bindnode" @@ -17,8 +18,11 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/message/ipldbind" + "github.com/ipfs/go-graphsync/metadata" ) +var log = logging.Logger("graphsync") + type MessageHandler struct{} func NewMessageHandler() *MessageHandler { @@ -159,12 +163,20 @@ func (mh *MessageHandler) fromIPLD(ibm *ipldbind.GraphSyncMessage) (message.Grap responses := make(map[graphsync.RequestID]message.GraphSyncResponse, len(ibm.Responses)) for _, res := range ibm.Responses { - // exts := res.Extensions id, err := graphsync.ParseRequestID(res.Id) if err != nil { return message.GraphSyncMessage{}, err } - responses[id] = message.NewResponse(id, graphsync.ResponseStatusCode(res.Status), res.Extensions.ToExtensionsList()...) + mdRaw := res.Extensions.Values[string(graphsync.ExtensionMetadata)] + var md metadata.Metadata + if mdRaw != nil { + md, err = metadata.DecodeMetadata(mdRaw) + if err != nil { + log.Warnf("Unable to decode metadata in response for request id %d: %w", id, err) + } + } + + responses[id] = message.NewResponse(id, graphsync.ResponseStatusCode(res.Status), md, res.Extensions.ToExtensionsList()...) } blks := make(map[cid.Cid]blocks.Block, len(ibm.Blocks)) diff --git a/requestmanager/asyncloader/asyncloader.go b/requestmanager/asyncloader/asyncloader.go index ea478099..b97c1576 100644 --- a/requestmanager/asyncloader/asyncloader.go +++ b/requestmanager/asyncloader/asyncloader.go @@ -15,7 +15,6 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" - "github.com/ipfs/go-graphsync/metadata" "github.com/ipfs/go-graphsync/requestmanager/asyncloader/loadattemptqueue" "github.com/ipfs/go-graphsync/requestmanager/asyncloader/responsecache" "github.com/ipfs/go-graphsync/requestmanager/asyncloader/unverifiedblockstore" @@ -108,7 +107,7 @@ func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOp // neccesary func (al *AsyncLoader) ProcessResponse( ctx context.Context, - responses map[graphsync.RequestID]metadata.Metadata, + responses map[graphsync.RequestID]graphsync.LinkMetadata, blks []blocks.Block) { requestIds := make([]string, 0, len(responses)) @@ -130,7 +129,7 @@ func (al *AsyncLoader) ProcessResponse( for queue, requestIDs := range byQueue { loadAttemptQueue := al.getLoadAttemptQueue(queue) responseCache := al.getResponseCache(queue) - queueResponses := make(map[graphsync.RequestID]metadata.Metadata, len(requestIDs)) + queueResponses := make(map[graphsync.RequestID]graphsync.LinkMetadata, len(requestIDs)) for _, requestID := range requestIDs { queueResponses[requestID] = responses[requestID] } diff --git a/requestmanager/asyncloader/asyncloader_test.go b/requestmanager/asyncloader/asyncloader_test.go index 8b6717fa..2d1e5034 100644 --- a/requestmanager/asyncloader/asyncloader_test.go +++ b/requestmanager/asyncloader/asyncloader_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/metadata" "github.com/ipfs/go-graphsync/requestmanager/types" "github.com/ipfs/go-graphsync/testutil" @@ -38,13 +39,12 @@ func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) { st := newStore() withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { requestID := graphsync.NewRequestID() - responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: { - metadata.Item{ + responses := map[graphsync.RequestID]graphsync.LinkMetadata{ + requestID: message.NewGraphSyncLinkMetadata( + []metadata.Item{{ Link: link.Cid, BlockPresent: true, - }, - }, + }}), } p := testutil.GeneratePeers(1)[0] asyncLoader.ProcessResponse(context.Background(), responses, blocks) @@ -62,13 +62,12 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) { link := testutil.NewTestLink() requestID := graphsync.NewRequestID() - responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: { - metadata.Item{ + responses := map[graphsync.RequestID]graphsync.LinkMetadata{ + requestID: message.NewGraphSyncLinkMetadata( + []metadata.Item{{ Link: link.(cidlink.Link).Cid, BlockPresent: false, - }, - }, + }}), } p := testutil.GeneratePeers(1)[0] asyncLoader.ProcessResponse(context.Background(), responses, nil) @@ -107,13 +106,12 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) { st.AssertAttemptLoadWithoutResult(ctx, t, resultChan) - responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: { - metadata.Item{ + responses := map[graphsync.RequestID]graphsync.LinkMetadata{ + requestID: message.NewGraphSyncLinkMetadata( + []metadata.Item{{ Link: link.Cid, BlockPresent: true, - }, - }, + }}), } asyncLoader.ProcessResponse(context.Background(), responses, blocks) assertSuccessResponse(ctx, t, resultChan) @@ -135,13 +133,12 @@ func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) { st.AssertAttemptLoadWithoutResult(ctx, t, resultChan) - responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: { - metadata.Item{ + responses := map[graphsync.RequestID]graphsync.LinkMetadata{ + requestID: message.NewGraphSyncLinkMetadata( + []metadata.Item{{ Link: link.(cidlink.Link).Cid, BlockPresent: false, - }, - }, + }}), } asyncLoader.ProcessResponse(context.Background(), responses, nil) assertFailResponse(ctx, t, resultChan) @@ -172,13 +169,12 @@ func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) { st := newStore() withLoader(st, func(ctx context.Context, asyncLoader *AsyncLoader) { requestID := graphsync.NewRequestID() - responses := map[graphsync.RequestID]metadata.Metadata{ - requestID: { - metadata.Item{ + responses := map[graphsync.RequestID]graphsync.LinkMetadata{ + requestID: message.NewGraphSyncLinkMetadata( + []metadata.Item{{ Link: link.Cid, BlockPresent: true, - }, - }, + }}), } p := testutil.GeneratePeers(1)[0] asyncLoader.ProcessResponse(context.Background(), responses, blocks) @@ -267,19 +263,17 @@ func TestRequestSplittingSameBlockTwoStores(t *testing.T) { p := testutil.GeneratePeers(1)[0] resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link, ipld.LinkContext{}) resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link, ipld.LinkContext{}) - responses := map[graphsync.RequestID]metadata.Metadata{ - requestID1: { - metadata.Item{ + responses := map[graphsync.RequestID]graphsync.LinkMetadata{ + requestID1: message.NewGraphSyncLinkMetadata( + []metadata.Item{{ Link: link.Cid, BlockPresent: true, - }, - }, - requestID2: { - metadata.Item{ + }}), + requestID2: message.NewGraphSyncLinkMetadata( + []metadata.Item{{ Link: link.Cid, BlockPresent: true, - }, - }, + }}), } asyncLoader.ProcessResponse(context.Background(), responses, blocks) @@ -308,13 +302,12 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) { p := testutil.GeneratePeers(1)[0] resultChan1 := asyncLoader.AsyncLoad(p, requestID1, link, ipld.LinkContext{}) resultChan2 := asyncLoader.AsyncLoad(p, requestID2, link, ipld.LinkContext{}) - responses := map[graphsync.RequestID]metadata.Metadata{ - requestID2: { - metadata.Item{ + responses := map[graphsync.RequestID]graphsync.LinkMetadata{ + requestID2: message.NewGraphSyncLinkMetadata( + []metadata.Item{{ Link: link.Cid, BlockPresent: true, - }, - }, + }}), } asyncLoader.ProcessResponse(context.Background(), responses, blocks) asyncLoader.CompleteResponsesFor(requestID1) diff --git a/requestmanager/asyncloader/responsecache/responsecache.go b/requestmanager/asyncloader/responsecache/responsecache.go index a490178b..ffaab02e 100644 --- a/requestmanager/asyncloader/responsecache/responsecache.go +++ b/requestmanager/asyncloader/responsecache/responsecache.go @@ -5,6 +5,7 @@ import ( "sync" blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" @@ -14,7 +15,6 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/linktracker" - "github.com/ipfs/go-graphsync/metadata" ) var log = logging.Logger("graphsync") @@ -73,7 +73,7 @@ func (rc *ResponseCache) AttemptLoad(requestID graphsync.RequestID, link ipld.Li // and tracking link metadata from a remote peer func (rc *ResponseCache) ProcessResponse( ctx context.Context, - responses map[graphsync.RequestID]metadata.Metadata, + responses map[graphsync.RequestID]graphsync.LinkMetadata, blks []blocks.Block) { ctx, span := otel.Tracer("graphsync").Start(ctx, "cacheProcess", trace.WithAttributes( @@ -90,10 +90,10 @@ func (rc *ResponseCache) ProcessResponse( } for requestID, md := range responses { - for _, item := range md { - log.Debugf("Traverse link %s on request ID %s", item.Link.String(), requestID.String()) - rc.linkTracker.RecordLinkTraversal(requestID, cidlink.Link{Cid: item.Link}, item.BlockPresent) - } + md.Iterate(func(c cid.Cid, la graphsync.LinkAction) { + log.Debugf("Traverse link %s on request ID %s", c.String(), requestID.String()) + rc.linkTracker.RecordLinkTraversal(requestID, cidlink.Link{Cid: c}, la == graphsync.LinkActionPresent) + }) } // prune unused blocks right away diff --git a/requestmanager/asyncloader/responsecache/responsecache_test.go b/requestmanager/asyncloader/responsecache/responsecache_test.go index 7034c13a..a7d85531 100644 --- a/requestmanager/asyncloader/responsecache/responsecache_test.go +++ b/requestmanager/asyncloader/responsecache/responsecache_test.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/metadata" "github.com/ipfs/go-graphsync/testutil" ) @@ -91,9 +92,9 @@ func TestResponseCacheManagingLinks(t *testing.T) { }, } - responses := map[graphsync.RequestID]metadata.Metadata{ - requestID1: request1Metadata, - requestID2: request2Metadata, + responses := map[graphsync.RequestID]graphsync.LinkMetadata{ + requestID1: message.NewGraphSyncLinkMetadata(request1Metadata), + requestID2: message.NewGraphSyncLinkMetadata(request2Metadata), } fubs := &fakeUnverifiedBlockStore{ diff --git a/requestmanager/client.go b/requestmanager/client.go index f579842f..5002f516 100644 --- a/requestmanager/client.go +++ b/requestmanager/client.go @@ -23,7 +23,6 @@ import ( "github.com/ipfs/go-graphsync/listeners" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/messagequeue" - "github.com/ipfs/go-graphsync/metadata" "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/notifications" "github.com/ipfs/go-graphsync/peerstate" @@ -73,8 +72,7 @@ type PeerHandler interface { // results as new responses are processed type AsyncLoader interface { StartRequest(graphsync.RequestID, string) error - ProcessResponse(ctx context.Context, responses map[graphsync.RequestID]metadata.Metadata, - blks []blocks.Block) + ProcessResponse(ctx context.Context, responses map[graphsync.RequestID]graphsync.LinkMetadata, blks []blocks.Block) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult CompleteResponsesFor(requestID graphsync.RequestID) CleanupRequest(p peer.ID, requestID graphsync.RequestID) @@ -286,7 +284,8 @@ func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync // ProcessResponses ingests the given responses from the network and // and updates the in progress requests based on those responses. -func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse, +func (rm *RequestManager) ProcessResponses(p peer.ID, + responses []gsmsg.GraphSyncResponse, blks []blocks.Block) { rm.send(&processResponsesMessage{p, responses, blks}, nil) } diff --git a/requestmanager/executor/executor_test.go b/requestmanager/executor/executor_test.go index 501cbedb..fc0c57f3 100644 --- a/requestmanager/executor/executor_test.go +++ b/requestmanager/executor/executor_test.go @@ -302,7 +302,7 @@ func (ree *requestExecutionEnv) ReleaseRequestTask(_ peer.ID, _ *peertask.Task, func (ree *requestExecutionEnv) GetRequestTask(_ peer.ID, _ *peertask.Task, requestExecutionChan chan executor.RequestTask) { var lastResponse atomic.Value - lastResponse.Store(gsmsg.NewResponse(ree.request.ID(), graphsync.RequestAcknowledged)) + lastResponse.Store(gsmsg.NewResponse(ree.request.ID(), graphsync.RequestAcknowledged, nil)) requestExecution := executor.RequestTask{ Ctx: ree.ctx, diff --git a/requestmanager/hooks/hooks_test.go b/requestmanager/hooks/hooks_test.go index d55c612b..0ee19aef 100644 --- a/requestmanager/hooks/hooks_test.go +++ b/requestmanager/hooks/hooks_test.go @@ -111,7 +111,7 @@ func TestBlockHookProcessing(t *testing.T) { Data: extensionUpdateData, } requestID := graphsync.NewRequestID() - response := gsmsg.NewResponse(requestID, graphsync.PartialResponse, extensionResponse) + response := gsmsg.NewResponse(requestID, graphsync.PartialResponse, nil, extensionResponse) p := testutil.GeneratePeers(1)[0] blockData := testutil.NewFakeBlockData() @@ -208,7 +208,7 @@ func TestResponseHookProcessing(t *testing.T) { Data: extensionUpdateData, } requestID := graphsync.NewRequestID() - response := gsmsg.NewResponse(requestID, graphsync.PartialResponse, extensionResponse) + response := gsmsg.NewResponse(requestID, graphsync.PartialResponse, nil, extensionResponse) p := testutil.GeneratePeers(1)[0] testCases := map[string]struct { diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index d8431b3d..39cd6729 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -8,6 +8,7 @@ import ( "time" blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" @@ -61,18 +62,10 @@ func TestNormalSimultaneousFetch(t *testing.T) { firstBlocks := append(td.blockChain.AllBlocks(), blockChain2.Blocks(0, 3)...) firstMetadata1 := metadataForBlocks(td.blockChain.AllBlocks(), true) - firstMetadataEncoded1 := metadata.EncodeMetadata(firstMetadata1) firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true) - firstMetadataEncoded2 := metadata.EncodeMetadata(firstMetadata2) firstResponses := []gsmsg.GraphSyncResponse{ - gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{ - Name: graphsync.ExtensionMetadata, - Data: firstMetadataEncoded1, - }), - gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, graphsync.ExtensionData{ - Name: graphsync.ExtensionMetadata, - Data: firstMetadataEncoded2, - }), + gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, firstMetadata1), + gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata2), } td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks) @@ -93,12 +86,8 @@ func TestNormalSimultaneousFetch(t *testing.T) { moreBlocks := blockChain2.RemainderBlocks(3) moreMetadata := metadataForBlocks(moreBlocks, true) - moreMetadataEncoded := metadata.EncodeMetadata(moreMetadata) moreResponses := []gsmsg.GraphSyncResponse{ - gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{ - Name: graphsync.ExtensionMetadata, - Data: moreMetadataEncoded, - }), + gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata), } td.requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks) @@ -135,7 +124,7 @@ func TestCancelRequestInProgress(t *testing.T) { td.tcm.AssertProtectedWithTags(t, peers[0], requestRecords[0].gsr.ID().Tag(), requestRecords[1].gsr.ID().Tag()) firstBlocks := td.blockChain.Blocks(0, 3) - firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true) + firstMetadata := metadataForBlocks(firstBlocks, true) firstResponses := []gsmsg.GraphSyncResponse{ gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata), gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata), @@ -153,7 +142,7 @@ func TestCancelRequestInProgress(t *testing.T) { require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID()) moreBlocks := td.blockChain.RemainderBlocks(3) - moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true) + moreMetadata := metadataForBlocks(moreBlocks, true) moreResponses := []gsmsg.GraphSyncResponse{ gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata), gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata), @@ -200,7 +189,7 @@ func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) { go func() { firstBlocks := td.blockChain.Blocks(0, 3) - firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true) + firstMetadata := metadataForBlocks(firstBlocks, true) firstResponses := []gsmsg.GraphSyncResponse{ gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata), } @@ -245,7 +234,7 @@ func TestCancelManagerExitsGracefully(t *testing.T) { rr := readNNetworkRequests(requestCtx, t, td, 1)[0] firstBlocks := td.blockChain.Blocks(0, 3) - firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true) + firstMetadata := metadataForBlocks(firstBlocks, true) firstResponses := []gsmsg.GraphSyncResponse{ gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata), } @@ -255,7 +244,7 @@ func TestCancelManagerExitsGracefully(t *testing.T) { managerCancel() moreBlocks := td.blockChain.RemainderBlocks(3) - moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true) + moreMetadata := metadataForBlocks(moreBlocks, true) moreResponses := []gsmsg.GraphSyncResponse{ gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata), } @@ -279,7 +268,7 @@ func TestFailedRequest(t *testing.T) { td.tcm.AssertProtectedWithTags(t, peers[0], rr.gsr.ID().Tag()) failedResponses := []gsmsg.GraphSyncResponse{ - gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound), + gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound, nil), } td.requestManager.ProcessResponses(peers[0], failedResponses, nil) @@ -307,7 +296,7 @@ func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) { // failure comes in later over network failedResponses := []gsmsg.GraphSyncResponse{ - gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound), + gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestFailedContentNotFound, nil), } td.requestManager.ProcessResponses(peers[0], failedResponses, nil) @@ -336,7 +325,7 @@ func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) { td.blockChain.VerifyWholeChain(requestCtx, returnedResponseChan) - md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), true) + md := metadataForBlocks(td.blockChain.AllBlocks(), true) firstResponses := []gsmsg.GraphSyncResponse{ gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md), } @@ -359,7 +348,7 @@ func TestRequestReturnsMissingBlocks(t *testing.T) { rr := readNNetworkRequests(requestCtx, t, td, 1)[0] - md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), false) + md := metadataForBlocks(td.blockChain.AllBlocks(), false) firstResponses := []gsmsg.GraphSyncResponse{ gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md), } @@ -451,10 +440,8 @@ func TestEncodingExtensions(t *testing.T) { expectedUpdate := basicnode.NewBytes(testutil.RandomBytes(100)) firstResponses := []gsmsg.GraphSyncResponse{ gsmsg.NewResponse(gsr.ID(), - graphsync.PartialResponse, graphsync.ExtensionData{ - Name: graphsync.ExtensionMetadata, - Data: nil, - }, + graphsync.PartialResponse, + nil, graphsync.ExtensionData{ Name: td.extensionName1, Data: expectedData, @@ -484,10 +471,8 @@ func TestEncodingExtensions(t *testing.T) { secondResponses := []gsmsg.GraphSyncResponse{ gsmsg.NewResponse(gsr.ID(), - graphsync.PartialResponse, graphsync.ExtensionData{ - Name: graphsync.ExtensionMetadata, - Data: nil, - }, + graphsync.PartialResponse, + nil, graphsync.ExtensionData{ Name: td.extensionName1, Data: nextExpectedData, @@ -566,13 +551,10 @@ func TestBlockHooks(t *testing.T) { firstBlocks := td.blockChain.Blocks(0, 3) firstMetadata := metadataForBlocks(firstBlocks, true) - firstMetadataEncoded := metadata.EncodeMetadata(firstMetadata) firstResponses := []gsmsg.GraphSyncResponse{ gsmsg.NewResponse(gsr.ID(), - graphsync.PartialResponse, graphsync.ExtensionData{ - Name: graphsync.ExtensionMetadata, - Data: firstMetadataEncoded, - }, + graphsync.PartialResponse, + firstMetadata, graphsync.ExtensionData{ Name: td.extensionName1, Data: expectedData, @@ -610,9 +592,12 @@ func TestBlockHooks(t *testing.T) { testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data") require.Equal(t, firstResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID") require.Equal(t, firstResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status") - metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata) - require.True(t, has) - require.Equal(t, firstMetadataEncoded, metadata, "should receive correct metadata") + md := make(metadata.Metadata, 0) + receivedResponse.Metadata().Iterate(func(c cid.Cid, la graphsync.LinkAction) { + md = append(md, metadata.Item{Link: c, BlockPresent: true}) + }) + require.Greater(t, len(md), 0) + require.Equal(t, firstMetadata, md, "should receive correct metadata") receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1) require.Equal(t, expectedData, receivedExtensionData, "should receive correct response extension data") var receivedBlock graphsync.BlockData @@ -626,13 +611,10 @@ func TestBlockHooks(t *testing.T) { nextExpectedUpdate2 := basicnode.NewBytes(testutil.RandomBytes(100)) nextBlocks := td.blockChain.RemainderBlocks(3) nextMetadata := metadataForBlocks(nextBlocks, true) - nextMetadataEncoded := metadata.EncodeMetadata(nextMetadata) secondResponses := []gsmsg.GraphSyncResponse{ gsmsg.NewResponse(gsr.ID(), - graphsync.RequestCompletedFull, graphsync.ExtensionData{ - Name: graphsync.ExtensionMetadata, - Data: nextMetadataEncoded, - }, + graphsync.RequestCompletedFull, + nextMetadata, graphsync.ExtensionData{ Name: td.extensionName1, Data: nextExpectedData, @@ -676,9 +658,12 @@ func TestBlockHooks(t *testing.T) { testutil.AssertReceive(ctx, t, receivedResponses, &receivedResponse, "did not receive response data") require.Equal(t, secondResponses[0].RequestID(), receivedResponse.RequestID(), "did not receive correct response ID") require.Equal(t, secondResponses[0].Status(), receivedResponse.Status(), "did not receive correct response status") - metadata, has := receivedResponse.Extension(graphsync.ExtensionMetadata) - require.True(t, has) - require.Equal(t, nextMetadataEncoded, metadata, "should receive correct metadata") + md := make(metadata.Metadata, 0) + receivedResponse.Metadata().Iterate(func(c cid.Cid, la graphsync.LinkAction) { + md = append(md, metadata.Item{Link: c, BlockPresent: true}) + }) + require.Greater(t, len(md), 0) + require.Equal(t, nextMetadata, md, "should receive correct metadata") receivedExtensionData, _ := receivedResponse.Extension(td.extensionName1) require.Equal(t, nextExpectedData, receivedExtensionData, "should receive correct response extension data") var receivedBlock graphsync.BlockData @@ -721,14 +706,9 @@ func TestOutgoingRequestHooks(t *testing.T) { require.Equal(t, "chainstore", key) md := metadataForBlocks(td.blockChain.AllBlocks(), true) - mdEncoded := metadata.EncodeMetadata(md) - mdExt := graphsync.ExtensionData{ - Name: graphsync.ExtensionMetadata, - Data: mdEncoded, - } responses := []gsmsg.GraphSyncResponse{ - gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, mdExt), - gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, mdExt), + gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, md), + gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, md), } td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks()) td.fal.VerifyLastProcessedBlocks(ctx, t, td.blockChain.AllBlocks()) @@ -783,13 +763,8 @@ func TestOutgoingRequestListeners(t *testing.T) { } md := metadataForBlocks(td.blockChain.AllBlocks(), true) - mdEncoded := metadata.EncodeMetadata(md) - mdExt := graphsync.ExtensionData{ - Name: graphsync.ExtensionMetadata, - Data: mdEncoded, - } responses := []gsmsg.GraphSyncResponse{ - gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, mdExt), + gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, md), } td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks()) td.fal.VerifyLastProcessedBlocks(ctx, t, td.blockChain.AllBlocks()) @@ -835,12 +810,8 @@ func TestPauseResume(t *testing.T) { // Start processing responses md := metadataForBlocks(td.blockChain.AllBlocks(), true) - mdEncoded := metadata.EncodeMetadata(md) responses := []gsmsg.GraphSyncResponse{ - gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{ - Name: graphsync.ExtensionMetadata, - Data: mdEncoded, - }), + gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md), } td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks()) td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks()) @@ -920,12 +891,8 @@ func TestPauseResumeExternal(t *testing.T) { // Start processing responses md := metadataForBlocks(td.blockChain.AllBlocks(), true) - mdEncoded := metadata.EncodeMetadata(md) responses := []gsmsg.GraphSyncResponse{ - gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, graphsync.ExtensionData{ - Name: graphsync.ExtensionMetadata, - Data: mdEncoded, - }), + gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md), } td.requestManager.ProcessResponses(peers[0], responses, td.blockChain.AllBlocks()) td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks()) @@ -1048,16 +1015,6 @@ func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata { return md } -func encodedMetadataForBlocks(t *testing.T, blks []blocks.Block, present bool) graphsync.ExtensionData { - t.Helper() - md := metadataForBlocks(blks, present) - metadataEncoded := metadata.EncodeMetadata(md) - return graphsync.ExtensionData{ - Name: graphsync.ExtensionMetadata, - Data: metadataEncoded, - } -} - type testData struct { requestRecordChan chan requestRecord fph *fakePeerHandler diff --git a/requestmanager/server.go b/requestmanager/server.go index bd94154b..796a3fbd 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -98,7 +98,7 @@ func (rm *RequestManager) newRequest(parentSpan trace.Span, p peer.ID, root ipld inProgressChan: make(chan graphsync.ResponseProgress), inProgressErr: make(chan error), } - requestStatus.lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged)) + requestStatus.lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged, nil)) rm.inProgressRequestStatuses[request.ID()] = requestStatus rm.connManager.Protect(p, requestID.Tag()) @@ -259,7 +259,10 @@ func (rm *RequestManager) cancelOnError(requestID graphsync.RequestID, ipr *inPr } } -func (rm *RequestManager) processResponses(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) { +func (rm *RequestManager) processResponses(p peer.ID, + responses []gsmsg.GraphSyncResponse, + blks []blocks.Block) { + log.Debugf("beginning processing responses for peer %s", p) requestIds := make([]string, 0, len(responses)) for _, r := range responses { @@ -272,8 +275,11 @@ func (rm *RequestManager) processResponses(p peer.ID, responses []gsmsg.GraphSyn defer span.End() filteredResponses := rm.processExtensions(responses, p) filteredResponses = rm.filterResponsesForPeer(filteredResponses, p) + responseMetadata := make(map[graphsync.RequestID]graphsync.LinkMetadata, len(responses)) + for _, response := range responses { + responseMetadata[response.RequestID()] = response.Metadata() + } rm.updateLastResponses(filteredResponses) - responseMetadata := metadataForResponses(filteredResponses) rm.asyncLoader.ProcessResponse(ctx, responseMetadata, blks) rm.processTerminations(filteredResponses) log.Debugf("end processing responses for peer %s", p) diff --git a/requestmanager/testloader/asyncloader.go b/requestmanager/testloader/asyncloader.go index 0c52877c..7f33fe4c 100644 --- a/requestmanager/testloader/asyncloader.go +++ b/requestmanager/testloader/asyncloader.go @@ -6,6 +6,7 @@ import ( "testing" blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" peer "github.com/libp2p/go-libp2p-core/peer" @@ -35,7 +36,7 @@ type storeKey struct { type FakeAsyncLoader struct { responseChannelsLk sync.RWMutex responseChannels map[requestKey]chan types.AsyncLoadResult - responses chan map[graphsync.RequestID]metadata.Metadata + responses chan map[graphsync.RequestID]graphsync.LinkMetadata blks chan []blocks.Block storesRequestedLk sync.RWMutex storesRequested map[storeKey]struct{} @@ -46,7 +47,7 @@ type FakeAsyncLoader struct { func NewFakeAsyncLoader() *FakeAsyncLoader { return &FakeAsyncLoader{ responseChannels: make(map[requestKey]chan types.AsyncLoadResult), - responses: make(chan map[graphsync.RequestID]metadata.Metadata, 10), + responses: make(chan map[graphsync.RequestID]graphsync.LinkMetadata, 10), blks: make(chan []blocks.Block, 10), storesRequested: make(map[storeKey]struct{}), } @@ -61,7 +62,7 @@ func (fal *FakeAsyncLoader) StartRequest(requestID graphsync.RequestID, name str } // ProcessResponse just records values passed to verify expectations later -func (fal *FakeAsyncLoader) ProcessResponse(_ context.Context, responses map[graphsync.RequestID]metadata.Metadata, +func (fal *FakeAsyncLoader) ProcessResponse(_ context.Context, responses map[graphsync.RequestID]graphsync.LinkMetadata, blks []blocks.Block) { fal.responses <- responses fal.blks <- blks @@ -81,9 +82,16 @@ func (fal *FakeAsyncLoader) VerifyLastProcessedBlocks(ctx context.Context, t *te func (fal *FakeAsyncLoader) VerifyLastProcessedResponses(ctx context.Context, t *testing.T, expectedResponses map[graphsync.RequestID]metadata.Metadata) { t.Helper() - var responses map[graphsync.RequestID]metadata.Metadata + var responses map[graphsync.RequestID]graphsync.LinkMetadata testutil.AssertReceive(ctx, t, fal.responses, &responses, "did not process responses") - require.Equal(t, expectedResponses, responses, "did not process correct responses") + actualResponses := make(map[graphsync.RequestID]metadata.Metadata) + for rid, lm := range responses { + actualResponses[rid] = make(metadata.Metadata, 0) + lm.Iterate(func(c cid.Cid, la graphsync.LinkAction) { + actualResponses[rid] = append(actualResponses[rid], metadata.Item{c, la == graphsync.LinkActionPresent}) + }) + } + require.Equal(t, expectedResponses, actualResponses, "did not process correct responses") } // VerifyNoRemainingData verifies no outstanding response channels are open for the given diff --git a/requestmanager/utils.go b/requestmanager/utils.go deleted file mode 100644 index 384f2e0c..00000000 --- a/requestmanager/utils.go +++ /dev/null @@ -1,31 +0,0 @@ -package requestmanager - -import ( - "github.com/ipfs/go-peertaskqueue/peertask" - - "github.com/ipfs/go-graphsync" - gsmsg "github.com/ipfs/go-graphsync/message" - "github.com/ipfs/go-graphsync/metadata" -) - -func metadataForResponses(responses []gsmsg.GraphSyncResponse) map[graphsync.RequestID]metadata.Metadata { - responseMetadata := make(map[graphsync.RequestID]metadata.Metadata, len(responses)) - for _, response := range responses { - mdRaw, found := response.Extension(graphsync.ExtensionMetadata) - if !found { - log.Warnf("Unable to decode metadata in response for request id: %s", response.RequestID().String()) - continue - } - md, err := metadata.DecodeMetadata(mdRaw) - if err != nil { - continue - } - responseMetadata[response.RequestID()] = md - } - return responseMetadata -} - -// RequestIDFromTaskTopic extracts a request ID from a given peer task topic -func RequestIDFromTaskTopic(topic peertask.Topic) graphsync.RequestID { - return topic.(graphsync.RequestID) -}