Skip to content

Commit

Permalink
feat(libp2p): add v1.0.0 network compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Jan 11, 2022
1 parent 482dd21 commit f7ce568
Show file tree
Hide file tree
Showing 9 changed files with 924 additions and 40 deletions.
2 changes: 1 addition & 1 deletion impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1770,7 +1770,7 @@ func processResponsesTraces(t *testing.T, tracing *testutil.Collector, responseC
finalStub := tracing.FindSpanByTraceString(fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1))
require.NotNil(t, finalStub)
if len(testutil.AttributeValueInTraceSpan(t, *finalStub, "requestIDs").AsStringSlice()) == 0 {
return append(traces, fmt.Sprintf("responseMessage(%d)->loaderProcess(0)", responseCount-1))
return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)", responseCount-1))
}
return append(traces, fmt.Sprintf("processResponses(%d)->loaderProcess(0)->cacheProcess(0)", responseCount-1))
}
123 changes: 108 additions & 15 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,24 +135,24 @@ func newResponse(requestID graphsync.RequestID,
}
}

func newMessageFromProto(pbm *pb.Message) (GraphSyncMessage, error) {
func (mh MessageHandler) newMessageFromProto(pbm pbMessage) (GraphSyncMessage, error) {
requests := make(map[graphsync.RequestID]GraphSyncRequest, len(pbm.GetRequests()))
for _, req := range pbm.Requests {
for _, req := range pbm.GetRequests() {
if req == nil {
return GraphSyncMessage{}, errors.New("request is nil")
}
var root cid.Cid
var err error
if !req.Cancel && !req.Update {
root, err = cid.Cast(req.Root)
if !req.GetCancel() && !req.GetUpdate() {
root, err = cid.Cast(req.GetRoot())
if err != nil {
return GraphSyncMessage{}, err
}
}

var selector ipld.Node
if !req.Cancel && !req.Update {
selector, err = ipldutil.DecodeNode(req.Selector)
if !req.GetCancel() && !req.GetUpdate() {
selector, err = ipldutil.DecodeNode(req.GetSelector())
if err != nil {
return GraphSyncMessage{}, err
}
Expand All @@ -161,27 +161,27 @@ func newMessageFromProto(pbm *pb.Message) (GraphSyncMessage, error) {
if exts == nil {
exts = make(map[string][]byte)
}
id, err := graphsync.ParseRequestID(req.Id)
id, err := req.GetId(mh)
if err != nil {
return GraphSyncMessage{}, err
}
requests[id] = newRequest(id, root, selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, exts)
requests[id] = newRequest(id, root, selector, graphsync.Priority(req.GetPriority()), req.GetCancel(), req.GetUpdate(), exts)
}

responses := make(map[graphsync.RequestID]GraphSyncResponse, len(pbm.GetResponses()))
for _, res := range pbm.Responses {
for _, res := range pbm.GetResponses() {
if res == nil {
return GraphSyncMessage{}, errors.New("response is nil")
}
exts := res.GetExtensions()
if exts == nil {
exts = make(map[string][]byte)
}
id, err := graphsync.ParseRequestID(res.Id)
id, err := res.GetId(mh)
if err != nil {
return GraphSyncMessage{}, err
}
responses[id] = newResponse(id, graphsync.ResponseStatusCode(res.Status), exts)
responses[id] = newResponse(id, graphsync.ResponseStatusCode(res.GetStatus()), exts)
}

blks := make(map[cid.Cid]blocks.Block, len(pbm.GetData()))
Expand Down Expand Up @@ -249,14 +249,20 @@ func (gsm GraphSyncMessage) Blocks() []blocks.Block {
return bs
}

type MessageHandler struct {
fromV1Map map[int32]graphsync.RequestID
toV1Map map[graphsync.RequestID]int32
nextIntId int32
}

// FromNet can read a network stream to deserialized a GraphSyncMessage
func FromNet(r io.Reader) (GraphSyncMessage, error) {
func (mh MessageHandler) FromNet(r io.Reader) (GraphSyncMessage, error) {
reader := msgio.NewVarintReaderSize(r, network.MessageSizeMax)
return FromMsgReader(reader)
return mh.FromMsgReader(reader)
}

// FromMsgReader can deserialize a protobuf message into a GraphySyncMessage.
func FromMsgReader(r msgio.Reader) (GraphSyncMessage, error) {
func (mh MessageHandler) FromMsgReader(r msgio.Reader) (GraphSyncMessage, error) {
msg, err := r.ReadMsg()
if err != nil {
return GraphSyncMessage{}, err
Expand All @@ -269,7 +275,24 @@ func FromMsgReader(r msgio.Reader) (GraphSyncMessage, error) {
return GraphSyncMessage{}, err
}

return newMessageFromProto(&pb)
return mh.newMessageFromProto(&pbMessageV1_1{&pb})
}

// FromMsgReaderV1 can deserialize a v1.0.0 protobuf message into a GraphySyncMessage.
func (mh MessageHandler) FromMsgReaderV1(r msgio.Reader) (GraphSyncMessage, error) {
msg, err := r.ReadMsg()
if err != nil {
return GraphSyncMessage{}, err
}

var pb pb.Message_V1_0_0
err = proto.Unmarshal(msg, &pb)
r.ReleaseMsg(msg)
if err != nil {
return GraphSyncMessage{}, err
}

return mh.newMessageFromProto(&pbMessageV1_0{&pb})
}

func (gsm GraphSyncMessage) ToProto() (*pb.Message, error) {
Expand Down Expand Up @@ -315,6 +338,57 @@ func (gsm GraphSyncMessage) ToProto() (*pb.Message, error) {
return pbm, nil
}

func (gsm GraphSyncMessage) ToProtoV1(mh MessageHandler) (*pb.Message_V1_0_0, error) {
pbm := new(pb.Message_V1_0_0)
pbm.Requests = make([]*pb.Message_V1_0_0_Request, 0, len(gsm.requests))
for _, request := range gsm.requests {
var selector []byte
var err error
if request.selector != nil {
selector, err = ipldutil.EncodeNode(request.selector)
if err != nil {
return nil, err
}
}
rid, err := bytesIdToInt(mh, request.id.Bytes())
if err != nil {
return nil, err
}
pbm.Requests = append(pbm.Requests, &pb.Message_V1_0_0_Request{
Id: rid,
Root: request.root.Bytes(),
Selector: selector,
Priority: int32(request.priority),
Cancel: request.isCancel,
Update: request.isUpdate,
Extensions: request.extensions,
})
}

pbm.Responses = make([]*pb.Message_V1_0_0_Response, 0, len(gsm.responses))
for _, response := range gsm.responses {
rid, err := bytesIdToInt(mh, response.requestID.Bytes())
if err != nil {
return nil, err
}
pbm.Responses = append(pbm.Responses, &pb.Message_V1_0_0_Response{
Id: rid,
Status: int32(response.status),
Extensions: response.extensions,
})
}

blocks := gsm.Blocks()
pbm.Data = make([]*pb.Message_V1_0_0_Block, 0, len(blocks))
for _, b := range blocks {
pbm.Data = append(pbm.Data, &pb.Message_V1_0_0_Block{
Data: b.RawData(),
Prefix: b.Cid().Prefix().Bytes(),
})
}
return pbm, nil
}

func (gsm GraphSyncMessage) ToNet(w io.Writer) error {
msg, err := gsm.ToProto()
if err != nil {
Expand All @@ -334,6 +408,25 @@ func (gsm GraphSyncMessage) ToNet(w io.Writer) error {
return err
}

func (gsm GraphSyncMessage) ToNetV1(mh MessageHandler, w io.Writer) error {
msg, err := gsm.ToProtoV1(mh)
if err != nil {
return err
}
size := proto.Size(msg)
buf := pool.Get(size + binary.MaxVarintLen64)
defer pool.Put(buf)

n := binary.PutUvarint(buf, uint64(size))

out, err := proto.MarshalOptions{}.MarshalAppend(buf[:n], msg)
if err != nil {
return err
}
_, err = w.Write(out)
return err
}

func (gsm GraphSyncMessage) Loggable() map[string]interface{} {
requests := make([]string, 0, len(gsm.requests))
for _, request := range gsm.requests {
Expand Down
14 changes: 7 additions & 7 deletions message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestAppendingRequests(t *testing.T) {
require.Equal(t, selectorEncoded, pbRequest.Selector)
require.Equal(t, map[string][]byte{"graphsync/awesome": extension.Data}, pbRequest.Extensions)

deserialized, err := newMessageFromProto(pbMessage)
deserialized, err := MessageHandler{}.newMessageFromProto(pbMessageV1_1{pbMessage})
require.NoError(t, err, "deserializing protobuf message errored")
deserializedRequests := deserialized.Requests()
require.Len(t, deserializedRequests, 1, "did not add request to deserialized message")
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestAppendingResponses(t *testing.T) {
require.Equal(t, int32(status), pbResponse.Status)
require.Equal(t, extension.Data, pbResponse.Extensions["graphsync/awesome"])

deserialized, err := newMessageFromProto(pbMessage)
deserialized, err := MessageHandler{}.newMessageFromProto(pbMessageV1_1{pbMessage})
require.NoError(t, err, "deserializing protobuf message errored")
deserializedResponses := deserialized.Responses()
require.Len(t, deserializedResponses, 1, "did not add response to deserialized message")
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestRequestCancel(t *testing.T) {
buf := new(bytes.Buffer)
err = gsm.ToNet(buf)
require.NoError(t, err, "did not serialize protobuf message")
deserialized, err := FromNet(buf)
deserialized, err := MessageHandler{}.FromNet(buf)
require.NoError(t, err, "did not deserialize protobuf message")
deserializedRequests := deserialized.Requests()
require.Len(t, deserializedRequests, 1, "did not add request to deserialized message")
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestRequestUpdate(t *testing.T) {
buf := new(bytes.Buffer)
err = gsm.ToNet(buf)
require.NoError(t, err, "did not serialize protobuf message")
deserialized, err := FromNet(buf)
deserialized, err := MessageHandler{}.FromNet(buf)
require.NoError(t, err, "did not deserialize protobuf message")

deserializedRequests := deserialized.Requests()
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestToNetFromNetEquivalency(t *testing.T) {
buf := new(bytes.Buffer)
err = gsm.ToNet(buf)
require.NoError(t, err, "did not serialize protobuf message")
deserialized, err := FromNet(buf)
deserialized, err := MessageHandler{}.FromNet(buf)
require.NoError(t, err, "did not deserialize protobuf message")

requests := gsm.Requests()
Expand Down Expand Up @@ -404,15 +404,15 @@ func TestKnownFuzzIssues(t *testing.T) {
for _, input := range inputs {
//inputAsBytes, err := hex.DecodeString(input)
///require.NoError(t, err)
msg1, err := FromNet(bytes.NewReader([]byte(input)))
msg1, err := MessageHandler{}.FromNet(bytes.NewReader([]byte(input)))
if err != nil {
continue
}
buf2 := new(bytes.Buffer)
err = msg1.ToNet(buf2)
require.NoError(t, err)

msg2, err := FromNet(buf2)
msg2, err := MessageHandler{}.FromNet(buf2)
require.NoError(t, err)

require.Equal(t, msg1, msg2)
Expand Down
5 changes: 2 additions & 3 deletions message/pb/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f7ce568

Please sign in to comment.