Skip to content

Commit

Permalink
fix: disallow concurrent retrieval deals for same peer/cid
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Feb 19, 2021
1 parent 1002d0c commit f9e8df5
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 5 deletions.
2 changes: 1 addition & 1 deletion docs/retrievalclient.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ stateDiagram-v2
note left of 24 : The following events only record in this state.<br><br>ClientEventLastPaymentRequested<br>ClientEventPaymentRequested<br>ClientEventAllBlocksReceived<br>ClientEventBlocksReceived


note left of 25 : The following events only record in this state.<br><br>ClientEventProviderCancelled
note left of 25 : The following events only record in this state.<br><br>ClientEventDealProposed<br>ClientEventProviderCancelled


note left of 28 : The following events only record in this state.<br><br>ClientEventLastPaymentRequested<br>ClientEventPaymentRequested<br>ClientEventAllBlocksReceived<br>ClientEventBlocksReceived
Expand Down
Binary file modified docs/retrievalclient.mmd.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions docs/retrievalclient.mmd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
42 changes: 41 additions & 1 deletion retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package retrievalimpl
import (
"context"
"errors"
"fmt"
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -45,6 +47,9 @@ type Client struct {
resolver discovery.PeerResolver
stateMachines fsm.Group
migrateStateMachines func(context.Context) error

// Guards concurrent access to Retrieve method
retrieveLk sync.Mutex
}

type internalEvent struct {
Expand Down Expand Up @@ -232,7 +237,17 @@ From then on, the statemachine controls the deal flow in the client. Other compo
Documentation of the client state machine can be found at https://godoc.org/github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates
*/
func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, p retrievalmarket.RetrievalPeer, clientWallet address.Address, minerWallet address.Address, storeID *multistore.StoreID) (retrievalmarket.DealID, error) {
err := c.addMultiaddrs(ctx, p)
c.retrieveLk.Lock()
defer c.retrieveLk.Unlock()

// Check if there's already an active retrieval deal with the same peer
// for the same payload CID
err := c.checkForActiveDeal(payloadCID, p.ID)
if err != nil {
return 0, err
}

err = c.addMultiaddrs(ctx, p)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -282,6 +297,31 @@ func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie
return dealID, nil
}

// Check if there's already an active retrieval deal with the same peer
// for the same payload CID
func (c *Client) checkForActiveDeal(payloadCID cid.Cid, pid peer.ID) error {
var deals []retrievalmarket.ClientDealState
err := c.stateMachines.List(&deals)
if err != nil {
return err
}

for _, deal := range deals {
match := deal.Sender == pid && deal.PayloadCID == payloadCID
active := !clientstates.IsFinalityState(deal.Status)
if match && active {
msg := fmt.Sprintf("there is an active retrieval deal with peer %s ", pid)
msg += fmt.Sprintf("for payload CID %s ", payloadCID)
msg += fmt.Sprintf("(retrieval deal ID %d, state %s) - ",
deal.ID, retrievalmarket.DealStatuses[deal.Status])
msg += "existing deal must be cancelled before starting a new retrieval deal"
err := xerrors.Errorf(msg)
return err
}
}
return nil
}

func (c *Client) notifySubscribers(eventName fsm.EventName, state fsm.StateType) {
evt := eventName.(retrievalmarket.ClientEvent)
ds := state.(retrievalmarket.ClientDealState)
Expand Down
139 changes: 139 additions & 0 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,145 @@ func TestClient_FindProviders(t *testing.T) {
})
}

