Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Add Provide RPC #37

Merged
merged 11 commits into from
Aug 10, 2022
5 changes: 5 additions & 0 deletions client/contentrouting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"context"
"testing"
"time"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -45,6 +46,10 @@ func (t TestDelegatedRoutingClient) PutIPNSAsync(ctx context.Context, id []byte,
panic("not supported")
}

func (t TestDelegatedRoutingClient) Provide(ctx context.Context, key cid.Cid, provider peer.AddrInfo, ttl time.Duration) (<-chan time.Duration, error) {
panic("not supported")
}

// TestContentRoutingFindProvidersUnlimitedResults is testing that ContentRoutingClient.FindProvidersAsync
// correctly wraps DelegatedRoutingClient.FindProvidersAsync in the regime when the former allows for unlimited results.
// This is a test of async semantics only. This is why values are not checked for validity.
Expand Down
20 changes: 20 additions & 0 deletions client/findproviders.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package client

import (
"context"
"time"

"github.com/ipfs/go-cid"
proto "github.com/ipfs/go-delegated-routing/gen/proto"
ipns "github.com/ipfs/go-ipns"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/edelweiss/values"
"github.com/libp2p/go-libp2p-core/peer"
record "github.com/libp2p/go-libp2p-record"
"github.com/multiformats/go-multiaddr"
Expand All @@ -21,6 +23,7 @@ type DelegatedRoutingClient interface {
GetIPNSAsync(ctx context.Context, id []byte) (<-chan GetIPNSAsyncResult, error)
PutIPNS(ctx context.Context, id []byte, record []byte) error
PutIPNSAsync(ctx context.Context, id []byte, record []byte) (<-chan PutIPNSAsyncResult, error)
Provide(ctx context.Context, key cid.Cid, provider peer.AddrInfo, ttl time.Duration) (<-chan time.Duration, error)
ajnavarro marked this conversation as resolved.
Show resolved Hide resolved
}

type Client struct {
Expand Down Expand Up @@ -142,5 +145,22 @@ func ParseNodeAddresses(n *proto.Peer) []peer.AddrInfo {
}
infos = append(infos, peer.AddrInfo{ID: peerID, Addrs: []multiaddr.Multiaddr{ma}})
}
if len(n.Multiaddresses) == 0 {
infos = append(infos, peer.AddrInfo{ID: peerID})
}
return infos
}

// ToProtoPeer creates a protocol Peer structure from address info.
func ToProtoPeer(ai peer.AddrInfo) *proto.Peer {
p := proto.Peer{
ID: values.Bytes(ai.ID),
Multiaddresses: make(proto.AnonList20, 0),
}

for _, addr := range ai.Addrs {
p.Multiaddresses = append(p.Multiaddresses, addr.Bytes())
}

return &p
}
307 changes: 307 additions & 0 deletions client/provide.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
package client

import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-delegated-routing/gen/proto"
"github.com/ipld/edelweiss/values"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
"github.com/ipld/go-ipld-prime/node/bindnode"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
"github.com/polydawn/refmt/cbor"
)

// Provider represents the source publishing one or more CIDs
type Provider struct {
Peer peer.AddrInfo
ProviderProto []TransferProtocol
}

// ToProto convers a provider into the wire proto form
func (p *Provider) ToProto() *proto.Provider {
pp := proto.Provider{
ProviderNode: proto.Node{
Peer: ToProtoPeer(p.Peer),
},
ProviderProto: proto.TransferProtocolList{},
}
for _, tp := range p.ProviderProto {
pp.ProviderProto = append(pp.ProviderProto, tp.ToProto())
}
return &pp
}

// TransferProtocol represents a data transfer protocol
type TransferProtocol struct {
Codec multicodec.Code
Payload []byte
}

// GraphSyncFILv1 is the current filecoin storage provider protocol.
type GraphSyncFILv1 struct {
PieceCID cid.Cid
VerifiedDeal bool
FastRetrieval bool
}

// ToProto converts a TransferProtocol to the wire representation
func (tp *TransferProtocol) ToProto() proto.TransferProtocol {
if tp.Codec == multicodec.TransportBitswap {
return proto.TransferProtocol{
Bitswap: &proto.BitswapProtocol{},
}
} else if tp.Codec == multicodec.TransportGraphsyncFilecoinv1 {
into := GraphSyncFILv1{}
if err := cbor.Unmarshal(cbor.DecodeOptions{}, tp.Payload, &into); err != nil {
return proto.TransferProtocol{}
}
return proto.TransferProtocol{
GraphSyncFILv1: &proto.GraphSyncFILv1Protocol{
PieceCID: proto.LinkToAny(into.PieceCID),
VerifiedDeal: values.Bool(into.VerifiedDeal),
FastRetrieval: values.Bool(into.FastRetrieval),
},
}
} else {
return proto.TransferProtocol{}
}
}

func parseProtocol(tp *proto.TransferProtocol) TransferProtocol {
if tp.Bitswap != nil {
return TransferProtocol{Codec: multicodec.TransportBitswap}
} else if tp.GraphSyncFILv1 != nil {
pl := GraphSyncFILv1{
PieceCID: cid.Cid(tp.GraphSyncFILv1.PieceCID),
VerifiedDeal: bool(tp.GraphSyncFILv1.VerifiedDeal),
FastRetrieval: bool(tp.GraphSyncFILv1.FastRetrieval),
}
plBytes, err := cbor.Marshal(&pl)
if err != nil {
willscott marked this conversation as resolved.
Show resolved Hide resolved
return TransferProtocol{}
}
return TransferProtocol{
Codec: multicodec.TransportGraphsyncFilecoinv1,
Payload: plBytes,
}
}
return TransferProtocol{}
}

