Skip to content

Commit

Permalink
feat(retrievalmarket): upgrade query protocol to spec v0
Browse files Browse the repository at this point in the history
Implements spec v0 of retrieval query protocol. Also defines mocks for network
  • Loading branch information
hannahhoward authored and shannonwells committed Jan 8, 2020
1 parent 6e5e0a8 commit 27813e5
Show file tree
Hide file tree
Showing 10 changed files with 897 additions and 93 deletions.
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,8 @@ github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoA
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/gxed/pubsub v0.0.0-20180201040156-26ebdf44f824/go.mod h1:OiEWyHgK+CWrmOlVquHaIK1vhpUJydC9m0Je6mhaiNE=
github.com/hannahhoward/cbor-gen-for v0.0.0-20191216214420-3e450425c40c h1:+MSf4NEnLCYZoAgK6fqwc7NH88nM8haFSxKGUGIG3vA=
github.com/hannahhoward/cbor-gen-for v0.0.0-20191216214420-3e450425c40c/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k=
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099 h1:vQqOW42RRM5LoM/1K5dK940VipLqpH8lEVGrMz+mNjU=
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down
58 changes: 17 additions & 41 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,22 @@ import (
"reflect"
"sync"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-cbor-util"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-components/retrievalmarket"
"github.com/filecoin-project/go-fil-components/shared/params"
rmnet "github.com/filecoin-project/go-fil-components/retrievalmarket/network"
"github.com/filecoin-project/go-fil-components/shared/tokenamount"
"github.com/filecoin-project/go-fil-components/shared/types"
)

var log = logging.Logger("retrieval")

