Skip to content

Commit

Permalink
refactor: move lotus mount, dag store etc from markets to lotus
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jul 19, 2021
1 parent 905592a commit b6a7a8c
Show file tree
Hide file tree
Showing 11 changed files with 756 additions and 13 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/elastic/gosigar v0.12.0
github.com/etclabscore/go-openrpc-reflect v0.0.36
github.com/fatih/color v1.9.0
github.com/filecoin-project/dagstore v0.1.0
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-bitfield v0.2.4
Expand All @@ -34,7 +35,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v1.7.0
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
Expand Down Expand Up @@ -78,6 +79,7 @@ require (
github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.6.4
github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg=
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e h1:mJHQp7htPo04N1IQjnYneUoif1FcsN43KuHvMbeNF7E=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e/go.mod h1:rfdpy6u0CdbknZNxRb5+7t7+yaPAAk4xLLhPaQICr0c=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd h1:5Gg9NyMV/5FauQu497je92yPbu8o2kbnb14eI7wKvBg=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd/go.mod h1:21Kl9Ml8XIueT5o1UIqjk9XX88UKkRqSSh+VmEqT7To=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
Expand Down
174 changes: 174 additions & 0 deletions markets/dagstore/dagstorewrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package dagstore

import (
"context"
"io"
"sync"
"time"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
"github.com/filecoin-project/go-fil-markets/carstore"
"github.com/filecoin-project/go-fil-markets/shared"
)

var log = logging.Logger("dagstore-wrapper")
var gcInterval = 5 * time.Minute

// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store.
type MarketDAGStoreConfig struct {
TransientsDir string
IndexDir string
Datastore ds.Datastore
}

type closableBlockstore struct {
bstore.Blockstore
io.Closer
}

type dagStoreWrapper struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

dagStore *dagstore.DAGStore
mountApi LotusMountAPI
}

var _ shared.DagStoreWrapper = (*dagStoreWrapper)(nil)

func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusMountAPI) (*dagStoreWrapper, error) {
// construct the DAG Store.
registry := mount.NewRegistry()
if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil {
return nil, xerrors.Errorf("failed to create registry: %w", err)
}

failureCh := make(chan dagstore.ShardResult, 1)
dcfg := dagstore.Config{
TransientsDir: cfg.TransientsDir,
IndexDir: cfg.IndexDir,
Datastore: cfg.Datastore,
MountRegistry: registry,
FailureCh: failureCh,
}
dagStore, err := dagstore.NewDAGStore(dcfg)
if err != nil {
return nil, xerrors.Errorf("failed to create DAG store: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())
dw := &dagStoreWrapper{
ctx: ctx,
cancel: cancel,

dagStore: dagStore,
mountApi: mountApi,
}

dw.wg.Add(1)
// the dagstore will write Shard failures to the `failureCh` here. Run a go-routine to handle them.
go dw.handleFailures(failureCh)

return dw, nil
}

func (ds *dagStoreWrapper) handleFailures(failureCh chan dagstore.ShardResult) {
defer ds.wg.Done()
ticker := time.NewTicker(gcInterval)
defer ticker.Stop()

select {
case <-ticker.C:
_, _ = ds.dagStore.GC(ds.ctx)
case f := <-failureCh:
log.Errorw("shard failed", "shard-key", f.Key.String(), "error", f.Error)
if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, nil, dagstore.RecoverOpts{}); err != nil {
log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err)
}
case <-ds.ctx.Done():
return
}
}

func (ds *dagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) {
key := shard.KeyFromCID(pieceCid)
resch := make(chan dagstore.ShardResult, 1)
err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})

if err != nil {
if xerrors.Unwrap(err) != dagstore.ErrShardUnknown {
return nil, xerrors.Errorf("failed to schedule acquire shard for piece CID %s: %w", pieceCid, err)
}

// if the DAGStore does not know about the Shard -> register it and then try to acquire it again.
log.Warnw("failed to load shard as shard is not registered, will re-register", "pieceCID", pieceCid)
if err := shared.RegisterShardSync(ctx, ds, pieceCid, "", false); err != nil {
return nil, xerrors.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err)
}
log.Warnw("successfully re-registered shard", "pieceCID", pieceCid)

resch = make(chan dagstore.ShardResult, 1)
if err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil {
return nil, xerrors.Errorf("failed to acquire Shard for piece CID %s after re-registering: %w", pieceCid, err)
}
}