// TestClient_DuplicateRetrieve verifies that it's not possible to make a
// retrieval deal for the same payload CID with the same peer as an existing
// active deal
func TestClient_DuplicateRetrieve(t *testing.T) {
bgCtx := context.Background()
ctx, cancel := context.WithCancel(bgCtx)
defer cancel()
payChAddr := address.TestAddress

payloadCIDs := tut.GenerateCids(2)
rpeer1 := retrievalmarket.RetrievalPeer{
Address: address.TestAddress2,
ID: peer.ID("p1"),
}
rpeer2 := retrievalmarket.RetrievalPeer{
Address: address.TestAddress2,
ID: peer.ID("p2"),
}

testCases := []struct {
name string
payloadCid1 cid.Cid
payloadCid2 cid.Cid
rpeer1 retrievalmarket.RetrievalPeer
rpeer2 retrievalmarket.RetrievalPeer
expectError bool
cancelled bool
}{{
name: "different payload CID",
payloadCid1: payloadCIDs[0],
payloadCid2: payloadCIDs[1],
rpeer1: rpeer1,
rpeer2: rpeer1,
}, {
name: "different peer",
payloadCid1: payloadCIDs[0],
payloadCid2: payloadCIDs[0],
rpeer1: rpeer1,
rpeer2: rpeer2,
}, {
name: "same peer and payload CID",
payloadCid1: payloadCIDs[0],
payloadCid2: payloadCIDs[0],
rpeer1: rpeer1,
rpeer2: rpeer1,
expectError: true,
}, {
name: "same peer and payload CID as cancelled deal",
payloadCid1: payloadCIDs[0],
payloadCid2: payloadCIDs[0],
rpeer1: rpeer1,
rpeer2: rpeer1,
cancelled: true,
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Set up a retrieval client node with mocks
ds := dss.MutexWrap(datastore.NewMapDatastore())
storedCounter := storedcounter.New(ds, datastore.NewKey("nextDealID"))
multiStore, err := multistore.NewMultiDstore(ds)
require.NoError(t, err)
dt := tut.NewTestDataTransfer()
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{})
node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{})
node.ExpectKnownAddresses(tc.rpeer1, nil)
node.ExpectKnownAddresses(tc.rpeer2, nil)

// Create the client
client, err := retrievalimpl.NewClient(
net,
multiStore,
dt,
node,
&tut.TestPeerResolver{},
ds,
storedCounter)
require.NoError(t, err)

// Start the client and wait till it's ready
err = client.Start(ctx)
require.NoError(t, err)

ready := make(chan struct{})
go func() {
client.OnReady(func(err error) {
close(ready)
})
}()
select {
case <-ready:
case <-time.After(100 * time.Millisecond):
}

// Retrieve first payload CID from first peer
params := retrievalmarket.Params{
Selector: nil,
PieceCID: &tut.GenerateCids(1)[0],
PricePerByte: abi.NewTokenAmount(1),
PaymentInterval: 1,
PaymentIntervalIncrease: 0,
UnsealPrice: abi.NewTokenAmount(0),
}

dealID1, err := client.Retrieve(ctx, tc.payloadCid1, params, abi.NewTokenAmount(10), tc.rpeer1, payChAddr, tc.rpeer1.Address, nil)
assert.NoError(t, err)

// If the deal should be cancelled
if tc.cancelled {
done := make(chan struct{})
go func() {
client.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) {
if state.Status == retrievalmarket.DealStatusCancelled {
close(done)
}
})
}()

// Cancel deal and wait for it to complete cancelling
err = client.CancelDeal(dealID1)
require.NoError(t, err)

select {
case <-done:
case <-time.After(100 * time.Millisecond):
}
}

// Retrieve second payload CID from second peer
_, err = client.Retrieve(ctx, tc.payloadCid2, params, abi.NewTokenAmount(10), tc.rpeer2, payChAddr, tc.rpeer2.Address, nil)
if tc.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

func TestMigrations(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
Expand Down
10 changes: 10 additions & 0 deletions retrievalmarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var ClientEvents = fsm.Events{
fsm.Event(rm.ClientEventDealProposed).
From(rm.DealStatusNew).To(rm.DealStatusWaitForAcceptance).
From(rm.DealStatusRetryLegacy).To(rm.DealStatusWaitForAcceptanceLegacy).
From(rm.DealStatusCancelling).ToJustRecord().
Action(func(deal *rm.ClientDealState, channelID datatransfer.ChannelID) error {
deal.ChannelID = channelID
deal.Message = ""
Expand Down Expand Up @@ -316,6 +317,15 @@ var ClientFinalityStates = []fsm.StateKey{
rm.DealStatusDealNotFound,
}

func IsFinalityState(st fsm.StateKey) bool {
for _, state := range ClientFinalityStates {
if st == state {
return true
}
}
return false
}

// ClientStateEntryFuncs are the handlers for different states in a retrieval client
var ClientStateEntryFuncs = fsm.StateEntryFuncs{
rm.DealStatusNew: ProposeDeal,
Expand Down

0 comments on commit f9e8df5

Please sign in to comment.