Skip to content

Commit

Permalink
ppl can sub to storage client evts (#217)
Browse files Browse the repository at this point in the history
  • Loading branch information
shannonwells authored May 4, 2020
1 parent 5e9a9cb commit 9476959
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 31 deletions.
2 changes: 1 addition & 1 deletion shared_testutil/test_ipld_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (tt TestIPLDTree) DumpToCar(out io.Writer, userOnNewCarBlocks ...car.OnNewC
node := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
ctx := context.Background()
sc := car.NewSelectiveCar(ctx, tt, []car.Dag{
car.Dag{
{
Root: tt.RootNodeLnk.(cidlink.Link).Cid,
Selector: node,
},
Expand Down
54 changes: 50 additions & 4 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils"
Expand All @@ -46,8 +48,8 @@ type Client struct {
pio pieceio.PieceIO
discovery *discovery.Local

node storagemarket.StorageClientNode

node storagemarket.StorageClientNode
pubSub *pubsub.PubSub
statemachines fsm.Group
conns *connmanager.ConnManager
}
Expand All @@ -70,8 +72,8 @@ func NewClient(
pio: pio,
discovery: discovery,
node: scn,

conns: connmanager.NewConnManager(),
pubSub: pubsub.New(clientDispatcher),
conns: connmanager.NewConnManager(),
}

statemachines, err := fsm.New(ds, fsm.Parameters{
Expand All @@ -80,6 +82,7 @@ func NewClient(
StateKeyField: "State",
Events: clientstates.ClientEvents,
StateEntryFuncs: clientstates.ClientStateEntryFuncs,
Notifier: c.dispatch,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -299,6 +302,49 @@ func (c *Client) AddPaymentEscrow(ctx context.Context, addr address.Address, amo
return <-done
}

func (c *Client) SubscribeToEvents(subscriber storagemarket.ClientSubscriber) shared.Unsubscribe {
return shared.Unsubscribe(c.pubSub.Subscribe(subscriber))
}

func (c *Client) dispatch(eventName fsm.EventName, deal fsm.StateType) {
evt, ok := eventName.(storagemarket.ClientEvent)
if !ok {
log.Errorf("dropped bad event %s", eventName)
}
realDeal, ok := deal.(storagemarket.ClientDeal)
if !ok {
log.Errorf("not a ClientDeal %v", deal)
}
pubSubEvt := internalClientEvent{evt, realDeal}

if err := c.pubSub.Publish(pubSubEvt); err != nil {
log.Errorf("failed to publish event %d", evt)
}
}

type internalClientEvent struct {
evt storagemarket.ClientEvent
deal storagemarket.ClientDeal
}

func clientDispatcher(evt pubsub.Event, fn pubsub.SubscriberFn) error {
ie, ok := evt.(internalClientEvent)
if !ok {
return xerrors.New("wrong type of event")
}
cb, ok := fn.(storagemarket.ClientSubscriber)
if !ok {
return xerrors.New("wrong type of event")
}
log.Infof("clientDispatcher called with valid evt %d", ie.evt)
cb(ie.evt, ie.deal)
return nil
}

// -------
// clientDealEnvironment
// -------

type clientDealEnvironment struct {
c *Client
}
Expand Down
18 changes: 9 additions & 9 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blo
actor: minerAddress,
dataTransfer: dataTransfer,
dealAcceptanceBuffer: DefaultDealAcceptanceBuffer,
pubSub: pubsub.New(dispatcher),
pubSub: pubsub.New(providerDispatcher),
}

deals, err := fsm.New(namespace.Wrap(ds, datastore.NewKey(ProviderDsPrefix)), fsm.Parameters{
Expand Down Expand Up @@ -338,30 +338,30 @@ func (p *Provider) dispatch(eventName fsm.EventName, deal fsm.StateType) {
}
realDeal, ok := deal.(storagemarket.MinerDeal)
if !ok {
log.Errorf("not a deal %v", deal)
log.Errorf("not a MinerDeal %v", deal)
}
pubSubEvt := internalEvent{evt, realDeal}
pubSubEvt := internalProviderEvent{evt, realDeal}

if err := p.pubSub.Publish(pubSubEvt); err != nil {
log.Errorf("failed to publish event %d", evt)
}
}

type internalEvent struct {
type internalProviderEvent struct {
evt storagemarket.ProviderEvent
deal storagemarket.MinerDeal
}

func dispatcher(evt pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie, ok := evt.(internalEvent)
func providerDispatcher(evt pubsub.Event, fn pubsub.SubscriberFn) error {
ie, ok := evt.(internalProviderEvent)
if !ok {
return xerrors.New("wrong type of event")
}
cb, ok := subscriberFn.(storagemarket.ProviderSubscriber)
cb, ok := fn.(storagemarket.ProviderSubscriber)
if !ok {
return xerrors.New("wrong type of event")
return xerrors.New("wrong type of callback")
}
log.Infof("dispatcher called with valid evt %d", ie.evt)
log.Infof("providerDispatcher called with valid evt %d", ie.evt)
cb(ie.evt, ie.deal)
return nil
}
Expand Down
60 changes: 43 additions & 17 deletions storagemarket/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,42 +44,56 @@ func TestMakeDeal(t *testing.T) {
assert.NoError(t, err)

// set up a subscriber
dealChan := make(chan storagemarket.MinerDeal)
providerDealChan := make(chan storagemarket.MinerDeal)
var checkedUnmarshalling bool
subscriber := func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
if !checkedUnmarshalling {
// test that deal created can marshall and unmarshalled
jsonBytes, err := json.Marshal(deal)
require.NoError(t, err)
var unmarhalledDeal storagemarket.MinerDeal
err = json.Unmarshal(jsonBytes, &unmarhalledDeal)
var unmDeal storagemarket.MinerDeal
err = json.Unmarshal(jsonBytes, &unmDeal)
require.NoError(t, err)
require.Equal(t, deal, unmarhalledDeal)
require.Equal(t, deal, unmDeal)
checkedUnmarshalling = true
}
dealChan <- deal
providerDealChan <- deal
}
_ = h.Provider.SubscribeToEvents(subscriber)

clientDealChan := make(chan storagemarket.ClientDeal)
clientSubscriber := func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
clientDealChan <- deal
}
_ = h.Client.SubscribeToEvents(clientSubscriber)

// set ask price where we'll accept any price
err = h.Provider.AddAsk(big.NewInt(0), 50_000)
assert.NoError(t, err)

result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid})
proposalCid := result.ProposalCid

time.Sleep(time.Millisecond * 200)

ctx, canc := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer canc()
var seenDeal storagemarket.MinerDeal
var actualStates []storagemarket.StorageDealStatus
for seenDeal.State != storagemarket.StorageDealCompleted {
var providerSeenDeal storagemarket.MinerDeal
var clientSeenDeal storagemarket.ClientDeal
var providerstates, clientstates []storagemarket.StorageDealStatus
for providerSeenDeal.State != storagemarket.StorageDealCompleted ||
clientSeenDeal.State != storagemarket.StorageDealActive {
select {
case seenDeal = <-dealChan:
actualStates = append(actualStates, seenDeal.State)
case clientSeenDeal = <-clientDealChan:
clientstates = append(clientstates, clientSeenDeal.State)
case providerSeenDeal = <-providerDealChan:
providerstates = append(providerstates, providerSeenDeal.State)
case <-ctx.Done():
t.Fatalf("never saw event")
t.Fatalf("never saw all events: %d, %d", clientSeenDeal.State, providerSeenDeal.State)
}
}

expectedStates := []storagemarket.StorageDealStatus{
expProviderStates := []storagemarket.StorageDealStatus{
storagemarket.StorageDealValidating,
storagemarket.StorageDealProposalAccepted,
storagemarket.StorageDealTransferring,
Expand All @@ -92,13 +106,25 @@ func TestMakeDeal(t *testing.T) {
storagemarket.StorageDealActive,
storagemarket.StorageDealCompleted,
}
assert.Equal(t, expectedStates, actualStates)

expClientStates := []storagemarket.StorageDealStatus{
storagemarket.StorageDealEnsureClientFunds,
//storagemarket.StorageDealClientFunding, // skipped because funds available
storagemarket.StorageDealFundsEnsured,
storagemarket.StorageDealValidating,
storagemarket.StorageDealProposalAccepted,
storagemarket.StorageDealSealing,
storagemarket.StorageDealActive,
}

assert.Equal(t, expProviderStates, providerstates)
assert.Equal(t, expClientStates, clientstates)

// check a couple of things to make sure we're getting the whole deal
assert.Equal(t, h.TestData.Host1.ID(), seenDeal.Client)
assert.Empty(t, seenDeal.Message)
assert.Equal(t, proposalCid, seenDeal.ProposalCid)
assert.Equal(t, h.ProviderAddr, seenDeal.ClientDealProposal.Proposal.Provider)
assert.Equal(t, h.TestData.Host1.ID(), providerSeenDeal.Client)
assert.Empty(t, providerSeenDeal.Message)
assert.Equal(t, proposalCid, providerSeenDeal.ProposalCid)
assert.Equal(t, h.ProviderAddr, providerSeenDeal.ClientDealProposal.Proposal.Provider)

cd, err := h.Client.GetLocalDeal(ctx, proposalCid)
assert.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions storagemarket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ type MessagePublishedCallback func(mcid cid.Cid, err error)

// Subscriber is a callback that is called when events are emitted
type ProviderSubscriber func(event ProviderEvent, deal MinerDeal)
type ClientSubscriber func(event ClientEvent, deal ClientDeal)

// StorageProvider is the interface provided for storage providers
type StorageProvider interface {
Expand Down Expand Up @@ -485,4 +486,6 @@ type StorageClient interface {

// AddStorageCollateral adds storage collateral
AddPaymentEscrow(ctx context.Context, addr address.Address, amount abi.TokenAmount) error

SubscribeToEvents(subscriber ClientSubscriber) shared.Unsubscribe
}

0 comments on commit 9476959

Please sign in to comment.