Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage Market Changes Based On Lotus Integration #223

Merged
merged 3 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ require (
github.com/ipfs/go-cid v0.0.5
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-graphsync v0.0.6-0.20200504202014-9d5f2c26a103
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-blockstore v1.0.0
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v0.1.1
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipld-cbor v0.0.4
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2Is
github.com/ipfs/go-ipfs-blockstore v0.1.1/go.mod h1:8gZOgIN5e+Xdg2YSGdwTTRbguSVjYyosIDRQCY8E9QM=
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=
github.com/ipfs/go-ipfs-blockstore v1.0.0 h1:pmFp5sFYsYVvMOp9X01AK3s85usVcLvkBTRsN6SnfUA=
github.com/ipfs/go-ipfs-blockstore v1.0.0/go.mod h1:knLVdhVU9L7CC4T+T4nvGdeUIPAXlnd9zmXfp+9MIjU=
github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ=
github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk=
github.com/ipfs/go-ipfs-chunker v0.0.1 h1:cHUUxKFQ99pozdahi+uSC/3Y6HeRpi9oTeUHbE27SEw=
Expand All @@ -300,6 +302,8 @@ github.com/ipfs/go-ipfs-ds-help v0.0.1 h1:QBg+Ts2zgeemK/dB0saiF/ykzRGgfoFMT90Rzo
github.com/ipfs/go-ipfs-ds-help v0.0.1/go.mod h1:gtP9xRaZXqIQRh1HRpp595KbBEdgqWFxefeVKOV8sxo=
github.com/ipfs/go-ipfs-ds-help v0.1.1 h1:IW/bXGeaAZV2VH0Kuok+Ohva/zHkHmeLFBxC1k7mNPc=
github.com/ipfs/go-ipfs-ds-help v0.1.1/go.mod h1:SbBafGJuGsPI/QL3j9Fc5YPLeAu+SzOkI0gFwAg+mOs=
github.com/ipfs/go-ipfs-ds-help v1.0.0 h1:bEQ8hMGs80h0sR8O4tfDgV6B01aaF9qeTrujrTLYV3g=
github.com/ipfs/go-ipfs-ds-help v1.0.0/go.mod h1:ujAbkeIgkKAWtxxNkoZHWLCyk5JpPoKnGyCcsoF6ueE=
github.com/ipfs/go-ipfs-exchange-interface v0.0.1 h1:LJXIo9W7CAmugqI+uofioIpRb6rY30GUu7G6LUfpMvM=
github.com/ipfs/go-ipfs-exchange-interface v0.0.1/go.mod h1:c8MwfHjtQjPoDyiy9cFquVtVHkO9b9Ob3FG91qJnWCM=
github.com/ipfs/go-ipfs-exchange-offline v0.0.1 h1:P56jYKZF7lDDOLx5SotVh5KFxoY6C81I1NSHW1FxGew=
Expand Down
4 changes: 2 additions & 2 deletions retrievalmarket/discovery/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewLocal(ds datastore.Batching) *Local {
}

func (l *Local) AddPeer(cid cid.Cid, peer retrievalmarket.RetrievalPeer) error {
key := dshelp.CidToDsKey(cid)
key := dshelp.MultihashToDsKey(cid.Hash())
exists, err := l.ds.Has(key)
if err != nil {
return err
Expand Down Expand Up @@ -64,7 +64,7 @@ func hasPeer(peerList []retrievalmarket.RetrievalPeer, peer retrievalmarket.Retr
}

func (l *Local) GetPeers(payloadCID cid.Cid) ([]retrievalmarket.RetrievalPeer, error) {
entry, err := l.ds.Get(dshelp.CidToDsKey(payloadCID))
entry, err := l.ds.Get(dshelp.MultihashToDsKey(payloadCID.Hash()))
if err == datastore.ErrNotFound {
return []retrievalmarket.RetrievalPeer{}, nil
}
Expand Down
10 changes: 6 additions & 4 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,13 @@ func (c *Client) AddPaymentEscrow(ctx context.Context, addr address.Address, amo
return err
}

err = c.node.WaitForMessage(mcid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error {
if code == exitcode.Ok {
done <- nil
err = c.node.WaitForMessage(ctx, mcid, func(code exitcode.ExitCode, bytes []byte, err error) error {
if err != nil {
done <- xerrors.Errorf("AddFunds errored: %w", err)
} else if code != exitcode.Ok {
done <- xerrors.Errorf("AddFunds error, exit code: %s", code.String())
} else {
done <- xerrors.Errorf("AddFunds error, exit code: %w", code)
done <- nil
}
return nil
})
Expand Down
12 changes: 8 additions & 4 deletions storagemarket/impl/clientstates/client_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ func EnsureClientFunds(ctx fsm.Context, environment ClientDealEnvironment, deal
func WaitForFunding(ctx fsm.Context, environment ClientDealEnvironment, deal storagemarket.ClientDeal) error {
node := environment.Node()

return node.WaitForMessage(deal.AddFundsCid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error {
if code == exitcode.Ok {
return ctx.Trigger(storagemarket.ClientEventFundsEnsured)
return node.WaitForMessage(ctx.Context(), deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error {
if err != nil {
return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("AddFunds err: %w", err))
}
if code != exitcode.Ok {
return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("AddFunds exit code: %s", code.String()))
}
return ctx.Trigger(storagemarket.ClientEventEnsureFundsFailed, xerrors.Errorf("AddFunds exit code: %w", code))
return ctx.Trigger(storagemarket.ClientEventFundsEnsured)

})
}

Expand Down
10 changes: 6 additions & 4 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,13 @@ func (p *Provider) AddStorageCollateral(ctx context.Context, amount abi.TokenAmo
return err
}

err = p.spn.WaitForMessage(mcid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error {
if code == exitcode.Ok {
done <- nil
err = p.spn.WaitForMessage(ctx, mcid, func(code exitcode.ExitCode, bytes []byte, err error) error {
if err != nil {
done <- xerrors.Errorf("AddFunds errored: %w", err)
} else if code != exitcode.Ok {
done <- xerrors.Errorf("AddFunds error, exit code: %s", code.String())
} else {
done <- xerrors.Errorf("AddFunds error, exit code: %w", code)
done <- nil
}
return nil
})
Expand Down
62 changes: 34 additions & 28 deletions storagemarket/impl/providerstates/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,14 @@ func EnsureProviderFunds(ctx fsm.Context, environment ProviderDealEnvironment, d
func WaitForFunding(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {
node := environment.Node()

return node.WaitForMessage(deal.AddFundsCid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, bytes []byte) error {
if code == exitcode.Ok {
return ctx.Trigger(storagemarket.ProviderEventFunded)
return node.WaitForMessage(ctx.Context(), deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error {
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds errored: %w", err))
}
if code != exitcode.Ok {
return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds exit code: %s", code.String()))
}
return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds exit code: %w", code))
return ctx.Trigger(storagemarket.ProviderEventFunded)
})
}

Expand All @@ -220,32 +223,35 @@ func PublishDeal(ctx fsm.Context, environment ProviderDealEnvironment, deal stor

// WaitForPublish waits for the publish message on chain and sends the deal id back to the client
func WaitForPublish(ctx fsm.Context, environment ProviderDealEnvironment, deal storagemarket.MinerDeal) error {
return environment.Node().WaitForMessage(deal.PublishCid, storagemarket.ChainConfidence, func(code exitcode.ExitCode, retBytes []byte) error {
if code == exitcode.Ok {
var retval market.PublishStorageDealsReturn
err := retval.UnmarshalCBOR(bytes.NewReader(retBytes))
if err != nil {
return err
}

err = environment.SendSignedResponse(ctx.Context(), &network.Response{
State: storagemarket.StorageDealProposalAccepted,
Proposal: deal.ProposalCid,
PublishMessage: &deal.PublishCid,
})

if err != nil {
return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err)
}

if err := environment.Disconnect(deal.ProposalCid); err != nil {
log.Warnf("closing client connection: %+v", err)
}

return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0])
return environment.Node().WaitForMessage(ctx.Context(), deal.PublishCid, func(code exitcode.ExitCode, retBytes []byte, err error) error {
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals errored: %w", err))
}
if code != exitcode.Ok {
return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals exit code: %s", code.String()))
}
var retval market.PublishStorageDealsReturn
err = retval.UnmarshalCBOR(bytes.NewReader(retBytes))
if err != nil {
return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals error unmarshalling result: %w", err))
}

err = environment.SendSignedResponse(ctx.Context(), &network.Response{
State: storagemarket.StorageDealProposalAccepted,
Proposal: deal.ProposalCid,
PublishMessage: &deal.PublishCid,
})

if err != nil {
return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err)
}

return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals exit code: %w", code))
if err := environment.Disconnect(deal.ProposalCid); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about Disconnect. Why is Disconnect called only when is no error? Also, there is a Disconnect call in SendSignedResponse, although in the tests it shows that it's harmless to do it twice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disconnect gets called in the DealFailing handler, so no need to call it here. It probably should not be calling disconnect in SendSignedResponse and I will check that and add a ticket if needed.

log.Warnf("closing client connection: %+v", err)
}

return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0])

})
}

Expand Down
13 changes: 7 additions & 6 deletions storagemarket/testnodes/testnodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ type FakeCommonNode struct {
GetBalanceError error
GetChainHeadError error

WaitForMessageBlocks bool
WaitForMessageError error
WaitForMessageExitCode exitcode.ExitCode
WaitForMessageRetBytes []byte
WaitForMessageBlocks bool
WaitForMessageError error
WaitForMessageExitCode exitcode.ExitCode
WaitForMessageRetBytes []byte
WaitForMessageNodeError error
}

// GetChainHead returns the state id in the storage market state
Expand Down Expand Up @@ -127,7 +128,7 @@ func (n *FakeCommonNode) EnsureFunds(ctx context.Context, addr, wallet address.A
return cid.Undef, n.EnsureFundsError
}

func (n *FakeCommonNode) WaitForMessage(mcid cid.Cid, confidence int64, onCompletion func(exitcode.ExitCode, []byte) error) error {
func (n *FakeCommonNode) WaitForMessage(ctx context.Context, mcid cid.Cid, onCompletion func(exitcode.ExitCode, []byte, error) error) error {
if n.WaitForMessageError != nil {
return n.WaitForMessageError
}
Expand All @@ -137,7 +138,7 @@ func (n *FakeCommonNode) WaitForMessage(mcid cid.Cid, confidence int64, onComple
return nil
}

return onCompletion(n.WaitForMessageExitCode, n.WaitForMessageRetBytes)
return onCompletion(n.WaitForMessageExitCode, n.WaitForMessageRetBytes, n.WaitForMessageNodeError)
}

// GetBalance returns the funds in the storage market state
Expand Down
3 changes: 1 addition & 2 deletions storagemarket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

const DealProtocolID = "/fil/storage/mk/1.0.1"
const AskProtocolID = "/fil/storage/ask/1.0.1"
const ChainConfidence = 10

type Balance struct {
Locked abi.TokenAmount
Expand Down Expand Up @@ -361,7 +360,7 @@ type StorageFunds interface {
// Verify a signature against an address + data
VerifySignature(ctx context.Context, signature crypto.Signature, signer address.Address, plaintext []byte, tok shared.TipSetToken) (bool, error)

WaitForMessage(mcid cid.Cid, confidence int64, onCompletion func(exitcode.ExitCode, []byte) error) error
WaitForMessage(ctx context.Context, mcid cid.Cid, onCompletion func(exitcode.ExitCode, []byte, error) error) error
}

// Node dependencies for a StorageProvider
Expand Down