Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ppl can sub to storage client evts #217

Merged
merged 2 commits into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion shared_testutil/test_ipld_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (tt TestIPLDTree) DumpToCar(out io.Writer, userOnNewCarBlocks ...car.OnNewC
node := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
ctx := context.Background()
sc := car.NewSelectiveCar(ctx, tt, []car.Dag{
car.Dag{
{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the linter keeps changing this. I'm just gonna leave it.

Root: tt.RootNodeLnk.(cidlink.Link).Cid,
Selector: node,
},
Expand Down
54 changes: 50 additions & 4 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils"
Expand All @@ -46,8 +48,8 @@ type Client struct {
pio pieceio.PieceIO
discovery *discovery.Local

node storagemarket.StorageClientNode

node storagemarket.StorageClientNode
pubSub *pubsub.PubSub
statemachines fsm.Group
conns *connmanager.ConnManager
}
Expand All @@ -70,8 +72,8 @@ func NewClient(
pio: pio,
discovery: discovery,
node: scn,

conns: connmanager.NewConnManager(),
pubSub: pubsub.New(clientDispatcher),
conns: connmanager.NewConnManager(),
}

statemachines, err := fsm.New(ds, fsm.Parameters{
Expand All @@ -80,6 +82,7 @@ func NewClient(
StateKeyField: "State",
Events: clientstates.ClientEvents,
StateEntryFuncs: clientstates.ClientStateEntryFuncs,
Notifier: c.dispatch,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -299,6 +302,49 @@ func (c *Client) AddPaymentEscrow(ctx context.Context, addr address.Address, amo
return <-done
}

func (c *Client) SubscribeToEvents(subscriber storagemarket.ClientSubscriber) shared.Unsubscribe {
return shared.Unsubscribe(c.pubSub.Subscribe(subscriber))
}

func (c *Client) dispatch(eventName fsm.EventName, deal fsm.StateType) {
evt, ok := eventName.(storagemarket.ClientEvent)
if !ok {
log.Errorf("dropped bad event %s", eventName)
}
realDeal, ok := deal.(storagemarket.ClientDeal)
if !ok {
log.Errorf("not a ClientDeal %v", deal)
}
pubSubEvt := internalClientEvent{evt, realDeal}

if err := c.pubSub.Publish(pubSubEvt); err != nil {
log.Errorf("failed to publish event %d", evt)
}
}

type internalClientEvent struct {
evt storagemarket.ClientEvent
deal storagemarket.ClientDeal
}

func clientDispatcher(evt pubsub.Event, fn pubsub.SubscriberFn) error {
ie, ok := evt.(internalClientEvent)
if !ok {
return xerrors.New("wrong type of event")
}
cb, ok := fn.(storagemarket.ClientSubscriber)
if !ok {
return xerrors.New("wrong type of event")
}
log.Infof("clientDispatcher called with valid evt %d", ie.evt)
cb(ie.evt, ie.deal)
return nil
}

// -------
// clientDealEnvironment
// -------

type clientDealEnvironment struct {
c *Client
}
Expand Down
18 changes: 9 additions & 9 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blo
actor: minerAddress,
dataTransfer: dataTransfer,
dealAcceptanceBuffer: DefaultDealAcceptanceBuffer,
pubSub: pubsub.New(dispatcher),
pubSub: pubsub.New(providerDispatcher),
}

deals, err := fsm.New(namespace.Wrap(ds, datastore.NewKey(ProviderDsPrefix)), fsm.Parameters{
Expand Down Expand Up @@ -338,30 +338,30 @@ func (p *Provider) dispatch(eventName fsm.EventName, deal fsm.StateType) {
}
realDeal, ok := deal.(storagemarket.MinerDeal)
if !ok {
log.Errorf("not a deal %v", deal)
log.Errorf("not a MinerDeal %v", deal)
}
pubSubEvt := internalEvent{evt, realDeal}
pubSubEvt := internalProviderEvent{evt, realDeal}

if err := p.pubSub.Publish(pubSubEvt); err != nil {
log.Errorf("failed to publish event %d", evt)
}
}

type internalEvent struct {
type internalProviderEvent struct {
evt storagemarket.ProviderEvent
deal storagemarket.MinerDeal
}

func dispatcher(evt pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie, ok := evt.(internalEvent)
func providerDispatcher(evt pubsub.Event, fn pubsub.SubscriberFn) error {
ie, ok := evt.(internalProviderEvent)
if !ok {
return xerrors.New("wrong type of event")
}
cb, ok := subscriberFn.(storagemarket.ProviderSubscriber)
cb, ok := fn.(storagemarket.ProviderSubscriber)
if !ok {
return xerrors.New("wrong type of event")
return xerrors.New("wrong type of callback")
}
log.Infof("dispatcher called with valid evt %d", ie.evt)
log.Infof("providerDispatcher called with valid evt %d", ie.evt)
cb(ie.evt, ie.deal)
return nil
}
Expand Down
60 changes: 43 additions & 17 deletions storagemarket/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,42 +44,56 @@ func TestMakeDeal(t *testing.T) {
assert.NoError(t, err)

// set up a subscriber
dealChan := make(chan storagemarket.MinerDeal)
providerDealChan := make(chan storagemarket.MinerDeal)
var checkedUnmarshalling bool
subscriber := func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
if !checkedUnmarshalling {
// test that deal created can marshall and unmarshalled
jsonBytes, err := json.Marshal(deal)
require.NoError(t, err)
var unmarhalledDeal storagemarket.MinerDeal
err = json.Unmarshal(jsonBytes, &unmarhalledDeal)
var unmDeal storagemarket.MinerDeal
err = json.Unmarshal(jsonBytes, &unmDeal)
require.NoError(t, err)
require.Equal(t, deal, unmarhalledDeal)
require.Equal(t, deal, unmDeal)
checkedUnmarshalling = true
}
dealChan <- deal
providerDealChan <- deal
}
_ = h.Provider.SubscribeToEvents(subscriber)

clientDealChan := make(chan storagemarket.ClientDeal)
clientSubscriber := func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
clientDealChan <- deal
}
_ = h.Client.SubscribeToEvents(clientSubscriber)

// set ask price where we'll accept any price
err = h.Provider.AddAsk(big.NewInt(0), 50_000)
assert.NoError(t, err)

result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid})
proposalCid := result.ProposalCid

time.Sleep(time.Millisecond * 200)

ctx, canc := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer canc()
var seenDeal storagemarket.MinerDeal
var actualStates []storagemarket.StorageDealStatus
for seenDeal.State != storagemarket.StorageDealCompleted {
var providerSeenDeal storagemarket.MinerDeal
var clientSeenDeal storagemarket.ClientDeal
var providerstates, clientstates []storagemarket.StorageDealStatus
for providerSeenDeal.State != storagemarket.StorageDealCompleted ||
clientSeenDeal.State != storagemarket.StorageDealActive {
select {
case seenDeal = <-dealChan:
actualStates = append(actualStates, seenDeal.State)
case clientSeenDeal = <-clientDealChan:
clientstates = append(clientstates, clientSeenDeal.State)
case providerSeenDeal = <-providerDealChan:
providerstates = append(providerstates, providerSeenDeal.State)
case <-ctx.Done():
t.Fatalf("never saw event")
t.Fatalf("never saw all events: %d, %d", clientSeenDeal.State, providerSeenDeal.State)
}
}

expectedStates := []storagemarket.StorageDealStatus{
expProviderStates := []storagemarket.StorageDealStatus{
storagemarket.StorageDealValidating,
storagemarket.StorageDealProposalAccepted,
storagemarket.StorageDealTransferring,
Expand All @@ -92,13 +106,25 @@ func TestMakeDeal(t *testing.T) {
storagemarket.StorageDealActive,
storagemarket.StorageDealCompleted,
}
assert.Equal(t, expectedStates, actualStates)

expClientStates := []storagemarket.StorageDealStatus{
storagemarket.StorageDealEnsureClientFunds,
//storagemarket.StorageDealClientFunding, // skipped because funds available
storagemarket.StorageDealFundsEnsured,
storagemarket.StorageDealValidating,
storagemarket.StorageDealProposalAccepted,
storagemarket.StorageDealSealing,
storagemarket.StorageDealActive,
}

assert.Equal(t, expProviderStates, providerstates)
assert.Equal(t, expClientStates, clientstates)

// check a couple of things to make sure we're getting the whole deal
assert.Equal(t, h.TestData.Host1.ID(), seenDeal.Client)
assert.Empty(t, seenDeal.Message)
assert.Equal(t, proposalCid, seenDeal.ProposalCid)
assert.Equal(t, h.ProviderAddr, seenDeal.ClientDealProposal.Proposal.Provider)
assert.Equal(t, h.TestData.Host1.ID(), providerSeenDeal.Client)
assert.Empty(t, providerSeenDeal.Message)
assert.Equal(t, proposalCid, providerSeenDeal.ProposalCid)
assert.Equal(t, h.ProviderAddr, providerSeenDeal.ClientDealProposal.Proposal.Provider)

cd, err := h.Client.GetLocalDeal(ctx, proposalCid)
assert.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions storagemarket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ type MessagePublishedCallback func(mcid cid.Cid, err error)

// Subscriber is a callback that is called when events are emitted
type ProviderSubscriber func(event ProviderEvent, deal MinerDeal)
type ClientSubscriber func(event ClientEvent, deal ClientDeal)

// StorageProvider is the interface provided for storage providers
type StorageProvider interface {
Expand Down Expand Up @@ -485,4 +486,6 @@ type StorageClient interface {

// AddStorageCollateral adds storage collateral
AddPaymentEscrow(ctx context.Context, addr address.Address, amount abi.TokenAmount) error

SubscribeToEvents(subscriber ClientSubscriber) shared.Unsubscribe
}