Skip to content

Commit

Permalink
Add expected CID param to AddPiece
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Aug 24, 2021
1 parent d61612e commit 1916a78
Show file tree
Hide file tree
Showing 24 changed files with 110 additions and 74 deletions.
2 changes: 1 addition & 1 deletion api/api_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Worker interface {
Info(context.Context) (storiface.WorkerInfo, error) //perm:admin

// storiface.WorkerCalls
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) //perm:admin
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, expectPieceCid *cid.Cid, r storage.Data) (storiface.CallID, error) //perm:admin
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) //perm:admin
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) //perm:admin
Expand Down
8 changes: 4 additions & 4 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var (
FullAPIVersion1 = newVer(2, 1, 0)

MinerAPIVersion0 = newVer(1, 2, 0)
WorkerAPIVersion0 = newVer(1, 1, 0)
WorkerAPIVersion0 = newVer(1, 2, 0)
)

//nolint:varcheck,deadcode
Expand Down
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion cmd/lotus-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func runSeals(sb *ffiwrapper.Sealer, sbfs *basicfs.Provider, numSectors int, par

r := rand.New(rand.NewSource(100 + int64(i)))

pi, err := sb.AddPiece(context.TODO(), sid, nil, abi.PaddedPieceSize(sectorSize).Unpadded(), r)
pi, err := sb.AddPiece(context.TODO(), sid, nil, abi.PaddedPieceSize(sectorSize).Unpadded(), nil, r)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-seed/seed/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func PreSeal(maddr address.Address, spt abi.RegisteredSealProof, offset abi.Sect
}

func presealSector(sb *ffiwrapper.Sealer, sbfs *basicfs.Provider, sid storage.SectorRef, ssize abi.SectorSize, preimage []byte) (*genesis.PreSeal, error) {
pi, err := sb.AddPiece(context.TODO(), sid, nil, abi.PaddedPieceSize(ssize).Unpadded(), rand.Reader)
pi, err := sb.AddPiece(context.TODO(), sid, nil, abi.PaddedPieceSize(ssize).Unpadded(), nil, rand.Reader)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions documentation/en/api-v0-methods-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ Inputs:
},
null,
1024,
null,
{}
]
```
Expand Down
93 changes: 58 additions & 35 deletions extern/sector-storage/ffiwrapper/sealer_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error
return nil
}

func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) {
func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, expectPieceCid *cid.Cid, file storage.Data) (abi.PieceInfo, error) {
// TODO: allow tuning those:
chunk := abi.PaddedPieceSize(4 << 20)
parallel := runtime.NumCPU()
Expand Down Expand Up @@ -183,54 +183,77 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existi
return abi.PieceInfo{}, xerrors.Errorf("closing padded writer: %w", err)
}

if err := stagedFile.MarkAllocated(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded()); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("marking data range as allocated: %w", err)
}
var piece abi.PieceInfo

if err := stagedFile.Close(); err != nil {
return abi.PieceInfo{}, err
{
if len(piecePromises) == 1 {
piece, err = piecePromises[0]()
if err != nil {
return abi.PieceInfo{}, err
}
} else {
var payloadRoundedBytes abi.PaddedPieceSize
pieceCids := make([]abi.PieceInfo, len(piecePromises))
for i, promise := range piecePromises {
pinfo, err := promise()
if err != nil {
return abi.PieceInfo{}, err
}

pieceCids[i] = pinfo
payloadRoundedBytes += pinfo.Size
}

pieceCID, err := ffi.GenerateUnsealedCID(sector.ProofType, pieceCids)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("generate unsealed CID: %w", err)
}

// validate that the pieceCID was properly formed
if _, err := commcid.CIDToPieceCommitmentV1(pieceCID); err != nil {
return abi.PieceInfo{}, err
}

if payloadRoundedBytes < pieceSize.Padded() {
paddedCid, err := commpffi.ZeroPadPieceCommitment(pieceCID, payloadRoundedBytes.Unpadded(), pieceSize)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("failed to pad data: %w", err)
}

pieceCID = paddedCid
}

piece = abi.PieceInfo{
Size: pieceSize.Padded(),
PieceCID: pieceCID,
}
}
}
stagedFile = nil

if len(piecePromises) == 1 {
return piecePromises[0]()
if expectPieceCid == nil {
log.Warnw("call to AddPiece with nil expected piece CID")
}

var payloadRoundedBytes abi.PaddedPieceSize
pieceCids := make([]abi.PieceInfo, len(piecePromises))
for i, promise := range piecePromises {
pinfo, err := promise()
if err != nil {
return abi.PieceInfo{}, err
if expectPieceCid != nil && !piece.PieceCID.Equals(*expectPieceCid) {
if err := stagedFile.Close(); err != nil {
log.Errorw("addPiece: error closing stagedFile with bad pieceCID", "error", err)
}

pieceCids[i] = pinfo
payloadRoundedBytes += pinfo.Size
// We haven't marked the piece as allocated yet, and returning an error here
// will make storage-fsm overwrite the bad piece
return abi.PieceInfo{}, xerrors.Errorf("pieceCIDs didn't match: expected %s, got %s (size: %d)", expectPieceCid, piece.PieceCID, piece.Size)
}

pieceCID, err := ffi.GenerateUnsealedCID(sector.ProofType, pieceCids)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("generate unsealed CID: %w", err)
if err := stagedFile.MarkAllocated(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded()); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("marking data range as allocated: %w", err)
}

// validate that the pieceCID was properly formed
if _, err := commcid.CIDToPieceCommitmentV1(pieceCID); err != nil {
if err := stagedFile.Close(); err != nil {
return abi.PieceInfo{}, err
}
stagedFile = nil

if payloadRoundedBytes < pieceSize.Padded() {
paddedCid, err := commpffi.ZeroPadPieceCommitment(pieceCID, payloadRoundedBytes.Unpadded(), pieceSize)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("failed to pad data: %w", err)
}

pieceCID = paddedCid
}

return abi.PieceInfo{
Size: pieceSize.Padded(),
PieceCID: pieceCID,
}, nil
return piece, nil
}

func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, error) {
Expand Down
8 changes: 4 additions & 4 deletions extern/sector-storage/ffiwrapper/sealer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *seal) precommit(t *testing.T, sb *Sealer, id storage.SectorRef, done fu

var err error
r := data(id.ID.Number, dlen)
s.pi, err = sb.AddPiece(context.TODO(), id, []abi.UnpaddedPieceSize{}, dlen, r)
s.pi, err = sb.AddPiece(context.TODO(), id, []abi.UnpaddedPieceSize{}, dlen, nil, r)
if err != nil {
t.Fatalf("%+v", err)
}
Expand Down Expand Up @@ -762,7 +762,7 @@ func TestAddPiece512M(t *testing.T) {
Number: 0,
},
ProofType: abi.RegisteredSealProof_StackedDrg512MiBV1_1,
}, nil, sz, io.LimitReader(r, int64(sz)))
}, nil, sz, nil, io.LimitReader(r, int64(sz)))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -805,7 +805,7 @@ func BenchmarkAddPiece512M(b *testing.B) {
Number: abi.SectorNumber(i),
},
ProofType: abi.RegisteredSealProof_StackedDrg512MiBV1_1,
}, nil, sz, io.LimitReader(&nullreader.Reader{}, int64(sz)))
}, nil, sz, nil, io.LimitReader(&nullreader.Reader{}, int64(sz)))
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -848,7 +848,7 @@ func TestAddPiece512MPadded(t *testing.T) {
Number: 0,
},
ProofType: abi.RegisteredSealProof_StackedDrg512MiBV1_1,
}, nil, sz, io.LimitReader(r, int64(sz/4)))
}, nil, sz, nil, io.LimitReader(r, int64(sz/4)))
if err != nil {
t.Fatalf("add piece failed: %s", err)
}
Expand Down
4 changes: 2 additions & 2 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (m *Manager) NewSector(ctx context.Context, sector storage.SectorRef) error
return nil
}

func (m *Manager) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
func (m *Manager) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, exp *cid.Cid, r io.Reader) (abi.PieceInfo, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -298,7 +298,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector storage.SectorRef, existi

var out abi.PieceInfo
err = m.sched.Schedule(ctx, sector, sealtasks.TTAddPiece, selector, schedNop, func(ctx context.Context, w Worker) error {
p, err := m.waitSimpleCall(ctx)(w.AddPiece(ctx, sector, existingPieces, sz, r))
p, err := m.waitSimpleCall(ctx)(w.AddPiece(ctx, sector, existingPieces, sz, exp, r))
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions extern/sector-storage/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ func TestSimple(t *testing.T) {
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1,
}

pi, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
pi, err := m.AddPiece(ctx, sid, nil, 1016, nil, strings.NewReader(strings.Repeat("testthis", 127)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)

piz, err := m.AddPiece(ctx, sid, nil, 1016, bytes.NewReader(make([]byte, 1016)[:]))
piz, err := m.AddPiece(ctx, sid, nil, 1016, nil, bytes.NewReader(make([]byte, 1016)[:]))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), piz.Size)

Expand Down Expand Up @@ -185,11 +185,11 @@ func TestRedoPC1(t *testing.T) {
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1,
}

pi, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
pi, err := m.AddPiece(ctx, sid, nil, 1016, nil, strings.NewReader(strings.Repeat("testthis", 127)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)

piz, err := m.AddPiece(ctx, sid, nil, 1016, bytes.NewReader(make([]byte, 1016)[:]))
piz, err := m.AddPiece(ctx, sid, nil, 1016, nil, bytes.NewReader(make([]byte, 1016)[:]))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), piz.Size)

Expand Down Expand Up @@ -239,11 +239,11 @@ func TestRestartManager(t *testing.T) {
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1,
}

pi, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
pi, err := m.AddPiece(ctx, sid, nil, 1016, nil, strings.NewReader(strings.Repeat("testthis", 127)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)

piz, err := m.AddPiece(ctx, sid, nil, 1016, bytes.NewReader(make([]byte, 1016)[:]))
piz, err := m.AddPiece(ctx, sid, nil, 1016, nil, bytes.NewReader(make([]byte, 1016)[:]))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), piz.Size)

Expand Down Expand Up @@ -347,7 +347,7 @@ func TestRestartWorker(t *testing.T) {
go func() {
defer close(apDone)

_, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
_, err := m.AddPiece(ctx, sid, nil, 1016, nil, strings.NewReader(strings.Repeat("testthis", 127)))
require.Error(t, err)
}()

Expand Down
9 changes: 6 additions & 3 deletions extern/sector-storage/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (mgr *SectorMgr) SectorsUnsealPiece(ctx context.Context, sector storage.Sec
panic("SectorMgr: unsealing piece: implement me")
}

func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorID storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorID storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, exp *cid.Cid, r io.Reader) (abi.PieceInfo, error) {
log.Warn("Add piece: ", sectorID, size, sectorID.ProofType)

var b bytes.Buffer
Expand Down Expand Up @@ -108,8 +108,11 @@ func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorID storage.SectorRef,
ss.pieces = append(ss.pieces, c)
ss.lk.Unlock()

return abi.PieceInfo{
if exp != nil && !c.Equals(*exp) {
return abi.PieceInfo{}, xerrors.Errorf("pieces didn't match")
}

return abi.PieceInfo{
Size: size.Padded(),
PieceCID: c,
}, nil
Expand Down Expand Up @@ -414,7 +417,7 @@ func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof
ProofType: spt,
}

pi, err := mgr.AddPiece(context.TODO(), id, nil, usize, bytes.NewReader(buf))
pi, err := mgr.AddPiece(context.TODO(), id, nil, usize, nil, bytes.NewReader(buf))
if err != nil {
return storage.SectorRef{}, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/partialfile/partialfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const veryLargeRle = 1 << 20
// in a sector are unsealed, and which are not (holes)

// unsealed sector files internally have this structure
// [unpadded (raw) data][rle+][4B LE length fo the rle+ field]
// [unpadded (raw) data][rle+][4B LE length of the rle+ field]

type PartialFile struct {
maxPiece abi.PaddedPieceSize
Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/piece_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (p *pieceProviderTestHarness) addPiece(t *testing.T, pieceData []byte) abi.
}

size := abi.UnpaddedPieceSize(len(pieceData))
pieceInfo, err := p.mgr.AddPiece(p.ctx, p.sector, existing, size, bytes.NewReader(pieceData))
pieceInfo, err := p.mgr.AddPiece(p.ctx, p.sector, existing, size, nil, bytes.NewReader(pieceData))
require.NoError(t, err)

p.addedPieces = append(p.addedPieces, pieceInfo)
Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *schedTestWorker) NewSector(ctx context.Context, sector storage.SectorRe
panic("implement me")
}

func (s *schedTestWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
func (s *schedTestWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, exp *cid.Cid, pieceData storage.Data) (storiface.CallID, error) {
panic("implement me")
}

Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/storiface/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ var _ fmt.Stringer = &CallID{}
var UndefCall CallID

type WorkerCalls interface {
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error)
AddPiece(ctx context.Context, sector storage.SectorRef, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, expectPieceCid *cid.Cid, r storage.Data) (CallID, error)
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (CallID, error)
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (CallID, error)
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (CallID, error)
Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/teststorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (t *testExec) NewSector(ctx context.Context, sector storage.SectorRef) erro
panic("implement me")
}

func (t *testExec) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
func (t *testExec) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, exp *cid.Cid, pieceData storage.Data) (abi.PieceInfo, error) {
resp := make(chan apres)
t.apch <- resp
ar := <-resp
Expand Down
5 changes: 3 additions & 2 deletions extern/sector-storage/testworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/google/uuid"
"github.com/ipfs/go-cid"

"github.com/filecoin-project/lotus/extern/sector-storage/mock"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
Expand Down Expand Up @@ -58,9 +59,9 @@ func (t *testWorker) asyncCall(sector storage.SectorRef, work func(ci storiface.
return ci, nil
}

func (t *testWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
func (t *testWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, exp *cid.Cid, pieceData storage.Data) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
p, err := t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
p, err := t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, exp, pieceData)
if err := t.ret.ReturnAddPiece(ctx, ci, p, toCallError(err)); err != nil {
log.Error(err)
}
Expand Down
Loading

0 comments on commit 1916a78

Please sign in to comment.