Skip to content

Commit

Permalink
retrieve by any CID (not just root CID) and reference provider integr…
Browse files Browse the repository at this point in the history
…ation (#629)

* feat: retrieve by any CID (not just root CID)

* feat: fix tests

* fix: TestHandleQueryStream

* fix: integration tests

* Publish indexing records to the Network Indexer using the reference provider (#647)

* reference prov integration

* Apply suggestions from code review

Co-authored-by: dirkmc <[email protected]>

* move provider callback to start

* fix: lint

Co-authored-by: dirkmc <[email protected]>

* fix conflict

* update dagstore and disable flaky

* refactor: use index-provider instead of indexer-reference-provider

* update to latest data-transfer and index-provider

* announce all deals to the indexer

* update go mods

* update deps

* lint: fix imports

* update dagstore to tagged ver

* update provider

* update deps

Co-authored-by: Aarsh Shah <[email protected]>
  • Loading branch information
dirkmc and aarshkshah1992 authored Jan 12, 2022
1 parent eb214b8 commit f77485c
Show file tree
Hide file tree
Showing 22 changed files with 811 additions and 227 deletions.
34 changes: 19 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,62 @@ module github.com/filecoin-project/go-fil-markets
go 1.13

require (
github.com/filecoin-project/dagstore v0.4.3-0.20211211192320-72b849e131d2
github.com/filecoin-project/dagstore v0.5.1
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-commp-utils v0.1.3
github.com/filecoin-project/go-data-transfer v1.12.0
github.com/filecoin-project/go-ds-versioning v0.0.0-20211206185234-508abd7c2aff
github.com/filecoin-project/go-data-transfer v1.12.1
github.com/filecoin-project/go-ds-versioning v0.1.1
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1
github.com/filecoin-project/go-state-types v0.1.1-0.20210506134452-99b279731c48
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/index-provider v0.2.0
github.com/filecoin-project/specs-actors v0.9.13
github.com/filecoin-project/specs-actors/v2 v2.3.5-0.20210114162132-5b58b773f4fb
github.com/filecoin-project/storetheindex v0.2.1
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/hashicorp/go-multierror v1.1.1
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.2.1
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-cidutil v0.0.2
github.com/ipfs/go-datastore v0.5.1
github.com/ipfs/go-filestore v1.1.0
github.com/ipfs/go-graphsync v0.11.0
github.com/ipfs/go-graphsync v0.11.5
github.com/ipfs/go-ipfs-blockstore v1.1.2
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/ipfs/go-ipfs-exchange-offline v0.1.1
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipfs-files v0.0.9
github.com/ipfs/go-ipld-cbor v0.0.5
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.3.0
github.com/ipfs/go-log/v2 v2.4.0
github.com/ipfs/go-merkledag v0.5.1
github.com/ipfs/go-unixfs v0.2.6
github.com/ipfs/go-unixfs v0.3.1
github.com/ipld/go-car v0.3.3-0.20211210032800-e6f244225a16
github.com/ipld/go-car/v2 v2.1.1-0.20211211000942-be2525f6bf2d
github.com/ipld/go-ipld-prime v0.14.3-0.20211207234443-319145880958
github.com/ipld/go-car/v2 v2.1.1
github.com/ipld/go-ipld-prime v0.14.3
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/jpillora/backoff v1.0.0
github.com/libp2p/go-libp2p v0.16.0
github.com/libp2p/go-libp2p-core v0.11.0
github.com/multiformats/go-multiaddr v0.4.0
github.com/libp2p/go-libp2p v0.17.0
github.com/libp2p/go-libp2p-core v0.13.0
github.com/multiformats/go-multiaddr v0.5.0
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61
github.com/multiformats/go-multicodec v0.4.0
github.com/multiformats/go-multihash v0.1.0
github.com/multiformats/go-varint v0.0.6
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9
github.com/stretchr/testify v1.7.0
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11
github.com/whyrusleeping/cbor-gen v0.0.0-20210713220151-be142a5ae1a8
github.com/whyrusleeping/cbor-gen v0.0.0-20211110122933-f57984553008
golang.org/x/exp v0.0.0-20210715201039-d37aa40e8013
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
)

Expand Down
386 changes: 347 additions & 39 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func TestClient_FindProviders(t *testing.T) {
// retrieval deal for the same payload CID with the same peer as an existing
// active deal
func TestClient_DuplicateRetrieve(t *testing.T) {
t.Skip("flaky test")
bgCtx := context.Background()
ctx, cancel := context.WithCancel(bgCtx)
defer cancel()
Expand Down
27 changes: 17 additions & 10 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
Expand All @@ -45,7 +46,7 @@ func TestClientCanMakeQueryToProvider(t *testing.T) {
bgCtx := context.Background()
payChAddr := address.TestAddress

client, expectedCIDs, missingPiece, expectedQR, retrievalPeer, _ := requireSetupTestClientAndProvider(bgCtx, t, payChAddr)
client, expectedCIDs, missingPiece, expectedQR, retrievalPeer, _, pieceStore := requireSetupTestClientAndProvider(bgCtx, t, payChAddr)

t.Run("when piece is found, returns piece and price data", func(t *testing.T) {
expectedQR.Status = retrievalmarket.QueryResponseAvailable
Expand All @@ -69,10 +70,12 @@ func TestClientCanMakeQueryToProvider(t *testing.T) {
})

t.Run("when there is some other error, returns error", func(t *testing.T) {
unknownPiece := tut.GenerateCids(1)[0]
pieceStore.ReturnErrorFromGetPieceInfo(xerrors.Errorf("someerr"))
expectedQR.Status = retrievalmarket.QueryResponseError
expectedQR.Message = "failed to fetch piece to retrieve from: get cid info: GetCIDInfo failed"
actualQR, err := client.Query(bgCtx, retrievalPeer, unknownPiece, retrievalmarket.QueryParams{})
expectedQR.PieceCIDFound = retrievalmarket.QueryItemUnavailable
expectedQR.Size = 0
expectedQR.Message = "failed to fetch piece to retrieve from: could not locate piece: someerr"
actualQR, err := client.Query(bgCtx, retrievalPeer, expectedCIDs[0], retrievalmarket.QueryParams{})
assert.NoError(t, err)
actualQR.MaxPaymentInterval = expectedQR.MaxPaymentInterval
actualQR.MinPricePerByte = expectedQR.MinPricePerByte
Expand All @@ -89,19 +92,22 @@ func TestProvider_Stop(t *testing.T) {
}
bgCtx := context.Background()
payChAddr := address.TestAddress
client, expectedCIDs, _, _, retrievalPeer, provider := requireSetupTestClientAndProvider(bgCtx, t, payChAddr)
client, expectedCIDs, _, _, retrievalPeer, provider, _ := requireSetupTestClientAndProvider(bgCtx, t, payChAddr)
require.NoError(t, provider.Stop())
_, err := client.Query(bgCtx, retrievalPeer, expectedCIDs[0], retrievalmarket.QueryParams{})

assert.EqualError(t, err, "exhausted 5 attempts but failed to open stream, err: protocol not supported")
}

func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChAddr address.Address) (retrievalmarket.RetrievalClient,
func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChAddr address.Address) (
retrievalmarket.RetrievalClient,
[]cid.Cid,
cid.Cid,
retrievalmarket.QueryResponse,
retrievalmarket.RetrievalPeer,
retrievalmarket.RetrievalProvider) {
retrievalmarket.RetrievalProvider,
*tut.TestPieceStore,
) {
testData := tut.NewLibp2pTestData(ctx, t)
nw1 := rmnet.NewFromLibp2pHost(testData.Host1, rmnet.RetryParameters(100*time.Millisecond, 1*time.Second, 5, 5))
cids := tut.GenerateCids(2)
Expand Down Expand Up @@ -130,6 +136,7 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
expectedPieceCIDs := tut.GenerateCids(3)
missingCID := tut.GenerateCids(1)[0]
expectedQR := tut.MakeTestQueryResponse()
dagstoreWrapper := tut.NewMockDagStoreWrapper(pieceStore, sectorAccessor)

pieceStore.ExpectMissingCID(missingCID)
for i, c := range expectedCIDs {
Expand All @@ -140,6 +147,7 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
},
},
})
dagstoreWrapper.AddBlockToPieceIndex(c, expectedPieceCIDs[i])
}
for i, piece := range expectedPieceCIDs {
pieceStore.ExpectPiece(piece, piecestore.PieceInfo{
Expand Down Expand Up @@ -170,8 +178,6 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
return ask, nil
}

// Set up a DAG store
dagstoreWrapper := tut.NewMockDagStoreWrapper(pieceStore, sectorAccessor)
provider, err := retrievalimpl.NewProvider(
paymentAddress, providerNode, sectorAccessor, nw2, pieceStore, dagstoreWrapper, dt2, providerDs,
priceFunc)
Expand All @@ -186,7 +192,7 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA

expectedQR.Size = uint64(abi.PaddedPieceSize(expectedQR.Size).Unpadded())

return client, expectedCIDs, missingCID, expectedQR, retrievalPeer, provider
return client, expectedCIDs, missingCID, expectedQR, retrievalPeer, provider, pieceStore
}

func TestClientCanMakeDealWithProvider(t *testing.T) {
Expand Down Expand Up @@ -711,6 +717,7 @@ func setupProvider(

// Create a DAG store wrapper
dagstoreWrapper := tut.NewMockDagStoreWrapper(pieceStore, sectorAccessor)
dagstoreWrapper.AddBlockToPieceIndex(payloadCID, pieceInfo.PieceCID)

// Register the piece with the DAG store wrapper
err = stores.RegisterShardSync(ctx, dagstoreWrapper, pieceInfo.PieceCID, carFilePath, true)
Expand Down
94 changes: 83 additions & 11 deletions retrievalmarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) {
answer.Size = uint64(pieceInfo.Deals[0].Length.Unpadded()) // TODO: verify on intermediate
answer.PieceCIDFound = retrievalmarket.QueryItemAvailable

storageDeals, err := storageDealsForPiece(query.PieceCID != nil, query.PayloadCID, pieceInfo, p.pieceStore)
storageDeals, err := p.storageDealsForPiece(query.PieceCID != nil, query.PayloadCID, pieceInfo)
if err != nil {
log.Errorf("Retrieval query: storageDealsForPiece: %s", err)
answer.Status = retrievalmarket.QueryResponseError
Expand Down Expand Up @@ -388,48 +388,61 @@ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) {
sendResp(answer)
}

func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, pieceCID cid.Cid) (piecestore.PieceInfo, bool, error) {
cidInfo, err := p.pieceStore.GetCIDInfo(payloadCID)
// Given the CID of a block, find a piece that contains that block.
// If the client has specified which piece they want, return that piece.
// Otherwise prefer pieces that are already unsealed.
func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, clientPieceCID cid.Cid) (piecestore.PieceInfo, bool, error) {
// Get all pieces that contain the target block
piecesWithTargetBlock, err := p.dagStore.GetPiecesContainingBlock(payloadCID)
if err != nil {
return piecestore.PieceInfoUndefined, false, xerrors.Errorf("get cid info: %w", err)
return piecestore.PieceInfoUndefined, false, xerrors.Errorf("getting pieces for cid %s: %w", payloadCID, err)
}

// For each piece that contains the target block
var lastErr error
var sealedPieceInfo *piecestore.PieceInfo

for _, pieceBlockLocation := range cidInfo.PieceBlockLocations {
pieceInfo, err := p.pieceStore.GetPieceInfo(pieceBlockLocation.PieceCID)
for _, pieceWithTargetBlock := range piecesWithTargetBlock {
// Get the deals for the piece
pieceInfo, err := p.pieceStore.GetPieceInfo(pieceWithTargetBlock)
if err != nil {
lastErr = err
continue
}

// if client wants to retrieve the payload from a specific piece, just return that piece.
if pieceCID.Defined() && pieceInfo.PieceCID.Equals(pieceCID) {
if clientPieceCID.Defined() && pieceInfo.PieceCID.Equals(clientPieceCID) {
return pieceInfo, p.pieceInUnsealedSector(ctx, pieceInfo), nil
}

// if client dosen't have a preference for a particular piece, prefer a piece
// if client doesn't have a preference for a particular piece, prefer a piece
// for which an unsealed sector exists.
if pieceCID.Equals(cid.Undef) {
if clientPieceCID.Equals(cid.Undef) {
if p.pieceInUnsealedSector(ctx, pieceInfo) {
// The piece is in an unsealed sector, so just return it
return pieceInfo, true, nil
}

if sealedPieceInfo == nil {
// The piece is not in an unsealed sector, so save it but keep
// checking other pieces to see if there is one that is in an
// unsealed sector
sealedPieceInfo = &pieceInfo
}
}

}

// Found a piece containing the target block, piece is in a sealed sector
if sealedPieceInfo != nil {
return *sealedPieceInfo, false, nil
}

// Couldn't find a piece containing the target block
if lastErr == nil {
lastErr = xerrors.Errorf("unknown pieceCID %s", pieceCID.String())
lastErr = xerrors.Errorf("unknown pieceCID %s", clientPieceCID.String())
}

// Error finding a piece containing the target block
return piecestore.PieceInfoUndefined, false, xerrors.Errorf("could not locate piece: %w", lastErr)
}

Expand All @@ -448,6 +461,65 @@ func (p *Provider) pieceInUnsealedSector(ctx context.Context, pieceInfo piecesto
return false
}

func (p *Provider) storageDealsForPiece(clientSpecificPiece bool, payloadCID cid.Cid, pieceInfo piecestore.PieceInfo) ([]abi.DealID, error) {
var storageDeals []abi.DealID
var err error
if clientSpecificPiece {
// If the user wants to retrieve the payload from a specific piece,
// we only need to inspect storage deals made for that piece to quote a price.
for _, d := range pieceInfo.Deals {
storageDeals = append(storageDeals, d.DealID)
}
} else {
// If the user does NOT want to retrieve from a specific piece, we'll have to inspect all storage deals
// made for that piece to quote a price.
storageDeals, err = p.getAllDealsContainingPayload(payloadCID)
if err != nil {
return nil, xerrors.Errorf("failed to fetch deals for payload: %w", err)
}
}

if len(storageDeals) == 0 {
return nil, xerrors.New("no storage deals found")
}

return storageDeals, nil
}

func (p *Provider) getAllDealsContainingPayload(payloadCID cid.Cid) ([]abi.DealID, error) {
// Get all pieces that contain the target block
piecesWithTargetBlock, err := p.dagStore.GetPiecesContainingBlock(payloadCID)
if err != nil {
return nil, xerrors.Errorf("getting pieces for cid %s: %w", payloadCID, err)
}

// For each piece that contains the target block
var lastErr error
var dealsIds []abi.DealID
for _, pieceWithTargetBlock := range piecesWithTargetBlock {
// Get the deals for the piece
pieceInfo, err := p.pieceStore.GetPieceInfo(pieceWithTargetBlock)
if err != nil {
lastErr = err
continue
}

for _, d := range pieceInfo.Deals {
dealsIds = append(dealsIds, d.DealID)
}
}

if lastErr == nil && len(dealsIds) == 0 {
return nil, xerrors.New("no deals found")
}

if lastErr != nil && len(dealsIds) == 0 {
return nil, xerrors.Errorf("failed to fetch deals containing payload %s: %w", payloadCID, lastErr)
}

return dealsIds, nil
}

// GetDynamicAsk quotes a dynamic price for the retrieval deal by calling the user configured
// dynamic pricing function. It passes the static price parameters set in the Ask Store to the pricing function.
func (p *Provider) GetDynamicAsk(ctx context.Context, input retrievalmarket.PricingInput, storageDeals []abi.DealID) (retrievalmarket.Ask, error) {
Expand Down
Loading

0 comments on commit f77485c

Please sign in to comment.