Skip to content

Commit

Permalink
Merge pull request #7761 from filecoin-project/jen/rcgain
Browse files Browse the repository at this point in the history
build: release: v1.13.2-rc4
  • Loading branch information
jennijuju authored Dec 11, 2021
2 parents 09a5c2e + 2343778 commit e05fdf8
Show file tree
Hide file tree
Showing 25 changed files with 296 additions and 149 deletions.
20 changes: 11 additions & 9 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
# Lotus changelog

# v1.13.2-rc3 / 2021-12-09
# v1.13.2-rc4 / 2021-12-10

This is the second RC for lotus v1.13.2, with a dependency upgrade of ipld-legacy which will fix ChainGetNode. This
is a highly recommended release with sealing pipeline fixes, worker management and scheduler enhancement, retrieval improvements and so on. More detailed changelog will be updated later.
This is the 4th RC for lotus v1.13.2, with another retrieval enhancement that fills the gap that's brought by the release. This is a highly recommended release with sealing pipeline fixes, worker management and scheduler enhancement, retrieval improvements and so on. More detailed changelog will be updated later.

- github.com/filecoin-project/lotus:
- CARv2 v2.1.0
- dagstore pieceReader: Fix wrong ErrUnexpectedEOF return in ReadAt
- dagstore pieceReader: Always read full in ReadAt
- Add metrics to dagstore piecereader
- disable building of appimage on release
- stores: Reduce log spam during retrievals
- fix lint
- Fix mock ReadPiece
- fr32: Reduce MTTresh from 32M to 512k per core
- piecereader: Avoid allocating 1024MB slices per read
- piecereader: Avoid redundant roundtrips when seeking
- piecereader: Move closer to storage
- build: release: v1.13.2-rc3 ([filecoin-project/lotus#7752](https://github.com/filecoin-project/lotus/pull/7752))
- build: release: v1.13.2-rc2 ([filecoin-project/lotus#7745](https://github.com/filecoin-project/lotus/pull/7745))
- v1.13.2-rc1 ([filecoin-project/lotus#7718](https://github.com/filecoin-project/lotus/pull/7718))
- Address Scheduler enhancements (#7703) review ([filecoin-project/lotus#7714](https://github.com/filecoin-project/lotus/pull/7714))
- Scheduler enhancements ([filecoin-project/lotus#7703](https://github.com/filecoin-project/lotus/pull/7703))
Expand Down Expand Up @@ -62,7 +65,6 @@ is a highly recommended release with sealing pipeline fixes, worker management a
- Intoduce update proofs enums
- Update golangci-lint for comatibility with Go 1.17
- Update execution image to maybe update debian version



# v1.13.1 / 2021-11-26
Expand Down
Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion build/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func BuildTypeString() string {
}

// BuildVersion is the local build version
const BuildVersion = "1.13.2-rc3"
const BuildVersion = "1.13.2-rc4"

func UserVersion() string {
if os.Getenv("LOTUS_VERSION_IGNORE_COMMIT") == "1" {
Expand Down
2 changes: 1 addition & 1 deletion documentation/en/cli-lotus-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ USAGE:
lotus-miner [global options] command [command options] [arguments...]
VERSION:
1.13.2-rc3
1.13.2-rc4
COMMANDS:
init Initialize a lotus miner repo
Expand Down
2 changes: 1 addition & 1 deletion documentation/en/cli-lotus-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ USAGE:
lotus-worker [global options] command [command options] [arguments...]
VERSION:
1.13.2-rc3
1.13.2-rc4
COMMANDS:
run Start lotus worker
Expand Down
2 changes: 1 addition & 1 deletion documentation/en/cli-lotus.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ USAGE:
lotus [global options] command [command options] [arguments...]
VERSION:
1.13.2-rc3
1.13.2-rc4
COMMANDS:
daemon Start a lotus daemon process
Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/fr32/fr32.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
)

var MTTresh = uint64(32 << 20)
var MTTresh = uint64(512 << 10)

func mtChunkCount(usz abi.PaddedPieceSize) uint64 {
threads := (uint64(usz)) / MTTresh
Expand Down
12 changes: 10 additions & 2 deletions extern/sector-storage/fr32/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,21 @@ type unpadReader struct {
work []byte
}

func BufSize(sz abi.PaddedPieceSize) int {
return int(MTTresh * mtChunkCount(sz))
}

func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) {
buf := make([]byte, BufSize(sz))

return NewUnpadReaderBuf(src, sz, buf)
}

func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Reader, error) {
if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err)
}

buf := make([]byte, MTTresh*mtChunkCount(sz))

return &unpadReader{
src: src,

Expand Down
15 changes: 13 additions & 2 deletions extern/sector-storage/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"

"github.com/filecoin-project/dagstore/mount"
ffiwrapper2 "github.com/filecoin-project/go-commp-utils/ffiwrapper"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-state-types/abi"
Expand Down Expand Up @@ -384,12 +385,22 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea
}
}

func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if uint64(offset) != 0 {
panic("implme")
}

return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][startOffset:size])), false, nil
br := bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])

return struct {
io.ReadCloser
io.Seeker
io.ReaderAt
}{
ReadCloser: ioutil.NopCloser(br),
Seeker: br,
ReaderAt: br,
}, false, nil
}

func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) {
Expand Down
110 changes: 68 additions & 42 deletions extern/sector-storage/piece_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"

Expand All @@ -27,7 +28,7 @@ type PieceProvider interface {
// default in most cases, but this might matter with future PoRep)
// startOffset is added to the pieceOffset to get the starting reader offset.
// The number of bytes that can be read is pieceSize-startOffset
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error)
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
}

