Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: lotus-miner: remove provecommit1 method #12251

Merged
merged 8 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- https://github.com/filecoin-project/lotus/pull/12221: Fix a nil reference panic in the ETH Trace API
- https://github.com/filecoin-project/lotus/pull/12112: Moved consts from build/ to build/buildconstants/ for ligher curio deps.
- https://github.com/filecoin-project/lotus/pull/12237: Upgrade to go-f3 `v0.0.4`.
- https://github.com/filecoin-project/lotus/pull/12251: Dropping support from ProveCommitSector1 method from lotus-miner
- https://github.com/filecoin-project/lotus/pull/12276: chore: deps: Update GST, Filecoin-FFI and Actors to final versions NV23
- https://github.com/filecoin-project/lotus/pull/12278: chore: Set Mainnet upgrade epoch for NV23.
- https://github.com/filecoin-project/lotus/pull/12269 Fix `logIndex` ordering in `EthGetTransactionReceipt` by using the EventIndex to fetch logs
Expand Down
338 changes: 20 additions & 318 deletions storage/pipeline/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
"sync"
"time"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
actorstypes "github.com/filecoin-project/go-state-types/actors"
"github.com/filecoin-project/go-state-types/big"
Expand Down Expand Up @@ -215,7 +213,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
return nil, nil
}

var res, resV1 []sealiface.CommitBatchRes
var res []sealiface.CommitBatchRes

ts, err := b.api.ChainHead(b.mctx)
if err != nil {
Expand Down Expand Up @@ -243,67 +241,22 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
}
}

if nv >= MinDDONetworkVersion {
// After nv21, we have a new ProveCommitSectors2 method, which supports
// batching without aggregation, but it doesn't support onboarding
// sectors which were precommitted with DealIDs in the precommit message.
// We prefer it for all other sectors, so first we use the new processBatchV2

var sectors []abi.SectorNumber
for sn := range b.todo {
sectors = append(sectors, sn)
}
res, err = b.processBatchV2(cfg, sectors, nv, !individual)
if err != nil {
err = xerrors.Errorf("processBatchV2: %w", err)
}

// Mark sectors as done
for _, r := range res {
if err != nil {
r.Error = err.Error()
}

for _, sn := range r.Sectors {
for _, ch := range b.waiting[sn] {
ch <- r // buffered
}

delete(b.waiting, sn)
delete(b.todo, sn)
delete(b.cutoffs, sn)
}
}
}

if err != nil {
log.Warnf("CommitBatcher maybeStartBatch processBatch-ddo %v", err)
}
// After nv21, we have a new ProveCommitSectors2 method, which supports
// batching without aggregation, but it doesn't support onboarding
// sectors which were precommitted with DealIDs in the precommit message.
// We prefer it for all other sectors, so first we use the new processBatchV2

if err != nil && len(res) == 0 {
return nil, err
}

if individual {
rjan90 marked this conversation as resolved.
Show resolved Hide resolved
resV1, err = b.processIndividually(cfg)
} else {
var sectors []abi.SectorNumber
for sn := range b.todo {
sectors = append(sectors, sn)
}
resV1, err = b.processBatchV1(cfg, sectors, nv)
var sectors []abi.SectorNumber
for sn := range b.todo {
sectors = append(sectors, sn)
}

res, err = b.processBatchV2(cfg, sectors, nv, !individual)
if err != nil {
log.Warnf("CommitBatcher maybeStartBatch individual:%v processBatch %v", individual, err)
err = xerrors.Errorf("processBatchV2: %w", err)
}

if err != nil && len(resV1) == 0 {
return nil, err
}

// Mark the rest as processed
for _, r := range resV1 {
// Mark sectors as done
for _, r := range res {
if err != nil {
r.Error = err.Error()
}
Expand All @@ -319,7 +272,13 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
}
}

res = append(res, resV1...)
if err != nil {
log.Warnf("CommitBatcher maybeStartBatch processBatch-ddo %v", err)
}

if err != nil && len(res) == 0 {
return nil, err
}

return res, nil
}
Expand Down Expand Up @@ -357,7 +316,6 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto

for _, sector := range sectors {
if b.todo[sector].DealIDPrecommit {
// can't process sectors precommitted with deal IDs with ProveCommitSectors2
continue
}

Expand Down Expand Up @@ -499,263 +457,7 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto
return []sealiface.CommitBatchRes{res}, nil
}

// processBatchV1 processes a batch of sectors before nv22. It always sends out an aggregate message.
func (b *CommitBatcher) processBatchV1(cfg sealiface.Config, sectors []abi.SectorNumber, nv network.Version) ([]sealiface.CommitBatchRes, error) {
ts, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}

total := len(sectors)

res := sealiface.CommitBatchRes{
FailedSectors: map[abi.SectorNumber]string{},
}

params := miner.ProveCommitAggregateParams{
SectorNumbers: bitfield.New(),
}

proofs := make([][]byte, 0, total)
infos := make([]proof.AggregateSealVerifyInfo, 0, total)
collateral := big.Zero()

for _, sector := range sectors {
res.Sectors = append(res.Sectors, sector)

sc, err := b.getSectorCollateral(sector, ts.Key())
if err != nil {
res.FailedSectors[sector] = err.Error()
continue
}

collateral = big.Add(collateral, sc)

params.SectorNumbers.Set(uint64(sector))
infos = append(infos, b.todo[sector].Info)
}

if len(infos) == 0 {
return nil, nil
}

