Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disallow concurrent retrieval deals for same peer/cid #493

Merged
merged 1 commit into from
Feb 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/retrievalclient.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ stateDiagram-v2
note left of 24 : The following events only record in this state.<br><br>ClientEventLastPaymentRequested<br>ClientEventPaymentRequested<br>ClientEventAllBlocksReceived<br>ClientEventBlocksReceived


note left of 25 : The following events only record in this state.<br><br>ClientEventProviderCancelled
note left of 25 : The following events only record in this state.<br><br>ClientEventDealProposed<br>ClientEventProviderCancelled


note left of 28 : The following events only record in this state.<br><br>ClientEventLastPaymentRequested<br>ClientEventPaymentRequested<br>ClientEventAllBlocksReceived<br>ClientEventBlocksReceived
Expand Down
Binary file modified docs/retrievalclient.mmd.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 3 additions & 3 deletions docs/retrievalclient.mmd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
42 changes: 41 additions & 1 deletion retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package retrievalimpl
import (
"context"
"errors"
"fmt"
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -44,6 +46,9 @@ type Client struct {
resolver discovery.PeerResolver
stateMachines fsm.Group
migrateStateMachines func(context.Context) error

// Guards concurrent access to Retrieve method
retrieveLk sync.Mutex
}

type internalEvent struct {
Expand Down Expand Up @@ -231,7 +236,17 @@ From then on, the statemachine controls the deal flow in the client. Other compo
Documentation of the client state machine can be found at https://godoc.org/github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates
*/
func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, p retrievalmarket.RetrievalPeer, clientWallet address.Address, minerWallet address.Address, storeID *multistore.StoreID) (retrievalmarket.DealID, error) {
err := c.addMultiaddrs(ctx, p)
c.retrieveLk.Lock()
defer c.retrieveLk.Unlock()

// Check if there's already an active retrieval deal with the same peer
// for the same payload CID
err := c.checkForActiveDeal(payloadCID, p.ID)
if err != nil {
return 0, err
}

err = c.addMultiaddrs(ctx, p)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -281,6 +296,31 @@ func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie
return dealID, nil
}

// Check if there's already an active retrieval deal with the same peer
// for the same payload CID
func (c *Client) checkForActiveDeal(payloadCID cid.Cid, pid peer.ID) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It would be more idiomatic to return (dealID, bool) here, and create the error in the caller.

var deals []retrievalmarket.ClientDealState
err := c.stateMachines.List(&deals)
if err != nil {
return err
}

for _, deal := range deals {
match := deal.Sender == pid && deal.PayloadCID == payloadCID
active := !clientstates.IsFinalityState(deal.Status)
if match && active {
msg := fmt.Sprintf("there is an active retrieval deal with peer %s ", pid)
msg += fmt.Sprintf("for payload CID %s ", payloadCID)
msg += fmt.Sprintf("(retrieval deal ID %d, state %s) - ",
deal.ID, retrievalmarket.DealStatuses[deal.Status])
msg += "existing deal must be cancelled before starting a new retrieval deal"
err := xerrors.Errorf(msg)
return err
}
}
return nil
}

func (c *Client) notifySubscribers(eventName fsm.EventName, state fsm.StateType) {
evt := eventName.(retrievalmarket.ClientEvent)
ds := state.(retrievalmarket.ClientDealState)
Expand Down
139 changes: 139 additions & 0 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,145 @@ func TestClient_FindProviders(t *testing.T) {
})
}

// TestClient_DuplicateRetrieve verifies that it's not possible to make a
// retrieval deal for the same payload CID with the same peer as an existing
// active deal
func TestClient_DuplicateRetrieve(t *testing.T) {
bgCtx := context.Background()
ctx, cancel := context.WithCancel(bgCtx)
defer cancel()
payChAddr := address.TestAddress

payloadCIDs := tut.GenerateCids(2)
rpeer1 := retrievalmarket.RetrievalPeer{
Address: address.TestAddress2,
ID: peer.ID("p1"),
}
rpeer2 := retrievalmarket.RetrievalPeer{
Address: address.TestAddress2,
ID: peer.ID("p2"),
}

testCases := []struct {
name string
payloadCid1 cid.Cid
payloadCid2 cid.Cid
rpeer1 retrievalmarket.RetrievalPeer
rpeer2 retrievalmarket.RetrievalPeer
expectError bool
cancelled bool
}{{
name: "different payload CID",
payloadCid1: payloadCIDs[0],
payloadCid2: payloadCIDs[1],
rpeer1: rpeer1,
rpeer2: rpeer1,
}, {
name: "different peer",
payloadCid1: payloadCIDs[0],
payloadCid2: payloadCIDs[0],
rpeer1: rpeer1,
rpeer2: rpeer2,
}, {
name: "same peer and payload CID",
payloadCid1: payloadCIDs[0],
payloadCid2: payloadCIDs[0],
rpeer1: rpeer1,
rpeer2: rpeer1,
expectError: true,
}, {
name: "same peer and payload CID as cancelled deal",
payloadCid1: payloadCIDs[0],
payloadCid2: payloadCIDs[0],
rpeer1: rpeer1,
rpeer2: rpeer1,
cancelled: true,
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Set up a retrieval client node with mocks
ds := dss.MutexWrap(datastore.NewMapDatastore())
storedCounter := storedcounter.New(ds, datastore.NewKey("nextDealID"))
multiStore, err := multistore.NewMultiDstore(ds)
require.NoError(t, err)
dt := tut.NewTestDataTransfer()
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{})
node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{})
node.ExpectKnownAddresses(tc.rpeer1, nil)
node.ExpectKnownAddresses(tc.rpeer2, nil)

// Create the client
client, err := retrievalimpl.NewClient(
net,
multiStore,
dt,
node,
&tut.TestPeerResolver{},
ds,
storedCounter)
require.NoError(t, err)

// Start the client and wait till it's ready
err = client.Start(ctx)
require.NoError(t, err)

ready := make(chan struct{})
go func() {
client.OnReady(func(err error) {
close(ready)
})
}()
select {
case <-ready:
case <-time.After(100 * time.Millisecond):
}

// Retrieve first payload CID from first peer
params := retrievalmarket.Params{
Selector: nil,
PieceCID: &tut.GenerateCids(1)[0],
PricePerByte: abi.NewTokenAmount(1),
PaymentInterval: 1,
PaymentIntervalIncrease: 0,
UnsealPrice: abi.NewTokenAmount(0),
}

dealID1, err := client.Retrieve(ctx, tc.payloadCid1, params, abi.NewTokenAmount(10), tc.rpeer1, payChAddr, tc.rpeer1.Address, nil)
assert.NoError(t, err)

// If the deal should be cancelled
if tc.cancelled {
done := make(chan struct{})
go func() {
client.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) {
if state.Status == retrievalmarket.DealStatusCancelled {
close(done)
}
})
}()

// Cancel deal and wait for it to complete cancelling
err = client.CancelDeal(dealID1)
require.NoError(t, err)

select {
case <-done:
case <-time.After(100 * time.Millisecond):
}
}

// Retrieve second payload CID from second peer
_, err = client.Retrieve(ctx, tc.payloadCid2, params, abi.NewTokenAmount(10), tc.rpeer2, payChAddr, tc.rpeer2.Address, nil)
if tc.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

func TestMigrations(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
Expand Down
10 changes: 10 additions & 0 deletions retrievalmarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var ClientEvents = fsm.Events{
fsm.Event(rm.ClientEventDealProposed).
From(rm.DealStatusNew).To(rm.DealStatusWaitForAcceptance).
From(rm.DealStatusRetryLegacy).To(rm.DealStatusWaitForAcceptanceLegacy).
From(rm.DealStatusCancelling).ToJustRecord().
Action(func(deal *rm.ClientDealState, channelID datatransfer.ChannelID) error {
deal.ChannelID = &channelID
deal.Message = ""
Expand Down Expand Up @@ -331,6 +332,15 @@ var ClientFinalityStates = []fsm.StateKey{
rm.DealStatusDealNotFound,
}

func IsFinalityState(st fsm.StateKey) bool {
for _, state := range ClientFinalityStates {
if st == state {
return true
}
}
return false
}

// ClientStateEntryFuncs are the handlers for different states in a retrieval client
var ClientStateEntryFuncs = fsm.StateEntryFuncs{
rm.DealStatusNew: ProposeDeal,
Expand Down
10 changes: 8 additions & 2 deletions retrievalmarket/migrations/maptypes/maptypes_cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.