Skip to content

Commit

Permalink
Add back connection tagging (#362)
Browse files Browse the repository at this point in the history
* Add peer tagging to Client flow

* Protect connections during data transfer (from the Provider's perspective)

* fix(imports): run imports check

Co-authored-by: Ingar Shu <[email protected]>
  • Loading branch information
hannahhoward and ingar committed Aug 12, 2020
1 parent fb07f57 commit a2d7bb5
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 32 deletions.
38 changes: 19 additions & 19 deletions shared_testutil/test_network_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ type TestStorageDealStream struct {
proposalWriter StorageDealProposalWriter
responseReader StorageDealResponseReader
responseWriter StorageDealResponseWriter
tags map[string]struct{}

CloseCount int
CloseError error
Expand All @@ -431,7 +430,6 @@ func NewTestStorageDealStream(params TestStorageDealStreamParams) *TestStorageDe
proposalWriter: TrivialStorageDealProposalWriter,
responseReader: TrivialStorageDealResponseReader,
responseWriter: TrivialStorageDealResponseWriter,
tags: make(map[string]struct{}),
}
if params.ProposalReader != nil {
stream.proposalReader = params.ProposalReader
Expand Down Expand Up @@ -477,23 +475,6 @@ func (tsds *TestStorageDealStream) Close() error {
return tsds.CloseError
}

// TagProtectedConnection preserves this connection as higher priority than others
func (tsds TestStorageDealStream) TagProtectedConnection(identifier string) {
tsds.tags[identifier] = struct{}{}
}

// UntagProtectedConnection removes the given tag on this connection, increasing
// the likelyhood it will be cleaned up
func (tsds TestStorageDealStream) UntagProtectedConnection(identifier string) {
delete(tsds.tags, identifier)
}

// AssertConnectionTagged verifies a connection was tagged with the given identifier
func (tsds TestStorageDealStream) AssertConnectionTagged(t *testing.T, identifier string) {
_, ok := tsds.tags[identifier]
require.True(t, ok)
}

// TrivialStorageDealProposalReader succeeds trivially, returning an empty proposal.
func TrivialStorageDealProposalReader() (smnet.Proposal, error) {
return smnet.Proposal{}, nil
Expand Down Expand Up @@ -559,3 +540,22 @@ func (tpr TestPeerResolver) GetPeers(cid.Cid) ([]rm.RetrievalPeer, error) {
}

var _ rm.PeerResolver = &TestPeerResolver{}

type TestPeerTagger struct {
TagCalls []peer.ID
UntagCalls []peer.ID
}

func NewTestPeerTagger() *TestPeerTagger {
return &TestPeerTagger{}
}

func (pt *TestPeerTagger) TagPeer(id peer.ID, _ string) {
pt.TagCalls = append(pt.TagCalls, id)
}

func (pt *TestPeerTagger) UntagPeer(id peer.ID, _ string) {
pt.UntagCalls = append(pt.UntagCalls, id)
}

var _ smnet.PeerTagger = &TestPeerTagger{}
8 changes: 8 additions & 0 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,14 @@ func (csg *clientStoreGetter) Get(proposalCid cid.Cid) (*multistore.Store, error
return csg.c.multiStore.Get(*deal.StoreID)
}

func (c *clientDealEnvironment) TagPeer(peer peer.ID, tag string) {
c.c.net.TagPeer(peer, tag)
}

func (c *clientDealEnvironment) UntagPeer(peer peer.ID, tag string) {
c.c.net.UntagPeer(peer, tag)
}

// ClientFSMParameterSpec is a valid set of parameters for a client deal FSM - used in doc generation
var ClientFSMParameterSpec = fsm.Parameters{
Environment: &clientDealEnvironment{},
Expand Down
9 changes: 9 additions & 0 deletions storagemarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ClientDealEnvironment interface {
GetProviderDealState(ctx context.Context, proposalCid cid.Cid) (*storagemarket.ProviderDealState, error)
PollingInterval() time.Duration
DealFunds() funds.DealFunds
network.PeerTagger
}

// ClientStateEntryFunc is the type for all state entry functions on a storage client
Expand Down Expand Up @@ -103,6 +104,8 @@ func ProposeDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storag
return ctx.Trigger(storagemarket.ClientEventWriteProposalFailed, err)
}

environment.TagPeer(deal.Miner, deal.ProposalCid.String())

if err := s.WriteDealProposal(proposal); err != nil {
return ctx.Trigger(storagemarket.ClientEventWriteProposalFailed, err)
}
Expand Down Expand Up @@ -160,6 +163,7 @@ func InitiateDataTransfer(ctx fsm.Context, environment ClientDealEnvironment, de

// CheckForDealAcceptance is run until the deal is sealed and published by the provider, or errors
func CheckForDealAcceptance(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {

dealState, err := environment.GetProviderDealState(ctx.Context(), deal.ProposalCid)
if err != nil {
log.Warnf("error when querying provider deal state: %w", err) // TODO: at what point do we fail the deal?
Expand Down Expand Up @@ -214,6 +218,9 @@ func ValidateDealPublished(ctx fsm.Context, environment ClientDealEnvironment, d
_ = ctx.Trigger(storagemarket.ClientEventFundsReleased, deal.FundsReserved)
}

// at this point data transfer is complete, so unprotect peer connection
environment.UntagPeer(deal.Miner, deal.ProposalCid.String())

return ctx.Trigger(storagemarket.ClientEventDealPublished, dealID)
}

Expand Down Expand Up @@ -277,6 +284,8 @@ func FailDeal(ctx fsm.Context, environment ClientDealEnvironment, deal storagema
// TODO: store in some sort of audit log
log.Errorf("deal %s failed: %s", deal.ProposalCid, deal.Message)

environment.UntagPeer(deal.Miner, deal.ProposalCid.String())

return ctx.Trigger(storagemarket.ClientEventFailed)
}

Expand Down
19 changes: 17 additions & 2 deletions storagemarket/impl/clientstates/client_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestWaitForFunding(t *testing.T) {
}

func TestProposeDeal(t *testing.T) {
t.Run("succeeds and closes stream", func(t *testing.T) {
t.Run("succeeds, closes stream, and tags connection", func(t *testing.T) {
ds := tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{
ResponseReader: testResponseReader(t, responseParams{
state: storagemarket.StorageDealWaitingForData,
Expand All @@ -115,6 +115,8 @@ func TestProposeDeal(t *testing.T) {
inspector: func(deal storagemarket.ClientDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealStartDataTransfer, deal.State)
assert.Equal(t, 1, env.dealStream.CloseCount)
assert.Len(t, env.peerTagger.TagCalls, 1)
assert.Equal(t, deal.Miner, env.peerTagger.TagCalls[0])
},
})
})
Expand All @@ -141,7 +143,6 @@ func TestProposeDeal(t *testing.T) {
},
})
})

t.Run("write proposal fails fails", func(t *testing.T) {
ds := tut.NewTestStorageDealStream(tut.TestStorageDealStreamParams{
ProposalWriter: tut.FailStorageProposalWriter,
Expand Down Expand Up @@ -369,6 +370,8 @@ func TestValidateDealPublished(t *testing.T) {
assert.Equal(t, abi.DealID(5), deal.DealID)
assert.Equal(t, env.dealFunds.ReleaseCalls[0], deal.Proposal.ClientBalanceRequirement())
assert.True(t, deal.FundsReserved.Nil() || deal.FundsReserved.IsZero())
assert.Len(t, env.peerTagger.UntagCalls, 1)
assert.Equal(t, deal.Miner, env.peerTagger.UntagCalls[0])
},
})
})
Expand All @@ -379,6 +382,8 @@ func TestValidateDealPublished(t *testing.T) {
tut.AssertDealState(t, storagemarket.StorageDealSealing, deal.State)
assert.Equal(t, abi.DealID(5), deal.DealID)
assert.Len(t, env.dealFunds.ReleaseCalls, 0)
assert.Len(t, env.peerTagger.UntagCalls, 1)
assert.Equal(t, deal.Miner, env.peerTagger.UntagCalls[0])
},
})
})
Expand Down Expand Up @@ -546,6 +551,7 @@ func makeExecutor(ctx context.Context,
getDealStatusErr: envParams.getDealStatusErr,
pollingInterval: envParams.pollingInterval,
dealFunds: tut.NewTestDealFunds(),
peerTagger: tut.NewTestPeerTagger(),
}

if environment.pollingInterval == 0 {
Expand Down Expand Up @@ -617,6 +623,7 @@ type fakeEnvironment struct {
getDealStatusErr error
pollingInterval time.Duration
dealFunds *tut.TestDealFunds
peerTagger *tut.TestPeerTagger
}

type dataTransferParams struct {
Expand Down Expand Up @@ -663,6 +670,14 @@ func (fe *fakeEnvironment) DealFunds() funds.DealFunds {
return fe.dealFunds
}

func (fe *fakeEnvironment) TagPeer(id peer.ID, ident string) {
fe.peerTagger.TagPeer(id, ident)
}

func (fe *fakeEnvironment) UntagPeer(id peer.ID, ident string) {
fe.peerTagger.UntagPeer(id, ident)
}

var _ clientstates.ClientDealEnvironment = &fakeEnvironment{}

type responseParams struct {
Expand Down
9 changes: 9 additions & 0 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -668,6 +669,14 @@ func (p *providerDealEnvironment) DealFunds() funds.DealFunds {
return p.p.dealFunds
}

func (p *providerDealEnvironment) TagPeer(id peer.ID, s string) {
p.p.net.TagPeer(id, s)
}

func (p *providerDealEnvironment) UntagPeer(id peer.ID, s string) {
p.p.net.UntagPeer(id, s)
}

var _ providerstates.ProviderDealEnvironment = &providerDealEnvironment{}

type providerStoreGetter struct {
Expand Down
9 changes: 8 additions & 1 deletion storagemarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ type ProviderDealEnvironment interface {
PieceStore() piecestore.PieceStore
RunCustomDecisionLogic(context.Context, storagemarket.MinerDeal) (bool, string, error)
DealFunds() funds.DealFunds
network.PeerTagger
}

// ProviderStateEntryFunc is the signature for a StateEntryFunc in the provider FSM
type ProviderStateEntryFunc func(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error

// ValidateDealProposal validates a proposed deal against the provider criteria
func ValidateDealProposal(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {
environment.TagPeer(deal.Client, deal.ProposalCid.String())

tok, _, err := environment.Node().GetChainHead(ctx.Context())
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("node error getting most recent state id: %w", err))
Expand Down Expand Up @@ -385,6 +388,9 @@ func VerifyDealActivated(ctx fsm.Context, environment ProviderDealEnvironment, d

// WaitForDealCompletion waits for the deal to be slashed or to expire
func WaitForDealCompletion(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {
// At this point we have all the data so we can unprotect the connection
environment.UntagPeer(deal.Client, deal.ProposalCid.String())

node := environment.Node()

// Called when the deal expires
Expand Down Expand Up @@ -433,9 +439,10 @@ func RejectDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stora

// FailDeal cleans up before terminating a deal
func FailDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {

log.Warnf("deal %s failed: %s", deal.ProposalCid, deal.Message)

environment.UntagPeer(deal.Client, deal.ProposalCid.String())

if deal.PiecePath != filestore.Path("") {
err := environment.FileStore().Delete(deal.PiecePath)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions storagemarket/impl/providerstates/provider_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -56,6 +57,8 @@ func TestValidateDealProposal(t *testing.T) {
"succeeds": {
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealAcceptWait, deal.State)
require.Len(t, env.peerTagger.TagCalls, 1)
require.Equal(t, deal.Client, env.peerTagger.TagCalls[0])
},
},
"verify signature fails": {
Expand Down Expand Up @@ -255,6 +258,7 @@ func TestVerifyData(t *testing.T) {
tut.AssertDealState(t, storagemarket.StorageDealEnsureProviderFunds, deal.State)
require.Equal(t, expPath, deal.PiecePath)
require.Equal(t, expMetaPath, deal.MetadataPath)

},
},
"generate piece CID fails": {
Expand Down Expand Up @@ -741,13 +745,17 @@ func TestWaitForDealCompletion(t *testing.T) {
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealSlashed, deal.State)
require.Equal(t, abi.ChainEpoch(5), deal.SlashEpoch)
require.Len(t, env.peerTagger.UntagCalls, 1)
require.Equal(t, deal.Client, env.peerTagger.UntagCalls[0])
},
},
"expiration succeeds": {
// OnDealSlashedEpoch of zero signals to test node to call onDealExpired()
nodeParams: nodeParams{OnDealSlashedEpoch: abi.ChainEpoch(0)},
dealInspector: func(t *testing.T, deal storagemarket.MinerDeal, env *fakeEnvironment) {
tut.AssertDealState(t, storagemarket.StorageDealExpired, deal.State)
require.Len(t, env.peerTagger.UntagCalls, 1)
require.Equal(t, deal.Client, env.peerTagger.UntagCalls[0])
},
},
"slashing fails": {
Expand Down Expand Up @@ -1131,6 +1139,7 @@ func makeExecutor(ctx context.Context,
fs: fs,
pieceStore: pieceStore,
dealFunds: tut.NewTestDealFunds(),
peerTagger: tut.NewTestPeerTagger(),
}
if environment.pieceCid == cid.Undef {
environment.pieceCid = defaultPieceCid
Expand Down Expand Up @@ -1181,6 +1190,7 @@ type fakeEnvironment struct {
expectedTags map[string]struct{}
receivedTags map[string]struct{}
dealFunds *tut.TestDealFunds
peerTagger *tut.TestPeerTagger
}

func (fe *fakeEnvironment) Address() address.Address {
Expand Down Expand Up @@ -1231,3 +1241,13 @@ func (fe *fakeEnvironment) RunCustomDecisionLogic(context.Context, storagemarket
func (fe *fakeEnvironment) DealFunds() funds.DealFunds {
return fe.dealFunds
}

func (fe *fakeEnvironment) TagPeer(id peer.ID, s string) {
fe.peerTagger.TagPeer(id, s)
}

func (fe *fakeEnvironment) UntagPeer(id peer.ID, s string) {
fe.peerTagger.UntagPeer(id, s)
}

var _ providerstates.ProviderDealEnvironment = &fakeEnvironment{}
8 changes: 0 additions & 8 deletions storagemarket/network/deal_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,3 @@ func (d *dealStream) Close() error {
func (d *dealStream) RemotePeer() peer.ID {
return d.p
}

func (d *dealStream) TagProtectedConnection(identifier string) {
d.host.ConnManager().TagPeer(d.p, identifier, TagPriority)
}

func (d *dealStream) UntagProtectedConnection(identifier string) {
d.host.ConnManager().UntagPeer(d.p, identifier)
}
8 changes: 8 additions & 0 deletions storagemarket/network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,11 @@ func (impl *libp2pStorageMarketNetwork) ID() peer.ID {
func (impl *libp2pStorageMarketNetwork) AddAddrs(p peer.ID, addrs []ma.Multiaddr) {
impl.host.Peerstore().AddAddrs(p, addrs, 8*time.Hour)
}

func (impl *libp2pStorageMarketNetwork) TagPeer(p peer.ID, id string) {
impl.host.ConnManager().TagPeer(p, id, TagPriority)
}

func (impl *libp2pStorageMarketNetwork) UntagPeer(p peer.ID, id string) {
impl.host.ConnManager().UntagPeer(p, id)
}
10 changes: 8 additions & 2 deletions storagemarket/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ type StorageDealStream interface {
ReadDealResponse() (SignedResponse, error)
WriteDealResponse(SignedResponse) error
RemotePeer() peer.ID
TagProtectedConnection(identifier string)
UntagProtectedConnection(identifier string)
Close() error
}

Expand Down Expand Up @@ -60,4 +58,12 @@ type StorageMarketNetwork interface {
StopHandlingRequests() error
ID() peer.ID
AddAddrs(peer.ID, []ma.Multiaddr)

PeerTagger
}

// PeerTagger implements arbitrary tagging of peers
type PeerTagger interface {
TagPeer(peer.ID, string)
UntagPeer(peer.ID, string)
}

0 comments on commit a2d7bb5

Please sign in to comment.