From 8aaddfef06ee67fb33f874dd716856161388e3a5 Mon Sep 17 00:00:00 2001 From: LexLuthr <88259624+LexLuthr@users.noreply.github.com> Date: Wed, 24 Jul 2024 21:02:29 +0400 Subject: [PATCH] fix: lotus-miner: remove provecommit1 method (#12251) * remove provecommit1 * add changelog * update precommit and commit params * fix lint error * fix commit params --- storage/pipeline/commit_batch.go | 338 ++-------------------- storage/pipeline/states_replica_update.go | 74 +---- storage/pipeline/states_sealing.go | 165 +++-------- 3 files changed, 79 insertions(+), 498 deletions(-) diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index ba09ef38f50..a1431fbcca5 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -7,11 +7,9 @@ import ( "sync" "time" - "github.com/ipfs/go-cid" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" actorstypes "github.com/filecoin-project/go-state-types/actors" "github.com/filecoin-project/go-state-types/big" @@ -215,7 +213,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, return nil, nil } - var res, resV1 []sealiface.CommitBatchRes + var res []sealiface.CommitBatchRes ts, err := b.api.ChainHead(b.mctx) if err != nil { @@ -243,67 +241,22 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, } } - if nv >= MinDDONetworkVersion { - // After nv21, we have a new ProveCommitSectors2 method, which supports - // batching without aggregation, but it doesn't support onboarding - // sectors which were precommitted with DealIDs in the precommit message. - // We prefer it for all other sectors, so first we use the new processBatchV2 - - var sectors []abi.SectorNumber - for sn := range b.todo { - sectors = append(sectors, sn) - } - res, err = b.processBatchV2(cfg, sectors, nv, !individual) - if err != nil { - err = xerrors.Errorf("processBatchV2: %w", err) - } - - // Mark sectors as done - for _, r := range res { - if err != nil { - r.Error = err.Error() - } - - for _, sn := range r.Sectors { - for _, ch := range b.waiting[sn] { - ch <- r // buffered - } - - delete(b.waiting, sn) - delete(b.todo, sn) - delete(b.cutoffs, sn) - } - } - } - - if err != nil { - log.Warnf("CommitBatcher maybeStartBatch processBatch-ddo %v", err) - } + // After nv21, we have a new ProveCommitSectors2 method, which supports + // batching without aggregation, but it doesn't support onboarding + // sectors which were precommitted with DealIDs in the precommit message. + // We prefer it for all other sectors, so first we use the new processBatchV2 - if err != nil && len(res) == 0 { - return nil, err - } - - if individual { - resV1, err = b.processIndividually(cfg) - } else { - var sectors []abi.SectorNumber - for sn := range b.todo { - sectors = append(sectors, sn) - } - resV1, err = b.processBatchV1(cfg, sectors, nv) + var sectors []abi.SectorNumber + for sn := range b.todo { + sectors = append(sectors, sn) } - + res, err = b.processBatchV2(cfg, sectors, nv, !individual) if err != nil { - log.Warnf("CommitBatcher maybeStartBatch individual:%v processBatch %v", individual, err) + err = xerrors.Errorf("processBatchV2: %w", err) } - if err != nil && len(resV1) == 0 { - return nil, err - } - - // Mark the rest as processed - for _, r := range resV1 { + // Mark sectors as done + for _, r := range res { if err != nil { r.Error = err.Error() } @@ -319,7 +272,13 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, } } - res = append(res, resV1...) + if err != nil { + log.Warnf("CommitBatcher maybeStartBatch processBatch-ddo %v", err) + } + + if err != nil && len(res) == 0 { + return nil, err + } return res, nil } @@ -357,7 +316,6 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto for _, sector := range sectors { if b.todo[sector].DealIDPrecommit { - // can't process sectors precommitted with deal IDs with ProveCommitSectors2 continue } @@ -499,263 +457,7 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto return []sealiface.CommitBatchRes{res}, nil } -// processBatchV1 processes a batch of sectors before nv22. It always sends out an aggregate message. -func (b *CommitBatcher) processBatchV1(cfg sealiface.Config, sectors []abi.SectorNumber, nv network.Version) ([]sealiface.CommitBatchRes, error) { - ts, err := b.api.ChainHead(b.mctx) - if err != nil { - return nil, err - } - - total := len(sectors) - - res := sealiface.CommitBatchRes{ - FailedSectors: map[abi.SectorNumber]string{}, - } - - params := miner.ProveCommitAggregateParams{ - SectorNumbers: bitfield.New(), - } - - proofs := make([][]byte, 0, total) - infos := make([]proof.AggregateSealVerifyInfo, 0, total) - collateral := big.Zero() - - for _, sector := range sectors { - res.Sectors = append(res.Sectors, sector) - - sc, err := b.getSectorCollateral(sector, ts.Key()) - if err != nil { - res.FailedSectors[sector] = err.Error() - continue - } - - collateral = big.Add(collateral, sc) - - params.SectorNumbers.Set(uint64(sector)) - infos = append(infos, b.todo[sector].Info) - } - - if len(infos) == 0 { - return nil, nil - } - - sort.Slice(infos, func(i, j int) bool { - return infos[i].Number < infos[j].Number - }) - - for _, info := range infos { - proofs = append(proofs, b.todo[info.Number].Proof) - } - - mid, err := address.IDFromAddress(b.maddr) - if err != nil { - res.Error = err.Error() - return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err) - } - - arp, err := b.aggregateProofType(nv) - if err != nil { - res.Error = err.Error() - return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate proof type: %w", err) - } - - params.AggregateProof, err = b.prover.AggregateSealProofs(proof.AggregateSealVerifyProofAndInfos{ - Miner: abi.ActorID(mid), - SealProof: b.todo[infos[0].Number].Spt, - AggregateProof: arp, - Infos: infos, - }, proofs) - if err != nil { - res.Error = err.Error() - return []sealiface.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err) - } - - enc := new(bytes.Buffer) - if err := params.MarshalCBOR(enc); err != nil { - res.Error = err.Error() - return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err) - } - - mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK) - if err != nil { - res.Error = err.Error() - return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err) - } - - maxFee := b.feeCfg.MaxCommitBatchGasFee.FeeForSectors(len(infos)) - - aggFeeRaw, err := policy.AggregateProveCommitNetworkFee(nv, len(infos), ts.MinTicketBlock().ParentBaseFee) - if err != nil { - res.Error = err.Error() - log.Errorf("getting aggregate commit network fee: %s", err) - return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate commit network fee: %s", err) - } - - aggFee := big.Div(big.Mul(aggFeeRaw, aggFeeNum), aggFeeDen) - - needFunds := big.Add(collateral, aggFee) - needFunds, err = collateralSendAmount(b.mctx, b.api, b.maddr, cfg, needFunds) - if err != nil { - res.Error = err.Error() - return []sealiface.CommitBatchRes{res}, err - } - - goodFunds := big.Add(maxFee, needFunds) - - from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, needFunds) - if err != nil { - res.Error = err.Error() - return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) - } - - _, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes()) - - if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) { - log.Errorf("simulating CommitBatch %s", err) - res.Error = err.Error() - return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch %w", err) - } - - // If we're out of gas, split the batch in half and evaluate again - if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) { - log.Warnf("CommitAggregate message ran out of gas, splitting batch in half and trying again (sectors: %d)", len(sectors)) - mid := len(sectors) / 2 - ret0, _ := b.processBatchV1(cfg, sectors[:mid], nv) - ret1, _ := b.processBatchV1(cfg, sectors[mid:], nv) - - return append(ret0, ret1...), nil - } - - mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes()) - if err != nil { - return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) - } - - res.Msg = &mcid - - log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos)) - - return []sealiface.CommitBatchRes{res}, nil -} - -func (b *CommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) { - - mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK) - if err != nil { - return nil, xerrors.Errorf("couldn't get miner info: %w", err) - } - - avail := types.TotalFilecoinInt - - if cfg.CollateralFromMinerBalance && !cfg.DisableCollateralFallback { - avail, err = b.api.StateMinerAvailableBalance(b.mctx, b.maddr, types.EmptyTSK) - if err != nil { - return nil, xerrors.Errorf("getting available miner balance: %w", err) - } - - avail = big.Sub(avail, cfg.AvailableBalanceBuffer) - if avail.LessThan(big.Zero()) { - avail = big.Zero() - } - } - - ts, err := b.api.ChainHead(b.mctx) - if err != nil { - return nil, err - } - - var res []sealiface.CommitBatchRes - - sectorsProcessed := 0 - - for sn, info := range b.todo { - r := sealiface.CommitBatchRes{ - Sectors: []abi.SectorNumber{sn}, - FailedSectors: map[abi.SectorNumber]string{}, - } - - if cfg.MaxSectorProveCommitsSubmittedPerEpoch > 0 && - uint64(sectorsProcessed) >= cfg.MaxSectorProveCommitsSubmittedPerEpoch { - - tmp := ts - for tmp.Height() <= ts.Height() { - tmp, err = b.api.ChainHead(b.mctx) - if err != nil { - log.Errorf("getting chain head: %+v", err) - return nil, err - } - time.Sleep(3 * time.Second) - } - - sectorsProcessed = 0 - ts = tmp - } - - mcid, err := b.processSingle(cfg, mi, &avail, sn, info, ts.Key()) - if err != nil { - log.Errorf("process single error: %+v", err) // todo: return to user - r.FailedSectors[sn] = err.Error() - } else { - r.Msg = &mcid - } - - res = append(res, r) - - sectorsProcessed++ - } - - return res, nil -} - -func (b *CommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo, avail *abi.TokenAmount, sn abi.SectorNumber, info AggregateInput, tsk types.TipSetKey) (cid.Cid, error) { - return b.processSingleV1(cfg, mi, avail, sn, info, tsk) -} - -func (b *CommitBatcher) processSingleV1(cfg sealiface.Config, mi api.MinerInfo, avail *abi.TokenAmount, sn abi.SectorNumber, info AggregateInput, tsk types.TipSetKey) (cid.Cid, error) { - enc := new(bytes.Buffer) - params := &miner.ProveCommitSectorParams{ - SectorNumber: sn, - Proof: info.Proof, - } - - if err := params.MarshalCBOR(enc); err != nil { - return cid.Undef, xerrors.Errorf("marshaling commit params: %w", err) - } - - collateral, err := b.getSectorCollateral(sn, tsk) - if err != nil { - return cid.Undef, err - } - - if cfg.CollateralFromMinerBalance { - c := big.Sub(collateral, *avail) - *avail = big.Sub(*avail, collateral) - collateral = c - - if collateral.LessThan(big.Zero()) { - collateral = big.Zero() - } - if (*avail).LessThan(big.Zero()) { - *avail = big.Zero() - } - } - - goodFunds := big.Add(collateral, big.Int(b.feeCfg.MaxCommitGasFee)) - - from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, collateral) - if err != nil { - return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err) - } - - mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, big.Int(b.feeCfg.MaxCommitGasFee), enc.Bytes()) - if err != nil { - return cid.Undef, xerrors.Errorf("pushing message to mpool: %w", err) - } - - return mcid, nil -} - -// register commit, wait for batch message, return message CID +// AddCommit registers commit, wait for batch message, return message CID func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface.CommitBatchRes, err error) { sn := s.SectorNumber diff --git a/storage/pipeline/states_replica_update.go b/storage/pipeline/states_replica_update.go index 85e3fabee79..0c743f9db0a 100644 --- a/storage/pipeline/states_replica_update.go +++ b/storage/pipeline/states_replica_update.go @@ -11,7 +11,6 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/exitcode" - "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/lotus/api" @@ -169,70 +168,28 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec return ctx.Send(SectorSubmitReplicaUpdateFailed{}) } - // figure out message type - - nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key()) - if err != nil { - log.Errorf("failed to get network version: %+v", err) - return ctx.Send(SectorSubmitReplicaUpdateFailed{}) - } - - pams, deals, err := m.processPieces(ctx.Context(), sector, nv >= network.Version22) + pams, err := m.processPieces(ctx.Context(), sector) if err != nil { log.Errorf("failed to process pieces: %+v", err) return ctx.Send(SectorSubmitReplicaUpdateFailed{}) } - if len(pams) > 0 { - // PRU3 - - params := &miner.ProveReplicaUpdates3Params{ - SectorUpdates: []miner.SectorUpdateManifest{ - { - Sector: sector.SectorNumber, - Deadline: sl.Deadline, - Partition: sl.Partition, - NewSealedCID: *sector.UpdateSealed, - Pieces: pams, - }, - }, - SectorProofs: [][]byte{sector.ReplicaUpdateProof}, - UpdateProofsType: updateProof, - //AggregateProof - //AggregateProofType - RequireActivationSuccess: cfg.RequireActivationSuccessUpdate, - RequireNotificationSuccess: cfg.RequireNotificationSuccessUpdate, - } - - enc := new(bytes.Buffer) - if err := params.MarshalCBOR(enc); err != nil { - log.Errorf("failed to serialize update replica params: %w", err) - return ctx.Send(SectorSubmitReplicaUpdateFailed{}) - } - - mcid, err := sendMsg(ctx.Context(), m.Api, from, m.maddr, builtin.MethodsMiner.ProveReplicaUpdates3, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes()) - if err != nil { - log.Errorf("handleSubmitReplicaUpdate: error sending message: %+v", err) - return ctx.Send(SectorSubmitReplicaUpdateFailed{}) - } - - return ctx.Send(SectorReplicaUpdateSubmitted{Message: mcid}) - } - - // PRU2 - params := &miner.ProveReplicaUpdatesParams2{ - Updates: []miner.ReplicaUpdate2{ + params := &miner.ProveReplicaUpdates3Params{ + SectorUpdates: []miner.SectorUpdateManifest{ { - SectorID: sector.SectorNumber, - Deadline: sl.Deadline, - Partition: sl.Partition, - NewSealedSectorCID: *sector.UpdateSealed, - NewUnsealedSectorCID: *sector.UpdateUnsealed, - UpdateProofType: updateProof, - ReplicaProof: sector.ReplicaUpdateProof, - Deals: deals, + Sector: sector.SectorNumber, + Deadline: sl.Deadline, + Partition: sl.Partition, + NewSealedCID: *sector.UpdateSealed, + Pieces: pams, }, }, + SectorProofs: [][]byte{sector.ReplicaUpdateProof}, + UpdateProofsType: updateProof, + //AggregateProof + //AggregateProofType + RequireActivationSuccess: cfg.RequireActivationSuccessUpdate, + RequireNotificationSuccess: cfg.RequireNotificationSuccessUpdate, } enc := new(bytes.Buffer) @@ -241,14 +198,13 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec return ctx.Send(SectorSubmitReplicaUpdateFailed{}) } - mcid, err := sendMsg(ctx.Context(), m.Api, from, m.maddr, builtin.MethodsMiner.ProveReplicaUpdates2, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes()) + mcid, err := sendMsg(ctx.Context(), m.Api, from, m.maddr, builtin.MethodsMiner.ProveReplicaUpdates3, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes()) if err != nil { log.Errorf("handleSubmitReplicaUpdate: error sending message: %+v", err) return ctx.Send(SectorSubmitReplicaUpdateFailed{}) } return ctx.Send(SectorReplicaUpdateSubmitted{Message: mcid}) - } func (m *Sealing) handleWaitMutable(ctx statemachine.Context, sector SectorInfo) error { diff --git a/storage/pipeline/states_sealing.go b/storage/pipeline/states_sealing.go index 795cabdb34d..5d6928fba88 100644 --- a/storage/pipeline/states_sealing.go +++ b/storage/pipeline/states_sealing.go @@ -430,42 +430,6 @@ func (m *Sealing) preCommitInfo(ctx statemachine.Context, sector SectorInfo) (*m if sector.hasData() { // only CC sectors don't have UnsealedCID params.UnsealedCid = sector.CommD - - // true when the sector has non-builtin-marked data - sectorIsDDO := false - - for _, piece := range sector.Pieces { - err := piece.handleDealInfo(handleDealInfoParams{ - FillerHandler: func(info UniversalPieceInfo) error { - return nil // ignore - }, - BuiltinMarketHandler: func(info UniversalPieceInfo) error { - if sectorIsDDO { - return nil // will be passed later in the Commit message - } - params.DealIDs = append(params.DealIDs, info.Impl().DealID) - return nil - }, - DDOHandler: func(info UniversalPieceInfo) error { - if nv < MinDDONetworkVersion { - return xerrors.Errorf("DDO sectors are not supported on network version %d", nv) - } - - log.Infow("DDO piece in sector", "sector", sector.SectorNumber, "piece", info.String()) - - sectorIsDDO = true - - // DDO sectors don't carry DealIDs, we will pass those - // deals in the Commit message later - params.DealIDs = nil - return nil - }, - }) - - if err != nil { - return nil, big.Zero(), types.EmptyTSK, xerrors.Errorf("handleDealInfo: %w", err) - } - } } collateral, err := m.Api.StateMinerPreCommitDepositForPower(ctx.Context(), m.maddr, *params, ts.Key()) @@ -749,101 +713,61 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo // processPieces returns either: // - a list of piece activation manifests // - a list of deal IDs, if all non-filler pieces are deal-id pieces -func (m *Sealing) processPieces(ctx context.Context, sector SectorInfo, forceDDO bool) ([]miner.PieceActivationManifest, []abi.DealID, error) { +func (m *Sealing) processPieces(ctx context.Context, sector SectorInfo) ([]miner.PieceActivationManifest, error) { pams := make([]miner.PieceActivationManifest, 0, len(sector.Pieces)) - dealIDs := make([]abi.DealID, 0, len(sector.Pieces)) - hasDDO := forceDDO - - if !forceDDO { - // if not forcing DDO, check if we have any DDO pieces - for _, piece := range sector.Pieces { - piece := piece - - // first figure out if this is a ddo sector - err := piece.handleDealInfo(handleDealInfoParams{ - FillerHandler: func(info UniversalPieceInfo) error { - // Fillers are implicit (todo review: Are they??) - return nil - }, - BuiltinMarketHandler: func(info UniversalPieceInfo) error { - return nil - }, - DDOHandler: func(info UniversalPieceInfo) error { - hasDDO = true - return nil - }, - }) - if err != nil { - return nil, nil, xerrors.Errorf("handleDealInfo: %w", err) - } - } - } + for _, piece := range sector.Pieces { piece := piece + if piece.HasDealInfo() { + info := piece.DealInfo() + // If we have a dealID then covert to PAM + if info.Impl().DealID > 0 { + alloc, err := m.Api.StateGetAllocationIdForPendingDeal(ctx, info.Impl().DealID, types.EmptyTSK) + if err != nil { + return nil, xerrors.Errorf("getting allocation for deal %d: %w", info.Impl().DealID, err) + } + clid, err := m.Api.StateLookupID(ctx, info.Impl().DealProposal.Client, types.EmptyTSK) + if err != nil { + return nil, xerrors.Errorf("getting client address for deal %d: %w", info.Impl().DealID, err) + } - err := piece.handleDealInfo(handleDealInfoParams{ - FillerHandler: func(info UniversalPieceInfo) error { - // Fillers are implicit (todo review: Are they??) - return nil - }, - BuiltinMarketHandler: func(info UniversalPieceInfo) error { - if hasDDO { - alloc, err := m.Api.StateGetAllocationIdForPendingDeal(ctx, info.Impl().DealID, types.EmptyTSK) - if err != nil { - return xerrors.Errorf("getting allocation for deal %d: %w", info.Impl().DealID, err) - } - clid, err := m.Api.StateLookupID(ctx, info.Impl().DealProposal.Client, types.EmptyTSK) - if err != nil { - return xerrors.Errorf("getting client address for deal %d: %w", info.Impl().DealID, err) - } - - clientId, err := address.IDFromAddress(clid) - if err != nil { - return xerrors.Errorf("getting client address for deal %d: %w", info.Impl().DealID, err) - } - - var vac *miner2.VerifiedAllocationKey - if alloc != verifreg.NoAllocationID { - vac = &miner2.VerifiedAllocationKey{ - Client: abi.ActorID(clientId), - ID: verifreg13.AllocationId(alloc), - } - } + clientId, err := address.IDFromAddress(clid) + if err != nil { + return nil, xerrors.Errorf("getting client address for deal %d: %w", info.Impl().DealID, err) + } - payload, err := cborutil.Dump(info.Impl().DealID) - if err != nil { - return xerrors.Errorf("serializing deal id: %w", err) + var vac *miner2.VerifiedAllocationKey + if alloc != verifreg.NoAllocationID { + vac = &miner2.VerifiedAllocationKey{ + Client: abi.ActorID(clientId), + ID: verifreg13.AllocationId(alloc), } + } - pams = append(pams, miner.PieceActivationManifest{ - CID: piece.Piece().PieceCID, - Size: piece.Piece().Size, - VerifiedAllocationKey: vac, - Notify: []miner2.DataActivationNotification{ - { - Address: market.Address, - Payload: payload, - }, - }, - }) - - return nil + payload, err := cborutil.Dump(info.Impl().DealID) + if err != nil { + return nil, xerrors.Errorf("serializing deal id: %w", err) } - dealIDs = append(dealIDs, info.Impl().DealID) - return nil - }, - DDOHandler: func(info UniversalPieceInfo) error { - pams = append(pams, *piece.Impl().PieceActivationManifest) - return nil - }, - }) - if err != nil { - return nil, nil, xerrors.Errorf("handleDealInfo: %w", err) + pams = append(pams, miner.PieceActivationManifest{ + CID: piece.Piece().PieceCID, + Size: piece.Piece().Size, + VerifiedAllocationKey: vac, + Notify: []miner2.DataActivationNotification{ + { + Address: market.Address, + Payload: payload, + }, + }, + }) + } else { + // Add PAM directly + pams = append(pams, *info.Impl().PieceActivationManifest) + } } } - return pams, dealIDs, nil + return pams, nil } func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector SectorInfo) error { @@ -851,7 +775,7 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")}) } - pams, dealIDs, err := m.processPieces(ctx.Context(), sector, false) + pams, err := m.processPieces(ctx.Context(), sector) if err != nil { return err } @@ -871,7 +795,6 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S SectorNumber: sector.SectorNumber, Pieces: pams, }, - DealIDPrecommit: len(dealIDs) > 0, }) if err != nil || res.Error != "" {