Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move lotus mount, dag store etc from markets to lotus #6793

Merged
merged 7 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/elastic/gosigar v0.12.0
github.com/etclabscore/go-openrpc-reflect v0.0.36
github.com/fatih/color v1.9.0
github.com/filecoin-project/dagstore v0.1.0
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-bitfield v0.2.4
Expand All @@ -34,7 +35,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v1.7.0
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
Expand Down Expand Up @@ -78,6 +79,7 @@ require (
github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.6.4
github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg=
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e h1:mJHQp7htPo04N1IQjnYneUoif1FcsN43KuHvMbeNF7E=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e/go.mod h1:rfdpy6u0CdbknZNxRb5+7t7+yaPAAk4xLLhPaQICr0c=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd h1:5Gg9NyMV/5FauQu497je92yPbu8o2kbnb14eI7wKvBg=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd/go.mod h1:21Kl9Ml8XIueT5o1UIqjk9XX88UKkRqSSh+VmEqT7To=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
Expand Down
Binary file added markets/dagstore/fixtures/sample-rw-bs-v2.car
Binary file not shown.
91 changes: 91 additions & 0 deletions markets/dagstore/lotusaccessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package dagstore

import (
"context"
"io"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
)

type LotusAccessor interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error)
}

type lotusAccessor struct {
pieceStore piecestore.PieceStore
rm retrievalmarket.RetrievalProviderNode
}

var _ LotusAccessor = (*lotusAccessor)(nil)

func NewLotusMountAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) *lotusAccessor {
return &lotusAccessor{
pieceStore: store,
rm: rm,
}
}

func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
if err != nil {
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}

if len(pieceInfo.Deals) == 0 {
return nil, xerrors.Errorf("no storage deals found for Piece %s", pieceCid)
}

// prefer an unsealed sector containing the piece if one exists
for _, deal := range pieceInfo.Deals {
isUnsealed, err := m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil {
log.Warnf("failed to check if deal %d unsealed: %s", deal.DealID, err)
continue
}
if isUnsealed {
// UnsealSector will NOT unseal a sector if we already have an unsealed copy lying around.
reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err == nil {
return reader, nil
}
}
}

lastErr := xerrors.New("no sectors found to unseal from")
// if there is no unsealed sector containing the piece, just read the piece from the first sector we are able to unseal.
for _, deal := range pieceInfo.Deals {
// Note that if the deal data is not already unsealed, unsealing may
// block for a long time with the current PoRep
reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
log.Warn(lastErr.Error())
continue
}

// Successfully fetched the deal data so return a reader over the data
return reader, nil
}

return nil, lastErr
}

func (m *lotusAccessor) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
if err != nil {
return 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}

if len(pieceInfo.Deals) == 0 {
return 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
}

len := pieceInfo.Deals[0].Length

return uint64(len), nil
}
166 changes: 166 additions & 0 deletions markets/dagstore/lotusaccessor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package dagstore

import (
"bytes"
"context"
"io"
"testing"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"

"github.com/filecoin-project/go-fil-markets/piecestore"
piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
)

const unsealedSectorID = abi.SectorNumber(1)
const sealedSectorID = abi.SectorNumber(2)

func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
ctx := context.Background()

cid1, err := cid.Parse("bafkqaaa")
require.NoError(t, err)

unsealedSectorData := "unsealed"
sealedSectorData := "sealed"
mockData := map[abi.SectorNumber]string{
unsealedSectorID: unsealedSectorData,
sealedSectorID: sealedSectorData,
}

testCases := []struct {
name string
deals []abi.SectorNumber
fetchedData string
expectErr bool
}{{
// Expect error if there is no deal info for piece CID
name: "no deals",
expectErr: true,
}, {
// Expect the API to always fetch the unsealed deal (because it's
// cheaper than fetching the sealed deal)
name: "prefer unsealed deal",
deals: []abi.SectorNumber{unsealedSectorID, sealedSectorID},
fetchedData: unsealedSectorData,
}, {
// Expect the API to unseal the data if there are no unsealed deals
name: "unseal if necessary",
deals: []abi.SectorNumber{sealedSectorID},
fetchedData: sealedSectorData,
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ps := getPieceStore(t)
rpn := &mockRPN{
sectors: mockData,
}
api := NewLotusMountAPI(ps, rpn)

// Add deals to piece store
for _, sectorID := range tc.deals {
dealInfo := piecestore.DealInfo{
SectorID: sectorID,
}
err = ps.AddDealForPiece(cid1, dealInfo)
require.NoError(t, err)
}

// Fetch the piece
r, err := api.FetchUnsealedPiece(ctx, cid1)
if tc.expectErr {
require.Error(t, err)
return
}

// Check that the returned reader is for the correct piece
require.NoError(t, err)
bz, err := io.ReadAll(r)
require.NoError(t, err)

require.Equal(t, tc.fetchedData, string(bz))
})
}
}

func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
cid1, err := cid.Parse("bafkqaaa")
require.NoError(t, err)

ps := getPieceStore(t)
rpn := &mockRPN{}
api := NewLotusMountAPI(ps, rpn)

// Add a deal with data Length 10
dealInfo := piecestore.DealInfo{
Length: 10,
}
err = ps.AddDealForPiece(cid1, dealInfo)
require.NoError(t, err)

// Check that the data length is correct
len, err := api.GetUnpaddedCARSize(cid1)
require.NoError(t, err)
require.EqualValues(t, 10, len)
}

func getPieceStore(t *testing.T) piecestore.PieceStore {
ps, err := piecestoreimpl.NewPieceStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)

err = ps.Start(context.Background())
require.NoError(t, err)

ready := make(chan error)
ps.OnReady(func(err error) {
ready <- err
})
err = <-ready
require.NoError(t, err)

return ps
}

type mockRPN struct {
sectors map[abi.SectorNumber]string
}

func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
data, ok := m.sectors[sectorID]
if !ok {
panic("sector not found")
}
return io.NopCloser(bytes.NewBuffer([]byte(data))), nil
}

func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
return sectorID == unsealedSectorID, nil
}

func (m *mockRPN) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
panic("implement me")
}

func (m *mockRPN) GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) {
panic("implement me")
}

func (m *mockRPN) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount, tok shared.TipSetToken) (abi.TokenAmount, error) {
panic("implement me")
}

func (m *mockRPN) GetRetrievalPricingInput(ctx context.Context, pieceCID cid.Cid, storageDeals []abi.DealID) (retrievalmarket.PricingInput, error) {
panic("implement me")
}

var _ retrievalmarket.RetrievalProviderNode = (*mockRPN)(nil)
67 changes: 67 additions & 0 deletions markets/dagstore/mocks/mock_lotus_mount_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading