Skip to content

Commit

Permalink
feat: cancel transfer should cause deal to fail
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Nov 27, 2020
1 parent f672e07 commit 9523133
Show file tree
Hide file tree
Showing 13 changed files with 354 additions and 69 deletions.
3 changes: 3 additions & 0 deletions docs/storageclient.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ stateDiagram-v2
16 --> 17 : ClientEventDataTransferInitiated
16 --> 17 : ClientEventDataTransferRestarted
28 --> 17 : ClientEventDataTransferRestarted
16 --> 11 : ClientEventDataTransferCancelled
17 --> 11 : ClientEventDataTransferCancelled
28 --> 11 : ClientEventDataTransferCancelled
16 --> 13 : ClientEventDataTransferComplete
17 --> 13 : ClientEventDataTransferComplete
13 --> 13 : ClientEventWaitForDealState
Expand Down
3 changes: 3 additions & 0 deletions docs/storageprovider.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ stateDiagram-v2
27 --> 11 : ProviderEventDataTransferRestartFailed
18 --> 17 : ProviderEventDataTransferRestarted
27 --> 17 : ProviderEventDataTransferRestarted
17 --> 11 : ProviderEventDataTransferCancelled
18 --> 11 : ProviderEventDataTransferCancelled
27 --> 11 : ProviderEventDataTransferCancelled
17 --> 19 : ProviderEventDataTransferCompleted
19 --> 11 : ProviderEventDataVerificationFailed
18 --> 20 : ProviderEventVerifiedData
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/filecoin-project/go-address v0.0.3
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434
github.com/filecoin-project/go-data-transfer v1.0.1
github.com/filecoin-project/go-data-transfer v1.1.1-0.20201127133903-9944ad93a52a
github.com/filecoin-project/go-ds-versioning v0.1.0
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
Expand All @@ -22,7 +22,7 @@ require (
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-graphsync v0.4.3
github.com/ipfs/go-graphsync v0.5.0
github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand All @@ -37,9 +37,9 @@ require (
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/jpillora/backoff v1.0.0
github.com/libp2p/go-libp2p v0.10.0
github.com/libp2p/go-libp2p-core v0.6.0
github.com/multiformats/go-multiaddr v0.2.2
github.com/libp2p/go-libp2p v0.12.0
github.com/libp2p/go-libp2p-core v0.7.0
github.com/multiformats/go-multiaddr v0.3.1
github.com/multiformats/go-multibase v0.0.3
github.com/stretchr/testify v1.6.1
github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163
Expand Down
72 changes: 72 additions & 0 deletions go.sum

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA

gs1 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(testData.Host1), testData.Loader1, testData.Storer1)
dtTransport1 := dtgstransport.NewTransport(testData.Host1.ID(), gs1)
dt1, err := dtimpl.NewDataTransfer(testData.DTStore1, testData.DTNet1, dtTransport1, testData.DTStoredCounter1)
dt1, err := dtimpl.NewDataTransfer(testData.DTStore1, testData.DTTmpDir1, testData.DTNet1, dtTransport1, testData.DTStoredCounter1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
require.NoError(t, err)
Expand Down Expand Up @@ -143,7 +143,7 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA

gs2 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(testData.Host2), testData.Loader2, testData.Storer2)
dtTransport2 := dtgstransport.NewTransport(testData.Host2.ID(), gs2)
dt2, err := dtimpl.NewDataTransfer(testData.DTStore2, testData.DTNet2, dtTransport2, testData.DTStoredCounter2)
dt2, err := dtimpl.NewDataTransfer(testData.DTStore2, testData.DTTmpDir2, testData.DTNet2, dtTransport2, testData.DTStoredCounter2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)
require.NoError(t, err)
Expand Down Expand Up @@ -599,7 +599,7 @@ func setupClient(

gs1 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(testData.Host1), testData.Loader1, testData.Storer1)
dtTransport1 := dtgstransport.NewTransport(testData.Host1.ID(), gs1)
dt1, err := dtimpl.NewDataTransfer(testData.DTStore1, testData.DTNet1, dtTransport1, testData.DTStoredCounter1)
dt1, err := dtimpl.NewDataTransfer(testData.DTStore1, testData.DTTmpDir1, testData.DTNet1, dtTransport1, testData.DTStoredCounter1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
require.NoError(t, err)
Expand Down Expand Up @@ -636,7 +636,7 @@ func setupProvider(

gs2 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(testData.Host2), testData.Loader2, testData.Storer2)
dtTransport2 := dtgstransport.NewTransport(testData.Host2.ID(), gs2)
dt2, err := dtimpl.NewDataTransfer(testData.DTStore2, testData.DTNet2, dtTransport2, testData.DTStoredCounter2)
dt2, err := dtimpl.NewDataTransfer(testData.DTStore2, testData.DTTmpDir2, testData.DTNet2, dtTransport2, testData.DTStoredCounter2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)
require.NoError(t, err)
Expand Down
12 changes: 12 additions & 0 deletions shared_testutil/mocknet.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -54,6 +55,8 @@ type Libp2pTestData struct {
DTNet2 dtnet.DataTransferNetwork
DTStore1 datastore.Batching
DTStore2 datastore.Batching
DTTmpDir1 string
DTTmpDir2 string
Loader1 ipld.Loader
Loader2 ipld.Loader
Storer1 ipld.Storer
Expand Down Expand Up @@ -149,6 +152,15 @@ func NewLibp2pTestData(ctx context.Context, t *testing.T) *Libp2pTestData {
testData.DTStore1 = namespace.Wrap(testData.Ds1, datastore.NewKey("DataTransfer1"))
testData.DTStore2 = namespace.Wrap(testData.Ds1, datastore.NewKey("DataTransfer2"))

testData.DTTmpDir1, err = ioutil.TempDir("", "dt-tmp-1")
require.NoError(t, err)
testData.DTTmpDir2, err = ioutil.TempDir("", "dt-tmp-2")
require.NoError(t, err)
t.Cleanup(func() {
_ = os.RemoveAll(testData.DTTmpDir1)
_ = os.RemoveAll(testData.DTTmpDir2)
})

testData.MockNet = mn

return testData
Expand Down
8 changes: 8 additions & 0 deletions storagemarket/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ const (

// ClientEventDataTransferStalled happens when the clients data transfer experiences a disconnect
ClientEventDataTransferStalled

// ClientEventDataTransferCancelled happens when a data transfer is cancelled
ClientEventDataTransferCancelled
)

// ClientEvents maps client event codes to string names
Expand Down Expand Up @@ -137,6 +140,7 @@ var ClientEvents = map[ClientEvent]string{
ClientEventDataTransferRestarted: "ClientEventDataTransferRestarted",
ClientEventDataTransferRestartFailed: "ClientEventDataTransferRestartFailed",
ClientEventDataTransferStalled: "ClientEventDataTransferStalled",
ClientEventDataTransferCancelled: "ClientEventDataTransferCancelled",
}

// ProviderEvent is an event that happens in the provider's deal state machine
Expand Down Expand Up @@ -265,6 +269,9 @@ const (

// ProviderEventDataTransferStalled happens when the providers data transfer experiences a disconnect
ProviderEventDataTransferStalled

// ProviderEventDataTransferCancelled happens when a data transfer is cancelled
ProviderEventDataTransferCancelled
)

// ProviderEvents maps provider event codes to string names
Expand Down Expand Up @@ -308,4 +315,5 @@ var ProviderEvents = map[ProviderEvent]string{
ProviderEventDataTransferRestarted: "ProviderEventDataTransferRestarted",
ProviderEventDataTransferRestartFailed: "ProviderEventDataTransferRestartFailed",
ProviderEventDataTransferStalled: "ProviderEventDataTransferStalled",
ProviderEventDataTransferCancelled: "ProviderEventDataTransferCancelled",
}
15 changes: 14 additions & 1 deletion storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,21 @@ var ClientEvents = fsm.Events{
return nil
}),

fsm.Event(storagemarket.ClientEventDataTransferCancelled).
FromMany(
storagemarket.StorageDealStartDataTransfer,
storagemarket.StorageDealTransferring,
storagemarket.StorageDealClientTransferRestart,
).
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal) error {
deal.Message = "data transfer cancelled"
return nil
}),

fsm.Event(storagemarket.ClientEventDataTransferComplete).
FromMany(storagemarket.StorageDealTransferring, storagemarket.StorageDealStartDataTransfer).To(storagemarket.StorageDealCheckForAcceptance),
FromMany(storagemarket.StorageDealTransferring, storagemarket.StorageDealStartDataTransfer).
To(storagemarket.StorageDealCheckForAcceptance),
fsm.Event(storagemarket.ClientEventWaitForDealState).
From(storagemarket.StorageDealCheckForAcceptance).ToNoChange().
Action(func(deal *storagemarket.ClientDeal, pollError bool) error {
Expand Down
82 changes: 38 additions & 44 deletions storagemarket/impl/dtutils/dtutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,26 @@ func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber
}
}

// data transfer events for progress do not affect deal state
switch event.Code {
case datatransfer.Restart:
err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferRestarted, channelState.ChannelID())
if err != nil {
log.Errorf("processing dt event: %w", err)
}
case datatransfer.Disconnected:
err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferStalled)
if err != nil {
log.Errorf("processing dt event: %w", err)
}
case datatransfer.Open:
err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferInitiated, channelState.ChannelID())
if err != nil {
log.Errorf("processing dt event: %w", err)
}
case datatransfer.Error:
err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferFailed, fmt.Errorf("deal data transfer failed: %s", event.Message))
if err != nil {
log.Errorf("processing dt event: %w", err)
// Translate from data transfer events to provider FSM events
// Note: We ignore data transfer progress events (they do not affect deal state)
err := func() error {
switch event.Code {
case datatransfer.Cancel:
return deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferCancelled)
case datatransfer.Restart:
return deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferRestarted, channelState.ChannelID())
case datatransfer.Disconnected:
return deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferStalled)
case datatransfer.Open:
return deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferInitiated, channelState.ChannelID())
case datatransfer.Error:
return deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferFailed, fmt.Errorf("deal data transfer failed: %s", event.Message))
default:
return nil
}
default:
}()
if err != nil {
log.Errorf("processing dt event: %w", err)
}
}
}
Expand All @@ -90,29 +87,26 @@ func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
}
}

