Skip to content

Commit

Permalink
Merge pull request #7693 from filecoin-project/feat/randacc-dagstore-…
Browse files Browse the repository at this point in the history
…mount

Make small retrieval 200x faster
  • Loading branch information
magik6k authored Nov 29, 2021
2 parents 160bb0b + 4bcde2f commit 26c3752
Show file tree
Hide file tree
Showing 14 changed files with 285 additions and 105 deletions.
6 changes: 3 additions & 3 deletions extern/sector-storage/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,12 @@ func generateFakePoSt(sectorInfo []proof5.SectorInfo, rpt func(abi.RegisteredSea
}
}

func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if offset != 0 {
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if uint64(offset) != 0 {
panic("implme")
}

return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])), false, nil
return ioutil.NopCloser(bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][startOffset:size])), false, nil
}

func (mgr *SectorMgr) StageFakeData(mid abi.ActorID, spt abi.RegisteredSealProof) (storage.SectorRef, []abi.PieceInfo, error) {
Expand Down
43 changes: 28 additions & 15 deletions extern/sector-storage/piece_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ type Unsealer interface {

type PieceProvider interface {
// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
// pieceOffset + pieceSize specify piece bounds for unsealing (note: with SDR the entire sector will be unsealed by
// default in most cases, but this might matter with future PoRep)
// startOffset is added to the pieceOffset to get the starting reader offset.
// The number of bytes that can be read is pieceSize-startOffset
ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error)
IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
}

Expand Down Expand Up @@ -67,7 +71,7 @@ func (p *pieceProvider) IsUnsealed(ctx context.Context, sector storage.SectorRef
// It will NOT try to schedule an Unseal of a sealed sector file for the read.
//
// Returns a nil reader if the piece does NOT exist in any unsealed file or there is no unsealed file for the given sector on any of the workers.
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage.SectorRef, pieceOffset, startOffset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.ReadCloser, context.CancelFunc, error) {
// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
if err := p.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
Expand All @@ -78,7 +82,7 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage
// Reader returns a reader for an unsealed piece at the given offset in the given sector.
// The returned reader will be nil if none of the workers has an unsealed sector file containing
// the unsealed piece.
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(offset.Padded()), size.Padded())
r, err := p.storage.Reader(ctx, sector, abi.PaddedPieceSize(pieceOffset.Padded()+startOffset.Padded()), size.Padded()-abi.PaddedPieceSize(startOffset.Padded()))
if err != nil {
log.Debugf("did not get storage reader;sector=%+v, err:%s", sector.ID, err)
cancel()
Expand All @@ -97,20 +101,22 @@ func (p *pieceProvider) tryReadUnsealedPiece(ctx context.Context, sector storage
// If we do NOT have an existing unsealed file containing the given piece thus causing us to schedule an Unseal,
// the returned boolean parameter will be set to true.
// If we have an existing unsealed file containing the given piece, the returned boolean will be set to false.
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if err := offset.Valid(); err != nil {
return nil, false, xerrors.Errorf("offset is not valid: %w", err)
func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, startOffset uint64, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (io.ReadCloser, bool, error) {
if err := pieceOffset.Valid(); err != nil {
return nil, false, xerrors.Errorf("pieceOffset is not valid: %w", err)
}
if err := size.Validate(); err != nil {
return nil, false, xerrors.Errorf("size is not a valid piece size: %w", err)
}

r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, offset, size)
startOffsetAligned := storiface.UnpaddedByteIndex(startOffset / 127 * 127) // floor to multiple of 127

r, unlock, err := p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)

log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err)

if xerrors.Is(err, storiface.ErrSectorNotFound) {
log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
err = nil
}
if err != nil {
Expand All @@ -129,14 +135,14 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
if unsealed == cid.Undef {
commd = nil
}
if err := p.uns.SectorsUnsealPiece(ctx, sector, offset, size, ticket, commd); err != nil {
if err := p.uns.SectorsUnsealPiece(ctx, sector, pieceOffset, size, ticket, commd); err != nil {
log.Errorf("failed to SectorsUnsealPiece: %s", err)
return nil, false, xerrors.Errorf("unsealing piece: %w", err)
}

log.Debugf("unsealed a sector file to read the piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
log.Debugf("unsealed a sector file to read the piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)

r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, offset, size)
r, unlock, err = p.tryReadUnsealedPiece(ctx, sector, pieceOffset, startOffsetAligned, size)
if err != nil {
log.Errorf("failed to tryReadUnsealedPiece after SectorsUnsealPiece: %s", err)
return nil, true, xerrors.Errorf("read after unsealing: %w", err)
Expand All @@ -145,9 +151,9 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
log.Errorf("got no reader after unsealing piece")
return nil, true, xerrors.Errorf("got no reader after unsealing piece")
}
log.Debugf("got a reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
log.Debugf("got a reader to read unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
} else {
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, offset=%d, size=%d", sector, offset, size)
log.Debugf("unsealed piece already exists, no need to unseal, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
}

upr, err := fr32.NewUnpadReader(r, size.Padded())
Expand All @@ -156,10 +162,17 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
return nil, uns, xerrors.Errorf("creating unpadded reader: %w", err)
}

log.Debugf("returning reader to read unsealed piece, sector=%+v, offset=%d, size=%d", sector, offset, size)
log.Debugf("returning reader to read unsealed piece, sector=%+v, pieceOffset=%d, startOffset=%d, size=%d", sector, pieceOffset, startOffset, size)

bir := bufio.NewReaderSize(upr, 127)
if startOffset > uint64(startOffsetAligned) {
if _, err := bir.Discard(int(startOffset - uint64(startOffsetAligned))); err != nil {
return nil, false, xerrors.Errorf("discarding bytes for startOffset: %w", err)
}
}

return &funcCloser{
Reader: bufio.NewReaderSize(upr, 127),
Reader: bir,
close: func() error {
err = r.Close()
unlock()
Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/piece_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (p *pieceProviderTestHarness) isUnsealed(t *testing.T, offset storiface.Unp

func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
expectedHadToUnseal bool, expectedBytes []byte) {
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, 0, size, p.ticket, p.commD)
require.NoError(t, err)
require.NotNil(t, rd)
require.Equal(t, expectedHadToUnseal, isUnsealed)
Expand Down
33 changes: 21 additions & 12 deletions markets/dagstore/miner_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"

"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

Expand All @@ -14,23 +15,31 @@ import (
"github.com/filecoin-project/go-fil-markets/shared"
)

//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_lotus_accessor.go -package=mock_dagstore . MinerAPI

type MinerAPI interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error)
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error)
Start(ctx context.Context) error
}

type SectorAccessor interface {
retrievalmarket.SectorAccessor

UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error)
}

type minerAPI struct {
pieceStore piecestore.PieceStore
sa retrievalmarket.SectorAccessor
sa SectorAccessor
throttle throttle.Throttler
readyMgr *shared.ReadyManager
}

var _ MinerAPI = (*minerAPI)(nil)

func NewMinerAPI(store piecestore.PieceStore, sa retrievalmarket.SectorAccessor, concurrency int) MinerAPI {
func NewMinerAPI(store piecestore.PieceStore, sa SectorAccessor, concurrency int) MinerAPI {
return &minerAPI{
pieceStore: store,
sa: sa,
Expand Down Expand Up @@ -91,10 +100,10 @@ func (m *minerAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, erro
return false, nil
}

func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid, offset uint64) (io.ReadCloser, abi.UnpaddedPieceSize, error) {
err := m.readyMgr.AwaitReady()
if err != nil {
return nil, err
return nil, 0, err
}

// Throttle this path to avoid flooding the storage subsystem.
Expand All @@ -105,11 +114,11 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
})

if err != nil {
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
return nil, 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}

if len(pieceInfo.Deals) == 0 {
return nil, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
return nil, 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
}

// prefer an unsealed sector containing the piece if one exists
Expand All @@ -127,7 +136,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
return nil
}
// Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing.
reader, err = m.sa.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
return err
})

Expand All @@ -138,7 +147,7 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io

if reader != nil {
// we were able to obtain a reader for an already unsealed piece
return reader, nil
return reader, deal.Length.Unpadded(), nil
}
}

Expand All @@ -149,18 +158,18 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io
// block for a long time with the current PoRep
//
// This path is unthrottled.
reader, err := m.sa.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), offset, deal.Length.Unpadded())
if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
log.Warn(lastErr.Error())
continue
}

