Skip to content

Commit

Permalink
Merge pull request #7746 from filecoin-project/feat/faster-retrieval
Browse files Browse the repository at this point in the history
Make retrieval even faster
  • Loading branch information
magik6k authored Dec 10, 2021
2 parents 509aea5 + e8ef39e commit 36f1a8f
Show file tree
Hide file tree
Showing 17 changed files with 281 additions and 136 deletions.
2 changes: 1 addition & 1 deletion extern/sector-storage/fr32/fr32.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
)

var MTTresh = uint64(32 << 20)
var MTTresh = uint64(512 << 10)

func mtChunkCount(usz abi.PaddedPieceSize) uint64 {
threads := (uint64(usz)) / MTTresh
Expand Down
12 changes: 10 additions & 2 deletions extern/sector-storage/fr32/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,21 @@ type unpadReader struct {
work []byte
}

func BufSize(sz abi.PaddedPieceSize) int {
return int(MTTresh * mtChunkCount(sz))
}

func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) {
buf := make([]byte, BufSize(sz))

return NewUnpadReaderBuf(src, sz, buf)
}

func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Reader, error) {
if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err)
}

buf := make([]byte, MTTresh*mtChunkCount(sz))

return &unpadReader{
src: src,

Expand Down
15 changes: 13 additions & 2 deletions extern/sector-storage/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"

"github.com/filecoin-project/dagstore/mount"
ffiwrapper2 "github.com/filecoin-project/go-commp-utils/ffiwrapper"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-state-types/abi"
Expand Down Expand Up @@ -384,12 +385,22 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea
}
}

func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if uint64(offset) != 0 {
panic("implme")
}

return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][startOffset:size])), false, nil
br := bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])

return struct {
io.ReadCloser
io.Seeker
io.ReaderAt
}{
ReadCloser: ioutil.NopCloser(br),
Seeker: br,
ReaderAt: br,
}, false, nil
}

func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) {
Expand Down
110 changes: 68 additions & 42 deletions extern/sector-storage/piece_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"

Expand All @@ -27,7 +28,7 @@ type PieceProvider interface {
// default in most cases, but this might matter with future PoRep)
// startOffset is added to the pieceOffset to get the starting reader offset.
// The number of bytes that can be read is pieceSize-startOffset
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error)
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
}

Expand Down Expand Up @@ -71,47 +72,99 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
//
// Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers.
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, pieceOffset, startOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, pc cid.Cid, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (mount.Reader, error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
cancel()
return nil, nil, xerrors.Errorf("acquiring read sector lock: %w", err)
return nil, xerrors.Errorf("acquiring read sector lock: %w", err)
}

// Reader returns a reader for an unsealed piece at the given offset in the given sector.
// Reader returns a reader getter for an unsealed piece at the given offset in the given sector.
// The returned reader will be nil if none of the workers has an unsealed sector file containing
// the unsealed piece.
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded()-abi.PaddedPieceSize(startOffset.Padded()))
rg, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()), size.Padded())
if err != nil {
cancel()
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
return nil, err
}
if rg == nil {
cancel()
return nil, nil, err
return nil, nil
}
if r == nil {

buf := make([]byte, fr32.BufSize(size.Padded()))

pr, err := (&pieceReader{
ctx: ctx,
getReader: func(ctx context.Context, startOffset uint64) (io.ReadCloser, error) {
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127

r, err := rg(startOffsetAligned.Padded())
if err != nil {
return nil, xerrors.Errorf("getting reader at +%d: %w", startOffsetAligned, err)
}

upr, err := fr32.NewUnpadReaderBuf(r, size.Padded(), buf)
if err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("creating unpadded reader: %w", err)
}

bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
r.Close() // nolint
return nil, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}

return struct {
io.Reader
io.Closer
}{
Reader: bir,
Closer: funcCloser(func() error {
return r.Close()
}),
}, nil
},
len: size,
onClose: cancel,
pieceCid: pc,
}).init()
if err != nil || pr == nil { // pr == nil to make sure we don't return typed nil
cancel()
return nil, err
}

return r, cancel, nil
return pr, err
}

