From 9f1fde56a14cb7499b9d8f94207f4697bda14a7e Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 19 Jul 2021 15:51:46 +0200 Subject: [PATCH] refactor: move lotus mount, dag store etc from markets to lotus --- go.mod | 4 +- go.sum | 4 +- markets/dagstore/dagstorewrapper.go | 174 ++++++++++++++++++ .../dagstore/mocks/mock_lotus_mount_api.go | 67 +++++++ markets/dagstore/mount.go | 114 ++++++++++++ markets/dagstore/mount_api.go | 83 +++++++++ markets/dagstore/mount_api_test.go | 166 +++++++++++++++++ markets/dagstore/mount_test.go | 103 +++++++++++ markets/dagstore/readonlyblockstore.go | 33 ++++ node/builder_miner.go | 5 +- node/modules/storageminer.go | 17 +- 11 files changed, 757 insertions(+), 13 deletions(-) create mode 100644 markets/dagstore/dagstorewrapper.go create mode 100644 markets/dagstore/mocks/mock_lotus_mount_api.go create mode 100644 markets/dagstore/mount.go create mode 100644 markets/dagstore/mount_api.go create mode 100644 markets/dagstore/mount_api_test.go create mode 100644 markets/dagstore/mount_test.go create mode 100644 markets/dagstore/readonlyblockstore.go diff --git a/go.mod b/go.mod index f2d70621159..d8f8ae00b58 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/elastic/gosigar v0.12.0 github.com/etclabscore/go-openrpc-reflect v0.0.36 github.com/fatih/color v1.9.0 + github.com/filecoin-project/dagstore v0.1.0 github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f github.com/filecoin-project/go-address v0.0.5 github.com/filecoin-project/go-bitfield v0.2.4 @@ -34,7 +35,7 @@ require ( github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v1.7.0 github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a - github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e + github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 @@ -78,6 +79,7 @@ require ( github.com/ipfs/go-fs-lock v0.0.6 github.com/ipfs/go-graphsync v0.6.4 github.com/ipfs/go-ipfs-blockstore v1.0.3 + github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-ds-help v1.0.0 github.com/ipfs/go-ipfs-exchange-interface v0.0.1 diff --git a/go.sum b/go.sum index 6a30a306824..e0f0ada7b5e 100644 --- a/go.sum +++ b/go.sum @@ -286,8 +286,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg= github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e h1:mJHQp7htPo04N1IQjnYneUoif1FcsN43KuHvMbeNF7E= -github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e/go.mod h1:rfdpy6u0CdbknZNxRb5+7t7+yaPAAk4xLLhPaQICr0c= +github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd h1:5Gg9NyMV/5FauQu497je92yPbu8o2kbnb14eI7wKvBg= +github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd/go.mod h1:21Kl9Ml8XIueT5o1UIqjk9XX88UKkRqSSh+VmEqT7To= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= diff --git a/markets/dagstore/dagstorewrapper.go b/markets/dagstore/dagstorewrapper.go new file mode 100644 index 00000000000..ee67618b77d --- /dev/null +++ b/markets/dagstore/dagstorewrapper.go @@ -0,0 +1,174 @@ +package dagstore + +import ( + "context" + "io" + "sync" + "time" + + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + bstore "github.com/ipfs/go-ipfs-blockstore" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/dagstore" + "github.com/filecoin-project/dagstore/mount" + "github.com/filecoin-project/dagstore/shard" + "github.com/filecoin-project/go-fil-markets/carstore" + "github.com/filecoin-project/go-fil-markets/shared" +) + +var log = logging.Logger("dagstore-wrapper") +var gcInterval = 5 * time.Minute + +// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store. +type MarketDAGStoreConfig struct { + TransientsDir string + IndexDir string + Datastore ds.Datastore +} + +type closableBlockstore struct { + bstore.Blockstore + io.Closer +} + +type dagStoreWrapper struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + dagStore *dagstore.DAGStore + mountApi LotusMountAPI +} + +var _ shared.DagStoreWrapper = (*dagStoreWrapper)(nil) + +func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusMountAPI) (*dagStoreWrapper, error) { + // construct the DAG Store. + registry := mount.NewRegistry() + if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil { + return nil, xerrors.Errorf("failed to create registry: %w", err) + } + + failureCh := make(chan dagstore.ShardResult, 1) + dcfg := dagstore.Config{ + TransientsDir: cfg.TransientsDir, + IndexDir: cfg.IndexDir, + Datastore: cfg.Datastore, + MountRegistry: registry, + FailureCh: failureCh, + } + dagStore, err := dagstore.NewDAGStore(dcfg) + if err != nil { + return nil, xerrors.Errorf("failed to create DAG store: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + dw := &dagStoreWrapper{ + ctx: ctx, + cancel: cancel, + + dagStore: dagStore, + mountApi: mountApi, + } + + dw.wg.Add(1) + // the dagstore will write Shard failures to the `failureCh` here. Run a go-routine to handle them. + go dw.handleFailures(failureCh) + + return dw, nil +} + +func (ds *dagStoreWrapper) handleFailures(failureCh chan dagstore.ShardResult) { + defer ds.wg.Done() + ticker := time.NewTicker(gcInterval) + defer ticker.Stop() + + select { + case <-ticker.C: + _, _ = ds.dagStore.GC(ds.ctx) + case f := <-failureCh: + log.Errorw("shard failed", "shard-key", f.Key.String(), "error", f.Error) + if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, nil, dagstore.RecoverOpts{}); err != nil { + log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err) + } + case <-ds.ctx.Done(): + return + } +} + +func (ds *dagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) { + key := shard.KeyFromCID(pieceCid) + resch := make(chan dagstore.ShardResult, 1) + err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}) + + if err != nil { + if xerrors.Unwrap(err) != dagstore.ErrShardUnknown { + return nil, xerrors.Errorf("failed to schedule acquire shard for piece CID %s: %w", pieceCid, err) + } + + // if the DAGStore does not know about the Shard -> register it and then try to acquire it again. + log.Warnw("failed to load shard as shard is not registered, will re-register", "pieceCID", pieceCid) + if err := shared.RegisterShardSync(ctx, ds, pieceCid, "", false); err != nil { + return nil, xerrors.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err) + } + log.Warnw("successfully re-registered shard", "pieceCID", pieceCid) + + resch = make(chan dagstore.ShardResult, 1) + if err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil { + return nil, xerrors.Errorf("failed to acquire Shard for piece CID %s after re-registering: %w", pieceCid, err) + } + } + + // TODO: Can I rely on AcquireShard to return an error if the context times out? + var res dagstore.ShardResult + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res = <-resch: + if res.Error != nil { + return nil, xerrors.Errorf("failed to acquire shard for piece CID %s: %w", pieceCid, res.Error) + } + } + + bs, err := res.Accessor.Blockstore() + if err != nil { + return nil, err + } + + return &closableBlockstore{Blockstore: NewReadOnlyBlockstore(bs), Closer: res.Accessor}, nil +} + +func (ds *dagStoreWrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error { + // Create a lotus mount with the piece CID + key := shard.KeyFromCID(pieceCid) + mt, err := NewLotusMount(pieceCid, ds.mountApi) + if err != nil { + return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err) + } + + // Register the shard + opts := dagstore.RegisterOpts{ + ExistingTransient: carPath, + LazyInitialization: !eagerInit, + } + err = ds.dagStore.RegisterShard(ctx, key, mt, resch, opts) + if err != nil { + return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err) + } + + return nil +} + +func (ds *dagStoreWrapper) Close() error { + if err := ds.dagStore.Close(); err != nil { + return err + } + + ds.cancel() + ds.wg.Wait() + + return nil +} diff --git a/markets/dagstore/mocks/mock_lotus_mount_api.go b/markets/dagstore/mocks/mock_lotus_mount_api.go new file mode 100644 index 00000000000..5485e59aee3 --- /dev/null +++ b/markets/dagstore/mocks/mock_lotus_mount_api.go @@ -0,0 +1,67 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: mount_api.go + +// Package mock_dagstore is a generated GoMock package. +package mock_dagstore + +import ( + context "context" + io "io" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + cid "github.com/ipfs/go-cid" +) + +// MockLotusMountAPI is a mock of LotusMountAPI interface. +type MockLotusMountAPI struct { + ctrl *gomock.Controller + recorder *MockLotusMountAPIMockRecorder +} + +// MockLotusMountAPIMockRecorder is the mock recorder for MockLotusMountAPI. +type MockLotusMountAPIMockRecorder struct { + mock *MockLotusMountAPI +} + +// NewMockLotusMountAPI creates a new mock instance. +func NewMockLotusMountAPI(ctrl *gomock.Controller) *MockLotusMountAPI { + mock := &MockLotusMountAPI{ctrl: ctrl} + mock.recorder = &MockLotusMountAPIMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLotusMountAPI) EXPECT() *MockLotusMountAPIMockRecorder { + return m.recorder +} + +// FetchUnsealedPiece mocks base method. +func (m *MockLotusMountAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchUnsealedPiece", ctx, pieceCid) + ret0, _ := ret[0].(io.ReadCloser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchUnsealedPiece indicates an expected call of FetchUnsealedPiece. +func (mr *MockLotusMountAPIMockRecorder) FetchUnsealedPiece(ctx, pieceCid interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockLotusMountAPI)(nil).FetchUnsealedPiece), ctx, pieceCid) +} + +// GetUnpaddedCARSize mocks base method. +func (m *MockLotusMountAPI) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetUnpaddedCARSize", pieceCid) + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetUnpaddedCARSize indicates an expected call of GetUnpaddedCARSize. +func (mr *MockLotusMountAPIMockRecorder) GetUnpaddedCARSize(pieceCid interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockLotusMountAPI)(nil).GetUnpaddedCARSize), pieceCid) +} diff --git a/markets/dagstore/mount.go b/markets/dagstore/mount.go new file mode 100644 index 00000000000..fe5c302fb72 --- /dev/null +++ b/markets/dagstore/mount.go @@ -0,0 +1,114 @@ +package dagstore + +import ( + "context" + "fmt" + "io" + "net/url" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/dagstore/mount" +) + +const lotusScheme = "lotus" +const mountURLTemplate = "%s://%s" + +var _ mount.Mount = (*LotusMount)(nil) + +// LotusMount is the Lotus implementation of a Sharded DAG Store Mount. +// A Filecoin Piece is treated as a Shard by this implementation. +type LotusMount struct { + api LotusMountAPI + pieceCid cid.Cid +} + +// This method is called when registering a mount with the DAG store registry. +// The DAG store registry receives an instance of the mount (a "template"). +// When the registry needs to deserialize a mount it clones the template then +// calls Deserialize on the cloned instance, which will have a reference to the +// lotus mount API supplied here. +func NewLotusMountTemplate(api LotusMountAPI) *LotusMount { + return &LotusMount{api: api} +} + +func NewLotusMount(pieceCid cid.Cid, api LotusMountAPI) (*LotusMount, error) { + return &LotusMount{ + pieceCid: pieceCid, + api: api, + }, nil +} + +func (l *LotusMount) Serialize() *url.URL { + u := fmt.Sprintf(mountURLTemplate, lotusScheme, l.pieceCid.String()) + url, err := url.Parse(u) + if err != nil { + // Should never happen + panic(xerrors.Errorf("failed to parse mount URL '%s': %w", u, err)) + } + + return url +} + +func (l *LotusMount) Deserialize(u *url.URL) error { + if u.Scheme != lotusScheme { + return xerrors.Errorf("scheme '%s' for URL '%s' does not match required scheme '%s'", u.Scheme, u, lotusScheme) + } + + pieceCid, err := cid.Decode(u.Host) + if err != nil { + return xerrors.Errorf("failed to parse PieceCid from host '%s': %w", u.Host, err) + } + + l.pieceCid = pieceCid + return nil +} + +func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) { + r, err := l.api.FetchUnsealedPiece(ctx, l.pieceCid) + if err != nil { + return nil, xerrors.Errorf("failed to fetch unsealed piece %s: %w", l.pieceCid, err) + } + return &readCloser{r}, nil +} + +func (l *LotusMount) Info() mount.Info { + return mount.Info{ + Kind: mount.KindRemote, + AccessSequential: true, + AccessSeek: false, + AccessRandom: false, + } +} + +func (l *LotusMount) Close() error { + return nil +} + +func (l *LotusMount) Stat(_ context.Context) (mount.Stat, error) { + size, err := l.api.GetUnpaddedCARSize(l.pieceCid) + if err != nil { + return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.pieceCid, err) + } + + // TODO Mark false when storage deal expires. + return mount.Stat{ + Exists: true, + Size: int64(size), + }, nil +} + +type readCloser struct { + io.ReadCloser +} + +var _ mount.Reader = (*readCloser)(nil) + +func (r *readCloser) ReadAt(p []byte, off int64) (n int, err error) { + return 0, xerrors.Errorf("ReadAt called but not implemented") +} + +func (r *readCloser) Seek(offset int64, whence int) (int64, error) { + return 0, xerrors.Errorf("Seek called but not implemented") +} diff --git a/markets/dagstore/mount_api.go b/markets/dagstore/mount_api.go new file mode 100644 index 00000000000..9821d28e2a3 --- /dev/null +++ b/markets/dagstore/mount_api.go @@ -0,0 +1,83 @@ +package dagstore + +import ( + "context" + "io" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-fil-markets/piecestore" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" +) + +type LotusMountAPI interface { + FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) + GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) +} + +type lotusMountApiImpl struct { + pieceStore piecestore.PieceStore + rm retrievalmarket.RetrievalProviderNode +} + +var _ LotusMountAPI = (*lotusMountApiImpl)(nil) + +func NewLotusMountAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) *lotusMountApiImpl { + return &lotusMountApiImpl{ + pieceStore: store, + rm: rm, + } +} + +func (m *lotusMountApiImpl) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) { + pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid) + if err != nil { + return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) + } + + if len(pieceInfo.Deals) <= 0 { + return nil, xerrors.Errorf("no storage deals found for Piece %s", pieceCid) + } + + // prefer an unsealed sector containing the piece if one exists + for _, deal := range pieceInfo.Deals { + isUnsealed, err := m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + if err != nil { + continue + } + if isUnsealed { + // UnsealSector will NOT unseal a sector if we already have an unsealed copy lying around. + reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + if err == nil { + return reader, nil + } + } + } + + lastErr := xerrors.New("no sectors found to unseal from") + // if there is no unsealed sector containing the piece, just read the piece from the first sector we are able to unseal. + for _, deal := range pieceInfo.Deals { + reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + if err == nil { + return reader, nil + } + lastErr = err + } + return nil, lastErr +} + +func (m *lotusMountApiImpl) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) { + pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid) + if err != nil { + return 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) + } + + if len(pieceInfo.Deals) == 0 { + return 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid) + } + + len := pieceInfo.Deals[0].Length + + return uint64(len), nil +} diff --git a/markets/dagstore/mount_api_test.go b/markets/dagstore/mount_api_test.go new file mode 100644 index 00000000000..26515578698 --- /dev/null +++ b/markets/dagstore/mount_api_test.go @@ -0,0 +1,166 @@ +package dagstore + +import ( + "bytes" + "context" + "io" + "testing" + + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-actors/actors/builtin/paych" + + "github.com/filecoin-project/go-fil-markets/piecestore" + piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/go-fil-markets/shared" +) + +const unsealedSectorID = abi.SectorNumber(1) +const sealedSectorID = abi.SectorNumber(2) + +func TestLotusMountApiFetchUnsealedPiece(t *testing.T) { + ctx := context.Background() + + cid1, err := cid.Parse("bafkqaaa") + require.NoError(t, err) + + unsealedSectorData := "unsealed" + sealedSectorData := "sealed" + mockData := map[abi.SectorNumber]string{ + unsealedSectorID: unsealedSectorData, + sealedSectorID: sealedSectorData, + } + + testCases := []struct { + name string + deals []abi.SectorNumber + fetchedData string + expectErr bool + }{{ + // Expect error if there is no deal info for piece CID + name: "no deals", + expectErr: true, + }, { + // Expect the API to always fetch the unsealed deal (because it's + // cheaper than fetching the sealed deal) + name: "prefer unsealed deal", + deals: []abi.SectorNumber{unsealedSectorID, sealedSectorID}, + fetchedData: unsealedSectorData, + }, { + // Expect the API to unseal the data if there are no unsealed deals + name: "unseal if necessary", + deals: []abi.SectorNumber{sealedSectorID}, + fetchedData: sealedSectorData, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ps := getPieceStore(t) + rpn := &mockRPN{ + sectors: mockData, + } + api := NewLotusMountAPI(ps, rpn) + + // Add deals to piece store + for _, sectorID := range tc.deals { + dealInfo := piecestore.DealInfo{ + SectorID: sectorID, + } + err = ps.AddDealForPiece(cid1, dealInfo) + require.NoError(t, err) + } + + // Fetch the piece + r, err := api.FetchUnsealedPiece(ctx, cid1) + if tc.expectErr { + require.Error(t, err) + return + } + + // Check that the returned reader is for the correct piece + require.NoError(t, err) + bz, err := io.ReadAll(r) + require.NoError(t, err) + + require.Equal(t, tc.fetchedData, string(bz)) + }) + } +} + +func TestLotusMountApiGetUnpaddedCARSize(t *testing.T) { + cid1, err := cid.Parse("bafkqaaa") + require.NoError(t, err) + + ps := getPieceStore(t) + rpn := &mockRPN{} + api := NewLotusMountAPI(ps, rpn) + + // Add a deal with data Length 10 + dealInfo := piecestore.DealInfo{ + Length: 10, + } + err = ps.AddDealForPiece(cid1, dealInfo) + require.NoError(t, err) + + // Check that the data length is correct + len, err := api.GetUnpaddedCARSize(cid1) + require.NoError(t, err) + require.EqualValues(t, 10, len) +} + +func getPieceStore(t *testing.T) piecestore.PieceStore { + ps, err := piecestoreimpl.NewPieceStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + require.NoError(t, err) + + err = ps.Start(context.Background()) + require.NoError(t, err) + + ready := make(chan error) + ps.OnReady(func(err error) { + ready <- err + }) + err = <-ready + require.NoError(t, err) + + return ps +} + +type mockRPN struct { + sectors map[abi.SectorNumber]string +} + +func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { + data, ok := m.sectors[sectorID] + if !ok { + panic("sector not found") + } + return io.NopCloser(bytes.NewBuffer([]byte(data))), nil +} + +func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) { + return sectorID == unsealedSectorID, nil +} + +func (m *mockRPN) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) { + panic("implement me") +} + +func (m *mockRPN) GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) { + panic("implement me") +} + +func (m *mockRPN) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount, tok shared.TipSetToken) (abi.TokenAmount, error) { + panic("implement me") +} + +func (m *mockRPN) GetRetrievalPricingInput(ctx context.Context, pieceCID cid.Cid, storageDeals []abi.DealID) (retrievalmarket.PricingInput, error) { + panic("implement me") +} + +var _ retrievalmarket.RetrievalProviderNode = (*mockRPN)(nil) diff --git a/markets/dagstore/mount_test.go b/markets/dagstore/mount_test.go new file mode 100644 index 00000000000..66e4e7d5271 --- /dev/null +++ b/markets/dagstore/mount_test.go @@ -0,0 +1,103 @@ +package dagstore + +import ( + "context" + "fmt" + "io/ioutil" + "net/url" + "strings" + "testing" + + "github.com/golang/mock/gomock" + blocksutil "github.com/ipfs/go-ipfs-blocksutil" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/dagstore/mount" + + mock_dagstore "github.com/filecoin-project/lotus/markets/dagstore/mocks" +) + +func TestLotusMount(t *testing.T) { + ctx := context.Background() + bgen := blocksutil.NewBlockGenerator() + cid := bgen.Next().Cid() + + mockCtrl := gomock.NewController(t) + // when test is done, assert expectations on all mock objects. + defer mockCtrl.Finish() + + // create a mock lotus api that returns the reader we want + mockLotusMountAPI := mock_dagstore.NewMockLotusMountAPI(mockCtrl) + mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1) + mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1) + mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(cid).Return(uint64(100), nil).Times(1) + + mnt, err := NewLotusMount(cid, mockLotusMountAPI) + require.NoError(t, err) + info := mnt.Info() + require.Equal(t, info.Kind, mount.KindRemote) + + // fetch and assert success + rd, err := mnt.Fetch(context.Background()) + require.NoError(t, err) + + bz, err := ioutil.ReadAll(rd) + require.NoError(t, err) + require.NoError(t, rd.Close()) + require.Equal(t, []byte("testing"), bz) + + stat, err := mnt.Stat(ctx) + require.NoError(t, err) + require.EqualValues(t, 100, stat.Size) + + // serialize url then deserialize from mount template -> should get back + // the same mount + url := mnt.Serialize() + mnt2 := NewLotusMountTemplate(mockLotusMountAPI) + err = mnt2.Deserialize(url) + require.NoError(t, err) + + // fetching on this mount should get us back the same data. + rd, err = mnt2.Fetch(context.Background()) + require.NoError(t, err) + bz, err = ioutil.ReadAll(rd) + require.NoError(t, err) + require.NoError(t, rd.Close()) + require.Equal(t, []byte("testing"), bz) +} + +func TestLotusMountDeserialize(t *testing.T) { + api := &lotusMountApiImpl{} + + bgen := blocksutil.NewBlockGenerator() + cid := bgen.Next().Cid() + + // success + us := fmt.Sprintf(mountURLTemplate, lotusScheme, cid.String()) + u, err := url.Parse(us) + require.NoError(t, err) + + mnt := NewLotusMountTemplate(api) + err = mnt.Deserialize(u) + require.NoError(t, err) + + require.Equal(t, cid, mnt.pieceCid) + require.Equal(t, api, mnt.api) + + // fails if scheme is not Lotus + us = fmt.Sprintf(mountURLTemplate, "http", cid.String()) + u, err = url.Parse(us) + require.NoError(t, err) + + err = mnt.Deserialize(u) + require.Error(t, err) + require.Contains(t, err.Error(), "does not match") + + // fails if cid is not valid + us = fmt.Sprintf(mountURLTemplate, lotusScheme, "rand") + u, err = url.Parse(us) + require.NoError(t, err) + err = mnt.Deserialize(u) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to parse PieceCid") +} diff --git a/markets/dagstore/readonlyblockstore.go b/markets/dagstore/readonlyblockstore.go new file mode 100644 index 00000000000..b8f19313a7a --- /dev/null +++ b/markets/dagstore/readonlyblockstore.go @@ -0,0 +1,33 @@ +package dagstore + +import ( + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + bstore "github.com/ipfs/go-ipfs-blockstore" + "golang.org/x/xerrors" + + "github.com/filecoin-project/dagstore" +) + +// ReadOnlyBlockstore stubs out Blockstore mutators with methods that error out +type ReadOnlyBlockstore struct { + dagstore.ReadBlockstore +} + +func NewReadOnlyBlockstore(rbs dagstore.ReadBlockstore) bstore.Blockstore { + return ReadOnlyBlockstore{ReadBlockstore: rbs} +} + +func (r ReadOnlyBlockstore) DeleteBlock(c cid.Cid) error { + return xerrors.Errorf("DeleteBlock called but not implemented") +} + +func (r ReadOnlyBlockstore) Put(block blocks.Block) error { + return xerrors.Errorf("Put called but not implemented") +} + +func (r ReadOnlyBlockstore) PutMany(blocks []blocks.Block) error { + return xerrors.Errorf("PutMany called but not implemented") +} + +var _ bstore.Blockstore = (*ReadOnlyBlockstore)(nil) diff --git a/node/builder_miner.go b/node/builder_miner.go index 48efcd54679..a7df6db5aed 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -4,10 +4,11 @@ import ( "errors" "time" + "github.com/filecoin-project/lotus/markets/dagstore" + "go.uber.org/fx" "golang.org/x/xerrors" - mktdagstore "github.com/filecoin-project/go-fil-markets/dagstore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" "github.com/filecoin-project/go-fil-markets/storagemarket" @@ -147,7 +148,7 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)), // DAG Store - Override(new(mktdagstore.DagStoreWrapper), modules.DagStoreWrapper), + Override(new(dagstore.DagStoreWrapper), modules.DagStoreWrapper), // Markets (retrieval) Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode), diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index bb7025e458f..85601d1e3e2 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -32,7 +32,6 @@ import ( dtimpl "github.com/filecoin-project/go-data-transfer/impl" dtnet "github.com/filecoin-project/go-data-transfer/network" dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync" - mktdagstore "github.com/filecoin-project/go-fil-markets/dagstore" piecefilestore "github.com/filecoin-project/go-fil-markets/filestore" piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl" "github.com/filecoin-project/go-fil-markets/retrievalmarket" @@ -66,6 +65,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/markets" + "github.com/filecoin-project/lotus/markets/dagstore" marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/markets/pricing" lotusminer "github.com/filecoin-project/lotus/miner" @@ -580,16 +580,16 @@ func DagStoreWrapper( r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, rpn retrievalmarket.RetrievalProviderNode, -) (mktdagstore.DagStoreWrapper, error) { +) (shared.DagStoreWrapper, error) { dagStoreDir := filepath.Join(r.Path(), "dagstore") dagStoreDS := namespace.Wrap(ds, datastore.NewKey("/dagstore/provider")) - cfg := mktdagstore.MarketDAGStoreConfig{ + cfg := dagstore.MarketDAGStoreConfig{ TransientsDir: filepath.Join(dagStoreDir, "transients"), IndexDir: filepath.Join(dagStoreDir, "index"), Datastore: dagStoreDS, } - mountApi := mktdagstore.NewLotusMountAPI(pieceStore, rpn) - return mktdagstore.NewDagStoreWrapper(cfg, mountApi) + mountApi := dagstore.NewLotusMountAPI(pieceStore, rpn) + return dagstore.NewDagStoreWrapper(cfg, mountApi) } func StorageProvider(minerAddress dtypes.MinerAddress, @@ -600,7 +600,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode, df dtypes.StorageDealFilter, - dagStore mktdagstore.DagStoreWrapper, + dagStore shared.DagStoreWrapper, ) (storagemarket.StorageProvider, error) { net := smnet.NewFromLibp2pHost(h) store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path())) @@ -609,9 +609,10 @@ func StorageProvider(minerAddress dtypes.MinerAddress, } opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df)) + shardMigrator := storageimpl.NewShardMigrator(address.Address(minerAddress), ds, dagStore, pieceStore, spn) return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, dagStore, pieceStore, - dataTransfer, spn, address.Address(minerAddress), storedAsk, opt) + dataTransfer, spn, address.Address(minerAddress), storedAsk, shardMigrator, opt) } func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, @@ -675,7 +676,7 @@ func RetrievalProvider( dt dtypes.ProviderDataTransfer, pricingFnc dtypes.RetrievalPricingFunc, userFilter dtypes.RetrievalDealFilter, - dagStore mktdagstore.DagStoreWrapper, + dagStore shared.DagStoreWrapper, ) (retrievalmarket.RetrievalProvider, error) { opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter)) return retrievalimpl.NewProvider(