Skip to content

Commit

Permalink
Techdebt/1 block file retrieval test (#51)
Browse files Browse the repository at this point in the history
* < 1 block file retrieval test

* parameterize the voucher amount so the key lookup succeeds for the test provider node

* add a provider subscriber and use a channel to check that the provider sees the completed deal

* please the linter

* mod tidy

* reduce PR footprint
  • Loading branch information
shannonwells authored and hannahhoward committed Jan 22, 2020
1 parent 0321c8c commit db66f33
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 114 deletions.
6 changes: 3 additions & 3 deletions retrievalmarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"fmt"

"golang.org/x/xerrors"

rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"

"golang.org/x/xerrors"
)

// ClientDealEnvironment is a bridge to the environment a client deal is executing in
Expand Down Expand Up @@ -89,7 +89,7 @@ func ProcessPaymentRequested(ctx context.Context, environment ClientDealEnvironm
}

// check that totalReceived - bytesPaidFor >= currentInterval, or fail
if (deal.TotalReceived-deal.BytesPaidFor < deal.CurrentInterval) && deal.Status != rm.DealStatusFundsNeededLastPayment{
if (deal.TotalReceived-deal.BytesPaidFor < deal.CurrentInterval) && deal.Status != rm.DealStatusFundsNeededLastPayment {
return errorFunc(xerrors.New("not enough bytes received between payment request"))
}

Expand Down
265 changes: 155 additions & 110 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-data-transfer/testutil"
"github.com/ipfs/go-log/v2"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -92,7 +91,6 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC
}

