Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit tests for sector batchers #6432

Merged
merged 1 commit into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions extern/storage-sealing/commit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (

const arp = abi.RegisteredAggregationProof_SnarkPackV1

//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_commit_batcher.go -package=mocks . CommitBatcherApi

type CommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
Expand All @@ -44,9 +46,9 @@ type CommitBatcherApi interface {
}

type AggregateInput struct {
spt abi.RegisteredSealProof
info proof5.AggregateSealVerifyInfo
proof []byte
Spt abi.RegisteredSealProof
Info proof5.AggregateSealVerifyInfo
Proof []byte
}

type CommitBatcher struct {
Expand Down Expand Up @@ -256,15 +258,15 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa

res.Sectors = append(res.Sectors, id)
params.SectorNumbers.Set(uint64(id))
infos = append(infos, p.info)
infos = append(infos, p.Info)
}

sort.Slice(infos, func(i, j int) bool {
return infos[i].Number < infos[j].Number
})

for _, info := range infos {
proofs = append(proofs, b.todo[info.Number].proof)
proofs = append(proofs, b.todo[info.Number].Proof)
}

mid, err := address.IDFromAddress(b.maddr)
Expand All @@ -274,7 +276,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa

params.AggregateProof, err = b.prover.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{
Miner: abi.ActorID(mid),
SealProof: b.todo[infos[0].Number].spt,
SealProof: b.todo[infos[0].Number].Spt,
AggregateProof: arp,
Infos: infos,
}, proofs)
Expand Down Expand Up @@ -362,7 +364,7 @@ func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, i
enc := new(bytes.Buffer)
params := &miner.ProveCommitSectorParams{
SectorNumber: sn,
Proof: info.proof,
Proof: info.Proof,
}

if err := params.MarshalCBOR(enc); err != nil {
Expand Down Expand Up @@ -447,7 +449,7 @@ func (b *CommitBatcher) Pending(ctx context.Context) ([]abi.SectorID, error) {
for _, s := range b.todo {
res = append(res, abi.SectorID{
Miner: abi.ActorID(mid),
Number: s.info.Number,
Number: s.Info.Number,
})
}

Expand Down
299 changes: 299 additions & 0 deletions extern/storage-sealing/commit_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
package sealing_test

import (
"bytes"
"context"
"sort"
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/network"
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
)

func TestCommitBatcher(t *testing.T) {
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)

ctx := context.Background()

as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return t0123, big.Zero(), nil
}

maxBatch := miner5.MaxAggregatedSectors
minBatch := miner5.MinAggregatedSectors

cfg := func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 2,
MaxSealingSectors: 0,
MaxSealingSectorsForDeals: 0,
WaitDealsDelay: time.Hour * 6,
AlwaysKeepUnsealedCopy: true,

BatchPreCommits: true,
MinPreCommitBatch: 1,
MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize,
PreCommitBatchWait: 24 * time.Hour,
PreCommitBatchSlack: 3 * time.Hour,

AggregateCommits: true,
MinCommitBatch: minBatch,
MaxCommitBatch: maxBatch,
CommitBatchWait: 24 * time.Hour,
CommitBatchSlack: 1 * time.Hour,

TerminateBatchMin: 1,
TerminateBatchMax: 100,
TerminateBatchWait: 5 * time.Minute,
}, nil
}

type promise func(t *testing.T)
type action func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise

actions := func(as ...action) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
var ps []promise
for _, a := range as {
p := a(t, s, pcb)
if p != nil {
ps = append(ps, p)
}
}

if len(ps) > 0 {
return func(t *testing.T) {
for _, p := range ps {
p(t)
}
}
}
return nil
}
}

addSector := func(sn abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
var pcres sealiface.CommitBatchRes
var pcerr error
done := sync.Mutex{}
done.Lock()

si := sealing.SectorInfo{
SectorNumber: sn,
}

s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
PreCommitDeposit: big.Zero(),
}, nil)

