From a99592f8c326de3d9c22d680ddb15a1b55f7c4a3 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 15 Jul 2024 19:50:24 +0400 Subject: [PATCH 1/5] fix!: sealer: handle initialisation error without panic storage/pipeline.NewPreCommitBatcher and storage/pipeline.New now have an additional error return to deal with errors arising from fetching the sealing config. --- itests/api_test.go | 3 +++ node/modules/storageminer.go | 5 ++++- storage/pipeline/precommit_batch.go | 18 +++++++++--------- storage/pipeline/sealing.go | 14 +++++++++----- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/itests/api_test.go b/itests/api_test.go index e3a41256e34..ceed874f428 100644 --- a/itests/api_test.go +++ b/itests/api_test.go @@ -172,6 +172,8 @@ func (ts *apiSuite) testOutOfGasError(t *testing.T) { build.BlockGasLimit = originalLimit }() + t.Logf("BlockGasLimit changed: %d", buildconstants.BlockGasLimit) + msg := &types.Message{ From: senderAddr, To: senderAddr, @@ -288,6 +290,7 @@ func (ts *apiSuite) testNonGenesisMiner(t *testing.T) { ctx := context.Background() full, genesisMiner, ens := kit.EnsembleMinimal(t, append(ts.opts, kit.MockProofs())...) + ens.InterconnectAll().BeginMining(4 * time.Millisecond) time.Sleep(1 * time.Second) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 04a0a53768c..b244b4a28ca 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -255,7 +255,10 @@ func SealingPipeline(fc config.MinerFeeConfig) func(params SealingPipelineParams provingBuffer := md.WPoStProvingPeriod * 2 pcp := sealing.NewBasicPreCommitPolicy(api, gsd, provingBuffer) - pipeline := sealing.New(ctx, api, fc, evts, maddr, ds, sealer, verif, prover, &pcp, gsd, j, as) + pipeline, err := sealing.New(ctx, api, fc, evts, maddr, ds, sealer, verif, prover, &pcp, gsd, j, as) + if err != nil { + return nil, xerrors.Errorf("creating sealing pipeline: %w", err) + } lc.Append(fx.Hook{ OnStart: func(context.Context) error { diff --git a/storage/pipeline/precommit_batch.go b/storage/pipeline/precommit_batch.go index 55bead59037..0efe446bdd9 100644 --- a/storage/pipeline/precommit_batch.go +++ b/storage/pipeline/precommit_batch.go @@ -67,7 +67,7 @@ type PreCommitBatcher struct { lk sync.Mutex } -func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc) *PreCommitBatcher { +func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc) (*PreCommitBatcher, error) { b := &PreCommitBatcher{ api: api, maddr: maddr, @@ -86,20 +86,20 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom stopped: make(chan struct{}), } - go b.run() + cfg, err := b.getConfig() + if err != nil { + return nil, xerrors.Errorf("failed to get sealer config: %w", err) + } + + go b.run(cfg) - return b + return b, nil } -func (b *PreCommitBatcher) run() { +func (b *PreCommitBatcher) run(cfg sealiface.Config) { var forceRes chan []sealiface.PreCommitBatchRes var lastRes []sealiface.PreCommitBatchRes - cfg, err := b.getConfig() - if err != nil { - panic(err) - } - timer := time.NewTimer(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack)) for { if forceRes != nil { diff --git a/storage/pipeline/sealing.go b/storage/pipeline/sealing.go index 75791fae8c0..c111565efea 100644 --- a/storage/pipeline/sealing.go +++ b/storage/pipeline/sealing.go @@ -230,7 +230,7 @@ type pendingPiece struct { accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error) } -func New(mctx context.Context, sapi SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc dtypes.GetSealingConfigFunc, journal journal.Journal, addrSel AddressSelector) *Sealing { +func New(mctx context.Context, sapi SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc dtypes.GetSealingConfigFunc, journal journal.Journal, addrSel AddressSelector) (*Sealing, error) { s := &Sealing{ Api: sapi, DealInfo: &CurrentDealInfoManager{sapi}, @@ -257,9 +257,8 @@ func New(mctx context.Context, sapi SealingAPI, fc config.MinerFeeConfig, events addrSel: addrSel, - terminator: NewTerminationBatcher(mctx, maddr, sapi, addrSel, fc, gc), - precommiter: NewPreCommitBatcher(mctx, maddr, sapi, addrSel, fc, gc), - commiter: NewCommitBatcher(mctx, maddr, sapi, addrSel, fc, gc, prov), + terminator: NewTerminationBatcher(mctx, maddr, sapi, addrSel, fc, gc), + commiter: NewCommitBatcher(mctx, maddr, sapi, addrSel, fc, gc, prov), getConfig: gc, @@ -270,6 +269,11 @@ func New(mctx context.Context, sapi SealingAPI, fc config.MinerFeeConfig, events byState: map[SectorState]int64{}, }, } + pc, err := NewPreCommitBatcher(mctx, maddr, sapi, addrSel, fc, gc) + if err != nil { + return nil, err + } + s.precommiter = pc s.notifee = func(before, after SectorInfo) { s.journal.RecordEvent(s.sealingEvtType, func() interface{} { @@ -287,7 +291,7 @@ func New(mctx context.Context, sapi SealingAPI, fc config.MinerFeeConfig, events s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) - return s + return s, nil } func (m *Sealing) Run(ctx context.Context) { From 8f891371fbbc350e1c34614dfc4427dddc4529f4 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Tue, 16 Jul 2024 10:03:41 +0400 Subject: [PATCH 2/5] add breaking API upgrade warning to the ChangeLog From bd8432d26b986ee5b11ad2e456da84d882399ff0 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Tue, 16 Jul 2024 12:26:53 +0400 Subject: [PATCH 3/5] NewCommitBatcher now has an additional error return to deal with errors arising from fetching the sealing config. --- storage/pipeline/commit_batch.go | 18 +++++++++--------- storage/pipeline/sealing.go | 10 +++++++--- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index a1431fbcca5..2e3f1a644dd 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -81,7 +81,7 @@ type CommitBatcher struct { lk sync.Mutex } -func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc, prov storiface.Prover) *CommitBatcher { +func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc, prov storiface.Prover) (*CommitBatcher, error) { b := &CommitBatcher{ api: api, maddr: maddr, @@ -101,20 +101,20 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat stopped: make(chan struct{}), } - go b.run() + cfg, err := b.getConfig() + if err != nil { + return nil, err + } + + go b.run(cfg) - return b + return b, nil } -func (b *CommitBatcher) run() { +func (b *CommitBatcher) run(cfg sealiface.Config) { var forceRes chan []sealiface.CommitBatchRes var lastMsg []sealiface.CommitBatchRes - cfg, err := b.getConfig() - if err != nil { - panic(err) - } - timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)) for { if forceRes != nil { diff --git a/storage/pipeline/sealing.go b/storage/pipeline/sealing.go index c111565efea..fe25e15e64b 100644 --- a/storage/pipeline/sealing.go +++ b/storage/pipeline/sealing.go @@ -258,9 +258,7 @@ func New(mctx context.Context, sapi SealingAPI, fc config.MinerFeeConfig, events addrSel: addrSel, terminator: NewTerminationBatcher(mctx, maddr, sapi, addrSel, fc, gc), - commiter: NewCommitBatcher(mctx, maddr, sapi, addrSel, fc, gc, prov), - - getConfig: gc, + getConfig: gc, legacySc: storedcounter.New(ds, datastore.NewKey(StorageCounterDSPrefix)), @@ -275,6 +273,12 @@ func New(mctx context.Context, sapi SealingAPI, fc config.MinerFeeConfig, events } s.precommiter = pc + cc, err := NewCommitBatcher(mctx, maddr, sapi, addrSel, fc, gc, prov) + if err != nil { + return nil, err + } + s.commiter = cc + s.notifee = func(before, after SectorInfo) { s.journal.RecordEvent(s.sealingEvtType, func() interface{} { return SealingStateEvt{ From c1331c7d2f257d360138a42544796f00ecb3c592 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 16 Jul 2024 12:31:51 +0400 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: Rod Vagg From 191d88eab84ff41dd4a690622d096e3bdc0a93df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 13 Aug 2024 14:34:08 +0200 Subject: [PATCH 5/5] fix: miner: Fix DDO pledge math (#12341) * Power is units of Space * Time so multiply by deal duration * fix: miner: Fix DDO pledge math * appease the changelog checker * Fix gen --------- Co-authored-by: zenground0 --- CHANGELOG.md | 1 - storage/pipeline/commit_batch.go | 21 +++++++++++++------ storage/pipeline/mocks/mock_commit_batcher.go | 15 ------------- storage/pipeline/pledge.go | 13 +++++++++--- storage/pipeline/sealing.go | 2 +- storage/pipeline/states_replica_update.go | 7 +++++++ storage/pipeline/states_sealing.go | 15 +++++++++++-- 7 files changed, 46 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f7d9f41c8b..a389a1ec3c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,6 @@ # Lotus changelog # 1.28.2 / 2024-08-15 - This is a Lotus patch release v1.28.2 for Node operators and Storage Providers. For node operators, this patch release fixes an issue where excessive bandwidth usage was caused by a routing loop in pubsub, where small "manifest" messages were cycling repeatedly around the network due to an ineffective routing loop prevention mechanism. diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index 2e3f1a644dd..ad767fea1cc 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -42,7 +42,6 @@ type CommitBatcherApi interface { ChainHead(ctx context.Context) (*types.TipSet, error) StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error) - StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (big.Int, error) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error) StateGetAllocation(ctx context.Context, clientAddr address.Address, allocationId verifregtypes.AllocationId, tsk types.TipSetKey) (*verifregtypes.Allocation, error) @@ -54,10 +53,16 @@ type CommitBatcherApi interface { StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) } +type PledgeApi interface { + sectorWeight(ctx context.Context, sector SectorInfo, expiration abi.ChainEpoch) (abi.StoragePower, error) + pledgeForPower(ctx context.Context, addedPower abi.StoragePower) (abi.TokenAmount, error) +} + type AggregateInput struct { - Spt abi.RegisteredSealProof - Info proof.AggregateSealVerifyInfo - Proof []byte + Spt abi.RegisteredSealProof + Info proof.AggregateSealVerifyInfo + Proof []byte + Weight abi.StoragePower ActivationManifest miner.SectorActivationManifest DealIDPrecommit bool @@ -65,6 +70,7 @@ type AggregateInput struct { type CommitBatcher struct { api CommitBatcherApi + pledgeApi PledgeApi maddr address.Address mctx context.Context addrSel AddressSelector @@ -81,7 +87,7 @@ type CommitBatcher struct { lk sync.Mutex } -func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc, prov storiface.Prover) (*CommitBatcher, error) { +func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc, prov storiface.Prover, pa PledgeApi) (*CommitBatcher, error) { b := &CommitBatcher{ api: api, maddr: maddr, @@ -90,6 +96,7 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat feeCfg: feeCfg, getConfig: getConfig, prover: prov, + pledgeApi: pa, cutoffs: map[abi.SectorNumber]time.Time{}, todo: map[abi.SectorNumber]AggregateInput{}, @@ -606,7 +613,7 @@ func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tsk types.TipSe return big.Zero(), xerrors.Errorf("precommit info not found on chain") } - collateral, err := b.api.StateMinerInitialPledgeCollateral(b.mctx, b.maddr, pci.Info, tsk) + collateral, err := b.pledgeApi.pledgeForPower(b.mctx, b.todo[sn].Weight) // b.maddr, pci.Info, tsk if err != nil { return big.Zero(), xerrors.Errorf("getting initial pledge collateral: %w", err) } @@ -616,6 +623,8 @@ func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tsk types.TipSe collateral = big.Zero() } + log.Infow("getSectorCollateral", "collateral", types.FIL(collateral), "sn", sn, "precommit", types.FIL(pci.PreCommitDeposit), "pledge", types.FIL(collateral), "weight", b.todo[sn].Weight) + return collateral, nil } func (b *CommitBatcher) aggregateProofType(nv network.Version) (abi.RegisteredAggregationProof, error) { diff --git a/storage/pipeline/mocks/mock_commit_batcher.go b/storage/pipeline/mocks/mock_commit_batcher.go index e2fce1eca01..aad1c332fb9 100644 --- a/storage/pipeline/mocks/mock_commit_batcher.go +++ b/storage/pipeline/mocks/mock_commit_batcher.go @@ -164,21 +164,6 @@ func (mr *MockCommitBatcherApiMockRecorder) StateMinerInfo(arg0, arg1, arg2 inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInfo", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateMinerInfo), arg0, arg1, arg2) } -// StateMinerInitialPledgeCollateral mocks base method. -func (m *MockCommitBatcherApi) StateMinerInitialPledgeCollateral(arg0 context.Context, arg1 address.Address, arg2 miner.SectorPreCommitInfo, arg3 types.TipSetKey) (big.Int, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StateMinerInitialPledgeCollateral", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].(big.Int) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// StateMinerInitialPledgeCollateral indicates an expected call of StateMinerInitialPledgeCollateral. -func (mr *MockCommitBatcherApiMockRecorder) StateMinerInitialPledgeCollateral(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInitialPledgeCollateral", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateMinerInitialPledgeCollateral), arg0, arg1, arg2, arg3) -} - // StateNetworkVersion mocks base method. func (m *MockCommitBatcherApi) StateNetworkVersion(arg0 context.Context, arg1 types.TipSetKey) (network.Version, error) { m.ctrl.T.Helper() diff --git a/storage/pipeline/pledge.go b/storage/pipeline/pledge.go index 04567fca1b2..7cdda9fa4cd 100644 --- a/storage/pipeline/pledge.go +++ b/storage/pipeline/pledge.go @@ -91,6 +91,11 @@ func (m *Sealing) sectorWeight(ctx context.Context, sector SectorInfo, expiratio // get verified deal infos var w, vw = big.Zero(), big.Zero() + now, err := m.Api.ChainHead(ctx) + if err != nil { + return abi.NewStoragePower(0), err + } + sectorDuration := big.NewInt(int64(expiration - now.Height())) for _, piece := range sector.Pieces { if !piece.HasDealInfo() { // todo StateMinerInitialPledgeCollateral doesn't add cc/padding to non-verified weight, is that correct? @@ -99,14 +104,16 @@ func (m *Sealing) sectorWeight(ctx context.Context, sector SectorInfo, expiratio alloc, err := piece.GetAllocation(ctx, m.Api, ts.Key()) if err != nil || alloc == nil { - w = big.Add(w, abi.NewStoragePower(int64(piece.Piece().Size))) + if err == nil { + log.Errorw("failed to get allocation", "error", err) + } + w = big.Add(w, big.Mul(sectorDuration, abi.NewStoragePower(int64(piece.Piece().Size)))) continue } - vw = big.Add(vw, abi.NewStoragePower(int64(piece.Piece().Size))) + vw = big.Add(vw, big.Mul(sectorDuration, abi.NewStoragePower(int64(piece.Piece().Size)))) } - // load market actor duration := expiration - ts.Height() sectorWeight := builtin.QAPowerForWeight(ssize, duration, w, vw) diff --git a/storage/pipeline/sealing.go b/storage/pipeline/sealing.go index fe25e15e64b..4ef0323ff1f 100644 --- a/storage/pipeline/sealing.go +++ b/storage/pipeline/sealing.go @@ -273,7 +273,7 @@ func New(mctx context.Context, sapi SealingAPI, fc config.MinerFeeConfig, events } s.precommiter = pc - cc, err := NewCommitBatcher(mctx, maddr, sapi, addrSel, fc, gc, prov) + cc, err := NewCommitBatcher(mctx, maddr, sapi, addrSel, fc, gc, prov, s) if err != nil { return nil, err } diff --git a/storage/pipeline/states_replica_update.go b/storage/pipeline/states_replica_update.go index 0c743f9db0a..f94c867c3e3 100644 --- a/storage/pipeline/states_replica_update.go +++ b/storage/pipeline/states_replica_update.go @@ -143,6 +143,13 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec return ctx.Send(SectorSubmitReplicaUpdateFailed{}) } + log.Infow("submitting replica update", + "sector", sector.SectorNumber, + "weight", types.FIL(weightUpdate), + "totalPledge", types.FIL(collateral), + "initialPledge", types.FIL(onChainInfo.InitialPledge), + "toPledge", types.FIL(big.Sub(collateral, onChainInfo.InitialPledge))) + collateral = big.Sub(collateral, onChainInfo.InitialPledge) if collateral.LessThan(big.Zero()) { collateral = big.Zero() diff --git a/storage/pipeline/states_sealing.go b/storage/pipeline/states_sealing.go index 5d6928fba88..d166ee890d3 100644 --- a/storage/pipeline/states_sealing.go +++ b/storage/pipeline/states_sealing.go @@ -780,6 +780,16 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S return err } + pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("getting precommit info: %w", err) + } + + weight, err := m.sectorWeight(ctx.Context(), sector, pci.Info.Expiration) + if err != nil { + return xerrors.Errorf("getting sector weight: %w", err) + } + res, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{ Info: proof.AggregateSealVerifyInfo{ Number: sector.SectorNumber, @@ -788,8 +798,9 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S SealedCID: *sector.CommR, UnsealedCID: *sector.CommD, }, - Proof: sector.Proof, - Spt: sector.SectorType, + Proof: sector.Proof, + Spt: sector.SectorType, + Weight: weight, ActivationManifest: miner2.SectorActivationManifest{ SectorNumber: sector.SectorNumber,