Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move lotus mount, dag store etc from markets to lotus #6793

Merged
merged 7 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/elastic/gosigar v0.12.0
github.com/etclabscore/go-openrpc-reflect v0.0.36
github.com/fatih/color v1.9.0
github.com/filecoin-project/dagstore v0.1.0
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-bitfield v0.2.4
Expand All @@ -34,7 +35,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v1.7.0
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
Expand Down Expand Up @@ -78,6 +79,7 @@ require (
github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.6.4
github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg=
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e h1:mJHQp7htPo04N1IQjnYneUoif1FcsN43KuHvMbeNF7E=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e/go.mod h1:rfdpy6u0CdbknZNxRb5+7t7+yaPAAk4xLLhPaQICr0c=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd h1:5Gg9NyMV/5FauQu497je92yPbu8o2kbnb14eI7wKvBg=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd/go.mod h1:21Kl9Ml8XIueT5o1UIqjk9XX88UKkRqSSh+VmEqT7To=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
Expand Down
174 changes: 174 additions & 0 deletions markets/dagstore/dagstorewrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package dagstore
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"io"
"sync"
"time"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
"github.com/filecoin-project/go-fil-markets/carstore"
"github.com/filecoin-project/go-fil-markets/shared"
)

var log = logging.Logger("dagstore-wrapper")
var gcInterval = 5 * time.Minute
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll probably want this to be configurable via config.yml. I can submit a PR to add a DAG Store configuration. Then that can take the place of MarketsDAGStoreConfig!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good 👍
Suggest we do this as a separate PR


// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store.
type MarketDAGStoreConfig struct {
TransientsDir string
IndexDir string
Datastore ds.Datastore
}

type closableBlockstore struct {
bstore.Blockstore
io.Closer
}

type dagStoreWrapper struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

dagStore *dagstore.DAGStore
mountApi LotusMountAPI
}

var _ shared.DagStoreWrapper = (*dagStoreWrapper)(nil)

func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusMountAPI) (*dagStoreWrapper, error) {
// construct the DAG Store.
registry := mount.NewRegistry()
if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil {
return nil, xerrors.Errorf("failed to create registry: %w", err)
}

failureCh := make(chan dagstore.ShardResult, 1)
dcfg := dagstore.Config{
TransientsDir: cfg.TransientsDir,
IndexDir: cfg.IndexDir,
Datastore: cfg.Datastore,
MountRegistry: registry,
FailureCh: failureCh,
}
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
dagStore, err := dagstore.NewDAGStore(dcfg)
if err != nil {
return nil, xerrors.Errorf("failed to create DAG store: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be constructed through DI likely. We should take a context.Config in the constructor, and then wire it in the node package to the DI context. I can help with that if you need advice on how to do it!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Start and Stop hooks that call a new Start(ctx) method and the existing Close() method respectively

dw := &dagStoreWrapper{
ctx: ctx,
cancel: cancel,

dagStore: dagStore,
mountApi: mountApi,
}

dw.wg.Add(1)
// the dagstore will write Shard failures to the `failureCh` here. Run a go-routine to handle them.
go dw.handleFailures(failureCh)

return dw, nil
}

func (ds *dagStoreWrapper) handleFailures(failureCh chan dagstore.ShardResult) {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
defer ds.wg.Done()
ticker := time.NewTicker(gcInterval)
defer ticker.Stop()

select {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
case <-ticker.C:
_, _ = ds.dagStore.GC(ds.ctx)
case f := <-failureCh:
log.Errorw("shard failed", "shard-key", f.Key.String(), "error", f.Error)
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, nil, dagstore.RecoverOpts{}); err != nil {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err)
}
case <-ds.ctx.Done():
return
}
}

func (ds *dagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) {
key := shard.KeyFromCID(pieceCid)
resch := make(chan dagstore.ShardResult, 1)
err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})

if err != nil {
if xerrors.Unwrap(err) != dagstore.ErrShardUnknown {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
return nil, xerrors.Errorf("failed to schedule acquire shard for piece CID %s: %w", pieceCid, err)
}

// if the DAGStore does not know about the Shard -> register it and then try to acquire it again.
log.Warnw("failed to load shard as shard is not registered, will re-register", "pieceCID", pieceCid)
if err := shared.RegisterShardSync(ctx, ds, pieceCid, "", false); err != nil {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
return nil, xerrors.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err)
}
log.Warnw("successfully re-registered shard", "pieceCID", pieceCid)

resch = make(chan dagstore.ShardResult, 1)
if err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil {
return nil, xerrors.Errorf("failed to acquire Shard for piece CID %s after re-registering: %w", pieceCid, err)
}
}

