Skip to content

Commit

Permalink
New provider and client states
Browse files Browse the repository at this point in the history
State tests, rebase madness
linting
Fix erroneous state!
remove debugging statement
Remove dead function

WIP PR feedback

Fix client-side manual transfer transitions

Remove dead code

Refactor client state test fake environment initialization

Refactor client state test

Remove redundant Provider state

Allow ClientEventDataTransferFailed event to transition from StorageDealTransferring!
  • Loading branch information
ingar authored and hannahhoward committed May 20, 2020
1 parent 6a1508a commit 383c016
Show file tree
Hide file tree
Showing 17 changed files with 802 additions and 540 deletions.
10 changes: 9 additions & 1 deletion retrievalmarket/storage_retrieval_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ func TestStorageRetrieval(t *testing.T) {
case storageProviderSeenDeal = <-providerDealChan:
case storageClientSeenDeal = <-clientDealChan:
case <-ctxTimeout.Done():
t.Fatalf("never saw completed: %d, %d", storageClientSeenDeal.State, storageProviderSeenDeal.State)
t.Fatalf("never saw completed deal, client deal state: %s (%d), provider deal state: %s (%d)",
storagemarket.DealStates[storageClientSeenDeal.State],
storageClientSeenDeal.State,
storagemarket.DealStates[storageProviderSeenDeal.State],
storageProviderSeenDeal.State,
)
}
}

Expand Down Expand Up @@ -221,7 +226,10 @@ func newStorageHarness(ctx context.Context, t *testing.T) *storageHarness {
&clientNode,
)
require.NoError(t, err)

dt2 := graphsyncimpl.NewGraphSyncDataTransfer(td.Host2, td.GraphSync2, td.DTStoredCounter2)
require.NoError(t, dt2.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, &fakeDTValidator{}))

