diff --git a/shared_testutil/testutil.go b/shared_testutil/testutil.go index 2ebc8e73d..4dd8cfa49 100644 --- a/shared_testutil/testutil.go +++ b/shared_testutil/testutil.go @@ -11,6 +11,7 @@ import ( blocksutil "github.com/ipfs/go-ipfs-blocksutil" "github.com/jbenet/go-random" "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-fil-markets/storagemarket" @@ -99,7 +100,7 @@ func TestVoucherEquality(t *testing.T, a, b *paych.SignedVoucher) { // AssertDealState asserts equality of StorageDealStatus but with better error messaging func AssertDealState(t *testing.T, expected storagemarket.StorageDealStatus, actual storagemarket.StorageDealStatus) { - require.Equal(t, expected, actual, + assert.Equal(t, expected, actual, "Unexpected deal status\nexpected: %s (%d)\nactual : %s (%d)", storagemarket.DealStates[expected], expected, storagemarket.DealStates[actual], actual, diff --git a/storagemarket/impl/client.go b/storagemarket/impl/client.go index b20a8a361..da0d3b809 100644 --- a/storagemarket/impl/client.go +++ b/storagemarket/impl/client.go @@ -30,6 +30,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientstates" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/clientutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/connmanager" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -86,6 +87,10 @@ func NewClient( return nil, err } c.statemachines = statemachines + + // register a data transfer event handler -- this will send events to the state machines based on DT events + dataTransfer.SubscribeToEvents(dtutils.ClientDataTransferSubscriber(statemachines)) + return c, nil } diff --git a/storagemarket/impl/clientstates/client_fsm.go b/storagemarket/impl/clientstates/client_fsm.go index 58e42e10b..7ad4e1675 100644 --- a/storagemarket/impl/clientstates/client_fsm.go +++ b/storagemarket/impl/clientstates/client_fsm.go @@ -123,7 +123,6 @@ var ClientStateEntryFuncs = fsm.StateEntryFuncs{ storagemarket.StorageDealClientFunding: WaitForFunding, storagemarket.StorageDealFundsEnsured: ProposeDeal, storagemarket.StorageDealWaitingForDataRequest: WaitingForDataRequest, - storagemarket.StorageDealTransferring: TransferringData, storagemarket.StorageDealValidating: VerifyDealResponse, storagemarket.StorageDealProposalAccepted: ValidateDealPublished, storagemarket.StorageDealSealing: VerifyDealActivated, diff --git a/storagemarket/impl/clientstates/client_states.go b/storagemarket/impl/clientstates/client_states.go index b78a9ffdf..e39587446 100644 --- a/storagemarket/impl/clientstates/client_states.go +++ b/storagemarket/impl/clientstates/client_states.go @@ -129,13 +129,6 @@ func WaitingForDataRequest(ctx fsm.Context, environment ClientDealEnvironment, d return ctx.Trigger(storagemarket.ClientEventDataTransferInitiated) } -func TransferringData(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { - environment.OnDataTransferEvent(ctx.Context(), datatransfer.Complete, func(event datatransfer.Event) error { - return ctx.Trigger(storagemarket.ClientEventDataTransferComplete) - }) - return nil -} - // VerifyDealResponse reads and verifies the response from the provider to the proposed deal func VerifyDealResponse(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error { diff --git a/storagemarket/impl/clientstates/client_states_test.go b/storagemarket/impl/clientstates/client_states_test.go index c3ca6137e..bec35ed15 100644 --- a/storagemarket/impl/clientstates/client_states_test.go +++ b/storagemarket/impl/clientstates/client_states_test.go @@ -165,21 +165,6 @@ func TestWaitingForDataRequest(t *testing.T) { }) } -func TestTransferringData(t *testing.T) { - ctx := context.Background() - eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) - require.NoError(t, err) - clientDealProposal := tut.MakeTestClientDealProposal() - runTransferringData := makeExecutor(ctx, eventProcessor, clientstates.TransferringData, storagemarket.StorageDealTransferring, clientDealProposal) - - t.Run("adds an event listener", func(t *testing.T) { - runTransferringData(t, makeNode(nodeParams{}), nil, nil, func(deal storagemarket.ClientDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealTransferring, deal.State) - require.NotNil(t, env.dtCallbacks[datatransfer.Complete]) - }) - }) -} - func TestVerifyResponse(t *testing.T) { ctx := context.Background() eventProcessor, err := fsm.NewEventProcessor(storagemarket.ClientDeal{}, "State", clientstates.ClientEvents) diff --git a/storagemarket/impl/dtutils/dtutils.go b/storagemarket/impl/dtutils/dtutils.go new file mode 100644 index 000000000..bec77185b --- /dev/null +++ b/storagemarket/impl/dtutils/dtutils.go @@ -0,0 +1,90 @@ +// Package dtutils provides go-data-transfer related types and functionality for +// client and provider FSMs +package dtutils + +import ( + "errors" + + datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-statemachine/fsm" + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" +) + +var log = logging.Logger("storagemarket_impl") + +var ( + // ErrDataTransferFailed means a data transfer for a deal failed + ErrDataTransferFailed = errors.New("deal data transfer failed") +) + +// EventReceiver is any thing that can receive FSM events +type EventReceiver interface { + Send(id interface{}, name fsm.EventName, args ...interface{}) (err error) +} + +// DataTransferSubscriber is the function called when an event occurs in a data +// transfer -- it reads the voucher to verify this even occurred in a storage +// market deal, then, based on the data transfer event that occurred, it generates +// and update message for the deal -- either moving to staged for a completion +// event or moving to error if a data transfer error occurs +func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber { + return func(event datatransfer.Event, channelState datatransfer.ChannelState) { + voucher, ok := channelState.Voucher().(*requestvalidation.StorageDataTransferVoucher) + // if this event is for a transfer not related to storage, ignore + if !ok { + return + } + + // data transfer events for progress do not affect deal state + switch event.Code { + case datatransfer.Open: + err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferInitiated) + if err != nil { + log.Errorf("processing dt event: %w", err) + } + case datatransfer.Complete: + err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferCompleted) + if err != nil { + log.Errorf("processing dt event: %w", err) + } + case datatransfer.Error: + err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferFailed, ErrDataTransferFailed) + if err != nil { + log.Errorf("processing dt event: %w", err) + } + default: + } + } +} + +// DataTransferSubscriber is the function called when an event occurs in a data +// transfer -- it reads the voucher to verify this even occurred in a storage +// market deal, then, based on the data transfer event that occurred, it dispatches +// an event to the appropriate state machine +func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber { + return func(event datatransfer.Event, channelState datatransfer.ChannelState) { + voucher, ok := channelState.Voucher().(*requestvalidation.StorageDataTransferVoucher) + // if this event is for a transfer not related to storage, ignore + if !ok { + return + } + + // data transfer events for progress do not affect deal state + switch event.Code { + case datatransfer.Complete: + err := deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferComplete) + if err != nil { + log.Errorf("processing dt event: %w", err) + } + case datatransfer.Error: + err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferFailed, ErrDataTransferFailed) + if err != nil { + log.Errorf("processing dt event: %w", err) + } + default: + } + } +} diff --git a/storagemarket/impl/dtutils/dtutils_test.go b/storagemarket/impl/dtutils/dtutils_test.go new file mode 100644 index 000000000..aea0331fe --- /dev/null +++ b/storagemarket/impl/dtutils/dtutils_test.go @@ -0,0 +1,103 @@ +package dtutils_test + +import ( + "testing" + + datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-statemachine/fsm" + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-fil-markets/shared_testutil" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" +) + +func TestDataTransferSubscriber(t *testing.T) { + expectedProposalCID := shared_testutil.GenerateCids(1)[0] + tests := map[string]struct { + code datatransfer.EventCode + called bool + voucher datatransfer.Voucher + expectedID interface{} + expectedEvent fsm.EventName + expectedArgs []interface{} + }{ + "not a storage voucher": { + called: false, + voucher: nil, + }, + "open event": { + code: datatransfer.Open, + called: true, + voucher: &requestvalidation.StorageDataTransferVoucher{ + Proposal: expectedProposalCID, + }, + expectedID: expectedProposalCID, + expectedEvent: storagemarket.ProviderEventDataTransferInitiated, + }, + "completion event": { + code: datatransfer.Complete, + called: true, + voucher: &requestvalidation.StorageDataTransferVoucher{ + Proposal: expectedProposalCID, + }, + expectedID: expectedProposalCID, + expectedEvent: storagemarket.ProviderEventDataTransferCompleted, + }, + "error event": { + code: datatransfer.Error, + called: true, + voucher: &requestvalidation.StorageDataTransferVoucher{ + Proposal: expectedProposalCID, + }, + expectedID: expectedProposalCID, + expectedEvent: storagemarket.ProviderEventDataTransferFailed, + expectedArgs: []interface{}{dtutils.ErrDataTransferFailed}, + }, + "other event": { + code: datatransfer.Progress, + called: false, + voucher: &requestvalidation.StorageDataTransferVoucher{ + Proposal: expectedProposalCID, + }, + }, + } + for test, data := range tests { + t.Run(test, func(t *testing.T) { + fdg := &fakeDealGroup{} + subscriber := dtutils.ProviderDataTransferSubscriber(fdg) + subscriber(datatransfer.Event{Code: data.code}, datatransfer.ChannelState{ + Channel: datatransfer.NewChannel(datatransfer.TransferID(0), cid.Undef, nil, data.voucher, peer.ID(""), peer.ID(""), 0), + }) + if data.called { + require.True(t, fdg.called) + require.Equal(t, fdg.lastID, data.expectedID) + require.Equal(t, fdg.lastEvent, data.expectedEvent) + require.Equal(t, fdg.lastArgs, data.expectedArgs) + } else { + require.False(t, fdg.called) + } + }) + } +} + +// TODO: Test Client DT event subscriber + +type fakeDealGroup struct { + returnedErr error + called bool + lastID interface{} + lastEvent fsm.EventName + lastArgs []interface{} +} + +func (fdg *fakeDealGroup) Send(id interface{}, name fsm.EventName, args ...interface{}) (err error) { + fdg.lastID = id + fdg.lastEvent = name + fdg.lastArgs = args + fdg.called = true + return fdg.returnedErr +} diff --git a/storagemarket/impl/provider.go b/storagemarket/impl/provider.go index b47104c52..76a9dbee9 100644 --- a/storagemarket/impl/provider.go +++ b/storagemarket/impl/provider.go @@ -26,6 +26,7 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/connmanager" + "github.com/filecoin-project/go-fil-markets/storagemarket/impl/dtutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" @@ -117,6 +118,9 @@ func NewProvider(net network.StorageMarketNetwork, ds datastore.Batching, bs blo h.Configure(options...) + // register a data transfer event handler -- this will send events to the state machines based on DT events + dataTransfer.SubscribeToEvents(dtutils.ProviderDataTransferSubscriber(deals)) + return h, nil } diff --git a/storagemarket/impl/providerstates/provider_fsm.go b/storagemarket/impl/providerstates/provider_fsm.go index 3ca694261..7558a85c4 100644 --- a/storagemarket/impl/providerstates/provider_fsm.go +++ b/storagemarket/impl/providerstates/provider_fsm.go @@ -125,8 +125,6 @@ var ProviderEvents = fsm.Events{ // ProviderStateEntryFuncs are the handlers for different states in a storage client var ProviderStateEntryFuncs = fsm.StateEntryFuncs{ storagemarket.StorageDealValidating: ValidateDealProposal, - storagemarket.StorageDealWaitingForData: WaitingForData, - storagemarket.StorageDealTransferring: TransferringData, storagemarket.StorageDealVerifyData: VerifyData, storagemarket.StorageDealEnsureProviderFunds: EnsureProviderFunds, storagemarket.StorageDealProviderFunding: WaitForFunding, diff --git a/storagemarket/impl/providerstates/provider_states.go b/storagemarket/impl/providerstates/provider_states.go index 4c74abdfb..13a196e05 100644 --- a/storagemarket/impl/providerstates/provider_states.go +++ b/storagemarket/impl/providerstates/provider_states.go @@ -127,26 +127,6 @@ func ValidateDealProposal(ctx fsm.Context, environment ProviderDealEnvironment, return ctx.Trigger(storagemarket.ProviderEventDataRequested) } -func WaitingForData(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { - if deal.Ref.TransferType == storagemarket.TTManual { - return nil - } - - environment.OnDataTransferEvent(ctx.Context(), datatransfer.Open, func(event datatransfer.Event) error { - return ctx.Trigger(storagemarket.ProviderEventDataTransferInitiated) - }) - - return nil -} - -func TransferringData(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { - environment.OnDataTransferEvent(ctx.Context(), datatransfer.Complete, func(event datatransfer.Event) error { - return ctx.Trigger(storagemarket.ProviderEventDataTransferCompleted) - }) - - return nil -} - // VerifyData verifies that data received for a deal matches the pieceCID // in the proposal func VerifyData(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error { diff --git a/storagemarket/impl/providerstates/provider_states_test.go b/storagemarket/impl/providerstates/provider_states_test.go index 870ea57b6..f6a208b46 100644 --- a/storagemarket/impl/providerstates/provider_states_test.go +++ b/storagemarket/impl/providerstates/provider_states_test.go @@ -142,71 +142,6 @@ func TestValidateDealProposal(t *testing.T) { } } -func TestWaitingForData(t *testing.T) { - ctx := context.Background() - eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) - require.NoError(t, err) - runWaitingForData := makeExecutor(ctx, eventProcessor, providerstates.WaitingForData, storagemarket.StorageDealWaitingForData) - tests := map[string]struct { - nodeParams nodeParams - dealParams dealParams - environmentParams environmentParams - fileStoreParams tut.TestFileStoreParams - pieceStoreParams tut.TestPieceStoreParams - dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) - }{ - "succeeds": { - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { - require.NotNil(t, env.dtCallbacks[datatransfer.Open]) - tut.AssertDealState(t, storagemarket.StorageDealWaitingForData, deal.State) - }, - }, - "does nothing for a manual transfer deal": { - dealParams: dealParams{ - DataRef: &storagemarket.DataRef{ - Root: tut.GenerateCids(1)[0], - TransferType: storagemarket.TTManual, - }, - }, - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { - tut.AssertDealState(t, storagemarket.StorageDealWaitingForData, deal.State) - }, - }, - } - for test, data := range tests { - t.Run(test, func(t *testing.T) { - runWaitingForData(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) - }) - } -} - -func TestTransferringData(t *testing.T) { - ctx := context.Background() - eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) - require.NoError(t, err) - runWaitingForData := makeExecutor(ctx, eventProcessor, providerstates.TransferringData, storagemarket.StorageDealTransferring) - tests := map[string]struct { - nodeParams nodeParams - dealParams dealParams - environmentParams environmentParams - fileStoreParams tut.TestFileStoreParams - pieceStoreParams tut.TestPieceStoreParams - dealInspector func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) - }{ - "sets a listener for the datatransfer.Complete event": { - dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) { - require.NotNil(t, env.dtCallbacks[datatransfer.Complete]) - tut.AssertDealState(t, storagemarket.StorageDealTransferring, deal.State) - }, - }, - } - for test, data := range tests { - t.Run(test, func(t *testing.T) { - runWaitingForData(t, data.nodeParams, data.environmentParams, data.dealParams, data.fileStoreParams, data.pieceStoreParams, data.dealInspector) - }) - } -} - func TestVerifyData(t *testing.T) { ctx := context.Background() eventProcessor, err := fsm.NewEventProcessor(storagemarket.MinerDeal{}, "State", providerstates.ProviderEvents) diff --git a/storagemarket/impl/providerutils/providerutils.go b/storagemarket/impl/providerutils/providerutils.go index 56df3ab54..501cb466e 100644 --- a/storagemarket/impl/providerutils/providerutils.go +++ b/storagemarket/impl/providerutils/providerutils.go @@ -2,12 +2,9 @@ package providerutils import ( "context" - "errors" "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" - datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/crypto" @@ -20,16 +17,10 @@ import ( "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/shared" - "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" ) var log = logging.Logger("storagemarket_impl") -var ( - // ErrDataTransferFailed means a data transfer for a deal failed - ErrDataTransferFailed = errors.New("deal data transfer failed") -) // VerifyFunc is a function that can validate a signature for a given address and bytes type VerifyFunc func(context.Context, crypto.Signature, address.Address, []byte, shared.TipSetToken) (bool, error) @@ -79,41 +70,6 @@ func SignMinerData(ctx context.Context, data interface{}, address address.Addres return sig, nil } -// EventReceiver is any thing that can receive FSM events -type EventReceiver interface { - Send(id interface{}, name fsm.EventName, args ...interface{}) (err error) -} - -// DataTransferSubscriber is the function called when an event occurs in a data -// transfer -- it reads the voucher to verify this even occurred in a storage -// market deal, then, based on the data transfer event that occurred, it generates -// and update message for the deal -- either moving to staged for a completion -// event or moving to error if a data transfer error occurs -func DataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber { - return func(event datatransfer.Event, channelState datatransfer.ChannelState) { - voucher, ok := channelState.Voucher().(*requestvalidation.StorageDataTransferVoucher) - // if this event is for a transfer not related to storage, ignore - if !ok { - return - } - - // data transfer events for opening and progress do not affect deal state - switch event.Code { - case datatransfer.Complete: - err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferCompleted) - if err != nil { - log.Errorf("processing dt event: %w", err) - } - case datatransfer.Error: - err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferFailed, ErrDataTransferFailed) - if err != nil { - log.Errorf("processing dt event: %w", err) - } - default: - } - } -} - // CommPGenerator is a commP generating function that writes to a file type CommPGenerator func(abi.RegisteredProof, cid.Cid, ipld.Node, ...car.OnNewCarBlockFunc) (cid.Cid, filestore.Path, abi.UnpaddedPieceSize, error) diff --git a/storagemarket/impl/providerutils/providerutils_test.go b/storagemarket/impl/providerutils/providerutils_test.go index 44ac3d349..ded46f86f 100644 --- a/storagemarket/impl/providerutils/providerutils_test.go +++ b/storagemarket/impl/providerutils/providerutils_test.go @@ -8,8 +8,6 @@ import ( "testing" "github.com/filecoin-project/go-address" - datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-statemachine/fsm" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/crypto" @@ -18,16 +16,13 @@ import ( "github.com/ipld/go-ipld-prime" basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal/selector/builder" - "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared_testutil" - "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/blockrecorder" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/providerutils" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/network" ) @@ -121,82 +116,6 @@ func TestSignMinerData(t *testing.T) { } } -func TestDataTransferSubscriber(t *testing.T) { - expectedProposalCID := shared_testutil.GenerateCids(1)[0] - tests := map[string]struct { - code datatransfer.EventCode - called bool - voucher datatransfer.Voucher - expectedID interface{} - expectedEvent fsm.EventName - expectedArgs []interface{} - }{ - "not a storage voucher": { - called: false, - voucher: nil, - }, - "completion event": { - code: datatransfer.Complete, - called: true, - voucher: &requestvalidation.StorageDataTransferVoucher{ - Proposal: expectedProposalCID, - }, - expectedID: expectedProposalCID, - expectedEvent: storagemarket.ProviderEventDataTransferCompleted, - }, - "error event": { - code: datatransfer.Error, - called: true, - voucher: &requestvalidation.StorageDataTransferVoucher{ - Proposal: expectedProposalCID, - }, - expectedID: expectedProposalCID, - expectedEvent: storagemarket.ProviderEventDataTransferFailed, - expectedArgs: []interface{}{providerutils.ErrDataTransferFailed}, - }, - "other event": { - code: datatransfer.Progress, - called: false, - voucher: &requestvalidation.StorageDataTransferVoucher{ - Proposal: expectedProposalCID, - }, - }, - } - for test, data := range tests { - t.Run(test, func(t *testing.T) { - fdg := &fakeDealGroup{} - subscriber := providerutils.DataTransferSubscriber(fdg) - subscriber(datatransfer.Event{Code: data.code}, datatransfer.ChannelState{ - Channel: datatransfer.NewChannel(datatransfer.TransferID(0), cid.Undef, nil, data.voucher, peer.ID(""), peer.ID(""), 0), - }) - if data.called { - require.True(t, fdg.called) - require.Equal(t, fdg.lastID, data.expectedID) - require.Equal(t, fdg.lastEvent, data.expectedEvent) - require.Equal(t, fdg.lastArgs, data.expectedArgs) - } else { - require.False(t, fdg.called) - } - }) - } -} - -type fakeDealGroup struct { - returnedErr error - called bool - lastID interface{} - lastEvent fsm.EventName - lastArgs []interface{} -} - -func (fdg *fakeDealGroup) Send(id interface{}, name fsm.EventName, args ...interface{}) (err error) { - fdg.lastID = id - fdg.lastEvent = name - fdg.lastArgs = args - fdg.called = true - return fdg.returnedErr -} - func TestCommPGenerationWithMetadata(t *testing.T) { tempFilePath := filestore.Path("applesauce.jpg") tempFile := shared_testutil.NewTestFile(shared_testutil.TestFileParams{Path: tempFilePath}) diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index a71570166..76ba80b30 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -136,14 +136,14 @@ func TestMakeDeal(t *testing.T) { cd, err := h.Client.GetLocalDeal(ctx, proposalCid) assert.NoError(t, err) - assert.Equal(t, storagemarket.StorageDealActive, cd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealActive, cd.State) providerDeals, err := h.Provider.ListLocalDeals() assert.NoError(t, err) pd := providerDeals[0] assert.Equal(t, pd.ProposalCid, proposalCid) - assert.Equal(t, storagemarket.StorageDealCompleted, pd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealCompleted, pd.State) } func TestMakeDealOffline(t *testing.T) { @@ -173,14 +173,14 @@ func TestMakeDealOffline(t *testing.T) { cd, err := h.Client.GetLocalDeal(ctx, proposalCid) assert.NoError(t, err) - assert.Equal(t, storagemarket.StorageDealValidating, cd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealValidating, cd.State) providerDeals, err := h.Provider.ListLocalDeals() assert.NoError(t, err) pd := providerDeals[0] assert.True(t, pd.ProposalCid.Equals(proposalCid)) - assert.Equal(t, storagemarket.StorageDealWaitingForData, pd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealWaitingForData, pd.State) err = cario.NewCarIO().WriteCar(ctx, h.TestData.Bs1, h.PayloadCid, shared.AllSelector(), carBuf) require.NoError(t, err) @@ -191,14 +191,14 @@ func TestMakeDealOffline(t *testing.T) { cd, err = h.Client.GetLocalDeal(ctx, proposalCid) assert.NoError(t, err) - assert.Equal(t, storagemarket.StorageDealActive, cd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealActive, cd.State) providerDeals, err = h.Provider.ListLocalDeals() assert.NoError(t, err) pd = providerDeals[0] assert.True(t, pd.ProposalCid.Equals(proposalCid)) - assert.Equal(t, storagemarket.StorageDealCompleted, pd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealCompleted, pd.State) } func TestMakeDealNonBlocking(t *testing.T) { @@ -220,7 +220,7 @@ func TestMakeDealNonBlocking(t *testing.T) { cd, err := h.Client.GetLocalDeal(ctx, result.ProposalCid) assert.NoError(t, err) - assert.Equal(t, storagemarket.StorageDealValidating, cd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealValidating, cd.State) providerDeals, err := h.Provider.ListLocalDeals() assert.NoError(t, err) @@ -228,7 +228,7 @@ func TestMakeDealNonBlocking(t *testing.T) { // Provider should be blocking on waiting for funds to appear on chain pd := providerDeals[0] assert.Equal(t, result.ProposalCid, pd.ProposalCid) - assert.Equal(t, storagemarket.StorageDealProviderFunding, pd.State) + shared_testutil.AssertDealState(t, storagemarket.StorageDealProviderFunding, pd.State) } type harness struct {