func TestClientCanMakeDealWithProvider(t *testing.T) {
log.SetDebugLogging()
bgCtx := context.Background()
clientPaymentChannel, err := address.NewIDAddress(rand.Uint64())
require.NoError(t, err)
Expand All @@ -101,124 +99,171 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {

// -------- SET UP PROVIDER

// Inject a unixFS file on the provider side to its blockstore
// obtained via `ls -laf` on this file

// pieceLink := testData.LoadUnixFSFile(t, "lorem_big.txt", true)
// fileSize := uint64(89359)
pieceLink := testData.LoadUnixFSFile(t, "lorem.txt", true)
fileSize := uint64(19000)


pieceCID := []byte("pieceCID")
providerPaymentAddr, err := address.NewIDAddress(rand.Uint64())
require.NoError(t, err)
paymentInterval := uint64(10000)
paymentIntervalIncrease := uint64(1000)
pricePerByte := tokenamount.FromInt(1000)

expectedQR := retrievalmarket.QueryResponse{
Size: 1024,
PaymentAddress: providerPaymentAddr,
MinPricePerByte: pricePerByte,
MaxPaymentInterval: paymentInterval,
MaxPaymentIntervalIncrease: paymentIntervalIncrease,
testCases := []struct {
name string
filename string
filesize uint64
voucherAmts []tokenamount.TokenAmount
}{
{name: "1 block file retrieval succeeds",
filename: "lorem_under_1_block.txt",
filesize: 410,
voucherAmts: []tokenamount.TokenAmount{tokenamount.FromInt(410000)}},
{name: "multi-block file retrieval succeeds",
filename: "lorem.txt",
filesize: 19000,
voucherAmts: []tokenamount.TokenAmount{tokenamount.FromInt(10136000), tokenamount.FromInt(9784000)}},
}

providerNode := setupProvider(t, testData, pieceCID, expectedQR, providerPaymentAddr )

retrievalPeer := &retrievalmarket.RetrievalPeer{Address: providerPaymentAddr, ID: testData.Host2.ID(),}

expectedVoucher := tut.MakeTestSignedVoucher()

// just make sure there is enough to cover the transfer
expectedTotal := tokenamount.Mul(pricePerByte, tokenamount.FromInt(fileSize*2))

// this is just pulled from the actual answer so the expected keys in the test node match up.
// later we compare the voucher values.
expectedVoucher.Amount = tokenamount.FromInt(10136000)
proof := []byte("")
require.NoError(t, providerNode.ExpectVoucher(clientPaymentChannel, expectedVoucher, proof, expectedVoucher.Amount, expectedVoucher.Amount, nil))

// ------- SET UP CLIENT
nw1 := rmnet.NewFromLibp2pHost(testData.Host1)

createdChan, newLaneAddr, createdVoucher, client := setupClient(clientPaymentChannel, expectedVoucher, nw1, testData)

dealStateChan := make(chan retrievalmarket.ClientDealState)
client.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) {
switch event {
case retrievalmarket.ClientEventComplete:
dealStateChan <- state
case retrievalmarket.ClientEventError:
msg := `
Status: %d
TotalReceived: %d
BytesPaidFor: %d
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
// Inject a unixFS file on the provider side to its blockstore
// obtained via `ls -laf` on this file
pieceLink := testData.LoadUnixFSFile(t, testCase.filename, true)

pieceCID := []byte("pieceCID")
providerPaymentAddr, err := address.NewIDAddress(rand.Uint64())
require.NoError(t, err)
paymentInterval := uint64(10000)
paymentIntervalIncrease := uint64(1000)
pricePerByte := tokenamount.FromInt(1000)

expectedQR := retrievalmarket.QueryResponse{
Size: 1024,
PaymentAddress: providerPaymentAddr,
MinPricePerByte: pricePerByte,
MaxPaymentInterval: paymentInterval,
MaxPaymentIntervalIncrease: paymentIntervalIncrease,
}

providerNode, provider := setupProvider(t, testData, pieceCID, expectedQR, providerPaymentAddr)

retrievalPeer := &retrievalmarket.RetrievalPeer{Address: providerPaymentAddr, ID: testData.Host2.ID(),}

expectedVoucher := tut.MakeTestSignedVoucher()

// just make sure there is enough to cover the transfer
expectedTotal := tokenamount.Mul(pricePerByte, tokenamount.FromInt(testCase.filesize*2))

// voucherAmts are pulled from the actual answer so the expected keys in the test node match up.
// later we compare the voucher values. The last voucherAmt is a remainder
proof := []byte("")
for _, voucherAmt := range testCase.voucherAmts {
require.NoError(t, providerNode.ExpectVoucher(clientPaymentChannel, expectedVoucher, proof, voucherAmt, voucherAmt, nil))
}

// ------- SET UP CLIENT
nw1 := rmnet.NewFromLibp2pHost(testData.Host1)

createdChan, newLaneAddr, createdVoucher, client := setupClient(clientPaymentChannel, expectedVoucher, nw1, testData)

clientDealStateChan := make(chan retrievalmarket.ClientDealState)
client.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) {
switch event {
case retrievalmarket.ClientEventComplete:
clientDealStateChan <- state
case retrievalmarket.ClientEventError:
msg := `
Status: %d
TotalReceived: %d
BytesPaidFor: %d
CurrentInterval: %d
TotalFunds: %s
TotalFunds: %s
`
t.Logf(msg, state.Status, state.TotalReceived, state.BytesPaidFor, state.CurrentInterval,state.TotalFunds.String(),)
}
})

// **** Send the query for the Piece
// set up retrieval params
resp, err := client.Query(bgCtx, *retrievalPeer, pieceCID, retrievalmarket.QueryParams{})
require.NoError(t, err)
require.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status)

c, ok := pieceLink.(cidlink.Link)
require.True(t, ok)
payloadCID := c.Cid

rmParams := retrievalmarket.Params{
PricePerByte: pricePerByte,
PaymentInterval: paymentInterval,
PaymentIntervalIncrease: paymentIntervalIncrease,
PayloadCID: payloadCID,
t.Logf(msg, state.Status, state.TotalReceived, state.BytesPaidFor, state.CurrentInterval, state.TotalFunds.String(), )
}
})

providerDealStateChan := make(chan retrievalmarket.ProviderDealState)
provider.SubscribeToEvents(func(event retrievalmarket.ProviderEvent, state retrievalmarket.ProviderDealState) {
switch event {
case retrievalmarket.ProviderEventComplete:
providerDealStateChan <- state
case retrievalmarket.ProviderEventError:
msg := `
Status: %d
TotalSent: %d
FundsReceived: %s
Message: %s
CurrentInterval: %d
`
t.Logf(msg, state.Status, state.TotalSent, state.FundsReceived.String(), state.Message, state.CurrentInterval)
}
})

// **** Send the query for the Piece
// set up retrieval params
resp, err := client.Query(bgCtx, *retrievalPeer, pieceCID, retrievalmarket.QueryParams{})
require.NoError(t, err)
require.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status)

c, ok := pieceLink.(cidlink.Link)
require.True(t, ok)
payloadCID := c.Cid

rmParams := retrievalmarket.Params{
PricePerByte: pricePerByte,
PaymentInterval: paymentInterval,
PaymentIntervalIncrease: paymentIntervalIncrease,
PayloadCID: payloadCID,
}

// *** Retrieve the piece
did := client.Retrieve(bgCtx, pieceCID, rmParams, expectedTotal, retrievalPeer.ID, clientPaymentChannel, retrievalPeer.Address)
assert.Equal(t, did, retrievalmarket.DealID(1))

ctx, cancel := context.WithTimeout(bgCtx, 10*time.Second)
defer cancel()

// verify that client subscribers will be notified of state changes
var clientDealState retrievalmarket.ClientDealState
select {
case <-ctx.Done():
t.Error("deal never completed")
t.FailNow()
case clientDealState = <-clientDealStateChan:
}
assert.Equal(t, clientDealState.Lane, expectedVoucher.Lane)
require.NotNil(t, createdChan)
require.Equal(t, expectedTotal, createdChan.amt)
require.Equal(t, clientPaymentChannel, *newLaneAddr)
// verify that the voucher was saved/seen by the client with correct values
require.NotNil(t, createdVoucher)
assert.True(t, createdVoucher.Equals(expectedVoucher))

ctx, cancel = context.WithTimeout(bgCtx, 10*time.Second)
defer cancel()
var providerDealState retrievalmarket.ProviderDealState
select {
case <-ctx.Done():
t.Error("provider never saw completed deal")
t.FailNow()
case providerDealState = <- providerDealStateChan:
}

require.Equal(t, retrievalmarket.DealStatusCompleted, providerDealState.Status)

// verify that the provider saved the same voucher values
providerNode.VerifyExpectations(t)
testData.VerifyFileTransferred(t, pieceLink, false)
})
}

// *** Retrieve the piece
did := client.Retrieve(bgCtx, pieceCID, rmParams, expectedTotal, retrievalPeer.ID, clientPaymentChannel, retrievalPeer.Address)
assert.Equal(t, did, retrievalmarket.DealID(1))

ctx , cancel := context.WithTimeout(bgCtx, 20*time.Second)
defer cancel()

var dealState retrievalmarket.ClientDealState
select {
case <- ctx.Done():
t.Error("deal never completed")
t.FailNow()
case dealState = <- dealStateChan:
}
assert.Equal(t, dealState.Lane, expectedVoucher.Lane)
require.NotNil(t, createdChan)
require.Equal(t, expectedTotal, createdChan.amt)
require.Equal(t, clientPaymentChannel, *newLaneAddr)
// verify that the voucher was saved/seen by the client with correct values
require.NotNil(t, createdVoucher)
assert.True(t, createdVoucher.Equals(expectedVoucher))
// // verify that the provider saved the same voucher values
providerNode.VerifyExpectations(t)
testData.VerifyFileTransferred(t, pieceLink, false)
}

func setupClient(
clientPaymentChannel address.Address,
expectedVoucher *types.SignedVoucher,
nw1 rmnet.RetrievalMarketNetwork,
testData *tut.Libp2pTestData) ( *pmtChan,
*address.Address,
*types.SignedVoucher,
retrievalmarket.RetrievalClient) {
var createdChan pmtChan
clientPaymentChannel address.Address,
expectedVoucher *types.SignedVoucher,
nw1 rmnet.RetrievalMarketNetwork,
testData *tut.Libp2pTestData) (*pmtChan,
*address.Address,
*types.SignedVoucher,
retrievalmarket.RetrievalClient) {
var createdChan pmtChan
paymentChannelRecorder := func(client, miner address.Address, amt tokenamount.TokenAmount) {
createdChan = pmtChan{client, miner, amt}
}

var newLaneAddr address.Address
var newLaneAddr address.Address
laneRecorder := func(paymentChannel address.Address) {
newLaneAddr = paymentChannel
}
Expand All @@ -239,7 +284,7 @@ func setupClient(
return &createdChan, &newLaneAddr, &createdVoucher, client
}

func setupProvider(t *testing.T, testData *tut.Libp2pTestData, pieceCID []byte, expectedQR retrievalmarket.QueryResponse, providerPaymentAddr address.Address) *testnodes.TestRetrievalProviderNode {
func setupProvider(t *testing.T, testData *tut.Libp2pTestData, pieceCID []byte, expectedQR retrievalmarket.QueryResponse, providerPaymentAddr address.Address) (*testnodes.TestRetrievalProviderNode, retrievalmarket.RetrievalProvider) {
nw2 := rmnet.NewFromLibp2pHost(testData.Host2)
providerNode := testnodes.NewTestRetrievalProviderNode()
providerNode.SetBlockstore(testData.Bs2)
Expand All @@ -248,7 +293,7 @@ func setupProvider(t *testing.T, testData *tut.Libp2pTestData, pieceCID []byte,
provider.SetPaymentInterval(expectedQR.MaxPaymentInterval, expectedQR.MaxPaymentIntervalIncrease)
provider.SetPricePerByte(expectedQR.MinPricePerByte)
require.NoError(t, provider.Start())
return providerNode
return providerNode, provider
}

type pmtChan struct {
Expand Down
3 changes: 2 additions & 1 deletion retrievalmarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package providerstates
import (
"context"

"golang.org/x/xerrors"

rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"golang.org/x/xerrors"
)

// ProviderDealEnvironment is a bridge to the environment a provider deal is executing in
Expand Down

0 comments on commit db66f33

Please sign in to comment.