type client struct {
h host.Host
bs blockstore.Blockstore
node retrievalmarket.RetrievalClientNode
network rmnet.RetrievalMarketNetwork
bs blockstore.Blockstore
node retrievalmarket.RetrievalClientNode
// The parameters should be replaced by RetrievalClientNode

nextDealLk sync.Mutex
Expand All @@ -38,8 +29,8 @@ type client struct {
}

// NewClient creates a new retrieval client
func NewClient(h host.Host, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
return &client{h: h, bs: bs, node: node}
func NewClient(network rmnet.RetrievalMarketNetwork, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
return &client{network: network, bs: bs, node: node}
}

// V0
Expand All @@ -53,39 +44,22 @@ func (c *client) FindProviders(pieceCID []byte) []retrievalmarket.RetrievalPeer
// TODO: Update to match spec for V0 epic
// https://github.com/filecoin-project/go-retrieval-market-project/issues/8
func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pieceCID []byte, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) {
cid, err := cid.Cast(pieceCID)
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

s, err := c.h.NewStream(ctx, p.ID, retrievalmarket.QueryProtocolID)
s, err := c.network.NewQueryStream(p.ID)
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}
defer s.Close()

err = cborutil.WriteCborRPC(s, &OldQuery{
Piece: cid,
err = s.WriteQuery(retrievalmarket.Query{
PieceCID: pieceCID,
})
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

var oldResp OldQueryResponse
if err := oldResp.UnmarshalCBOR(s); err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

resp := retrievalmarket.QueryResponse{
Status: retrievalmarket.QueryResponseStatus(oldResp.Status),
Size: oldResp.Size,
MinPricePerByte: tokenamount.Div(oldResp.MinPrice, tokenamount.FromInt(oldResp.Size)),
}
return resp, nil
return s.ReadQueryResponse()
}

// TODO: Update to match spec for V0 Epic:
Expand All @@ -110,15 +84,15 @@ func (c *client) Retrieve(ctx context.Context, pieceCID []byte, params retrieval

go func() {
evt := retrievalmarket.ClientEventError
converted, err := cid.Cast(pieceCID)
//converted, err := cid.Cast(pieceCID)

if err == nil {
/*if err == nil {
err = c.retrieveUnixfs(ctx, converted, tokenamount.Div(totalFunds, params.PricePerByte).Uint64(), totalFunds, miner, clientWallet, minerWallet)
if err == nil {
evt = retrievalmarket.ClientEventComplete
dealState.Status = retrievalmarket.DealStatusCompleted
}
}
}*/

c.notifySubscribers(evt, dealState)
}()
Expand Down Expand Up @@ -170,6 +144,7 @@ func (c *client) ListDeals() map[retrievalmarket.DealID]retrievalmarket.ClientDe
panic("not implemented")
}

/*
type clientStream struct {
node retrievalmarket.RetrievalClientNode
stream network.Stream
Expand All @@ -188,7 +163,7 @@ type clientStream struct {
verifier BlockVerifier
bs blockstore.Blockstore
}

*/
/* This is the old retrieval code that is NOT spec compliant */

// C > S
Expand All @@ -203,7 +178,7 @@ type clientStream struct {
// < ..Blocks
// > DealProposal(...)
// < ...
func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total tokenamount.TokenAmount, miner peer.ID, client, minerAddr address.Address) error {
/*func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total tokenamount.TokenAmount, miner peer.ID, client, minerAddr address.Address) error {
s, err := c.h.NewStream(ctx, miner, retrievalmarket.ProtocolID)
if err != nil {
return xerrors.Errorf("failed to open stream to miner for retrieval query: %w", err)
Expand Down Expand Up @@ -382,3 +357,4 @@ func (cst *clientStream) setupPayment(ctx context.Context, toSend tokenamount.To
Vouchers: []*types.SignedVoucher{sv},
}, nil
}
*/
129 changes: 129 additions & 0 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package retrievalimpl_test

import (
"context"
"testing"

"github.com/filecoin-project/go-address"
"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-components/retrievalmarket"
retrievalimpl "github.com/filecoin-project/go-fil-components/retrievalmarket/impl"
"github.com/filecoin-project/go-fil-components/retrievalmarket/network"
rmnet "github.com/filecoin-project/go-fil-components/retrievalmarket/network"
tut "github.com/filecoin-project/go-fil-components/retrievalmarket/network/testutil"
"github.com/filecoin-project/go-fil-components/shared/tokenamount"
"github.com/filecoin-project/go-fil-components/shared/types"
)

func TestClient_Query(t *testing.T) {
ctx := context.Background()

bs := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore()))

pcid := []byte(string("applesauce"))
expectedPeer := peer.ID("somevalue")
rpeer := retrievalmarket.RetrievalPeer{
Address: address.TestAddress2,
ID: expectedPeer,
}

expectedQuery := retrievalmarket.Query{
PieceCID: pcid,
}

expectedQueryResponse := retrievalmarket.QueryResponse{
Status: retrievalmarket.QueryResponseAvailable,
Size: 1234,
PaymentAddress: address.TestAddress,
MinPricePerByte: tokenamount.FromInt(5678),
MaxPaymentInterval: 4321,
MaxPaymentIntervalIncrease: 0,
}

t.Run("it works", func(t *testing.T) {
var qsb tut.QueryStreamBuilder = func(p peer.ID) (rmnet.RetrievalQueryStream, error) {
return tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{
Writer: tut.ExpectQueryWriter(t, expectedQuery, "queries should match"),
RespReader: tut.StubbedQueryResponseReader(expectedQueryResponse),
}), nil
}
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.ExpectPeerOnQueryStreamBuilder(t, expectedPeer, qsb, "Peers should match"),
})
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{})

resp, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
require.NoError(t, err)
assert.NotNil(t, resp)
assert.Equal(t, expectedQueryResponse, resp)
})

t.Run("when the stream returns error, returns error", func(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.FailNewQueryStream,
})
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{})

_, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "new query stream failed")
})

t.Run("when WriteQuery fails, returns error", func(t *testing.T) {

qsbuilder := func(p peer.ID) (network.RetrievalQueryStream, error) {
newStream := tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{
PeerID: p,
Writer: tut.FailQueryWriter,
})
return newStream, nil
}

net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{})

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "write query failed")
assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode)
})

t.Run("when ReadQueryResponse fails, returns error", func(t *testing.T) {
qsbuilder := func(p peer.ID) (network.RetrievalQueryStream, error) {
newStream := tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{
PeerID: p,
RespReader: tut.FailResponseReader,
})
return newStream, nil
}
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{})

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "query response failed")
assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode)
})
}

type testRetrievalNode struct {
}

func (t *testRetrievalNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable tokenamount.TokenAmount) (address.Address, error) {
return address.Address{}, nil
}

func (t *testRetrievalNode) AllocateLane(paymentChannel address.Address) (uint64, error) {
return 0, nil
}

func (t *testRetrievalNode) CreatePaymentVoucher(ctx context.Context, paymentChannel address.Address, amount tokenamount.TokenAmount, lane uint64) (*types.SignedVoucher, error) {
return nil, nil
}
Loading

0 comments on commit 27813e5

Please sign in to comment.