Skip to content

Commit

Permalink
Tentative acceptance protocol (#244)
Browse files Browse the repository at this point in the history
* New provider and client states
State tests, rebase madness
linting
Fix erroneous state!
remove debugging statement
Remove dead function

WIP PR feedback

Fix client-side manual transfer transitions

Remove dead code

Refactor client state test fake environment initialization

Refactor client state test

Remove redundant Provider state

Allow ClientEventDataTransferFailed event to transition from StorageDealTransferring!

* Integrating changes from master

- Remove unused event
- Re-re-refactor client states test

* refactor(storagemarket): minor changes after rebase

add one state test and fix compile error

Co-authored-by: hannahhoward <[email protected]>
  • Loading branch information
ingar and hannahhoward authored May 20, 2020
1 parent 6a1508a commit 095b388
Show file tree
Hide file tree
Showing 17 changed files with 901 additions and 621 deletions.
10 changes: 9 additions & 1 deletion retrievalmarket/storage_retrieval_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ func TestStorageRetrieval(t *testing.T) {
case storageProviderSeenDeal = <-providerDealChan:
case storageClientSeenDeal = <-clientDealChan:
case <-ctxTimeout.Done():
t.Fatalf("never saw completed: %d, %d", storageClientSeenDeal.State, storageProviderSeenDeal.State)
t.Fatalf("never saw completed deal, client deal state: %s (%d), provider deal state: %s (%d)",
storagemarket.DealStates[storageClientSeenDeal.State],
storageClientSeenDeal.State,
storagemarket.DealStates[storageProviderSeenDeal.State],
storageProviderSeenDeal.State,
)
}
}

Expand Down Expand Up @@ -221,7 +226,10 @@ func newStorageHarness(ctx context.Context, t *testing.T) *storageHarness {
&clientNode,
)
require.NoError(t, err)

dt2 := graphsyncimpl.NewGraphSyncDataTransfer(td.Host2, td.GraphSync2, td.DTStoredCounter2)
require.NoError(t, dt2.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, &fakeDTValidator{}))