// TODO: Can I rely on AcquireShard to return an error if the context times out?
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
var res dagstore.ShardResult
select {
case <-ctx.Done():
return nil, ctx.Err()
case res = <-resch:
if res.Error != nil {
return nil, xerrors.Errorf("failed to acquire shard for piece CID %s: %w", pieceCid, res.Error)
}
}

bs, err := res.Accessor.Blockstore()
if err != nil {
return nil, err
}

return &closableBlockstore{Blockstore: NewReadOnlyBlockstore(bs), Closer: res.Accessor}, nil
}

func (ds *dagStoreWrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error {
// Create a lotus mount with the piece CID
key := shard.KeyFromCID(pieceCid)
mt, err := NewLotusMount(pieceCid, ds.mountApi)
if err != nil {
return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err)
}

// Register the shard
opts := dagstore.RegisterOpts{
ExistingTransient: carPath,
LazyInitialization: !eagerInit,
}
err = ds.dagStore.RegisterShard(ctx, key, mt, resch, opts)
if err != nil {
return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err)
}

return nil
}

func (ds *dagStoreWrapper) Close() error {
if err := ds.dagStore.Close(); err != nil {
return err
}

ds.cancel()
ds.wg.Wait()

return nil
}
67 changes: 67 additions & 0 deletions markets/dagstore/mocks/mock_lotus_mount_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

114 changes: 114 additions & 0 deletions markets/dagstore/mount.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package dagstore

import (
"context"
"fmt"
"io"
"net/url"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore/mount"
)

const lotusScheme = "lotus"
const mountURLTemplate = "%s://%s"
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

var _ mount.Mount = (*LotusMount)(nil)

// LotusMount is the Lotus implementation of a Sharded DAG Store Mount.
// A Filecoin Piece is treated as a Shard by this implementation.
type LotusMount struct {
api LotusMountAPI
pieceCid cid.Cid
}

// This method is called when registering a mount with the DAG store registry.
// The DAG store registry receives an instance of the mount (a "template").
// When the registry needs to deserialize a mount it clones the template then
// calls Deserialize on the cloned instance, which will have a reference to the
// lotus mount API supplied here.
func NewLotusMountTemplate(api LotusMountAPI) *LotusMount {
return &LotusMount{api: api}
}

func NewLotusMount(pieceCid cid.Cid, api LotusMountAPI) (*LotusMount, error) {
return &LotusMount{
pieceCid: pieceCid,
api: api,
}, nil
}

func (l *LotusMount) Serialize() *url.URL {
u := fmt.Sprintf(mountURLTemplate, lotusScheme, l.pieceCid.String())
url, err := url.Parse(u)
if err != nil {
// Should never happen
panic(xerrors.Errorf("failed to parse mount URL '%s': %w", u, err))
}
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

return url
}

func (l *LotusMount) Deserialize(u *url.URL) error {
if u.Scheme != lotusScheme {
return xerrors.Errorf("scheme '%s' for URL '%s' does not match required scheme '%s'", u.Scheme, u, lotusScheme)
}
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

pieceCid, err := cid.Decode(u.Host)
if err != nil {
return xerrors.Errorf("failed to parse PieceCid from host '%s': %w", u.Host, err)
}

l.pieceCid = pieceCid
return nil
}

func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) {
r, err := l.api.FetchUnsealedPiece(ctx, l.pieceCid)
if err != nil {
return nil, xerrors.Errorf("failed to fetch unsealed piece %s: %w", l.pieceCid, err)
}
return &readCloser{r}, nil
}

func (l *LotusMount) Info() mount.Info {
return mount.Info{
Kind: mount.KindRemote,
AccessSequential: true,
AccessSeek: false,
AccessRandom: false,
}
}

func (l *LotusMount) Close() error {
return nil
}

func (l *LotusMount) Stat(_ context.Context) (mount.Stat, error) {
size, err := l.api.GetUnpaddedCARSize(l.pieceCid)
if err != nil {
return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.pieceCid, err)
}

// TODO Mark false when storage deal expires.
return mount.Stat{
Exists: true,
Size: int64(size),
}, nil
}

type readCloser struct {
io.ReadCloser
}

var _ mount.Reader = (*readCloser)(nil)

func (r *readCloser) ReadAt(p []byte, off int64) (n int, err error) {
return 0, xerrors.Errorf("ReadAt called but not implemented")
}

func (r *readCloser) Seek(offset int64, whence int) (int64, error) {
return 0, xerrors.Errorf("Seek called but not implemented")
}
Loading