Skip to content

Commit

Permalink
WIP PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ingar committed May 13, 2020
1 parent f716ea2 commit c243211
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 244 deletions.
3 changes: 2 additions & 1 deletion shared_testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
"github.com/jbenet/go-random"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/storagemarket"
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestVoucherEquality(t *testing.T, a, b *paych.SignedVoucher) {

// AssertDealState asserts equality of StorageDealStatus but with better error messaging
func AssertDealState(t *testing.T, expected storagemarket.StorageDealStatus, actual storagemarket.StorageDealStatus) {
require.Equal(t, expected, actual,
assert.Equal(t, expected, actual,
"Unexpected deal status\nexpected: %s (%d)\nactual : %s (%d)",
storagemarket.DealStates[expected], expected,
storagemarket.DealStates[actual], actual,
Expand Down
5 changes: 5 additions & 0 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/connmanager"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
)
Expand Down Expand Up @@ -86,6 +87,10 @@ func NewClient(
return nil, err
}
c.statemachines = statemachines

// register a data transfer event handler -- this will send events to the state machines based on DT events
dataTransfer.SubscribeToEvents(dtutils.ClientDataTransferSubscriber(statemachines))

return c, nil
}

Expand Down
1 change: 0 additions & 1 deletion storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ var ClientStateEntryFuncs = fsm.StateEntryFuncs{
storagemarket.StorageDealClientFunding: WaitForFunding,
storagemarket.StorageDealFundsEnsured: ProposeDeal,
storagemarket.StorageDealWaitingForDataRequest: WaitingForDataRequest,
storagemarket.StorageDealTransferring: TransferringData,
storagemarket.StorageDealValidating: VerifyDealResponse,
storagemarket.StorageDealProposalAccepted: ValidateDealPublished,
storagemarket.StorageDealSealing: VerifyDealActivated,
Expand Down
7 changes: 0 additions & 7 deletions storagemarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,6 @@ func WaitingForDataRequest(ctx fsm.Context, environment ClientDealEnvironment, d
return ctx.Trigger(storagemarket.ClientEventDataTransferInitiated)
}

func TransferringData(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {
environment.OnDataTransferEvent(ctx.Context(), datatransfer.Complete, func(event datatransfer.Event) error {
return ctx.Trigger(storagemarket.ClientEventDataTransferComplete)
})
return nil
}

// VerifyDealResponse reads and verifies the response from the provider to the proposed deal
func VerifyDealResponse(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {

Expand Down
15 changes: 0 additions & 15 deletions storagemarket/impl/clientstates/client_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,6 @@ func TestWaitingForDataRequest(t *testing.T) {
})
}

func TestTransferringData(t *testing.T) {
ctx := context.Background()
eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents)
require.NoError(t, err)
clientDealProposal := tut.MakeTestClientDealProposal()
runTransferringData := makeExecutor(ctx, eventProcessor, clientstates.TransferringData, storagemarket.StorageDealTransferring, clientDealProposal)

t.Run("adds an event listener", func(t *testing.T) {
runTransferringData(t, makeNode(nodeParams{}), nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealTransferring, deal.State)
require.NotNil(t, env.dtCallbacks[datatransfer.Complete])
})
})
}

func TestVerifyResponse(t *testing.T) {
ctx := context.Background()
eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents)
Expand Down
90 changes: 90 additions & 0 deletions storagemarket/impl/dtutils/dtutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Package dtutils provides go-data-transfer related types and functionality for
// client and provider FSMs
package dtutils

import (
"errors"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-statemachine/fsm"
logging "github.com/ipfs/go-log/v2"

"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
)

var log = logging.Logger("storagemarket_impl")

var (
// ErrDataTransferFailed means a data transfer for a deal failed
ErrDataTransferFailed = errors.New("deal data transfer failed")
)

// EventReceiver is any thing that can receive FSM events
type EventReceiver interface {
Send(id interface{}, name fsm.EventName, args ...interface{}) (err error)
}

// DataTransferSubscriber is the function called when an event occurs in a data
// transfer -- it reads the voucher to verify this even occurred in a storage
// market deal, then, based on the data transfer event that occurred, it generates
// and update message for the deal -- either moving to staged for a completion
// event or moving to error if a data transfer error occurs
func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
voucher, ok := channelState.Voucher().(*requestvalidation.StorageDataTransferVoucher)
// if this event is for a transfer not related to storage, ignore
if !ok {
return
}

// data transfer events for progress do not affect deal state
switch event.Code {
case datatransfer.Open:
err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferInitiated)
if err != nil {
log.Errorf("processing dt event: %w", err)
}
case datatransfer.Complete:
err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferCompleted)
if err != nil {
log.Errorf("processing dt event: %w", err)
}
case datatransfer.Error:
err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferFailed, ErrDataTransferFailed)
if err != nil {
log.Errorf("processing dt event: %w", err)
}
default:
}
}
}

// DataTransferSubscriber is the function called when an event occurs in a data
// transfer -- it reads the voucher to verify this even occurred in a storage
// market deal, then, based on the data transfer event that occurred, it dispatches
// an event to the appropriate state machine
func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
voucher, ok := channelState.Voucher().(*requestvalidation.StorageDataTransferVoucher)
// if this event is for a transfer not related to storage, ignore
if !ok {
return
}

