Skip to content

Commit

Permalink
feat: new LinkMetadata iface, integrate metadata into Response type
Browse files Browse the repository at this point in the history
* LinkMetadata wrapper around existing metadata type to allow for easier
  backward-compat upgrade path
* integrate metadata directly into GraphSyncResponse type, moving it from an
  optional extension
* still deal with metadata as an extension for now—further work for v2 protocol
  will move it into the core message schema

Ref: #335
  • Loading branch information
rvagg committed Feb 2, 2022
1 parent 6b86c3c commit 84d6385
Show file tree
Hide file tree
Showing 16 changed files with 208 additions and 198 deletions.
19 changes: 19 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,25 @@ type ResponseData interface {
// Extension returns the content for an extension on a response, or errors
// if extension is not present
Extension(name ExtensionName) (datamodel.Node, bool)

// Metadata returns a copy of the link metadata contained in this response
Metadata() LinkMetadata
}

// TODO: docs for these new bits

type LinkAction string

const (
LinkActionPresent = LinkAction("present")

LinkActionMissing = LinkAction("missing")
)

type LinkMetadataIterator func(cid.Cid, LinkAction)

type LinkMetadata interface {
Iterate(LinkMetadataIterator)
}

// BlockData gives information about a block included in a graphsync response
Expand Down
7 changes: 1 addition & 6 deletions message/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,8 @@ func (b *Builder) ScrubResponses(requestIDs []graphsync.RequestID) uint64 {
func (b *Builder) Build() (GraphSyncMessage, error) {
responses := make(map[graphsync.RequestID]GraphSyncResponse, len(b.outgoingResponses))
for requestID, linkMap := range b.outgoingResponses {
mdRaw := metadata.EncodeMetadata(linkMap)
b.extensions[requestID] = append(b.extensions[requestID], graphsync.ExtensionData{
Name: graphsync.ExtensionMetadata,
Data: mdRaw,
})
status, isComplete := b.completedResponses[requestID]
responses[requestID] = NewResponse(requestID, responseCode(status, isComplete), b.extensions[requestID]...)
responses[requestID] = NewResponse(requestID, responseCode(status, isComplete), linkMap, b.extensions[requestID]...)
}
return GraphSyncMessage{
b.requests, responses, b.outgoingBlocks,
Expand Down
59 changes: 48 additions & 11 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-msgio"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/metadata"
)

type MessageHandler interface {
Expand Down Expand Up @@ -64,12 +65,18 @@ func (gsr GraphSyncRequest) String() string {
type GraphSyncResponse struct {
requestID graphsync.RequestID
status graphsync.ResponseStatusCode
metadata metadata.Metadata
extensions map[string]datamodel.Node
}

type GraphSyncLinkMetadata struct {
linkMetadata metadata.Metadata
}

// String returns a human-readable form of a GraphSyncResponse
func (gsr GraphSyncResponse) String() string {
extStr := strings.Builder{}
// TODO: metadata
for _, name := range gsr.ExtensionNames() {
extStr.WriteString(string(name))
extStr.WriteString("|")
Expand Down Expand Up @@ -101,8 +108,7 @@ func NewMessage(
// its contents
func (gsm GraphSyncMessage) String() string {
cts := make([]string, 0)
for i, req := range gsm.requests {
fmt.Printf("req.String(%v)\n", i)
for _, req := range gsm.requests {
cts = append(cts, req.String())
}
for _, resp := range gsm.responses {
Expand Down Expand Up @@ -134,6 +140,10 @@ func NewUpdateRequest(id graphsync.RequestID, extensions ...graphsync.ExtensionD
return newRequest(id, cid.Cid{}, nil, 0, false, true, toExtensionsMap(extensions))
}

func NewGraphSyncLinkMetadata(md metadata.Metadata) GraphSyncLinkMetadata {
return GraphSyncLinkMetadata{md}
}

func toExtensionsMap(extensions []graphsync.ExtensionData) (extensionsMap map[string]datamodel.Node) {
if len(extensions) > 0 {
extensionsMap = make(map[string]datamodel.Node, len(extensions))
Expand Down Expand Up @@ -165,15 +175,21 @@ func newRequest(id graphsync.RequestID,
// NewResponse builds a new Graphsync response
func NewResponse(requestID graphsync.RequestID,
status graphsync.ResponseStatusCode,
md metadata.Metadata,
extensions ...graphsync.ExtensionData) GraphSyncResponse {
return newResponse(requestID, status, toExtensionsMap(extensions))

return newResponse(requestID, status, md, toExtensionsMap(extensions))
}

func newResponse(requestID graphsync.RequestID,
status graphsync.ResponseStatusCode, extensions map[string]datamodel.Node) GraphSyncResponse {
status graphsync.ResponseStatusCode,
responseMetadata metadata.Metadata,
extensions map[string]datamodel.Node) GraphSyncResponse {

return GraphSyncResponse{
requestID: requestID,
status: status,
metadata: responseMetadata,
extensions: extensions,
}
}
Expand Down Expand Up @@ -282,14 +298,18 @@ func (gsr GraphSyncResponse) Status() graphsync.ResponseStatusCode { return gsr.
// Extension returns the content for an extension on a response, or errors
// if extension is not present
func (gsr GraphSyncResponse) Extension(name graphsync.ExtensionName) (datamodel.Node, bool) {
if gsr.extensions == nil {
return nil, false
}
val, ok := gsr.extensions[string(name)]
if !ok {
return nil, false
if name == graphsync.ExtensionMetadata {
return metadata.EncodeMetadata(gsr.metadata), true
} else {
if gsr.extensions == nil {
return nil, false
}
val, ok := gsr.extensions[string(name)]
if !ok {
return nil, false
}
return val, true
}
return val, true
}

// ExtensionNames returns the names of the extensions included in this request
Expand All @@ -298,9 +318,26 @@ func (gsr GraphSyncResponse) ExtensionNames() []graphsync.ExtensionName {
for ext := range gsr.extensions {
extNames = append(extNames, graphsync.ExtensionName(ext))
}
if len(gsr.metadata) > 0 {
extNames = append(extNames, graphsync.ExtensionMetadata)
}
return extNames
}

func (gsr GraphSyncResponse) Metadata() graphsync.LinkMetadata {
return GraphSyncLinkMetadata{gsr.metadata}
}

func (gslm GraphSyncLinkMetadata) Iterate(iter graphsync.LinkMetadataIterator) {
for _, md := range gslm.linkMetadata {
action := graphsync.LinkActionPresent
if !md.BlockPresent {
action = graphsync.LinkActionMissing
}
iter(md.Link, action)
}
}

// ReplaceExtensions merges the extensions given extensions into the request to create a new request,
// but always uses new data
func (gsr GraphSyncRequest) ReplaceExtensions(extensions []graphsync.ExtensionData) GraphSyncRequest {
Expand Down
17 changes: 16 additions & 1 deletion message/v1/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime/datamodel"
pool "github.com/libp2p/go-buffer-pool"
"github.com/libp2p/go-libp2p-core/network"
Expand All @@ -19,8 +20,11 @@ import (
"github.com/ipfs/go-graphsync/ipldutil"
"github.com/ipfs/go-graphsync/message"
pb "github.com/ipfs/go-graphsync/message/pb"
"github.com/ipfs/go-graphsync/metadata"
)

var log = logging.Logger("graphsync")

type MessagePartWithExtensions interface {
ExtensionNames() []graphsync.ExtensionName
Extension(name graphsync.ExtensionName) (datamodel.Node, bool)
Expand Down Expand Up @@ -215,7 +219,18 @@ func (mh *MessageHandler) newMessageFromProto(p peer.ID, pbm *pb.Message) (messa
if err != nil {
return message.GraphSyncMessage{}, err
}
responses[id] = message.NewResponse(id, graphsync.ResponseStatusCode(res.Status), exts...)
var md metadata.Metadata
for _, ext := range exts {
if ext.Name == graphsync.ExtensionMetadata {
var err error
md, err = metadata.DecodeMetadata(ext.Data)
if err != nil {
log.Warnf("Unable to decode metadata in response for request id %d: %w", id, err)
}
}
break
}
responses[id] = message.NewResponse(id, graphsync.ResponseStatusCode(res.Status), md, exts...)
}

blks := make(map[cid.Cid]blocks.Block, len(pbm.GetData()))
Expand Down
16 changes: 14 additions & 2 deletions message/v2/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/node/bindnode"
Expand All @@ -17,8 +18,11 @@ import (
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/message/ipldbind"
"github.com/ipfs/go-graphsync/metadata"
)

var log = logging.Logger("graphsync")

type MessageHandler struct{}

func NewMessageHandler() *MessageHandler {
Expand Down Expand Up @@ -159,12 +163,20 @@ func (mh *MessageHandler) fromIPLD(ibm *ipldbind.GraphSyncMessage) (message.Grap

responses := make(map[graphsync.RequestID]message.GraphSyncResponse, len(ibm.Responses))
for _, res := range ibm.Responses {
// exts := res.Extensions
id, err := graphsync.ParseRequestID(res.Id)
if err != nil {
return message.GraphSyncMessage{}, err
}
responses[id] = message.NewResponse(id, graphsync.ResponseStatusCode(res.Status), res.Extensions.ToExtensionsList()...)
mdRaw := res.Extensions.Values[string(graphsync.ExtensionMetadata)]
var md metadata.Metadata
if mdRaw != nil {
md, err = metadata.DecodeMetadata(mdRaw)
if err != nil {
log.Warnf("Unable to decode metadata in response for request id %d: %w", id, err)
}
}

responses[id] = message.NewResponse(id, graphsync.ResponseStatusCode(res.Status), md, res.Extensions.ToExtensionsList()...)
}

blks := make(map[cid.Cid]blocks.Block, len(ibm.Blocks))
Expand Down
5 changes: 2 additions & 3 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader/loadattemptqueue"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader/responsecache"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader/unverifiedblockstore"
Expand Down Expand Up @@ -108,7 +107,7 @@ func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOp
// neccesary
func (al *AsyncLoader) ProcessResponse(
ctx context.Context,
responses map[graphsync.RequestID]metadata.Metadata,
responses map[graphsync.RequestID]graphsync.LinkMetadata,
blks []blocks.Block) {

requestIds := make([]string, 0, len(responses))
Expand All @@ -130,7 +129,7 @@ func (al *AsyncLoader) ProcessResponse(
for queue, requestIDs := range byQueue {
loadAttemptQueue := al.getLoadAttemptQueue(queue)
responseCache := al.getResponseCache(queue)
queueResponses := make(map[graphsync.RequestID]metadata.Metadata, len(requestIDs))
queueResponses := make(map[graphsync.RequestID]graphsync.LinkMetadata, len(requestIDs))
for _, requestID := range requestIDs {
queueResponses[requestID] = responses[requestID]
}
Expand Down
Loading

0 comments on commit 84d6385

Please sign in to comment.