storedAsk, err := storedask.NewStoredAsk(td.Ds2, datastore.NewKey("latest-ask"), providerNode, providerAddr)
require.NoError(t, err)
provider, err := stormkt.NewProvider(
Expand Down
14 changes: 10 additions & 4 deletions shared_testutil/test_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,20 @@ func MakeTestClientDealProposal() *market.ClientDealProposal {
}

// MakeTestDataRef returns a storage market data ref
func MakeTestDataRef() *storagemarket.DataRef {
return &storagemarket.DataRef{
func MakeTestDataRef(manualXfer bool) *storagemarket.DataRef {
out := &storagemarket.DataRef{
Root: GenerateCids(1)[0],
}

if manualXfer {
out.TransferType = storagemarket.TTManual
}

return out
}

// MakeTestClientDeal returns a storage market client deal
func MakeTestClientDeal(state storagemarket.StorageDealStatus, clientDealProposal *market.ClientDealProposal) (*storagemarket.ClientDeal, error) {
func MakeTestClientDeal(state storagemarket.StorageDealStatus, clientDealProposal *market.ClientDealProposal, manualXfer bool) (*storagemarket.ClientDeal, error) {
proposalNd, err := cborutil.AsIpld(clientDealProposal)

if err != nil {
Expand All @@ -165,7 +171,7 @@ func MakeTestClientDeal(state storagemarket.StorageDealStatus, clientDealProposa
State: state,
Miner: p,
MinerWorker: address.TestAddress2,
DataRef: MakeTestDataRef(),
DataRef: MakeTestDataRef(manualXfer),
}, nil
}

Expand Down
12 changes: 12 additions & 0 deletions shared_testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
"github.com/jbenet/go-random"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/storagemarket"
)

var blockGenerator = blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -94,3 +97,12 @@ func TestVoucherEquality(t *testing.T, a, b *paych.SignedVoucher) {
require.NoError(t, err)
require.True(t, bytes.Equal(aB, bB))
}

// AssertDealState asserts equality of StorageDealStatus but with better error messaging
func AssertDealState(t *testing.T, expected storagemarket.StorageDealStatus, actual storagemarket.StorageDealStatus) {
assert.Equal(t, expected, actual,
"Unexpected deal status\nexpected: %s (%d)\nactual : %s (%d)",
storagemarket.DealStates[expected], expected,
storagemarket.DealStates[actual], actual,
)
}
16 changes: 10 additions & 6 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

Expand All @@ -29,6 +30,7 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/connmanager"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

Expand All @@ -39,11 +41,6 @@ var _ storagemarket.StorageClient = &Client{}
type Client struct {
net network.StorageMarketNetwork

// dataTransfer
// TODO: once the data transfer module is complete, the
// client will listen to events on the data transfer module
// Because we are using only a fake DAGService
// implementation, there's no validation or events on the client side
dataTransfer datatransfer.Manager
bs blockstore.Blockstore
pio pieceio.PieceIO
Expand Down Expand Up @@ -89,6 +86,10 @@ func NewClient(
return nil, err
}
c.statemachines = statemachines

// register a data transfer event handler -- this will send events to the state machines based on DT events
dataTransfer.SubscribeToEvents(dtutils.ClientDataTransferSubscriber(statemachines))

return c, nil
}

Expand Down Expand Up @@ -385,4 +386,7 @@ func (c *clientDealEnvironment) CloseStream(proposalCid cid.Cid) error {
return c.c.conns.Disconnect(proposalCid)
}

var _ clientstates.ClientDealEnvironment = &clientDealEnvironment{}
func (c *clientDealEnvironment) StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error {
_, err := c.c.dataTransfer.OpenPushDataChannel(ctx, to, voucher, baseCid, selector)
return err
}
43 changes: 27 additions & 16 deletions storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,35 @@ var ClientEvents = fsm.Events{
return nil
}),
fsm.Event(storagemarket.ClientEventDealProposed).
From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealValidating),
fsm.Event(storagemarket.ClientEventDealStreamLookupErrored).
FromAny().To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("miner connection error: %w", err).Error()
return nil
}),
From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealWaitingForDataRequest),
fsm.Event(storagemarket.ClientEventReadResponseFailed).
From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealError).
FromMany(storagemarket.StorageDealWaitingForDataRequest, storagemarket.StorageDealValidating).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error reading Response message: %w", err).Error()
return nil
}),
fsm.Event(storagemarket.ClientEventResponseVerificationFailed).
From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealFailing).
FromMany(storagemarket.StorageDealWaitingForDataRequest, storagemarket.StorageDealValidating).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal) error {
deal.Message = "unable to verify signature on deal response"
return nil
}),
fsm.Event(storagemarket.ClientEventUnexpectedDealState).
From(storagemarket.StorageDealWaitingForDataRequest).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, status storagemarket.StorageDealStatus) error {
deal.Message = xerrors.Errorf("unexpected deal status while waiting for data request: %d", status).Error()
return nil
}),
fsm.Event(storagemarket.ClientEventDataTransferFailed).
FromMany(storagemarket.StorageDealWaitingForDataRequest, storagemarket.StorageDealTransferring).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("failed to initiate data transfer: %w", err).Error()
return nil
}),
fsm.Event(storagemarket.ClientEventDataTransferInitiated).
From(storagemarket.StorageDealWaitingForDataRequest).To(storagemarket.StorageDealTransferring),
fsm.Event(storagemarket.ClientEventDataTransferComplete).
FromMany(storagemarket.StorageDealTransferring, storagemarket.StorageDealWaitingForDataRequest).To(storagemarket.StorageDealValidating),
fsm.Event(storagemarket.ClientEventResponseDealDidNotMatch).
From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, responseCid cid.Cid, proposalCid cid.Cid) error {
Expand Down Expand Up @@ -104,11 +114,12 @@ var ClientEvents = fsm.Events{

// ClientStateEntryFuncs are the handlers for different states in a storage client
var ClientStateEntryFuncs = fsm.StateEntryFuncs{
storagemarket.StorageDealEnsureClientFunds: EnsureClientFunds,
storagemarket.StorageDealClientFunding: WaitForFunding,
storagemarket.StorageDealFundsEnsured: ProposeDeal,
storagemarket.StorageDealValidating: VerifyDealResponse,
storagemarket.StorageDealProposalAccepted: ValidateDealPublished,
storagemarket.StorageDealSealing: VerifyDealActivated,
storagemarket.StorageDealFailing: FailDeal,
storagemarket.StorageDealEnsureClientFunds: EnsureClientFunds,
storagemarket.StorageDealClientFunding: WaitForFunding,
storagemarket.StorageDealFundsEnsured: ProposeDeal,
storagemarket.StorageDealWaitingForDataRequest: WaitingForDataRequest,
storagemarket.StorageDealValidating: VerifyDealResponse,
storagemarket.StorageDealProposalAccepted: ValidateDealPublished,
storagemarket.StorageDealSealing: VerifyDealActivated,
storagemarket.StorageDealFailing: FailDeal,
}
53 changes: 52 additions & 1 deletion storagemarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package clientstates

import (
"context"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

Expand All @@ -23,6 +29,7 @@ type ClientDealEnvironment interface {
TagConnection(proposalCid cid.Cid) error
ReadDealResponse(proposalCid cid.Cid) (network.SignedResponse, error)
CloseStream(proposalCid cid.Cid) error
StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error
}

// ClientStateEntryFunc is the type for all state entry functions on a storage client
Expand Down Expand Up @@ -83,6 +90,50 @@ func ProposeDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storag
return ctx.Trigger(storagemarket.ClientEventDealProposed)
}

func WaitingForDataRequest(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {
resp, err := environment.ReadDealResponse(deal.ProposalCid)
if err != nil {
return ctx.Trigger(storagemarket.ClientEventReadResponseFailed, err)
}

tok, _, err := environment.Node().GetChainHead(ctx.Context())
if err != nil {
return ctx.Trigger(storagemarket.ClientEventResponseVerificationFailed)
}

if err := clientutils.VerifyResponse(ctx.Context(), resp, deal.MinerWorker, tok, environment.Node().VerifySignature); err != nil {
return ctx.Trigger(storagemarket.ClientEventResponseVerificationFailed)
}

if resp.Response.State != storagemarket.StorageDealWaitingForData {
return ctx.Trigger(storagemarket.ClientEventUnexpectedDealState, resp.Response.State)
}

if deal.DataRef.TransferType == storagemarket.TTManual {
log.Infof("manual data transfer for deal %s", deal.ProposalCid)

// Temporary, we will move to a query/response protocol to check on deal status
return ctx.Trigger(storagemarket.ClientEventDataTransferComplete)
}

log.Infof("sending data for a deal %s", deal.ProposalCid)

// initiate a push data transfer. This will complete asynchronously and the
// completion of the data transfer will trigger a change in deal state
err = environment.StartDataTransfer(ctx.Context(),
deal.Miner,
&requestvalidation.StorageDataTransferVoucher{Proposal: deal.ProposalCid},
deal.DataRef.Root,
shared.AllSelector(),
)

if err != nil {
return ctx.Trigger(storagemarket.ClientEventDataTransferFailed, xerrors.Errorf("failed to open push data channel: %w", err))
}

return ctx.Trigger(storagemarket.ClientEventDataTransferInitiated)
}

// VerifyDealResponse reads and verifies the response from the provider to the proposed deal
func VerifyDealResponse(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {

Expand Down
Loading

0 comments on commit 095b388

Please sign in to comment.