// ProvideRequest is a message indicating a provider can provide a Key for a given TTL
type ProvideRequest struct {
Key cid.Cid
Provider
Timestamp int64
AdvisoryTTL time.Duration
Signature []byte
}

var provideSchema, _ = ipld.LoadSchemaBytes([]byte(`
willscott marked this conversation as resolved.
Show resolved Hide resolved
type ProvideRequest struct {
Key &Any
Provider Provider
Timestamp Int
AdvisoryTTL Int
Signature Bytes
}
type Provider struct {
Peer Peer
ProviderProto [TransferProtocol]
}
type Peer struct {
ID String
Multiaddresses [Bytes]
}
type TransferProtocol struct {
Codec Int
Payload Bytes
}
`))

func bytesToMA(b []byte) (interface{}, error) {
return multiaddr.NewMultiaddrBytes(b)
}
func maToBytes(iface interface{}) ([]byte, error) {
if ma, ok := iface.(multiaddr.Multiaddr); ok {
return ma.Bytes(), nil
}
return nil, fmt.Errorf("did not get expected MA type")
}

// Sign a provide request
func (pr *ProvideRequest) Sign(key crypto.PrivKey) error {
if pr.IsSigned() {
return errors.New("already Signed")
}
pr.Timestamp = time.Now().Unix()
pr.Signature = []byte{}

sid, err := peer.IDFromPrivateKey(key)
if err != nil {
return err
}
if sid != pr.Provider.Peer.ID {
return errors.New("not the correct signing key")
}

ma, _ := multiaddr.NewMultiaddr("/")
opts := []bindnode.Option{
bindnode.TypedBytesConverter(&ma, bytesToMA, maToBytes),
}

node := bindnode.Wrap(pr, provideSchema.TypeByName("ProvideRequest"), opts...)
nodeRepr := node.Representation()
outBuf := bytes.NewBuffer(nil)
if err = dagjson.Encode(nodeRepr, outBuf); err != nil {
return err
}
hash := sha256.New().Sum(outBuf.Bytes())
sig, err := key.Sign(hash)
if err != nil {
return err
}
pr.Signature = sig
return nil
}

func (pr *ProvideRequest) Verify() error {
if !pr.IsSigned() {
return errors.New("not signed")
}
sig := pr.Signature
pr.Signature = []byte{}
defer func() {
pr.Signature = sig
}()

ma, _ := multiaddr.NewMultiaddr("/")
opts := []bindnode.Option{
bindnode.TypedBytesConverter(&ma, bytesToMA, maToBytes),
}

node := bindnode.Wrap(pr, provideSchema.TypeByName("ProvideRequest"), opts...)
nodeRepr := node.Representation()
outBuf := bytes.NewBuffer(nil)
if err := dagjson.Encode(nodeRepr, outBuf); err != nil {
return err
}
hash := sha256.New().Sum(outBuf.Bytes())

pk, err := pr.Peer.ID.ExtractPublicKey()
if err != nil {
return err
}

ok, err := pk.Verify(hash, sig)
if err != nil {
return err
}
if !ok {
return errors.New("signature failed to verify")
}

return nil
}

// IsSigned indicates if the ProvideRequest has been signed
func (pr *ProvideRequest) IsSigned() bool {
return pr.Signature != nil
}

func ParseProvideRequest(req *proto.ProvideRequest) (*ProvideRequest, error) {
pr := ProvideRequest{
Key: cid.Cid(req.Key),
Provider: parseProvider(&req.Provider),
AdvisoryTTL: time.Duration(req.AdvisoryTTL),
Timestamp: int64(req.Timestamp),
Signature: req.Signature,
}

if err := pr.Verify(); err != nil {
return nil, err
}
return &pr, nil
}

func parseProvider(p *proto.Provider) Provider {
prov := Provider{
Peer: parseProtoNodeToAddrInfo(p.ProviderNode)[0],
ProviderProto: make([]TransferProtocol, 0),
}
for _, tp := range p.ProviderProto {
prov.ProviderProto = append(prov.ProviderProto, parseProtocol(&tp))
}
return prov
}

type ProvideAsyncResult struct {
AdvisoryTTL time.Duration
Err error
}

// Provide makes a provide request to a delegated router
func (fp *Client) Provide(ctx context.Context, req *ProvideRequest) (<-chan ProvideAsyncResult, error) {
if !req.IsSigned() {
return nil, errors.New("request is not signed")
}
ch0, err := fp.client.Provide_Async(ctx, &proto.ProvideRequest{
Key: proto.LinkToAny(req.Key),
Provider: *req.Provider.ToProto(),
Timestamp: values.Int(req.Timestamp),
AdvisoryTTL: values.Int(req.AdvisoryTTL),
Signature: req.Signature,
})
if err != nil {
return nil, err
}
ch1 := make(chan ProvideAsyncResult, 1)
go func() {
defer close(ch1)
for {
select {
case <-ctx.Done():
return
case r0, ok := <-ch0:
if !ok {
return
}

var r1 ProvideAsyncResult

if r0.Err != nil {
r1.Err = r0.Err
select {
case <-ctx.Done():
return
case ch1 <- r1:
}
continue
}

if r0.Resp == nil {
continue
}

r1.AdvisoryTTL = time.Duration(r0.Resp.AdvisoryTTL)

select {
case <-ctx.Done():
return
case ch1 <- r1:
}
}
}
}()
return ch1, nil
}
Loading