// data transfer events for progress do not affect deal state
switch event.Code {
case datatransfer.Complete:
err := deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferComplete)
if err != nil {
log.Errorf("processing dt event: %w", err)
}
case datatransfer.Error:
err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferFailed, ErrDataTransferFailed)
if err != nil {
log.Errorf("processing dt event: %w", err)
}
default:
}
}
}
103 changes: 103 additions & 0 deletions storagemarket/impl/dtutils/dtutils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package dtutils_test

import (
"testing"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/shared_testutil"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
)

func TestDataTransferSubscriber(t *testing.T) {
expectedProposalCID := shared_testutil.GenerateCids(1)[0]
tests := map[string]struct {
code datatransfer.EventCode
called bool
voucher datatransfer.Voucher
expectedID interface{}
expectedEvent fsm.EventName
expectedArgs []interface{}
}{
"not a storage voucher": {
called: false,
voucher: nil,
},
"open event": {
code: datatransfer.Open,
called: true,
voucher: &requestvalidation.StorageDataTransferVoucher{
Proposal: expectedProposalCID,
},
expectedID: expectedProposalCID,
expectedEvent: storagemarket.ProviderEventDataTransferInitiated,
},
"completion event": {
code: datatransfer.Complete,
called: true,
voucher: &requestvalidation.StorageDataTransferVoucher{
Proposal: expectedProposalCID,
},
expectedID: expectedProposalCID,
expectedEvent: storagemarket.ProviderEventDataTransferCompleted,
},
"error event": {
code: datatransfer.Error,
called: true,
voucher: &requestvalidation.StorageDataTransferVoucher{
Proposal: expectedProposalCID,
},
expectedID: expectedProposalCID,
expectedEvent: storagemarket.ProviderEventDataTransferFailed,
expectedArgs: []interface{}{dtutils.ErrDataTransferFailed},
},
"other event": {
code: datatransfer.Progress,
called: false,
voucher: &requestvalidation.StorageDataTransferVoucher{
Proposal: expectedProposalCID,
},
},
}
for test, data := range tests {
t.Run(test, func(t *testing.T) {
fdg := &fakeDealGroup{}
subscriber := dtutils.ProviderDataTransferSubscriber(fdg)
subscriber(datatransfer.Event{Code: data.code}, datatransfer.ChannelState{
Channel: datatransfer.NewChannel(datatransfer.TransferID(0), cid.Undef, nil, data.voucher, peer.ID(""), peer.ID(""), 0),
})
if data.called {
require.True(t, fdg.called)
require.Equal(t, fdg.lastID, data.expectedID)
require.Equal(t, fdg.lastEvent, data.expectedEvent)
require.Equal(t, fdg.lastArgs, data.expectedArgs)
} else {
require.False(t, fdg.called)
}
})
}
}

// TODO: Test Client DT event subscriber

type fakeDealGroup struct {
returnedErr error
called bool
lastID interface{}
lastEvent fsm.EventName
lastArgs []interface{}
}

func (fdg *fakeDealGroup) Send(id interface{}, name fsm.EventName, args ...interface{}) (err error) {
fdg.lastID = id
fdg.lastEvent = name
fdg.lastArgs = args
fdg.called = true
return fdg.returnedErr
}
4 changes: 4 additions & 0 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/connmanager"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
Expand Down Expand Up @@ -117,6 +118,9 @@ func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blo

h.Configure(options...)

// register a data transfer event handler -- this will send events to the state machines based on DT events
dataTransfer.SubscribeToEvents(dtutils.ProviderDataTransferSubscriber(deals))

return h, nil
}

Expand Down
2 changes: 0 additions & 2 deletions storagemarket/impl/providerstates/provider_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ var ProviderEvents = fsm.Events{
// ProviderStateEntryFuncs are the handlers for different states in a storage client
var ProviderStateEntryFuncs = fsm.StateEntryFuncs{
storagemarket.StorageDealValidating: ValidateDealProposal,
storagemarket.StorageDealWaitingForData: WaitingForData,
storagemarket.StorageDealTransferring: TransferringData,
storagemarket.StorageDealVerifyData: VerifyData,
storagemarket.StorageDealEnsureProviderFunds: EnsureProviderFunds,
storagemarket.StorageDealProviderFunding: WaitForFunding,
Expand Down
20 changes: 0 additions & 20 deletions storagemarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,26 +127,6 @@ func ValidateDealProposal(ctx fsm.Context, environment ProviderDealEnvironment,
return ctx.Trigger(storagemarket.ProviderEventDataRequested)
}

func WaitingForData(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {
if deal.Ref.TransferType == storagemarket.TTManual {
return nil
}

environment.OnDataTransferEvent(ctx.Context(), datatransfer.Open, func(event datatransfer.Event) error {
return ctx.Trigger(storagemarket.ProviderEventDataTransferInitiated)
})

return nil
}

func TransferringData(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {
environment.OnDataTransferEvent(ctx.Context(), datatransfer.Complete, func(event datatransfer.Event) error {
return ctx.Trigger(storagemarket.ProviderEventDataTransferCompleted)
})

return nil
}

// VerifyData verifies that data received for a deal matches the pieceCID
// in the proposal
func VerifyData(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {
Expand Down
Loading

0 comments on commit c243211

Please sign in to comment.