// data transfer events for progress do not affect deal state
switch event.Code {
case datatransfer.Restart:
err := deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferRestarted, channelState.ChannelID())
if err != nil {
log.Errorf("processing dt event: %w", err)
}
case datatransfer.Disconnected:
err := deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferStalled)
if err != nil {
log.Errorf("processing dt event: %w", err)
}
case datatransfer.Accept:
err := deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferInitiated, channelState.ChannelID())
if err != nil {
log.Errorf("processing dt event: %w", err)
}
case datatransfer.Error:
err := deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferFailed, fmt.Errorf("deal data transfer failed: %s", event.Message))
if err != nil {
log.Errorf("processing dt event: %w", err)
// Translate from data transfer events to client FSM events
// Note: We ignore data transfer progress events (they do not affect deal state)
err := func() error {
switch event.Code {
case datatransfer.Cancel:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferCancelled)
case datatransfer.Restart:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferRestarted, channelState.ChannelID())
case datatransfer.Disconnected:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferStalled)
case datatransfer.Accept:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferInitiated, channelState.ChannelID())
case datatransfer.Error:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferFailed, fmt.Errorf("deal data transfer failed: %s", event.Message))
default:
return nil
}
default:
}()
if err != nil {
log.Errorf("processing dt event: %w", err)
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions storagemarket/impl/providerstates/provider_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ var ProviderEvents = fsm.Events{
return nil
}),

fsm.Event(storagemarket.ProviderEventDataTransferCancelled).
FromMany(
storagemarket.StorageDealWaitingForData,
storagemarket.StorageDealTransferring,
storagemarket.StorageDealProviderTransferRestart,
).
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.MinerDeal) error {
deal.Message = "data transfer cancelled"
return nil
}),

fsm.Event(storagemarket.ProviderEventDataTransferCompleted).
From(storagemarket.StorageDealTransferring).To(storagemarket.StorageDealVerifyData),
fsm.Event(storagemarket.ProviderEventDataVerificationFailed).
Expand Down
Loading

0 comments on commit 9523133

Please sign in to comment.