Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add commands to list pending deals and force publish #5538

Merged
merged 7 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/chain/types"
Expand Down Expand Up @@ -105,10 +106,12 @@ type StorageMiner interface {
MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error)
MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error)
MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)
// MinerRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
// MarketRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
// ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer
// MarketCancelDataTransfer cancels a data transfer with the given transfer ID and other peer
MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
MarketPendingDeals(ctx context.Context) (PendingDealInfo, error)
MarketPublishPendingDeals(ctx context.Context) error

DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
DealsList(ctx context.Context) ([]MarketDeal, error)
Expand Down Expand Up @@ -236,3 +239,11 @@ type AddressConfig struct {
CommitControl []address.Address
TerminateControl []address.Address
}

// PendingDealInfo has info about pending deals and when they are due to be
// published
type PendingDealInfo struct {
Deals []market.ClientDealProposal
PublishPeriodStart time.Time
PublishPeriod time.Duration
}
14 changes: 12 additions & 2 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,10 @@ type StorageMinerStruct struct {
MarketGetRetrievalAsk func(ctx context.Context) (*retrievalmarket.Ask, error) `perm:"read"`
MarketListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`
MarketRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"`
MarketCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"`
MarketRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"write"`
MarketCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"write"`
MarketPendingDeals func(ctx context.Context) (api.PendingDealInfo, error) `perm:"write"`
MarketPublishPendingDeals func(ctx context.Context) error `perm:"admin"`

PledgeSector func(context.Context) error `perm:"write"`

Expand Down Expand Up @@ -1506,6 +1508,14 @@ func (c *StorageMinerStruct) MarketCancelDataTransfer(ctx context.Context, trans
return c.Internal.MarketCancelDataTransfer(ctx, transferID, otherPeer, isInitiator)
}

func (c *StorageMinerStruct) MarketPendingDeals(ctx context.Context) (api.PendingDealInfo, error) {
return c.Internal.MarketPendingDeals(ctx)
}

func (c *StorageMinerStruct) MarketPublishPendingDeals(ctx context.Context) error {
return c.Internal.MarketPublishPendingDeals(ctx)
}

func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error {
return c.Internal.DealsImportData(ctx, dealPropCid, file)
}
Expand Down
50 changes: 49 additions & 1 deletion cmd/lotus-storage-miner/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ import (

tm "github.com/buger/goterm"
"github.com/docker/go-units"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multibase"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

cborutil "github.com/filecoin-project/go-cbor-util"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"

Expand Down Expand Up @@ -341,6 +342,7 @@ var storageDealsCmd = &cli.Command{
getBlocklistCmd,
resetBlocklistCmd,
setSealDurationCmd,
dealsPendingPublish,
},
}

Expand Down Expand Up @@ -825,3 +827,49 @@ var transfersListCmd = &cli.Command{
return nil
},
}

var dealsPendingPublish = &cli.Command{
Name: "pending-publish",
Usage: "list deals waiting in publish queue",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "publish-now",
Usage: "send a publish message now",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)

if cctx.Bool("publish-now") {
if err := api.MarketPublishPendingDeals(ctx); err != nil {
return xerrors.Errorf("publishing deals: %w", err)
}
fmt.Println("triggered deal publishing")
return nil
}

pending, err := api.MarketPendingDeals(ctx)
if err != nil {
return xerrors.Errorf("getting pending deals: %w", err)
}

w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
_, _ = fmt.Fprintf(w, "ProposalCID\tClient\tSize\n")

for _, deal := range pending.Deals {
proposalNd, err := cborutil.AsIpld(&deal) // nolint
if err != nil {
return err
}

_, _ = fmt.Fprintf(w, "%s\t%s\t%s\n", proposalNd.Cid(), deal.Proposal.Client, units.BytesSize(float64(deal.Proposal.PieceSize)))
}

return w.Flush()
},
}
35 changes: 31 additions & 4 deletions documentation/en/api-methods-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
* [MarketListDeals](#MarketListDeals)
* [MarketListIncompleteDeals](#MarketListIncompleteDeals)
* [MarketListRetrievalDeals](#MarketListRetrievalDeals)
* [MarketPendingDeals](#MarketPendingDeals)
* [MarketPublishPendingDeals](#MarketPublishPendingDeals)
* [MarketRestartDataTransfer](#MarketRestartDataTransfer)
* [MarketSetAsk](#MarketSetAsk)
* [MarketSetRetrievalAsk](#MarketSetRetrievalAsk)
Expand Down Expand Up @@ -524,10 +526,10 @@ Response: `{}`


### MarketCancelDataTransfer
ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer
MarketCancelDataTransfer cancels a data transfer with the given transfer ID and other peer


Perms: read
Perms: write

Inputs:
```json
Expand Down Expand Up @@ -725,11 +727,36 @@ Inputs: `null`

Response: `null`

### MarketPendingDeals
There are not yet any comments for this method.

Perms: write

Inputs: `null`

Response:
```json
{
"Deals": null,
"PublishPeriodStart": "0001-01-01T00:00:00Z",
"PublishPeriod": 60000000000
}
```

### MarketPublishPendingDeals
There are not yet any comments for this method.

Perms: admin

Inputs: `null`

Response: `{}`

### MarketRestartDataTransfer
MinerRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
MarketRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer


Perms: read
Perms: write

Inputs:
```json
Expand Down
35 changes: 35 additions & 0 deletions markets/storageadapter/dealpublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,41 @@ func newDealPublisher(
}
}

// PendingDeals returns the list of deals that are queued up to be published
func (p *DealPublisher) PendingDeals() api.PendingDealInfo {
p.lk.Lock()
defer p.lk.Unlock()

// Filter out deals whose context has been cancelled
deals := make([]*pendingDeal, 0, len(p.pending))
for _, dl := range p.pending {
if dl.ctx.Err() == nil {
deals = append(deals, dl)
}
}

pending := make([]market2.ClientDealProposal, len(deals))
for i, deal := range deals {
pending[i] = deal.deal
}

return api.PendingDealInfo{
Deals: pending,
PublishPeriodStart: p.publishPeriodStart,
PublishPeriod: p.publishPeriod,
}
}

// ForcePublishPendingDeals publishes all pending deals without waiting for
// the publish period to elapse
func (p *DealPublisher) ForcePublishPendingDeals() {
p.lk.Lock()
defer p.lk.Unlock()

log.Infof("force publishing deals")
p.publishAllDeals()
}

func (p *DealPublisher) Publish(ctx context.Context, deal market2.ClientDealProposal) (cid.Cid, error) {
pdeal := newPendingDeal(ctx, deal)

Expand Down
Loading