-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: move lotus mount, dag store etc from markets to lotus
- Loading branch information
Showing
11 changed files
with
757 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
package dagstore | ||
|
||
import ( | ||
"context" | ||
"io" | ||
"sync" | ||
"time" | ||
|
||
"github.com/ipfs/go-cid" | ||
ds "github.com/ipfs/go-datastore" | ||
bstore "github.com/ipfs/go-ipfs-blockstore" | ||
logging "github.com/ipfs/go-log/v2" | ||
"golang.org/x/xerrors" | ||
|
||
"github.com/filecoin-project/dagstore" | ||
"github.com/filecoin-project/dagstore/mount" | ||
"github.com/filecoin-project/dagstore/shard" | ||
"github.com/filecoin-project/go-fil-markets/carstore" | ||
"github.com/filecoin-project/go-fil-markets/shared" | ||
) | ||
|
||
var log = logging.Logger("dagstore-wrapper") | ||
var gcInterval = 5 * time.Minute | ||
|
||
// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store. | ||
type MarketDAGStoreConfig struct { | ||
TransientsDir string | ||
IndexDir string | ||
Datastore ds.Datastore | ||
} | ||
|
||
type closableBlockstore struct { | ||
bstore.Blockstore | ||
io.Closer | ||
} | ||
|
||
type dagStoreWrapper struct { | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
wg sync.WaitGroup | ||
|
||
dagStore *dagstore.DAGStore | ||
mountApi LotusMountAPI | ||
} | ||
|
||
var _ shared.DagStoreWrapper = (*dagStoreWrapper)(nil) | ||
|
||
func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusMountAPI) (*dagStoreWrapper, error) { | ||
// construct the DAG Store. | ||
registry := mount.NewRegistry() | ||
if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil { | ||
return nil, xerrors.Errorf("failed to create registry: %w", err) | ||
} | ||
|
||
failureCh := make(chan dagstore.ShardResult, 1) | ||
dcfg := dagstore.Config{ | ||
TransientsDir: cfg.TransientsDir, | ||
IndexDir: cfg.IndexDir, | ||
Datastore: cfg.Datastore, | ||
MountRegistry: registry, | ||
FailureCh: failureCh, | ||
} | ||
dagStore, err := dagstore.NewDAGStore(dcfg) | ||
if err != nil { | ||
return nil, xerrors.Errorf("failed to create DAG store: %w", err) | ||
} | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
dw := &dagStoreWrapper{ | ||
ctx: ctx, | ||
cancel: cancel, | ||
|
||
dagStore: dagStore, | ||
mountApi: mountApi, | ||
} | ||
|
||
dw.wg.Add(1) | ||
// the dagstore will write Shard failures to the `failureCh` here. Run a go-routine to handle them. | ||
go dw.handleFailures(failureCh) | ||
|
||
return dw, nil | ||
} | ||
|
||
func (ds *dagStoreWrapper) handleFailures(failureCh chan dagstore.ShardResult) { | ||
defer ds.wg.Done() | ||
ticker := time.NewTicker(gcInterval) | ||
defer ticker.Stop() | ||
|
||
select { | ||
case <-ticker.C: | ||
_, _ = ds.dagStore.GC(ds.ctx) | ||
case f := <-failureCh: | ||
log.Errorw("shard failed", "shard-key", f.Key.String(), "error", f.Error) | ||
if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, nil, dagstore.RecoverOpts{}); err != nil { | ||
log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err) | ||
} | ||
case <-ds.ctx.Done(): | ||
return | ||
} | ||
} | ||
|
||
func (ds *dagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) { | ||
key := shard.KeyFromCID(pieceCid) | ||
resch := make(chan dagstore.ShardResult, 1) | ||
err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}) | ||
|
||
if err != nil { | ||
if xerrors.Unwrap(err) != dagstore.ErrShardUnknown { | ||
return nil, xerrors.Errorf("failed to schedule acquire shard for piece CID %s: %w", pieceCid, err) | ||
} | ||
|
||
// if the DAGStore does not know about the Shard -> register it and then try to acquire it again. | ||
log.Warnw("failed to load shard as shard is not registered, will re-register", "pieceCID", pieceCid) | ||
if err := shared.RegisterShardSync(ctx, ds, pieceCid, "", false); err != nil { | ||
return nil, xerrors.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err) | ||
} | ||
log.Warnw("successfully re-registered shard", "pieceCID", pieceCid) | ||
|
||
resch = make(chan dagstore.ShardResult, 1) | ||
if err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil { | ||
return nil, xerrors.Errorf("failed to acquire Shard for piece CID %s after re-registering: %w", pieceCid, err) | ||
} | ||
} | ||
|
||
// TODO: Can I rely on AcquireShard to return an error if the context times out? | ||
var res dagstore.ShardResult | ||
select { | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
case res = <-resch: | ||
if res.Error != nil { | ||
return nil, xerrors.Errorf("failed to acquire shard for piece CID %s: %w", pieceCid, res.Error) | ||
} | ||
} | ||
|
||
bs, err := res.Accessor.Blockstore() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &closableBlockstore{Blockstore: NewReadOnlyBlockstore(bs), Closer: res.Accessor}, nil | ||
} | ||
|
||
func (ds *dagStoreWrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error { | ||
// Create a lotus mount with the piece CID | ||
key := shard.KeyFromCID(pieceCid) | ||
mt, err := NewLotusMount(pieceCid, ds.mountApi) | ||
if err != nil { | ||
return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err) | ||
} | ||
|
||
// Register the shard | ||
opts := dagstore.RegisterOpts{ | ||
ExistingTransient: carPath, | ||
LazyInitialization: !eagerInit, | ||
} | ||
err = ds.dagStore.RegisterShard(ctx, key, mt, resch, opts) | ||
if err != nil { | ||
return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (ds *dagStoreWrapper) Close() error { | ||
if err := ds.dagStore.Close(); err != nil { | ||
return err | ||
} | ||
|
||
ds.cancel() | ||
ds.wg.Wait() | ||
|
||
return nil | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
package dagstore | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
"net/url" | ||
|
||
"github.com/ipfs/go-cid" | ||
"golang.org/x/xerrors" | ||
|
||
"github.com/filecoin-project/dagstore/mount" | ||
) | ||
|
||
const lotusScheme = "lotus" | ||
const mountURLTemplate = "%s://%s" | ||
|
||
var _ mount.Mount = (*LotusMount)(nil) | ||
|
||
// LotusMount is the Lotus implementation of a Sharded DAG Store Mount. | ||
// A Filecoin Piece is treated as a Shard by this implementation. | ||
type LotusMount struct { | ||
api LotusMountAPI | ||
pieceCid cid.Cid | ||
} | ||
|
||
// This method is called when registering a mount with the DAG store registry. | ||
// The DAG store registry receives an instance of the mount (a "template"). | ||
// When the registry needs to deserialize a mount it clones the template then | ||
// calls Deserialize on the cloned instance, which will have a reference to the | ||
// lotus mount API supplied here. | ||
func NewLotusMountTemplate(api LotusMountAPI) *LotusMount { | ||
return &LotusMount{api: api} | ||
} | ||
|
||
func NewLotusMount(pieceCid cid.Cid, api LotusMountAPI) (*LotusMount, error) { | ||
return &LotusMount{ | ||
pieceCid: pieceCid, | ||
api: api, | ||
}, nil | ||
} | ||
|
||
func (l *LotusMount) Serialize() *url.URL { | ||
u := fmt.Sprintf(mountURLTemplate, lotusScheme, l.pieceCid.String()) | ||
url, err := url.Parse(u) | ||
if err != nil { | ||
// Should never happen | ||
panic(xerrors.Errorf("failed to parse mount URL '%s': %w", u, err)) | ||
} | ||
|
||
return url | ||
} | ||
|
||
func (l *LotusMount) Deserialize(u *url.URL) error { | ||
if u.Scheme != lotusScheme { | ||
return xerrors.Errorf("scheme '%s' for URL '%s' does not match required scheme '%s'", u.Scheme, u, lotusScheme) | ||
} | ||
|
||
pieceCid, err := cid.Decode(u.Host) | ||
if err != nil { | ||
return xerrors.Errorf("failed to parse PieceCid from host '%s': %w", u.Host, err) | ||
} | ||
|
||
l.pieceCid = pieceCid | ||
return nil | ||
} | ||
|
||
func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) { | ||
r, err := l.api.FetchUnsealedPiece(ctx, l.pieceCid) | ||
if err != nil { | ||
return nil, xerrors.Errorf("failed to fetch unsealed piece %s: %w", l.pieceCid, err) | ||
} | ||
return &readCloser{r}, nil | ||
} | ||
|
||
func (l *LotusMount) Info() mount.Info { | ||
return mount.Info{ | ||
Kind: mount.KindRemote, | ||
AccessSequential: true, | ||
AccessSeek: false, | ||
AccessRandom: false, | ||
} | ||
} | ||
|
||
func (l *LotusMount) Close() error { | ||
return nil | ||
} | ||
|
||
func (l *LotusMount) Stat(_ context.Context) (mount.Stat, error) { | ||
size, err := l.api.GetUnpaddedCARSize(l.pieceCid) | ||
if err != nil { | ||
return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.pieceCid, err) | ||
} | ||
|
||
// TODO Mark false when storage deal expires. | ||
return mount.Stat{ | ||
Exists: true, | ||
Size: int64(size), | ||
}, nil | ||
} | ||
|
||
type readCloser struct { | ||
io.ReadCloser | ||
} | ||
|
||
var _ mount.Reader = (*readCloser)(nil) | ||
|
||
func (r *readCloser) ReadAt(p []byte, off int64) (n int, err error) { | ||
return 0, xerrors.Errorf("ReadAt called but not implemented") | ||
} | ||
|
||
func (r *readCloser) Seek(offset int64, whence int) (int64, error) { | ||
return 0, xerrors.Errorf("Seek called but not implemented") | ||
} |
Oops, something went wrong.