Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: backport pledge fixes to 1.28.2 #12394

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Lotus changelog

# 1.28.2 / 2024-08-15

This is a Lotus patch release v1.28.2 for Node operators and Storage Providers.

For node operators, this patch release fixes an issue where excessive bandwidth usage was caused by a routing loop in pubsub, where small "manifest" messages were cycling repeatedly around the network due to an ineffective routing loop prevention mechanism.
Expand Down
3 changes: 3 additions & 0 deletions itests/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@
build.BlockGasLimit = originalLimit
}()

t.Logf("BlockGasLimit changed: %d", buildconstants.BlockGasLimit)

Check failure on line 175 in itests/api_test.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

undefined: buildconstants (typecheck)

Check failure on line 175 in itests/api_test.go

View workflow job for this annotation

GitHub Actions / Test (itest-api)

undefined: buildconstants

Check failure on line 175 in itests/api_test.go

View workflow job for this annotation

GitHub Actions / Test (itest-api)

undefined: buildconstants

msg := &types.Message{
From: senderAddr,
To: senderAddr,
Expand Down Expand Up @@ -288,6 +290,7 @@
ctx := context.Background()

full, genesisMiner, ens := kit.EnsembleMinimal(t, append(ts.opts, kit.MockProofs())...)

ens.InterconnectAll().BeginMining(4 * time.Millisecond)

time.Sleep(1 * time.Second)
Expand Down
5 changes: 4 additions & 1 deletion node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,10 @@ func SealingPipeline(fc config.MinerFeeConfig) func(params SealingPipelineParams
provingBuffer := md.WPoStProvingPeriod * 2
pcp := sealing.NewBasicPreCommitPolicy(api, gsd, provingBuffer)

pipeline := sealing.New(ctx, api, fc, evts, maddr, ds, sealer, verif, prover, &pcp, gsd, j, as)
pipeline, err := sealing.New(ctx, api, fc, evts, maddr, ds, sealer, verif, prover, &pcp, gsd, j, as)
if err != nil {
return nil, xerrors.Errorf("creating sealing pipeline: %w", err)
}

lc.Append(fx.Hook{
OnStart: func(context.Context) error {
Expand Down
37 changes: 23 additions & 14 deletions storage/pipeline/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type CommitBatcherApi interface {
ChainHead(ctx context.Context) (*types.TipSet, error)

StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error)
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (big.Int, error)
StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error)
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error)
StateGetAllocation(ctx context.Context, clientAddr address.Address, allocationId verifregtypes.AllocationId, tsk types.TipSetKey) (*verifregtypes.Allocation, error)
Expand All @@ -54,17 +53,24 @@ type CommitBatcherApi interface {
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
}

type PledgeApi interface {
sectorWeight(ctx context.Context, sector SectorInfo, expiration abi.ChainEpoch) (abi.StoragePower, error)
pledgeForPower(ctx context.Context, addedPower abi.StoragePower) (abi.TokenAmount, error)
}

type AggregateInput struct {
Spt abi.RegisteredSealProof
Info proof.AggregateSealVerifyInfo
Proof []byte
Spt abi.RegisteredSealProof
Info proof.AggregateSealVerifyInfo
Proof []byte
Weight abi.StoragePower

ActivationManifest miner.SectorActivationManifest
DealIDPrecommit bool
}

type CommitBatcher struct {
api CommitBatcherApi
pledgeApi PledgeApi
maddr address.Address
mctx context.Context
addrSel AddressSelector
Expand All @@ -81,7 +87,7 @@ type CommitBatcher struct {
lk sync.Mutex
}

func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc, prov storiface.Prover) *CommitBatcher {
func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc, prov storiface.Prover, pa PledgeApi) (*CommitBatcher, error) {
b := &CommitBatcher{
api: api,
maddr: maddr,
Expand All @@ -90,6 +96,7 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
feeCfg: feeCfg,
getConfig: getConfig,
prover: prov,
pledgeApi: pa,

cutoffs: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]AggregateInput{},
Expand All @@ -101,20 +108,20 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
stopped: make(chan struct{}),
}

go b.run()
cfg, err := b.getConfig()
if err != nil {
return nil, err
}

go b.run(cfg)

return b
return b, nil
}

func (b *CommitBatcher) run() {
func (b *CommitBatcher) run(cfg sealiface.Config) {
var forceRes chan []sealiface.CommitBatchRes
var lastMsg []sealiface.CommitBatchRes

cfg, err := b.getConfig()
if err != nil {
panic(err)
}

timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
for {
if forceRes != nil {
Expand Down Expand Up @@ -606,7 +613,7 @@ func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tsk types.TipSe
return big.Zero(), xerrors.Errorf("precommit info not found on chain")
}

collateral, err := b.api.StateMinerInitialPledgeCollateral(b.mctx, b.maddr, pci.Info, tsk)
collateral, err := b.pledgeApi.pledgeForPower(b.mctx, b.todo[sn].Weight) // b.maddr, pci.Info, tsk
if err != nil {
return big.Zero(), xerrors.Errorf("getting initial pledge collateral: %w", err)
}
Expand All @@ -616,6 +623,8 @@ func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tsk types.TipSe
collateral = big.Zero()
}

log.Infow("getSectorCollateral", "collateral", types.FIL(collateral), "sn", sn, "precommit", types.FIL(pci.PreCommitDeposit), "pledge", types.FIL(collateral), "weight", b.todo[sn].Weight)

return collateral, nil
}
func (b *CommitBatcher) aggregateProofType(nv network.Version) (abi.RegisteredAggregationProof, error) {
Expand Down
15 changes: 0 additions & 15 deletions storage/pipeline/mocks/mock_commit_batcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions storage/pipeline/pledge.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func (m *Sealing) sectorWeight(ctx context.Context, sector SectorInfo, expiratio
// get verified deal infos
var w, vw = big.Zero(), big.Zero()

now, err := m.Api.ChainHead(ctx)
if err != nil {
return abi.NewStoragePower(0), err
}
sectorDuration := big.NewInt(int64(expiration - now.Height()))
for _, piece := range sector.Pieces {
if !piece.HasDealInfo() {
// todo StateMinerInitialPledgeCollateral doesn't add cc/padding to non-verified weight, is that correct?
Expand All @@ -99,14 +104,16 @@ func (m *Sealing) sectorWeight(ctx context.Context, sector SectorInfo, expiratio

alloc, err := piece.GetAllocation(ctx, m.Api, ts.Key())
if err != nil || alloc == nil {
w = big.Add(w, abi.NewStoragePower(int64(piece.Piece().Size)))
if err == nil {
log.Errorw("failed to get allocation", "error", err)
}
w = big.Add(w, big.Mul(sectorDuration, abi.NewStoragePower(int64(piece.Piece().Size))))
continue
}

vw = big.Add(vw, abi.NewStoragePower(int64(piece.Piece().Size)))
vw = big.Add(vw, big.Mul(sectorDuration, abi.NewStoragePower(int64(piece.Piece().Size))))
}

// load market actor
duration := expiration - ts.Height()
sectorWeight := builtin.QAPowerForWeight(ssize, duration, w, vw)

Expand Down
18 changes: 9 additions & 9 deletions storage/pipeline/precommit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type PreCommitBatcher struct {
lk sync.Mutex
}

func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc) *PreCommitBatcher {
func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc) (*PreCommitBatcher, error) {
b := &PreCommitBatcher{
api: api,
maddr: maddr,
Expand All @@ -86,20 +86,20 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom
stopped: make(chan struct{}),
}

go b.run()
cfg, err := b.getConfig()
if err != nil {
return nil, xerrors.Errorf("failed to get sealer config: %w", err)
}

go b.run(cfg)

return b
return b, nil
}

func (b *PreCommitBatcher) run() {
func (b *PreCommitBatcher) run(cfg sealiface.Config) {
var forceRes chan []sealiface.PreCommitBatchRes
var lastRes []sealiface.PreCommitBatchRes

cfg, err := b.getConfig()
if err != nil {
panic(err)
}

timer := time.NewTimer(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack))
for {
if forceRes != nil {
Expand Down
22 changes: 15 additions & 7 deletions storage/pipeline/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ type pendingPiece struct {
accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error)
}

func New(mctx context.Context, sapi SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc dtypes.GetSealingConfigFunc, journal journal.Journal, addrSel AddressSelector) *Sealing {
func New(mctx context.Context, sapi SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc dtypes.GetSealingConfigFunc, journal journal.Journal, addrSel AddressSelector) (*Sealing, error) {
s := &Sealing{
Api: sapi,
DealInfo: &CurrentDealInfoManager{sapi},
Expand All @@ -257,11 +257,8 @@ func New(mctx context.Context, sapi SealingAPI, fc config.MinerFeeConfig, events

addrSel: addrSel,

terminator: NewTerminationBatcher(mctx, maddr, sapi, addrSel, fc, gc),
precommiter: NewPreCommitBatcher(mctx, maddr, sapi, addrSel, fc, gc),
commiter: NewCommitBatcher(mctx, maddr, sapi, addrSel, fc, gc, prov),

getConfig: gc,
terminator: NewTerminationBatcher(mctx, maddr, sapi, addrSel, fc, gc),
getConfig: gc,

legacySc: storedcounter.New(ds, datastore.NewKey(StorageCounterDSPrefix)),

Expand All @@ -270,6 +267,17 @@ func New(mctx context.Context, sapi SealingAPI, fc config.MinerFeeConfig, events
byState: map[SectorState]int64{},
},
}
pc, err := NewPreCommitBatcher(mctx, maddr, sapi, addrSel, fc, gc)
if err != nil {
return nil, err
}
s.precommiter = pc

cc, err := NewCommitBatcher(mctx, maddr, sapi, addrSel, fc, gc, prov, s)
if err != nil {
return nil, err
}
s.commiter = cc

s.notifee = func(before, after SectorInfo) {
s.journal.RecordEvent(s.sealingEvtType, func() interface{} {
Expand All @@ -287,7 +295,7 @@ func New(mctx context.Context, sapi SealingAPI, fc config.MinerFeeConfig, events

s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})

return s
return s, nil
}

func (m *Sealing) Run(ctx context.Context) {
Expand Down
7 changes: 7 additions & 0 deletions storage/pipeline/states_replica_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec
return ctx.Send(SectorSubmitReplicaUpdateFailed{})
}

log.Infow("submitting replica update",
"sector", sector.SectorNumber,
"weight", types.FIL(weightUpdate),
"totalPledge", types.FIL(collateral),
"initialPledge", types.FIL(onChainInfo.InitialPledge),
"toPledge", types.FIL(big.Sub(collateral, onChainInfo.InitialPledge)))

collateral = big.Sub(collateral, onChainInfo.InitialPledge)
if collateral.LessThan(big.Zero()) {
collateral = big.Zero()
Expand Down
15 changes: 13 additions & 2 deletions storage/pipeline/states_sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,16 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
return err
}

pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("getting precommit info: %w", err)
}

weight, err := m.sectorWeight(ctx.Context(), sector, pci.Info.Expiration)
if err != nil {
return xerrors.Errorf("getting sector weight: %w", err)
}

res, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{
Info: proof.AggregateSealVerifyInfo{
Number: sector.SectorNumber,
Expand All @@ -788,8 +798,9 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
SealedCID: *sector.CommR,
UnsealedCID: *sector.CommD,
},
Proof: sector.Proof,
Spt: sector.SectorType,
Proof: sector.Proof,
Spt: sector.SectorType,
Weight: weight,

ActivationManifest: miner2.SectorActivationManifest{
SectorNumber: sector.SectorNumber,
Expand Down
Loading