From 947695985053738ed95c95eb680acd7a507f8d85 Mon Sep 17 00:00:00 2001 From: Shannon Wells Date: Mon, 4 May 2020 15:19:18 -0700 Subject: [PATCH] ppl can sub to storage client evts (#217) --- shared_testutil/test_ipld_tree.go | 2 +- storagemarket/impl/client.go | 54 +++++++++++++++++++++++++--- storagemarket/impl/provider.go | 18 +++++----- storagemarket/integration_test.go | 60 ++++++++++++++++++++++--------- storagemarket/types.go | 3 ++ 5 files changed, 106 insertions(+), 31 deletions(-) diff --git a/shared_testutil/test_ipld_tree.go b/shared_testutil/test_ipld_tree.go index 949947b4..3e568a15 100644 --- a/shared_testutil/test_ipld_tree.go +++ b/shared_testutil/test_ipld_tree.go @@ -139,7 +139,7 @@ func (tt TestIPLDTree) DumpToCar(out io.Writer, userOnNewCarBlocks ...car.OnNewC node := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() ctx := context.Background() sc := car.NewSelectiveCar(ctx, tt, []car.Dag{ - car.Dag{ + { Root: tt.RootNodeLnk.(cidlink.Link).Cid, Selector: node, }, diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index 71d9120b..841c2e77 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" + "github.com/hannahhoward/go-pubsub" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -22,6 +23,7 @@ import ( "github.com/filecoin-project/go-fil-markets/pieceio/cario" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery" + "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils" @@ -46,8 +48,8 @@ type Client struct { pio pieceio.PieceIO discovery *discovery.Local - node storagemarket.StorageClientNode - + node storagemarket.StorageClientNode + pubSub *pubsub.PubSub statemachines fsm.Group conns *connmanager.ConnManager } @@ -70,8 +72,8 @@ func NewClient( pio: pio, discovery: discovery, node: scn, - - conns: connmanager.NewConnManager(), + pubSub: pubsub.New(clientDispatcher), + conns: connmanager.NewConnManager(), } statemachines, err := fsm.New(ds, fsm.Parameters{ @@ -80,6 +82,7 @@ func NewClient( StateKeyField: "State", Events: clientstates.ClientEvents, StateEntryFuncs: clientstates.ClientStateEntryFuncs, + Notifier: c.dispatch, }) if err != nil { return nil, err @@ -299,6 +302,49 @@ func (c *Client) AddPaymentEscrow(ctx context.Context, addr address.Address, amo return <-done } +func (c *Client) SubscribeToEvents(subscriber storagemarket.ClientSubscriber) shared.Unsubscribe { + return shared.Unsubscribe(c.pubSub.Subscribe(subscriber)) +} + +func (c *Client) dispatch(eventName fsm.EventName, deal fsm.StateType) { + evt, ok := eventName.(storagemarket.ClientEvent) + if !ok { + log.Errorf("dropped bad event %s", eventName) + } + realDeal, ok := deal.(storagemarket.ClientDeal) + if !ok { + log.Errorf("not a ClientDeal %v", deal) + } + pubSubEvt := internalClientEvent{evt, realDeal} + + if err := c.pubSub.Publish(pubSubEvt); err != nil { + log.Errorf("failed to publish event %d", evt) + } +} + +type internalClientEvent struct { + evt storagemarket.ClientEvent + deal storagemarket.ClientDeal +} + +func clientDispatcher(evt pubsub.Event, fn pubsub.SubscriberFn) error { + ie, ok := evt.(internalClientEvent) + if !ok { + return xerrors.New("wrong type of event") + } + cb, ok := fn.(storagemarket.ClientSubscriber) + if !ok { + return xerrors.New("wrong type of event") + } + log.Infof("clientDispatcher called with valid evt %d", ie.evt) + cb(ie.evt, ie.deal) + return nil +} + +// ------- +// clientDealEnvironment +// ------- + type clientDealEnvironment struct { c *Client } diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index 10df72c1..61c3ddd8 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -98,7 +98,7 @@ func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blo actor: minerAddress, dataTransfer: dataTransfer, dealAcceptanceBuffer: DefaultDealAcceptanceBuffer, - pubSub: pubsub.New(dispatcher), + pubSub: pubsub.New(providerDispatcher), } deals, err := fsm.New(namespace.Wrap(ds, datastore.NewKey(ProviderDsPrefix)), fsm.Parameters{ @@ -338,30 +338,30 @@ func (p *Provider) dispatch(eventName fsm.EventName, deal fsm.StateType) { } realDeal, ok := deal.(storagemarket.MinerDeal) if !ok { - log.Errorf("not a deal %v", deal) + log.Errorf("not a MinerDeal %v", deal) } - pubSubEvt := internalEvent{evt, realDeal} + pubSubEvt := internalProviderEvent{evt, realDeal} if err := p.pubSub.Publish(pubSubEvt); err != nil { log.Errorf("failed to publish event %d", evt) } } -type internalEvent struct { +type internalProviderEvent struct { evt storagemarket.ProviderEvent deal storagemarket.MinerDeal } -func dispatcher(evt pubsub.Event, subscriberFn pubsub.SubscriberFn) error { - ie, ok := evt.(internalEvent) +func providerDispatcher(evt pubsub.Event, fn pubsub.SubscriberFn) error { + ie, ok := evt.(internalProviderEvent) if !ok { return xerrors.New("wrong type of event") } - cb, ok := subscriberFn.(storagemarket.ProviderSubscriber) + cb, ok := fn.(storagemarket.ProviderSubscriber) if !ok { - return xerrors.New("wrong type of event") + return xerrors.New("wrong type of callback") } - log.Infof("dispatcher called with valid evt %d", ie.evt) + log.Infof("providerDispatcher called with valid evt %d", ie.evt) cb(ie.evt, ie.deal) return nil } diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index f5c1dbb3..c16ebb40 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -44,23 +44,33 @@ func TestMakeDeal(t *testing.T) { assert.NoError(t, err) // set up a subscriber - dealChan := make(chan storagemarket.MinerDeal) + providerDealChan := make(chan storagemarket.MinerDeal) var checkedUnmarshalling bool subscriber := func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { if !checkedUnmarshalling { // test that deal created can marshall and unmarshalled jsonBytes, err := json.Marshal(deal) require.NoError(t, err) - var unmarhalledDeal storagemarket.MinerDeal - err = json.Unmarshal(jsonBytes, &unmarhalledDeal) + var unmDeal storagemarket.MinerDeal + err = json.Unmarshal(jsonBytes, &unmDeal) require.NoError(t, err) - require.Equal(t, deal, unmarhalledDeal) + require.Equal(t, deal, unmDeal) checkedUnmarshalling = true } - dealChan <- deal + providerDealChan <- deal } _ = h.Provider.SubscribeToEvents(subscriber) + clientDealChan := make(chan storagemarket.ClientDeal) + clientSubscriber := func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) { + clientDealChan <- deal + } + _ = h.Client.SubscribeToEvents(clientSubscriber) + + // set ask price where we'll accept any price + err = h.Provider.AddAsk(big.NewInt(0), 50_000) + assert.NoError(t, err) + result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}) proposalCid := result.ProposalCid @@ -68,18 +78,22 @@ func TestMakeDeal(t *testing.T) { ctx, canc := context.WithTimeout(context.Background(), 100*time.Millisecond) defer canc() - var seenDeal storagemarket.MinerDeal - var actualStates []storagemarket.StorageDealStatus - for seenDeal.State != storagemarket.StorageDealCompleted { + var providerSeenDeal storagemarket.MinerDeal + var clientSeenDeal storagemarket.ClientDeal + var providerstates, clientstates []storagemarket.StorageDealStatus + for providerSeenDeal.State != storagemarket.StorageDealCompleted || + clientSeenDeal.State != storagemarket.StorageDealActive { select { - case seenDeal = <-dealChan: - actualStates = append(actualStates, seenDeal.State) + case clientSeenDeal = <-clientDealChan: + clientstates = append(clientstates, clientSeenDeal.State) + case providerSeenDeal = <-providerDealChan: + providerstates = append(providerstates, providerSeenDeal.State) case <-ctx.Done(): - t.Fatalf("never saw event") + t.Fatalf("never saw all events: %d, %d", clientSeenDeal.State, providerSeenDeal.State) } } - expectedStates := []storagemarket.StorageDealStatus{ + expProviderStates := []storagemarket.StorageDealStatus{ storagemarket.StorageDealValidating, storagemarket.StorageDealProposalAccepted, storagemarket.StorageDealTransferring, @@ -92,13 +106,25 @@ func TestMakeDeal(t *testing.T) { storagemarket.StorageDealActive, storagemarket.StorageDealCompleted, } - assert.Equal(t, expectedStates, actualStates) + + expClientStates := []storagemarket.StorageDealStatus{ + storagemarket.StorageDealEnsureClientFunds, + //storagemarket.StorageDealClientFunding, // skipped because funds available + storagemarket.StorageDealFundsEnsured, + storagemarket.StorageDealValidating, + storagemarket.StorageDealProposalAccepted, + storagemarket.StorageDealSealing, + storagemarket.StorageDealActive, + } + + assert.Equal(t, expProviderStates, providerstates) + assert.Equal(t, expClientStates, clientstates) // check a couple of things to make sure we're getting the whole deal - assert.Equal(t, h.TestData.Host1.ID(), seenDeal.Client) - assert.Empty(t, seenDeal.Message) - assert.Equal(t, proposalCid, seenDeal.ProposalCid) - assert.Equal(t, h.ProviderAddr, seenDeal.ClientDealProposal.Proposal.Provider) + assert.Equal(t, h.TestData.Host1.ID(), providerSeenDeal.Client) + assert.Empty(t, providerSeenDeal.Message) + assert.Equal(t, proposalCid, providerSeenDeal.ProposalCid) + assert.Equal(t, h.ProviderAddr, providerSeenDeal.ClientDealProposal.Proposal.Provider) cd, err := h.Client.GetLocalDeal(ctx, proposalCid) assert.NoError(t, err) diff --git a/storagemarket/types.go b/storagemarket/types.go index 43297334..c8c64f84 100644 --- a/storagemarket/types.go +++ b/storagemarket/types.go @@ -316,6 +316,7 @@ type MessagePublishedCallback func(mcid cid.Cid, err error) // Subscriber is a callback that is called when events are emitted type ProviderSubscriber func(event ProviderEvent, deal MinerDeal) +type ClientSubscriber func(event ClientEvent, deal ClientDeal) // StorageProvider is the interface provided for storage providers type StorageProvider interface { @@ -485,4 +486,6 @@ type StorageClient interface { // AddStorageCollateral adds storage collateral AddPaymentEscrow(ctx context.Context, addr address.Address, amount abi.TokenAmount) error + + SubscribeToEvents(subscriber ClientSubscriber) shared.Unsubscribe }