Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Query Protocol to Spec V0 #25

Merged
merged 8 commits into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ jobs:
- run: sudo apt-get update
- restore_cache:
name: restore go mod cache
key: v1-go-deps-{{ arch }}-{{ checksum "/home/circleci/project/go-fil-components/go.mod" }}
key: v1-go-deps-{{ arch }}-{{ checksum "/home/circleci/project/go-fil-markets/go.mod" }}
- run:
command: make build
- store_artifacts:
path: go-fil-components
path: go-fil-markets
- store_artifacts:
path: go-fil-components
path: go-fil-markets

test: &test
description: |
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# go-fil-markets
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![CircleCI](https://circleci.com/gh/filecoin-project/go-fil-components.svg?style=svg)](https://circleci.com/gh/filecoin-project/go-fil-components)
[![codecov](https://codecov.io/gh/filecoin-project/go-fil-components/branch/master/graph/badge.svg)](https://codecov.io/gh/filecoin-project/go-fil-components)
[![CircleCI](https://circleci.com/gh/filecoin-project/go-fil-markets.svg?style=svg)](https://circleci.com/gh/filecoin-project/go-fil-markets)
[![codecov](https://codecov.io/gh/filecoin-project/go-fil-markets/branch/master/graph/badge.svg)](https://codecov.io/gh/filecoin-project/go-fil-markets)

This repository contains modular implementations of the storage and retrieval market subsystems of Filecoin. These modules are guided by the [v1.0 and 1.1 Filecoin specification updates](https://filecoin-project.github.io/specs/#intro__changelog).

Expand All @@ -14,7 +14,7 @@ Separating an implementation into a blockchain component and one or more mining
## Contributing
PRs are welcome! Please first read the design docs and look over the current code. PRs against
master require approval of at least two maintainers. For the rest, please see our
[CONTRIBUTING](.go-fil-components/CONTRIBUTING.md) guide.
[CONTRIBUTING](.go-fil-markets/CONTRIBUTING.md) guide.

## Project-level documentation
The filecoin-project has a [community repo](https://github.com/filecoin-project/community) that documents in more detail our policies and guidelines, such as discussion forums and chat rooms and [Code of Conduct](https://github.com/filecoin-project/community/blob/master/CODE_OF_CONDUCT.md).
Expand Down
64 changes: 15 additions & 49 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,22 @@ import (
"reflect"
"sync"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-cbor-util"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared/params"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"github.com/filecoin-project/go-fil-markets/shared/types"
)

var log = logging.Logger("retrieval")

type client struct {
h host.Host
bs blockstore.Blockstore
node retrievalmarket.RetrievalClientNode
network rmnet.RetrievalMarketNetwork
bs blockstore.Blockstore
node retrievalmarket.RetrievalClientNode
// The parameters should be replaced by RetrievalClientNode

nextDealLk sync.Mutex
Expand All @@ -39,8 +30,8 @@ type client struct {
}

// NewClient creates a new retrieval client
func NewClient(h host.Host, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
return &client{h: h, bs: bs, node: node}
func NewClient(network rmnet.RetrievalMarketNetwork, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
return &client{network: network, bs: bs, node: node}
}

// V0
Expand All @@ -54,39 +45,22 @@ func (c *client) FindProviders(pieceCID []byte) []retrievalmarket.RetrievalPeer
// TODO: Update to match spec for V0 epic
// https://github.com/filecoin-project/go-retrieval-market-project/issues/8
func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pieceCID []byte, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) {
cid, err := cid.Cast(pieceCID)
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

s, err := c.h.NewStream(ctx, p.ID, retrievalmarket.QueryProtocolID)
s, err := c.network.NewQueryStream(p.ID)
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}
defer s.Close()

err = cborutil.WriteCborRPC(s, &OldQuery{
Piece: cid,
err = s.WriteQuery(retrievalmarket.Query{
PieceCID: pieceCID,
})
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

var oldResp OldQueryResponse
if err := oldResp.UnmarshalCBOR(s); err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

resp := retrievalmarket.QueryResponse{
Status: retrievalmarket.QueryResponseStatus(oldResp.Status),
Size: oldResp.Size,
MinPricePerByte: tokenamount.Div(oldResp.MinPrice, tokenamount.FromInt(oldResp.Size)),
}
return resp, nil
return s.ReadQueryResponse()
}

// TODO: Update to match spec for V0 Epic:
Expand All @@ -111,16 +85,6 @@ func (c *client) Retrieve(ctx context.Context, pieceCID []byte, params retrieval

go func() {
evt := retrievalmarket.ClientEventError
converted, err := cid.Cast(pieceCID)

if err == nil {
err = c.retrieveUnixfs(ctx, converted, tokenamount.Div(totalFunds, params.PricePerByte).Uint64(), totalFunds, miner, clientWallet, minerWallet)
if err == nil {
evt = retrievalmarket.ClientEventComplete
dealState.Status = retrievalmarket.DealStatusCompleted
}
}

c.notifySubscribers(evt, dealState)
}()

Expand Down Expand Up @@ -178,6 +142,7 @@ func (c *client) ListDeals() map[retrievalmarket.DealID]retrievalmarket.ClientDe
panic("not implemented")
}

/*
type clientStream struct {
node retrievalmarket.RetrievalClientNode
stream network.Stream
Expand All @@ -196,7 +161,7 @@ type clientStream struct {
verifier BlockVerifier
bs blockstore.Blockstore
}

*/
/* This is the old retrieval code that is NOT spec compliant */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same re: commented out code

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not delete in this case -- it's the whole old implementation -- need it for reference


// C > S
Expand All @@ -211,7 +176,7 @@ type clientStream struct {
// < ..Blocks
// > DealProposal(...)
// < ...
func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total tokenamount.TokenAmount, miner peer.ID, client, minerAddr address.Address) error {
/*func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total tokenamount.TokenAmount, miner peer.ID, client, minerAddr address.Address) error {
s, err := c.h.NewStream(ctx, miner, retrievalmarket.ProtocolID)
if err != nil {
return xerrors.Errorf("failed to open stream to miner for retrieval query: %w", err)
Expand Down Expand Up @@ -390,3 +355,4 @@ func (cst *clientStream) setupPayment(ctx context.Context, toSend tokenamount.To
Vouchers: []*types.SignedVoucher{sv},
}, nil
}
*/
129 changes: 129 additions & 0 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package retrievalimpl_test

import (
"context"
"testing"

"github.com/filecoin-project/go-address"
"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"github.com/filecoin-project/go-fil-markets/shared/types"
tut "github.com/filecoin-project/go-fil-markets/shared_testutil"
)

func TestClient_Query(t *testing.T) {
ctx := context.Background()

bs := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore()))

pcid := []byte(string("applesauce"))
expectedPeer := peer.ID("somevalue")
rpeer := retrievalmarket.RetrievalPeer{
Address: address.TestAddress2,
ID: expectedPeer,
}

expectedQuery := retrievalmarket.Query{
PieceCID: pcid,
}

expectedQueryResponse := retrievalmarket.QueryResponse{
Status: retrievalmarket.QueryResponseAvailable,
Size: 1234,
PaymentAddress: address.TestAddress,
MinPricePerByte: tokenamount.FromInt(5678),
MaxPaymentInterval: 4321,
MaxPaymentIntervalIncrease: 0,
}

t.Run("it works", func(t *testing.T) {
var qsb tut.QueryStreamBuilder = func(p peer.ID) (rmnet.RetrievalQueryStream, error) {
return tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{
Writer: tut.ExpectQueryWriter(t, expectedQuery, "queries should match"),
RespReader: tut.StubbedQueryResponseReader(expectedQueryResponse),
}), nil
}
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.ExpectPeerOnQueryStreamBuilder(t, expectedPeer, qsb, "Peers should match"),
})
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{})

resp, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
require.NoError(t, err)
assert.NotNil(t, resp)
assert.Equal(t, expectedQueryResponse, resp)
})

t.Run("when the stream returns error, returns error", func(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.FailNewQueryStream,
})
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{})

_, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "new query stream failed")
})

t.Run("when WriteQuery fails, returns error", func(t *testing.T) {

qsbuilder := func(p peer.ID) (network.RetrievalQueryStream, error) {
newStream := tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{
PeerID: p,
Writer: tut.FailQueryWriter,
})
return newStream, nil
}

net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{})

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "write query failed")
assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode)
})

t.Run("when ReadQueryResponse fails, returns error", func(t *testing.T) {
qsbuilder := func(p peer.ID) (network.RetrievalQueryStream, error) {
newStream := tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{
PeerID: p,
RespReader: tut.FailResponseReader,
})
return newStream, nil
}
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{})

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "query response failed")
assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode)
})
}

type testRetrievalNode struct {
}

func (t *testRetrievalNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable tokenamount.TokenAmount) (address.Address, error) {
return address.Address{}, nil
}

func (t *testRetrievalNode) AllocateLane(paymentChannel address.Address) (uint64, error) {
return 0, nil
}

func (t *testRetrievalNode) CreatePaymentVoucher(ctx context.Context, paymentChannel address.Address, amount tokenamount.TokenAmount, lane uint64) (*types.SignedVoucher, error) {
return nil, nil
}
Loading