type funcCloser func() error

func (f funcCloser) Close() error {
return f()
}

var _ io.Closer = funcCloser(nil)

// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
// If an Unsealed sector file exists with the Piece Unsealed in it, we'll use that for the read.
// Otherwise, we will Unseal a Sealed sector file for the given sector and read the Unsealed piece from it.
// If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal,
// the returned boolean parameter will be set to true.
// If we have an existing unsealed file containing the given piece, the returned boolean will be set to false.
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if err := pieceOffset.Valid(); err != nil {
return nil, false, xerrors.Errorf("pieceOffset is not valid: %w", err)
}
if err := size.Validate(); err != nil {
return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err)
}

startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127

r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
r, err := p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)

log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err)

Expand Down Expand Up @@ -142,7 +195,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,

log.Debugf("unsealed a sector file to read the piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)

r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
r, err = p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)
if err != nil {
log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err)
return nil, true, xerrors.Errorf("read after unsealing: %w", err)
Expand All @@ -156,34 +209,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
}

upr, err := fr32.NewUnpadReader(r, size.Padded())
if err != nil {
unlock()
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
}

log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, startOffset=%d, size=%d", sector, pieceOffset, startOffset, size)

bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
return nil, false, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}
log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)

return &funcCloser{
Reader: bir,
close: func() error {
err = r.Close()
unlock()
return err
},
}, uns, nil
return r, uns, nil
}

type funcCloser struct {
io.Reader
close func() error
}

func (fc *funcCloser) Close() error { return fc.close() }
2 changes: 1 addition & 1 deletion extern/sector-storage/piece_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.Unp

func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
expectedHadToUnseal bool, expectedBytes []byte) {
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, 0, size, p.ticket, p.commD)
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
require.NoError(t, err)
require.NotNil(t, rd)
require.Equal(t, expectedHadToUnseal, isUnsealed)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dagstore
package sectorstorage

import (
"bufio"
Expand All @@ -19,11 +19,14 @@ import (
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M
var ReadBuf = 128 * (127 * 8) // unpadded(128k)

type pieceGetter func(ctx context.Context, offset uint64) (io.ReadCloser, error)

type pieceReader struct {
ctx context.Context
api MinerAPI
pieceCid cid.Cid
len abi.UnpaddedPieceSize
ctx context.Context
getReader pieceGetter
pieceCid cid.Cid
len abi.UnpaddedPieceSize
onClose context.CancelFunc

closed bool
seqAt int64 // next byte to be read by io.Reader
Expand All @@ -37,10 +40,14 @@ func (p *pieceReader) init() (_ *pieceReader, err error) {
stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1))

p.rAt = 0
p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
p.r, err = p.getReader(p.ctx, uint64(p.rAt))
if err != nil {
return nil, err
}
if p.r == nil {
return nil, nil
}

p.br = bufio.NewReaderSize(p.r, ReadBuf)

return p, nil
Expand All @@ -60,12 +67,19 @@ func (p *pieceReader) Close() error {
}

if p.r != nil {
if err := p.r.Close(); err != nil {
return err
}
if err := p.r.Close(); err != nil {
return err
}
p.r = nil
}

p.onClose()

p.closed = true

return nil
}

Expand Down Expand Up @@ -127,7 +141,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
}

p.rAt = off
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
p.r, err = p.getReader(p.ctx, uint64(p.rAt))
p.br = bufio.NewReaderSize(p.r, ReadBuf)
if err != nil {
return 0, xerrors.Errorf("getting backing reader: %w", err)
Expand Down
1 change: 0 additions & 1 deletion extern/sector-storage/stores/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (handler *FetchHandler) remoteStatFs(w http.ResponseWriter, r *http.Request
// remoteGetSector returns the sector file/tared directory byte stream for the sectorID and sector file type sent in the request.
// returns an error if it does NOT have the required sector file/dir.
func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) {
log.Infof("SERVE GET %s", r.URL)
vars := mux.Vars(r)

id, err := storiface.ParseSectorID(vars["id"])
Expand Down
Loading

0 comments on commit 36f1a8f

Please sign in to comment.