go func() {
defer done.Unlock()
pcres, pcerr = pcb.AddCommit(ctx, si, sealing.AggregateInput{
Info: proof5.AggregateSealVerifyInfo{
Number: sn,
},
})
}()

return func(t *testing.T) {
done.Lock()
require.NoError(t, pcerr)
require.Empty(t, pcres.Error)
require.Contains(t, pcres.Sectors, si.SectorNumber)
}
}
}

addSectors := func(sectors []abi.SectorNumber) action {
as := make([]action, len(sectors))
for i, sector := range sectors {
as[i] = addSector(sector)
}
return actions(as...)
}

waitPending := func(n int) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
require.Eventually(t, func() bool {
p, err := pcb.Pending(ctx)
require.NoError(t, err)
return len(p) == n
}, time.Second*5, 10*time.Millisecond)

return nil
}
}

expectSend := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)

ti := len(expect)
batch := false
if ti >= minBatch {
batch = true
ti = 1
}
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
PreCommitDeposit: big.Zero(),
}, nil).Times(len(expect))
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect))
if batch {
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(2000), nil)
}

s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.([]byte)
if batch {
var params miner5.ProveCommitAggregateParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
for _, number := range expect {
set, err := params.SectorNumbers.IsSet(uint64(number))
require.NoError(t, err)
require.True(t, set)
}
} else {
var params miner5.ProveCommitSectorParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
}
return true
})).Times(ti)
return nil
}
}

flush := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
_ = expectSend(expect)(t, s, pcb)

batch := len(expect) >= minBatch

r, err := pcb.Flush(ctx)
require.NoError(t, err)
if batch {
require.Len(t, r, 1)
require.Empty(t, r[0].Error)
sort.Slice(r[0].Sectors, func(i, j int) bool {
return r[0].Sectors[i] < r[0].Sectors[j]
})
require.Equal(t, expect, r[0].Sectors)
} else {
require.Len(t, r, len(expect))
for _, res := range r {
require.Len(t, res.Sectors, 1)
require.Empty(t, res.Error)
}
sort.Slice(r, func(i, j int) bool {
return r[i].Sectors[0] < r[j].Sectors[0]
})
for i, res := range r {
require.Equal(t, abi.SectorNumber(i), res.Sectors[0])
}
}

return nil
}
}

getSectors := func(n int) []abi.SectorNumber {
out := make([]abi.SectorNumber, n)
for i := range out {
out[i] = abi.SectorNumber(i)
}
return out
}

tcs := map[string]struct {
actions []action
}{
"addSingle": {
actions: []action{
addSector(0),
waitPending(1),
flush([]abi.SectorNumber{0}),
},
},
"addTwo": {
actions: []action{
addSectors(getSectors(2)),
waitPending(2),
flush(getSectors(2)),
},
},
"addAte": {
actions: []action{
addSectors(getSectors(8)),
waitPending(8),
flush(getSectors(8)),
},
},
"addMax": {
actions: []action{
expectSend(getSectors(maxBatch)),
addSectors(getSectors(maxBatch)),
},
},
}

for name, tc := range tcs {
tc := tc

t.Run(name, func(t *testing.T) {
// create go mock controller here
mockCtrl := gomock.NewController(t)
// when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish()

// create them mocks
pcapi := mocks.NewMockCommitBatcherApi(mockCtrl)

pcb := sealing.NewCommitBatcher(ctx, t0123, pcapi, as, fc, cfg, &fakeProver{})

var promises []promise

for _, a := range tc.actions {
p := a(t, pcapi, pcb)
if p != nil {
promises = append(promises, p)
}
}

for _, p := range promises {
p(t)
}

err := pcb.Stop(ctx)
require.NoError(t, err)
})
}
}

type fakeProver struct{}

func (f fakeProver) AggregateSealProofs(aggregateInfo proof5.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error) {
return []byte("Trust me, I'm a proof"), nil
}

var _ ffiwrapper.Prover = &fakeProver{}
Loading