Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIP 13 addenda: correctly handle commit batch timer #6282

Merged
merged 4 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 78 additions & 8 deletions extern/storage-sealing/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
)
Expand All @@ -27,6 +28,7 @@ const arp = abi.RegisteredAggregationProof_SnarkPackV1
type CommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
}

type AggregateInput struct {
Expand All @@ -45,8 +47,9 @@ type CommitBatcher struct {
getConfig GetSealingConfigFunc
verif ffiwrapper.Verifier

todo map[abi.SectorNumber]AggregateInput
waiting map[abi.SectorNumber][]chan cid.Cid
deadlines map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]AggregateInput
waiting map[abi.SectorNumber][]chan cid.Cid

notify, stop, stopped chan struct{}
force chan chan *cid.Cid
Expand All @@ -63,8 +66,9 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
getConfig: getConfig,
verif: verif,

todo: map[abi.SectorNumber]AggregateInput{},
waiting: map[abi.SectorNumber][]chan cid.Cid{},
deadlines: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]AggregateInput{},
waiting: map[abi.SectorNumber][]chan cid.Cid{},

notify: make(chan struct{}, 1),
force: make(chan chan *cid.Cid),
Expand Down Expand Up @@ -100,7 +104,7 @@ func (b *CommitBatcher) run() {
return
case <-b.notify:
sendAboveMax = true
case <-time.After(cfg.CommitBatchWait):
case <-time.After(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack)):
sendAboveMin = true
case fr := <-b.force: // user triggered
forceRes = fr
Expand All @@ -114,6 +118,69 @@ func (b *CommitBatcher) run() {
}
}

func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
now := time.Now()

b.lk.Lock()
defer b.lk.Unlock()

var deadline time.Time
for sn := range b.todo {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
}
}
for sn := range b.waiting {
sectorDeadline := b.deadlines[sn]
if deadline.IsZero() || (!sectorDeadline.IsZero() && sectorDeadline.Before(deadline)) {
deadline = sectorDeadline
}
}

if deadline.IsZero() {
return maxWait
}

deadline = deadline.Add(-slack)
if deadline.Before(now) {
return time.Nanosecond // can't return 0
}

wait := deadline.Sub(now)
if wait > maxWait {
wait = maxWait
}

return wait
}

func (b *CommitBatcher) getSectorDeadline(si SectorInfo) time.Time {
tok, curEpoch, err := b.api.ChainHead(b.mctx)
if err != nil {
log.Errorf("getting chain head: %s", err)
return time.Time{}
}

deadlineEpoch := si.TicketEpoch
for _, p := range si.Pieces {
if p.DealInfo == nil {
continue
}

startEpoch := p.DealInfo.DealSchedule.StartEpoch
if startEpoch < deadlineEpoch {
deadlineEpoch = startEpoch
}
}

if deadlineEpoch <= curEpoch {
return time.Now()
}

return time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second
}

func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
b.lk.Lock()
defer b.lk.Unlock()
Expand Down Expand Up @@ -182,6 +249,7 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
}
delete(b.waiting, sn)
delete(b.todo, sn)
delete(b.deadlines, sn)
return nil
})
if err != nil {
Expand All @@ -192,12 +260,14 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
}

// register commit, wait for batch message, return message CID
func (b *CommitBatcher) AddCommit(ctx context.Context, s abi.SectorNumber, in AggregateInput) (mcid cid.Cid, err error) {
func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (mcid cid.Cid, err error) {
sn := s.SectorNumber
b.lk.Lock()
b.todo[s] = in
b.deadlines[sn] = b.getSectorDeadline(s)
b.todo[sn] = in

sent := make(chan cid.Cid, 1)
b.waiting[s] = append(b.waiting[s], sent)
b.waiting[sn] = append(b.waiting[sn], sent)

select {
case b.notify <- struct{}{}:
Expand Down
1 change: 1 addition & 0 deletions extern/storage-sealing/sealiface/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Config struct {
MinCommitBatch int
MaxCommitBatch int
CommitBatchWait time.Duration
CommitBatchSlack time.Duration

TerminateBatchMax uint64
TerminateBatchMin uint64
Expand Down
2 changes: 1 addition & 1 deletion extern/storage-sealing/states_sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
}

mcid, err := m.commiter.AddCommit(ctx.Context(), sector.SectorNumber, AggregateInput{
mcid, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{
info: proof.AggregateSealVerifyInfo{
Number: sector.SectorNumber,
Randomness: sector.TicketValue,
Expand Down
7 changes: 4 additions & 3 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,10 @@ func DefaultStorageMiner() *StorageMiner {
MinCommitBatch: 1, // we must have at least one proof to aggregate
MaxCommitBatch: 204, // this is the maximum aggregation per FIP13
CommitBatchWait: time.Day, // this can be up to 6 days
TerminateBatchMin: 1, // same as above
TerminateBatchMax: 204, // same as above
TerminateBatchWait: time.Day, // this can be up to 6 days
CommitBatchSlack: 8 * time.Hour,
TerminateBatchMin: 1,
TerminateBatchMax: 100,
TerminateBatchWait: 5 * time.Minute,
},

Storage: sectorstorage.SealerConfig{
Expand Down
10 changes: 10 additions & 0 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,11 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
AggregateCommits: cfg.AggregateCommits,
MinCommitBatch: cfg.MinCommitBatch,
MaxCommitBatch: cfg.MaxCommitBatch,
CommitBatchWait: cfg.CommitBatchWait,
CommitBatchSlack: cfg.CommitBatchSlack,
TerminateBatchMax: cfg.TerminateBatchMax,
TerminateBatchMin: cfg.TerminateBatchMin,
TerminateBatchWait: cfg.TerminateBatchWait,
}
})
return
Expand All @@ -845,6 +850,11 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
AggregateCommits: cfg.Sealing.AggregateCommits,
MinCommitBatch: cfg.Sealing.MinCommitBatch,
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
CommitBatchWait: cfg.Sealing.CommitBatchWait,
CommitBatchSlack: cfg.Sealing.CommitBatchSlack,
TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
TerminateBatchWait: cfg.Sealing.TerminateBatchWait,
}
})
return
Expand Down