diff --git a/markets/storageadapter/dealpublisher.go b/markets/storageadapter/dealpublisher.go new file mode 100644 index 00000000000..1923fbba0e5 --- /dev/null +++ b/markets/storageadapter/dealpublisher.go @@ -0,0 +1,264 @@ +package storageadapter + +import ( + "context" + "strings" + "sync" + "time" + + "go.uber.org/fx" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/node/config" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" +) + +type dealPublisherAPI interface { + StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error) + MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) +} + +// DealPublisher batches deal publishing so that many deals can be included in +// a single publish message. This saves gas for miners that publish deals +// frequently. +// When a deal is submitted, the DealPublisher waits a configurable amount of +// time for other deals to be submitted before sending the publish message. +// There is a configurable maximum number of deals that can be included in one +// message. When the limit is reached the DealPublisher immediately submits a +// publish message with all deals in the queue. +type DealPublisher struct { + api dealPublisherAPI + + ctx context.Context + Shutdown context.CancelFunc + + maxDealsPerPublishMsg uint64 + publishPeriod time.Duration + publishSpec *api.MessageSendSpec + + lk sync.Mutex + pending []*pendingDeal + cancelWaitForMoreDeals context.CancelFunc +} + +func NewDealPublisher( + feeConfig *config.MinerFeeConfig, + publishMsgCfg *config.PublishMsgConfig, +) func(lc fx.Lifecycle, dpapi dealPublisherAPI) *DealPublisher { + return func(lc fx.Lifecycle, dpapi dealPublisherAPI) *DealPublisher { + publishSpec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(feeConfig.MaxPublishDealsFee)} + dp := newDealPublisher(dpapi, publishMsgCfg, publishSpec) + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + dp.Shutdown() + return nil + }, + }) + return dp + } +} + +func newDealPublisher( + dpapi dealPublisherAPI, + publishMsgCfg *config.PublishMsgConfig, + publishSpec *api.MessageSendSpec, +) *DealPublisher { + ctx, cancel := context.WithCancel(context.Background()) + return &DealPublisher{ + api: dpapi, + ctx: ctx, + Shutdown: cancel, + maxDealsPerPublishMsg: publishMsgCfg.MaxDealsPerMsg, + publishPeriod: time.Duration(publishMsgCfg.PublishPeriod), + publishSpec: publishSpec, + } +} + +func (p *DealPublisher) Publish(ctx context.Context, deal market2.ClientDealProposal) (cid.Cid, error) { + pdeal := newPendingDeal(ctx, deal) + + // Add the deal to the queue + p.processNewDeal(pdeal) + + // Wait for the deal to be submitted + select { + case <-ctx.Done(): + return cid.Undef, ctx.Err() + case res := <-pdeal.Result: + return res.msgCid, res.err + } +} + +func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) { + p.lk.Lock() + defer p.lk.Unlock() + + // Add the deal to the queue + p.pending = append(p.pending, pdeal) + // Filter out any cancelled deals + p.filterCancelledDeals() + + // If the maximum number of deals per message has been reached, + // send a publish message + if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg { + p.publishAllDeals() + return + } + + // Otherwise wait for more deals to arrive or the timeout to be reached + p.waitForMoreDeals() +} + +func (p *DealPublisher) waitForMoreDeals() { + // If we already set the timeout + if p.cancelWaitForMoreDeals != nil { + // If there are some pending deals, wait for the timeout to expire + if len(p.pending) > 0 { + return + } + // If all pending deals have been cancelled, clear the timeout + p.cancelWaitForMoreDeals() + } + + // Set a timeout to wait for more deals to arrive + ctx, cancel := context.WithCancel(p.ctx) + p.cancelWaitForMoreDeals = cancel + + go func() { + select { + case <-ctx.Done(): + case <-time.After(p.publishPeriod): + p.lk.Lock() + defer p.lk.Unlock() + + // The timeout has expired so publish all pending deals + p.publishAllDeals() + } + }() +} + +func (p *DealPublisher) publishAllDeals() { + // If the timeout hasn't yet expired, cancel it + if p.cancelWaitForMoreDeals != nil { + p.cancelWaitForMoreDeals() + p.cancelWaitForMoreDeals = nil + } + + // Filter out any deals that have been cancelled + p.filterCancelledDeals() + deals := p.pending[:] + p.pending = nil + + // Send the publish message + go p.publishReady(deals) +} + +func (p *DealPublisher) publishReady(ready []*pendingDeal) { + if len(ready) == 0 { + return + } + + deals := make([]market2.ClientDealProposal, 0, len(ready)) + for _, pd := range ready { + deals = append(deals, pd.deal) + } + + // Send the publish message + msgCid, err := p.publishDealProposals(deals) + + // Signal that each deal has been published + for _, pd := range ready { + pd := pd + go func() { + res := publishResult{ + msgCid: msgCid, + err: err, + } + select { + case <-p.ctx.Done(): + case pd.Result <- res: + } + }() + } +} + +// Sends the publish message +func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal) (cid.Cid, error) { + log.Infof("publishing %d deals with piece CIDs: %s", len(deals), pieceCids(deals)) + + provider := deals[0].Proposal.Provider + mi, err := p.api.StateMinerInfo(p.ctx, provider, types.EmptyTSK) + if err != nil { + return cid.Undef, err + } + + params, err := actors.SerializeParams(&market2.PublishStorageDealsParams{ + Deals: deals, + }) + + if err != nil { + return cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err) + } + + smsg, err := p.api.MpoolPushMessage(p.ctx, &types.Message{ + To: market.Address, + From: mi.Worker, + Value: types.NewInt(0), + Method: market.Methods.PublishStorageDeals, + Params: params, + }, p.publishSpec) + + if err != nil { + return cid.Undef, err + } + return smsg.Cid(), nil +} + +func pieceCids(deals []market2.ClientDealProposal) string { + cids := make([]string, 0, len(deals)) + for _, dl := range deals { + cids = append(cids, dl.Proposal.PieceCID.String()) + } + return strings.Join(cids, ", ") +} + +// filter out deals that have been cancelled +func (p *DealPublisher) filterCancelledDeals() { + i := 0 + for _, pd := range p.pending { + if pd.ctx.Err() == nil { + p.pending[i] = pd + i++ + } + } + p.pending = p.pending[:i] +} + +type publishResult struct { + msgCid cid.Cid + err error +} + +type pendingDeal struct { + ctx context.Context + deal market2.ClientDealProposal + Result chan publishResult +} + +func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal { + return &pendingDeal{ + ctx: ctx, + deal: deal, + Result: make(chan publishResult), + } +} diff --git a/markets/storageadapter/dealpublisher_test.go b/markets/storageadapter/dealpublisher_test.go new file mode 100644 index 00000000000..77e8e65f84e --- /dev/null +++ b/markets/storageadapter/dealpublisher_test.go @@ -0,0 +1,227 @@ +package storageadapter + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/filecoin-project/go-state-types/crypto" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + "github.com/ipfs/go-cid" + + "github.com/stretchr/testify/require" + + tutils "github.com/filecoin-project/specs-actors/v2/support/testing" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" + market0 "github.com/filecoin-project/specs-actors/actors/builtin/market" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/node/config" +) + +func TestDealPublisher(t *testing.T) { + testCases := []struct { + name string + publishPeriod time.Duration + maxDealsPerMsg uint64 + dealCountWithinPublishPeriod int + expiredWithinPublishPeriod int + dealCountAfterPublishPeriod int + expectedDealsPerMsg []int + }{{ + name: "publish one deal within publish period", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 1, + dealCountAfterPublishPeriod: 0, + expectedDealsPerMsg: []int{1}, + }, { + name: "publish two deals within publish period", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 2, + dealCountAfterPublishPeriod: 0, + expectedDealsPerMsg: []int{2}, + }, { + name: "publish one deal within publish period, and one after", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 1, + dealCountAfterPublishPeriod: 1, + expectedDealsPerMsg: []int{1, 1}, + }, { + name: "publish deals that exceed max deals per message within publish period, and one after", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 2, + dealCountWithinPublishPeriod: 3, + dealCountAfterPublishPeriod: 1, + expectedDealsPerMsg: []int{2, 1, 1}, + }, { + name: "ignore expired deals", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 2, + expiredWithinPublishPeriod: 2, + dealCountAfterPublishPeriod: 1, + expectedDealsPerMsg: []int{2, 1}, + }} + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + client := tutils.NewActorAddr(t, "client") + provider := tutils.NewActorAddr(t, "provider") + worker := tutils.NewActorAddr(t, "worker") + dpapi := newDPAPI(t, worker) + + // Create a deal publisher + dp := newDealPublisher(dpapi, &config.PublishMsgConfig{ + PublishPeriod: config.Duration(tc.publishPeriod), + MaxDealsPerMsg: tc.maxDealsPerMsg, + }, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)}) + + // Keep a record of the deals that were submitted to be published + var dealsToPublish []market.ClientDealProposal + publishDeal := func(expired bool) { + pctx := ctx + var cancel context.CancelFunc + if expired { + pctx, cancel = context.WithCancel(ctx) + cancel() + } + + deal := market.ClientDealProposal{ + Proposal: market0.DealProposal{ + PieceCID: generateCids(1)[0], + Client: client, + Provider: provider, + }, + ClientSignature: crypto.Signature{ + Type: crypto.SigTypeSecp256k1, + Data: []byte("signature data"), + }, + } + if !expired { + dealsToPublish = append(dealsToPublish, deal) + } + go func() { + _, err := dp.Publish(pctx, deal) + if expired { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }() + } + + // Publish deals within publish period + for i := 0; i < tc.dealCountWithinPublishPeriod; i++ { + publishDeal(false) + } + for i := 0; i < tc.expiredWithinPublishPeriod; i++ { + publishDeal(true) + } + + // Wait until publish period has elapsed + time.Sleep(2 * tc.publishPeriod) + + // Publish deals after publish period + for i := 0; i < tc.dealCountAfterPublishPeriod; i++ { + publishDeal(false) + } + + // For each message that was expected to be sent + var publishedDeals []market.ClientDealProposal + for _, expectedDealsInMsg := range tc.expectedDealsPerMsg { + // Should have called StateMinerInfo with the provider address + stateMinerInfoAddr := <-dpapi.stateMinerInfoCalls + require.Equal(t, provider, stateMinerInfoAddr) + + // Check the fields of the message that was sent + msg := <-dpapi.pushedMsgs + require.Equal(t, worker, msg.From) + require.Equal(t, market.Address, msg.To) + require.Equal(t, market.Methods.PublishStorageDeals, msg.Method) + + // Check that the expected number of deals was included in the message + var params market2.PublishStorageDealsParams + err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)) + require.NoError(t, err) + require.Len(t, params.Deals, expectedDealsInMsg) + + // Keep track of the deals that were sent + for _, d := range params.Deals { + publishedDeals = append(publishedDeals, d) + } + } + + // Verify that all deals that were submitted to be published were + // sent out (we do this by ensuring all the piece CIDs are present) + require.True(t, matchPieceCids(publishedDeals, dealsToPublish)) + }) + } +} + +func matchPieceCids(sent []market.ClientDealProposal, exp []market.ClientDealProposal) bool { + cidsA := dealPieceCids(sent) + cidsB := dealPieceCids(exp) + + if len(cidsA) != len(cidsB) { + return false + } + + s1 := cid.NewSet() + for _, c := range cidsA { + s1.Add(c) + } + + for _, c := range cidsB { + if !s1.Has(c) { + return false + } + } + + return true +} + +func dealPieceCids(deals []market2.ClientDealProposal) []cid.Cid { + cids := make([]cid.Cid, 0, len(deals)) + for _, dl := range deals { + cids = append(cids, dl.Proposal.PieceCID) + } + return cids +} + +type dpAPI struct { + t *testing.T + worker address.Address + + stateMinerInfoCalls chan address.Address + pushedMsgs chan *types.Message +} + +func newDPAPI(t *testing.T, worker address.Address) *dpAPI { + return &dpAPI{ + t: t, + worker: worker, + stateMinerInfoCalls: make(chan address.Address, 128), + pushedMsgs: make(chan *types.Message, 128), + } +} + +func (d *dpAPI) StateMinerInfo(ctx context.Context, address address.Address, key types.TipSetKey) (miner.MinerInfo, error) { + d.stateMinerInfoCalls <- address + return miner.MinerInfo{Worker: d.worker}, nil +} + +func (d *dpAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) { + d.pushedMsgs <- msg + return &types.SignedMessage{Message: *msg}, nil +} diff --git a/markets/storageadapter/getcurrentdealinfo.go b/markets/storageadapter/getcurrentdealinfo.go index ab8c3f52fc6..8cc1f83a38c 100644 --- a/markets/storageadapter/getcurrentdealinfo.go +++ b/markets/storageadapter/getcurrentdealinfo.go @@ -52,6 +52,7 @@ func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDea } if len(retval.IDs) != 1 { + // TODO: now we send messages with more than one deal // market currently only ever sends messages with 1 deal return dealID, nil, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal") } diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 4ce32d2bfa0..72a37c1c0ab 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -22,7 +22,6 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/events/state" @@ -48,56 +47,33 @@ type ProviderNodeAdapter struct { secb *sectorblocks.SectorBlocks ev *events.Events - publishSpec, addBalanceSpec *api.MessageSendSpec - dsMatcher *dealStateMatcher + dealPublisher *DealPublisher + + addBalanceSpec *api.MessageSendSpec + dsMatcher *dealStateMatcher } -func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { - return func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { +func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode { + return func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode { na := &ProviderNodeAdapter{ FullNode: full, - dag: dag, - secb: secb, - ev: events.NewEvents(context.TODO(), full), - dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(full))), + dag: dag, + secb: secb, + ev: events.NewEvents(context.TODO(), full), + dealPublisher: dealPublisher, + dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(full))), } if fc != nil { - na.publishSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxPublishDealsFee)} na.addBalanceSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxMarketBalanceAddFee)} } + return na } } func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (cid.Cid, error) { - log.Info("publishing deal") - - mi, err := n.StateMinerInfo(ctx, deal.Proposal.Provider, types.EmptyTSK) - if err != nil { - return cid.Undef, err - } - - params, err := actors.SerializeParams(&market2.PublishStorageDealsParams{ - Deals: []market2.ClientDealProposal{deal.ClientDealProposal}, - }) - - if err != nil { - return cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err) - } - - // TODO: We may want this to happen after fetching data - smsg, err := n.MpoolPushMessage(ctx, &types.Message{ - To: market.Address, - From: mi.Worker, - Value: types.NewInt(0), - Method: market.Methods.PublishStorageDeals, - Params: params, - }, n.publishSpec) - if err != nil { - return cid.Undef, err - } - return smsg.Cid(), nil + return n.dealPublisher.Publish(ctx, deal.ClientDealProposal) } func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader) (*storagemarket.PackingResult, error) { diff --git a/node/builder.go b/node/builder.go index 8ee9b367440..5ab0867d9ed 100644 --- a/node/builder.go +++ b/node/builder.go @@ -377,6 +377,7 @@ func Online() Option { Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)), Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)), Override(new(storagemarket.StorageProvider), modules.StorageProvider), + Override(new(storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, nil)), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)), Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds), Override(HandleRetrievalKey, modules.HandleRetrieval), @@ -519,6 +520,7 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))), ), + Override(new(storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, &cfg.PublishMsg)), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)), Override(new(sectorstorage.SealerConfig), cfg.Storage), diff --git a/node/config/def.go b/node/config/def.go index 68371c3842a..6301324014a 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -33,6 +33,7 @@ type StorageMiner struct { Common Dealmaking DealmakingConfig + PublishMsg PublishMsgConfig Sealing SealingConfig Storage sectorstorage.SealerConfig Fees MinerFeeConfig @@ -53,6 +54,15 @@ type DealmakingConfig struct { RetrievalFilter string } +type PublishMsgConfig struct { + // The amount of time to wait for more deals to arrive before + // publishing + PublishPeriod Duration + // The maximum number of deals to include in a single PublishStorageDeals + // message + MaxDealsPerMsg uint64 +} + type SealingConfig struct { // 0 = no limit MaxWaitDealsSectors uint64 @@ -208,6 +218,11 @@ func DefaultStorageMiner() *StorageMiner { ExpectedSealDuration: Duration(time.Hour * 24), }, + PublishMsg: PublishMsgConfig{ + PublishPeriod: Duration(time.Hour), + MaxDealsPerMsg: 8, + }, + Fees: MinerFeeConfig{ MaxPreCommitGasFee: types.MustParseFIL("0.025"), MaxCommitGasFee: types.MustParseFIL("0.05"),