sort.Slice(infos, func(i, j int) bool {
return infos[i].Number < infos[j].Number
})

for _, info := range infos {
proofs = append(proofs, b.todo[info.Number].Proof)
}

mid, err := address.IDFromAddress(b.maddr)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err)
}

arp, err := b.aggregateProofType(nv)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate proof type: %w", err)
}

params.AggregateProof, err = b.prover.AggregateSealProofs(proof.AggregateSealVerifyProofAndInfos{
Miner: abi.ActorID(mid),
SealProof: b.todo[infos[0].Number].Spt,
AggregateProof: arp,
Infos: infos,
}, proofs)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err)
}

enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err)
}

mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
}

maxFee := b.feeCfg.MaxCommitBatchGasFee.FeeForSectors(len(infos))

aggFeeRaw, err := policy.AggregateProveCommitNetworkFee(nv, len(infos), ts.MinTicketBlock().ParentBaseFee)
if err != nil {
res.Error = err.Error()
log.Errorf("getting aggregate commit network fee: %s", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate commit network fee: %s", err)
}

aggFee := big.Div(big.Mul(aggFeeRaw, aggFeeNum), aggFeeDen)

needFunds := big.Add(collateral, aggFee)
needFunds, err = collateralSendAmount(b.mctx, b.api, b.maddr, cfg, needFunds)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, err
}

goodFunds := big.Add(maxFee, needFunds)

from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, needFunds)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
}

_, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())

if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) {
log.Errorf("simulating CommitBatch %s", err)
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch %w", err)
}

// If we're out of gas, split the batch in half and evaluate again
if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) {
log.Warnf("CommitAggregate message ran out of gas, splitting batch in half and trying again (sectors: %d)", len(sectors))
mid := len(sectors) / 2
ret0, _ := b.processBatchV1(cfg, sectors[:mid], nv)
ret1, _ := b.processBatchV1(cfg, sectors[mid:], nv)

return append(ret0, ret1...), nil
}

mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
}

res.Msg = &mcid

log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))

return []sealiface.CommitBatchRes{res}, nil
}

func (b *CommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) {

mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
}

avail := types.TotalFilecoinInt

if cfg.CollateralFromMinerBalance && !cfg.DisableCollateralFallback {
avail, err = b.api.StateMinerAvailableBalance(b.mctx, b.maddr, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting available miner balance: %w", err)
}

avail = big.Sub(avail, cfg.AvailableBalanceBuffer)
if avail.LessThan(big.Zero()) {
avail = big.Zero()
}
}

ts, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}

var res []sealiface.CommitBatchRes

sectorsProcessed := 0

for sn, info := range b.todo {
r := sealiface.CommitBatchRes{
Sectors: []abi.SectorNumber{sn},
FailedSectors: map[abi.SectorNumber]string{},
}

if cfg.MaxSectorProveCommitsSubmittedPerEpoch > 0 &&
uint64(sectorsProcessed) >= cfg.MaxSectorProveCommitsSubmittedPerEpoch {

tmp := ts
for tmp.Height() <= ts.Height() {
tmp, err = b.api.ChainHead(b.mctx)
if err != nil {
log.Errorf("getting chain head: %+v", err)
return nil, err
}
time.Sleep(3 * time.Second)
}

sectorsProcessed = 0
ts = tmp
}

mcid, err := b.processSingle(cfg, mi, &avail, sn, info, ts.Key())
if err != nil {
log.Errorf("process single error: %+v", err) // todo: return to user
r.FailedSectors[sn] = err.Error()
} else {
r.Msg = &mcid
}

res = append(res, r)

sectorsProcessed++
}

return res, nil
}

func (b *CommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo, avail *abi.TokenAmount, sn abi.SectorNumber, info AggregateInput, tsk types.TipSetKey) (cid.Cid, error) {
return b.processSingleV1(cfg, mi, avail, sn, info, tsk)
}

func (b *CommitBatcher) processSingleV1(cfg sealiface.Config, mi api.MinerInfo, avail *abi.TokenAmount, sn abi.SectorNumber, info AggregateInput, tsk types.TipSetKey) (cid.Cid, error) {
enc := new(bytes.Buffer)
params := &miner.ProveCommitSectorParams{
SectorNumber: sn,
Proof: info.Proof,
}

if err := params.MarshalCBOR(enc); err != nil {
return cid.Undef, xerrors.Errorf("marshaling commit params: %w", err)
}

collateral, err := b.getSectorCollateral(sn, tsk)
if err != nil {
return cid.Undef, err
}

if cfg.CollateralFromMinerBalance {
c := big.Sub(collateral, *avail)
*avail = big.Sub(*avail, collateral)
collateral = c

if collateral.LessThan(big.Zero()) {
collateral = big.Zero()
}
if (*avail).LessThan(big.Zero()) {
*avail = big.Zero()
}
}

goodFunds := big.Add(collateral, big.Int(b.feeCfg.MaxCommitGasFee))

from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, collateral)
if err != nil {
return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err)
}

mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, big.Int(b.feeCfg.MaxCommitGasFee), enc.Bytes())
if err != nil {
return cid.Undef, xerrors.Errorf("pushing message to mpool: %w", err)
}

return mcid, nil
}

// AddCommit registers a commit, waits for batch message, returns message CID
// AddCommit registers commit, wait for batch message, return message CID
func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface.CommitBatchRes, err error) {
sn := s.SectorNumber

Expand Down
Loading