Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storagemarket/provider allows subscription to events #202

Merged
merged 3 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/filecoin-project/sector-storage v0.0.0-20200411000242-61616264b16d
github.com/filecoin-project/specs-actors v1.0.0
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ github.com/hannahhoward/cbor-gen-for v0.0.0-20191216214420-3e450425c40c h1:+MSf4
github.com/hannahhoward/cbor-gen-for v0.0.0-20191216214420-3e450425c40c/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k=
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099 h1:vQqOW42RRM5LoM/1K5dK940VipLqpH8lEVGrMz+mNjU=
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k=
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e h1:3YKHER4nmd7b5qy5t0GWDTwSn4OyRgfAXSmo6VnryBY=
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e/go.mod h1:I8h3MITA53gN9OnWGCgaMa0JWVRdXthWw4M3CPM54OY=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
Expand Down
3 changes: 3 additions & 0 deletions shared/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ package shared

// TipSetToken is the implementation-nonspecific identity for a tipset.
type TipSetToken []byte

// Unsubscribe is a function that gets called to unsubscribe from (storage|retrieval)market events
type Unsubscribe func()
2 changes: 0 additions & 2 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
Expand All @@ -43,7 +42,6 @@ type Client struct {
// implementation, there's no validation or events on the client side
dataTransfer datatransfer.Manager
bs blockstore.Blockstore
fs filestore.FileStore
pio pieceio.PieceIO
discovery *discovery.Local

Expand Down
50 changes: 50 additions & 0 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/connmanager"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates"
Expand Down Expand Up @@ -49,6 +51,7 @@ type Provider struct {
dataTransfer datatransfer.Manager
universalRetrievalEnabled bool
dealAcceptanceBuffer abi.ChainEpoch
pubSub *pubsub.PubSub

deals fsm.Group
}
Expand Down Expand Up @@ -94,6 +97,7 @@ func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blo
actor: minerAddress,
dataTransfer: dataTransfer,
dealAcceptanceBuffer: DefaultDealAcceptanceBuffer,
pubSub: pubsub.New(dispatcher),
}

deals, err := fsm.New(namespace.Wrap(ds, datastore.NewKey(ProviderDsPrefix)), fsm.Parameters{
Expand All @@ -102,6 +106,7 @@ func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blo
StateKeyField: "State",
Events: providerstates.ProviderEvents,
StateEntryFuncs: providerstates.ProviderStateEntryFuncs,
Notifier: h.dispatch,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -298,6 +303,51 @@ func (p *Provider) UniversalRetrievalEnabled() bool {
return p.universalRetrievalEnabled
}

func (p *Provider) SubscribeToEvents(subscriber storagemarket.ProviderSubscriber) shared.Unsubscribe {
return shared.Unsubscribe(p.pubSub.Subscribe(subscriber))
}

// dispatch puts the fsm event into a form that pubSub can consume,
// then publishes the event
func (p *Provider) dispatch(eventName fsm.EventName, deal fsm.StateType) {
evt, ok := eventName.(storagemarket.ProviderEvent)
if !ok {
log.Errorf("dropped bad event %s", eventName)
}
realDeal, ok := deal.(storagemarket.MinerDeal)
if !ok {
log.Errorf("not a deal %v", deal)
}
pubSubEvt := internalEvent{evt, realDeal}

if err := p.pubSub.Publish(pubSubEvt); err != nil {
log.Errorf("failed to publish event %d", evt)
}
}

type internalEvent struct {
evt storagemarket.ProviderEvent
deal storagemarket.MinerDeal
}

func dispatcher(evt pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie, ok := evt.(internalEvent)
if !ok {
return xerrors.New("wrong type of event")
}
cb, ok := subscriberFn.(storagemarket.ProviderSubscriber)
if !ok {
return xerrors.New("wrong type of event")
}
log.Infof("dispatcher called with valid evt %d", ie.evt)
cb(ie.evt, ie.deal)
return nil
}

// -------
// providerDealEnvironment
// -------
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept paging too far down looking at stuff, so I figured a comment might help


type providerDealEnvironment struct {
p *Provider
}
Expand Down
45 changes: 42 additions & 3 deletions storagemarket/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package storagemarket_test
import (
"bytes"
"context"
"github.com/filecoin-project/go-fil-markets/pieceio"
"io/ioutil"
"reflect"
"testing"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
Expand Down Expand Up @@ -84,6 +84,13 @@ func TestMakeDeal(t *testing.T) {
)
assert.NoError(t, err)

// set up a subscriber
dealChan := make(chan storagemarket.MinerDeal)
subscriber := func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
dealChan <- deal
}
_ = provider.SubscribeToEvents(subscriber)

// set ask price where we'll accept any price
err = provider.AddAsk(big.NewInt(0), 50_000)
assert.NoError(t, err)
Expand Down Expand Up @@ -115,16 +122,48 @@ func TestMakeDeal(t *testing.T) {

time.Sleep(time.Millisecond * 100)

ctx, canc := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer canc()
var seenDeal storagemarket.MinerDeal
var actualStates []storagemarket.StorageDealStatus
for seenDeal.State != storagemarket.StorageDealCompleted {
select {
case seenDeal = <-dealChan:
actualStates = append(actualStates, seenDeal.State)
case <-ctx.Done():
t.Fatalf("never saw event")
}
}

expectedStates := []storagemarket.StorageDealStatus{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this check!

storagemarket.StorageDealValidating,
storagemarket.StorageDealProposalAccepted,
storagemarket.StorageDealTransferring,
storagemarket.StorageDealVerifyData,
storagemarket.StorageDealPublishing,
storagemarket.StorageDealStaged,
storagemarket.StorageDealSealing,
storagemarket.StorageDealActive,
storagemarket.StorageDealCompleted,
}
assert.Equal(t, expectedStates, actualStates)

// check a couple of things to make sure we're getting the whole deal
assert.Equal(t, td.Host1.ID(), seenDeal.Client)
assert.Empty(t, seenDeal.Message)
assert.Equal(t, proposalCid, seenDeal.ProposalCid)
assert.Equal(t, providerAddr, seenDeal.ClientDealProposal.Proposal.Provider)

cd, err := client.GetLocalDeal(ctx, proposalCid)
assert.NoError(t, err)
assert.Equal(t, cd.State, storagemarket.StorageDealActive)
assert.Equal(t, int(storagemarket.StorageDealActive), int(cd.State))

providerDeals, err := provider.ListLocalDeals()
assert.NoError(t, err)

pd := providerDeals[0]
assert.True(t, pd.ProposalCid.Equals(proposalCid))
assert.Equal(t, pd.State, storagemarket.StorageDealCompleted)
assert.Equal(t, int(storagemarket.StorageDealCompleted), int(pd.State))
}

func TestMakeDealOffline(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions storagemarket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ type StorageDeal struct {
market.DealState
}

// Subscriber is a callback that is called when events are emitted
type ProviderSubscriber func(event ProviderEvent, deal MinerDeal)

// StorageProvider is the interface provided for storage providers
type StorageProvider interface {
Start(ctx context.Context) error
Expand All @@ -301,6 +304,8 @@ type StorageProvider interface {
GetStorageCollateral(ctx context.Context) (Balance, error)

ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader) error

SubscribeToEvents(subscriber ProviderSubscriber) shared.Unsubscribe
}

// Node dependencies for a StorageProvider
Expand Down