Expand Down Expand Up @@ -71,47 +72,99 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
//
// Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers.
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, pieceOffset, startOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (mount.Reader, error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
cancel()
return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err)
return nil, xerrors.Errorf("acquiring read sector lock: %w", err)
}

// Reader returns a reader for an unsealed piece at the given offset in the given sector.
// Reader returns a reader getter for an unsealed piece at the given offset in the given sector.
// The returned reader will be nil if none of the workers has an unsealed sector file containing
// the unsealed piece.
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded()-abi.PaddedPieceSize(startOffset.Padded()))
rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), size.Padded())
if err != nil {
cancel()
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
return nil, err
}
if rg == nil {
cancel()
return nil, nil, err
return nil, nil
}
if r == nil {

buf := make([]byte, fr32.BufSize(size.Padded()))

pr, err := (&pieceReader{
ctx: ctx,
getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) {
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127

r, err := rg(startOffsetAligned.Padded())
if err != nil {
return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err)
}

upr, err := fr32.NewUnpadReaderBuf(r, size.Padded(), buf)
if err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("creating unpadded reader: %w", err)
}

bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}

return struct {
io.Reader
io.Closer
}{
Reader: bir,
Closer: funcCloser(func() error {
return r.Close()
}),
}, nil
},
len: size,
onClose: cancel,
pieceCid: pc,
}).init()
if err != nil || pr == nil { // pr == nil to make sure we don't return typed nil
cancel()
return nil, err
}

return r, cancel, nil
return pr, err
}

type funcCloser func() error

func (f funcCloser) Close() error {
return f()
}

var _ io.Closer = funcCloser(nil)

// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
// If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read.
// Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it.
// If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal,
// the returned boolean parameter will be set to true.
// If we have an existing unsealed file containing the given piece, the returned boolean will be set to false.
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if err := pieceOffset.Valid(); err != nil {
return nil, false, xerrors.Errorf("pieceOffset is not valid: %w", err)
}
if err := size.Validate(); err != nil {
return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err)
}

startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127

r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
r, err := p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)

log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err)

Expand Down Expand Up @@ -142,7 +195,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,

log.Debugf("unsealed a sector file to read the piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)

r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
r, err = p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)
if err != nil {
log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err)
return nil, true, xerrors.Errorf("read after unsealing: %w", err)
Expand All @@ -156,34 +209,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
}

upr, err := fr32.NewUnpadReader(r, size.Padded())
if err != nil {
unlock()
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
}

log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, startOffset=%d, size=%d", sector, pieceOffset, startOffset, size)

bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
return nil, false, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}
log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)

return &funcCloser{
Reader: bir,
close: func() error {
err = r.Close()
unlock()
return err
},
}, uns, nil
return r, uns, nil
}

type funcCloser struct {
io.Reader
close func() error
}

func (fc *funcCloser) Close() error { return fc.close() }
2 changes: 1 addition & 1 deletion extern/sector-storage/piece_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.Unp

func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
expectedHadToUnseal bool, expectedBytes []byte) {
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, 0, size, p.ticket, p.commD)
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
require.NoError(t, err)
require.NotNil(t, rd)
require.Equal(t, expectedHadToUnseal, isUnsealed)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dagstore
package sectorstorage

import (
"bufio"
Expand All @@ -19,11 +19,14 @@ import (
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M
var ReadBuf = 128 * (127 * 8) // unpadded(128k)

type pieceGetter func(ctx context.Context, offset uint64) (io.ReadCloser, error)

type pieceReader struct {
ctx context.Context
api MinerAPI
pieceCid cid.Cid
len abi.UnpaddedPieceSize
ctx context.Context
getReader pieceGetter
pieceCid cid.Cid
len abi.UnpaddedPieceSize
onClose context.CancelFunc

closed bool
seqAt int64 // next byte to be read by io.Reader
Expand All @@ -37,10 +40,14 @@ func (p *pieceReader) init() (_ *pieceReader, err error) {
stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1))

p.rAt = 0
p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
p.r, err = p.getReader(p.ctx, uint64(p.rAt))
if err != nil {
return nil, err
}
if p.r == nil {
return nil, nil
}

p.br = bufio.NewReaderSize(p.r, ReadBuf)

return p, nil
Expand All @@ -60,12 +67,19 @@ func (p *pieceReader) Close() error {
}

if p.r != nil {
if err := p.r.Close(); err != nil {
return err
}
if err := p.r.Close(); err != nil {
return err
}
p.r = nil
}

p.onClose()

p.closed = true

return nil
}

Expand Down Expand Up @@ -127,7 +141,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
}

p.rAt = off
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
p.r, err = p.getReader(p.ctx, uint64(p.rAt))
p.br = bufio.NewReaderSize(p.r, ReadBuf)
if err != nil {
return 0, xerrors.Errorf("getting backing reader: %w", err)
Expand Down
1 change: 0 additions & 1 deletion extern/sector-storage/stores/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (handler *FetchHandler) remoteStatFs(w http.ResponseWriter, r *http.Request
// remoteGetSector returns the sector file/tared directory byte stream for the sectorID and sector file type sent in the request.
// returns an error if it does NOT have the required sector file/dir.
func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) {
log.Infof("SERVE GET %s", r.URL)
vars := mux.Vars(r)

id, err := storiface.ParseSectorID(vars["id"])
Expand Down
Loading

0 comments on commit e05fdf8

Please sign in to comment.