// Successfully fetched the deal data so return a reader over the data
return reader, nil
return reader, deal.Length.Unpadded(), nil
}

return nil, lastErr
return nil, 0, lastErr
}

func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
Expand Down
10 changes: 7 additions & 3 deletions markets/dagstore/miner_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
}

// Fetch the piece
r, err := api.FetchUnsealedPiece(ctx, cid1)
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
if tc.expectErr {
require.Error(t, err)
return
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestThrottle(t *testing.T) {
errgrp, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 10; i++ {
errgrp.Go(func() error {
r, err := api.FetchUnsealedPiece(ctx, cid1)
r, _, err := api.FetchUnsealedPiece(ctx, cid1, 0)
if err == nil {
_ = r.Close()
}
Expand Down Expand Up @@ -203,6 +203,10 @@ type mockRPN struct {
}

func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
return m.UnsealSectorAt(ctx, sectorID, offset, 0, length)
}

func (m *mockRPN) UnsealSectorAt(ctx context.Context, sectorID abi.SectorNumber, pieceOffset abi.UnpaddedPieceSize, startOffset uint64, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
atomic.AddInt32(&m.calls, 1)
m.lk.RLock()
defer m.lk.RUnlock()
Expand All @@ -211,7 +215,7 @@ func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, o
if !ok {
panic("sector not found")
}
return io.NopCloser(bytes.NewBuffer([]byte(data))), nil
return io.NopCloser(bytes.NewBuffer([]byte(data[startOffset:]))), nil
}

func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
Expand Down
Loading

0 comments on commit 26c3752

Please sign in to comment.