Skip to content

Commit

Permalink
pass miner deal value, no goroutine, consume subscriber channel
Browse files Browse the repository at this point in the history
  • Loading branch information
shannonwells committed Apr 24, 2020
1 parent 2060caa commit fe99358
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 17 deletions.
1 change: 0 additions & 1 deletion storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type Client struct {
// implementation, there's no validation or events on the client side
dataTransfer datatransfer.Manager
bs blockstore.Blockstore
//fs filestore.FileStore
pio pieceio.PieceIO
discovery *discovery.Local

Expand Down
12 changes: 5 additions & 7 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,18 +318,16 @@ func (p *Provider) dispatch(eventName fsm.EventName, deal fsm.StateType) {
if !ok {
log.Errorf("not a deal %v", deal)
}
pubSubEvt := internalEvent{evt, &realDeal}
pubSubEvt := internalEvent{evt, realDeal}

go func() {
if err := p.pubSub.Publish(pubSubEvt); err != nil {
log.Errorf("failed to publish event %d", evt)
}
}()
if err := p.pubSub.Publish(pubSubEvt); err != nil {
log.Errorf("failed to publish event %d", evt)
}
}

type internalEvent struct {
evt storagemarket.ProviderEvent
deal *storagemarket.MinerDeal
deal storagemarket.MinerDeal
}

func dispatcher(evt pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
Expand Down
32 changes: 24 additions & 8 deletions storagemarket/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func TestMakeDeal(t *testing.T) {
assert.NoError(t, err)

// set up a subscriber
dealChan := make(chan *storagemarket.MinerDeal)
subscriber := func(event storagemarket.ProviderEvent, deal *storagemarket.MinerDeal) {
dealChan := make(chan storagemarket.MinerDeal)
subscriber := func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
dealChan <- deal
}
_ = provider.SubscribeToEvents(subscriber)
Expand Down Expand Up @@ -124,15 +124,31 @@ func TestMakeDeal(t *testing.T) {

ctx, canc := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer canc()
var seenDeal *storagemarket.MinerDeal
select {
case seenDeal = <-dealChan:
case <-ctx.Done():
t.Fatalf("never saw event")
var seenDeal storagemarket.MinerDeal
var actualStates []storagemarket.StorageDealStatus
for seenDeal.State != storagemarket.StorageDealCompleted {
select {
case seenDeal = <-dealChan:
actualStates = append(actualStates, seenDeal.State)
case <-ctx.Done():
t.Fatalf("never saw event")
}
}

expectedStates := []storagemarket.StorageDealStatus{
storagemarket.StorageDealValidating,
storagemarket.StorageDealProposalAccepted,
storagemarket.StorageDealTransferring,
storagemarket.StorageDealVerifyData,
storagemarket.StorageDealPublishing,
storagemarket.StorageDealStaged,
storagemarket.StorageDealSealing,
storagemarket.StorageDealActive,
storagemarket.StorageDealCompleted,
}
assert.Equal(t, expectedStates, actualStates)

// check a couple of things to make sure we're getting the whole deal
assert.Equal(t, storagemarket.StorageDealValidating, seenDeal.State)
assert.Equal(t, td.Host1.ID(), seenDeal.Client)
assert.Empty(t, seenDeal.Message)
assert.Equal(t, proposalCid, seenDeal.ProposalCid)
Expand Down
2 changes: 1 addition & 1 deletion storagemarket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ type StorageDeal struct {
}

// Subscriber is a callback that is called when events are emitted
type ProviderSubscriber func(event ProviderEvent, deal *MinerDeal)
type ProviderSubscriber func(event ProviderEvent, deal MinerDeal)

// StorageProvider is the interface provided for storage providers
type StorageProvider interface {
Expand Down

0 comments on commit fe99358

Please sign in to comment.