storedAsk, err := storedask.NewStoredAsk(td.Ds2, datastore.NewKey("latest-ask"), providerNode, providerAddr)
require.NoError(t, err)
provider, err := stormkt.NewProvider(
Expand Down
14 changes: 10 additions & 4 deletions shared_testutil/test_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,20 @@ func MakeTestClientDealProposal() *market.ClientDealProposal {
}

// MakeTestDataRef returns a storage market data ref
func MakeTestDataRef() *storagemarket.DataRef {
return &storagemarket.DataRef{
func MakeTestDataRef(manualXfer bool) *storagemarket.DataRef {
out := &storagemarket.DataRef{
Root: GenerateCids(1)[0],
}

if manualXfer {
out.TransferType = storagemarket.TTManual
}

return out
}

// MakeTestClientDeal returns a storage market client deal
func MakeTestClientDeal(state storagemarket.StorageDealStatus, clientDealProposal *market.ClientDealProposal) (*storagemarket.ClientDeal, error) {
func MakeTestClientDeal(state storagemarket.StorageDealStatus, clientDealProposal *market.ClientDealProposal, manualXfer bool) (*storagemarket.ClientDeal, error) {
proposalNd, err := cborutil.AsIpld(clientDealProposal)

if err != nil {
Expand All @@ -165,7 +171,7 @@ func MakeTestClientDeal(state storagemarket.StorageDealStatus, clientDealProposa
State: state,
Miner: p,
MinerWorker: address.TestAddress2,
DataRef: MakeTestDataRef(),
DataRef: MakeTestDataRef(manualXfer),
}, nil
}

Expand Down
12 changes: 12 additions & 0 deletions shared_testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
"github.com/jbenet/go-random"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/storagemarket"
)

var blockGenerator = blocksutil.NewBlockGenerator()
Expand Down Expand Up @@ -94,3 +97,12 @@ func TestVoucherEquality(t *testing.T, a, b *paych.SignedVoucher) {
require.NoError(t, err)
require.True(t, bytes.Equal(aB, bB))
}

// AssertDealState asserts equality of StorageDealStatus but with better error messaging
func AssertDealState(t *testing.T, expected storagemarket.StorageDealStatus, actual storagemarket.StorageDealStatus) {
assert.Equal(t, expected, actual,
"Unexpected deal status\nexpected: %s (%d)\nactual : %s (%d)",
storagemarket.DealStates[expected], expected,
storagemarket.DealStates[actual], actual,
)
}
16 changes: 10 additions & 6 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

Expand All @@ -29,6 +30,7 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/connmanager"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

Expand All @@ -39,11 +41,6 @@ var _ storagemarket.StorageClient = &Client{}
type Client struct {
net network.StorageMarketNetwork

// dataTransfer
// TODO: once the data transfer module is complete, the
// client will listen to events on the data transfer module
// Because we are using only a fake DAGService
// implementation, there's no validation or events on the client side
dataTransfer datatransfer.Manager
bs blockstore.Blockstore
pio pieceio.PieceIO
Expand Down Expand Up @@ -89,6 +86,10 @@ func NewClient(
return nil, err
}
c.statemachines = statemachines

// register a data transfer event handler -- this will send events to the state machines based on DT events
dataTransfer.SubscribeToEvents(dtutils.ClientDataTransferSubscriber(statemachines))

return c, nil
}

Expand Down Expand Up @@ -385,4 +386,7 @@ func (c *clientDealEnvironment) CloseStream(proposalCid cid.Cid) error {
return c.c.conns.Disconnect(proposalCid)
}

var _ clientstates.ClientDealEnvironment = &clientDealEnvironment{}
func (c *clientDealEnvironment) StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error {
_, err := c.c.dataTransfer.OpenPushDataChannel(ctx, to, voucher, baseCid, selector)
return err
}
37 changes: 27 additions & 10 deletions storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,41 @@ var ClientEvents = fsm.Events{
return nil
}),
fsm.Event(storagemarket.ClientEventDealProposed).
From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealValidating),
From(storagemarket.StorageDealFundsEnsured).To(storagemarket.StorageDealWaitingForDataRequest),
fsm.Event(storagemarket.ClientEventDealStreamLookupErrored).
FromAny().To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("miner connection error: %w", err).Error()
return nil
}),
fsm.Event(storagemarket.ClientEventReadResponseFailed).
From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealError).
FromMany(storagemarket.StorageDealWaitingForDataRequest, storagemarket.StorageDealValidating).To(storagemarket.StorageDealError).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("error reading Response message: %w", err).Error()
return nil
}),
fsm.Event(storagemarket.ClientEventResponseVerificationFailed).
From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealFailing).
FromMany(storagemarket.StorageDealWaitingForDataRequest, storagemarket.StorageDealValidating).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal) error {
deal.Message = "unable to verify signature on deal response"
return nil
}),
fsm.Event(storagemarket.ClientEventUnexpectedDealState).
From(storagemarket.StorageDealWaitingForDataRequest).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, status storagemarket.StorageDealStatus) error {
deal.Message = xerrors.Errorf("unexpected deal status while waiting for data request: %d", status).Error()
return nil
}),
fsm.Event(storagemarket.ClientEventDataTransferFailed).
FromMany(storagemarket.StorageDealWaitingForDataRequest, storagemarket.StorageDealTransferring).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("failed to initiate data transfer: %w", err).Error()
return nil
}),
fsm.Event(storagemarket.ClientEventDataTransferInitiated).
From(storagemarket.StorageDealWaitingForDataRequest).To(storagemarket.StorageDealTransferring),
fsm.Event(storagemarket.ClientEventDataTransferComplete).
FromMany(storagemarket.StorageDealTransferring, storagemarket.StorageDealWaitingForDataRequest).To(storagemarket.StorageDealValidating),
fsm.Event(storagemarket.ClientEventResponseDealDidNotMatch).
From(storagemarket.StorageDealValidating).To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, responseCid cid.Cid, proposalCid cid.Cid) error {
Expand Down Expand Up @@ -104,11 +120,12 @@ var ClientEvents = fsm.Events{

// ClientStateEntryFuncs are the handlers for different states in a storage client
var ClientStateEntryFuncs = fsm.StateEntryFuncs{
storagemarket.StorageDealEnsureClientFunds: EnsureClientFunds,
storagemarket.StorageDealClientFunding: WaitForFunding,
storagemarket.StorageDealFundsEnsured: ProposeDeal,
storagemarket.StorageDealValidating: VerifyDealResponse,
storagemarket.StorageDealProposalAccepted: ValidateDealPublished,
storagemarket.StorageDealSealing: VerifyDealActivated,
storagemarket.StorageDealFailing: FailDeal,
storagemarket.StorageDealEnsureClientFunds: EnsureClientFunds,
storagemarket.StorageDealClientFunding: WaitForFunding,
storagemarket.StorageDealFundsEnsured: ProposeDeal,
storagemarket.StorageDealWaitingForDataRequest: WaitingForDataRequest,
storagemarket.StorageDealValidating: VerifyDealResponse,
storagemarket.StorageDealProposalAccepted: ValidateDealPublished,
storagemarket.StorageDealSealing: VerifyDealActivated,
storagemarket.StorageDealFailing: FailDeal,
}
53 changes: 52 additions & 1 deletion storagemarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package clientstates

import (
"context"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
)

Expand All @@ -23,6 +29,7 @@ type ClientDealEnvironment interface {
TagConnection(proposalCid cid.Cid) error
ReadDealResponse(proposalCid cid.Cid) (network.SignedResponse, error)
CloseStream(proposalCid cid.Cid) error
StartDataTransfer(ctx context.Context, to peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) error
}

// ClientStateEntryFunc is the type for all state entry functions on a storage client
Expand Down Expand Up @@ -83,6 +90,50 @@ func ProposeDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storag
return ctx.Trigger(storagemarket.ClientEventDealProposed)
}

func WaitingForDataRequest(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {
resp, err := environment.ReadDealResponse(deal.ProposalCid)
if err != nil {
return ctx.Trigger(storagemarket.ClientEventReadResponseFailed, err)
}

tok, _, err := environment.Node().GetChainHead(ctx.Context())
if err != nil {
return ctx.Trigger(storagemarket.ClientEventResponseVerificationFailed)
}

if err := clientutils.VerifyResponse(ctx.Context(), resp, deal.MinerWorker, tok, environment.Node().VerifySignature); err != nil {
return ctx.Trigger(storagemarket.ClientEventResponseVerificationFailed)
}

if resp.Response.State != storagemarket.StorageDealWaitingForData {
return ctx.Trigger(storagemarket.ClientEventUnexpectedDealState, resp.Response.State)
}

if deal.DataRef.TransferType == storagemarket.TTManual {
log.Infof("manual data transfer for deal %s", deal.ProposalCid)

// Temporary, we will move to a query/response protocol to check on deal status
return ctx.Trigger(storagemarket.ClientEventDataTransferComplete)
}

log.Infof("sending data for a deal %s", deal.ProposalCid)

// initiate a push data transfer. This will complete asynchronously and the
// completion of the data transfer will trigger a change in deal state
err = environment.StartDataTransfer(ctx.Context(),
deal.Miner,
&requestvalidation.StorageDataTransferVoucher{Proposal: deal.ProposalCid},
deal.DataRef.Root,
shared.AllSelector(),
)

if err != nil {
return ctx.Trigger(storagemarket.ClientEventDataTransferFailed, xerrors.Errorf("failed to open push data channel: %w", err))
}

return ctx.Trigger(storagemarket.ClientEventDataTransferInitiated)
}

// VerifyDealResponse reads and verifies the response from the provider to the proposed deal
func VerifyDealResponse(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {

Expand Down
Loading

0 comments on commit 383c016

Please sign in to comment.