// TODO: Can I rely on AcquireShard to return an error if the context times out?
var res dagstore.ShardResult
select {
case <-ctx.Done():
return nil, ctx.Err()
case res = <-resch:
if res.Error != nil {
return nil, xerrors.Errorf("failed to acquire shard for piece CID %s: %w", pieceCid, res.Error)
}
}

bs, err := res.Accessor.Blockstore()
if err != nil {
return nil, err
}

return &closableBlockstore{Blockstore: NewReadOnlyBlockstore(bs), Closer: res.Accessor}, nil
}

func (ds *dagStoreWrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error {
// Create a lotus mount with the piece CID
key := shard.KeyFromCID(pieceCid)
mt, err := NewLotusMount(pieceCid, ds.mountApi)
if err != nil {
return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err)
}

// Register the shard
opts := dagstore.RegisterOpts{
ExistingTransient: carPath,
LazyInitialization: !eagerInit,
}
err = ds.dagStore.RegisterShard(ctx, key, mt, resch, opts)
if err != nil {
return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err)
}

return nil
}

func (ds *dagStoreWrapper) Close() error {
if err := ds.dagStore.Close(); err != nil {
return err
}

ds.cancel()
ds.wg.Wait()

return nil
}
67 changes: 67 additions & 0 deletions markets/dagstore/mocks/mock_lotus_mount_api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

114 changes: 114 additions & 0 deletions markets/dagstore/mount.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package dagstore

import (
"context"
"fmt"
"io"
"net/url"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore/mount"
)

const lotusScheme = "lotus"
const mountURLTemplate = "%s://%s"

var _ mount.Mount = (*LotusMount)(nil)

// LotusMount is the Lotus implementation of a Sharded DAG Store Mount.
// A Filecoin Piece is treated as a Shard by this implementation.
type LotusMount struct {
api LotusMountAPI
pieceCid cid.Cid
}

// This method is called when registering a mount with the DAG store registry.
// The DAG store registry receives an instance of the mount (a "template").
// When the registry needs to deserialize a mount it clones the template then
// calls Deserialize on the cloned instance, which will have a reference to the
// lotus mount API supplied here.
func NewLotusMountTemplate(api LotusMountAPI) *LotusMount {
return &LotusMount{api: api}
}

func NewLotusMount(pieceCid cid.Cid, api LotusMountAPI) (*LotusMount, error) {
return &LotusMount{
pieceCid: pieceCid,
api: api,
}, nil
}

func (l *LotusMount) Serialize() *url.URL {
u := fmt.Sprintf(mountURLTemplate, lotusScheme, l.pieceCid.String())
url, err := url.Parse(u)
if err != nil {
// Should never happen
panic(xerrors.Errorf("failed to parse mount URL '%s': %w", u, err))
}

return url
}

func (l *LotusMount) Deserialize(u *url.URL) error {
if u.Scheme != lotusScheme {
return xerrors.Errorf("scheme '%s' for URL '%s' does not match required scheme '%s'", u.Scheme, u, lotusScheme)
}

pieceCid, err := cid.Decode(u.Host)
if err != nil {
return xerrors.Errorf("failed to parse PieceCid from host '%s': %w", u.Host, err)
}

l.pieceCid = pieceCid
return nil
}

func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) {
r, err := l.api.FetchUnsealedPiece(ctx, l.pieceCid)
if err != nil {
return nil, xerrors.Errorf("failed to fetch unsealed piece %s: %w", l.pieceCid, err)
}
return &readCloser{r}, nil
}

func (l *LotusMount) Info() mount.Info {
return mount.Info{
Kind: mount.KindRemote,
AccessSequential: true,
AccessSeek: false,
AccessRandom: false,
}
}

func (l *LotusMount) Close() error {
return nil
}

func (l *LotusMount) Stat(_ context.Context) (mount.Stat, error) {
size, err := l.api.GetUnpaddedCARSize(l.pieceCid)
if err != nil {
return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.pieceCid, err)
}

// TODO Mark false when storage deal expires.
return mount.Stat{
Exists: true,
Size: int64(size),
}, nil
}

type readCloser struct {
io.ReadCloser
}

var _ mount.Reader = (*readCloser)(nil)

func (r *readCloser) ReadAt(p []byte, off int64) (n int, err error) {
return 0, xerrors.Errorf("ReadAt called but not implemented")
}

func (r *readCloser) Seek(offset int64, whence int) (int64, error) {
return 0, xerrors.Errorf("Seek called but not implemented")
}
Loading

0 comments on commit b6a7a8c

Please sign in to comment.