Skip to content

Commit

Permalink
lotus-miner dagstore initialize-all --include-sealed.
Browse files Browse the repository at this point in the history
Fixes #7010.
  • Loading branch information
raulk committed Aug 9, 2021
1 parent 55e0fc5 commit 892e146
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 13 deletions.
1 change: 1 addition & 0 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ type DagstoreShardResult struct {

type DagstoreInitializeAllParams struct {
MaxConcurrency int
IncludeSealed bool
}

// DagstoreInitializeAllEvent represents an initialization event.
Expand Down
8 changes: 7 additions & 1 deletion cmd/lotus-miner/dagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,21 @@ var dagstoreRecoverShardCmd = &cli.Command{

var dagstoreInitializeAllCmd = &cli.Command{
Name: "initialize-all",
Usage: "Initialize all uninitialized shards, streaming results as they're produced",
Usage: "Initialize all uninitialized shards, streaming results as they're produced; only shards for unsealed pieces are initialized by default",
Flags: []cli.Flag{
&cli.UintFlag{
Name: "concurrency",
Usage: "maximum shards to initialize concurrently at a time; use 0 for unlimited",
Required: true,
},
&cli.BoolFlag{
Name: "include-sealed",
Usage: "initialize sealed pieces as well",
},
},
Action: func(cctx *cli.Context) error {
concurrency := cctx.Uint("concurrency")
sealed := cctx.Bool("sealed")

marketsApi, closer, err := lcli.GetMarketsAPI(cctx)
if err != nil {
Expand All @@ -141,6 +146,7 @@ var dagstoreInitializeAllCmd = &cli.Command{

params := api.DagstoreInitializeAllParams{
MaxConcurrency: int(concurrency),
IncludeSealed: sealed,
}

ch, err := marketsApi.DagstoreInitializeAll(ctx, params)
Expand Down
67 changes: 55 additions & 12 deletions node/impl/storminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ type StorageMinerAPI struct {
RemoteStore *stores.Remote

// Markets
PieceStore dtypes.ProviderPieceStore `optional:"true"`
StorageProvider storagemarket.StorageProvider `optional:"true"`
RetrievalProvider retrievalmarket.RetrievalProvider `optional:"true"`
DataTransfer dtypes.ProviderDataTransfer `optional:"true"`
DealPublisher *storageadapter.DealPublisher `optional:"true"`
SectorBlocks *sectorblocks.SectorBlocks `optional:"true"`
Host host.Host `optional:"true"`
DAGStore *dagstore.DAGStore `optional:"true"`
PieceStore dtypes.ProviderPieceStore `optional:"true"`
StorageProvider storagemarket.StorageProvider `optional:"true"`
RetrievalProvider retrievalmarket.RetrievalProvider `optional:"true"`
RetrievalProviderNode retrievalmarket.RetrievalProviderNode `optional:"true"`
DataTransfer dtypes.ProviderDataTransfer `optional:"true"`
DealPublisher *storageadapter.DealPublisher `optional:"true"`
SectorBlocks *sectorblocks.SectorBlocks `optional:"true"`
Host host.Host `optional:"true"`
DAGStore *dagstore.DAGStore `optional:"true"`

// Miner / storage
Miner *storage.Miner `optional:"true"`
Expand Down Expand Up @@ -629,6 +630,10 @@ func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api
return nil, fmt.Errorf("dagstore not available on this node")
}

if sm.RetrievalProviderNode == nil {
return nil, fmt.Errorf("retrieval provider node not available on this node")
}

// prepare the thottler tokens.
var throttle chan struct{}
if c := params.MaxConcurrency; c > 0 {
Expand All @@ -638,16 +643,54 @@ func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api
}
}

// are we initializing only unsealed pieces?
onlyUnsealed := !params.IncludeSealed

info := sm.DAGStore.AllShardsInfo()
var uninit []string
var toInitialize []string
for k, i := range info {
if i.ShardState != dagstore.ShardStateNew {
continue
}
uninit = append(uninit, k.String())

// if we're initializing only unsealed pieces, check if there's an
// unsealed deal for this piece available.
if onlyUnsealed {
pieceCid, err := cid.Decode(k.String())
if err != nil {
log.Warnw("DagstoreInitializeAll: failed to decode shard key as piece CID; skipping", "shard_key", k.String(), "error", err)
continue
}

pi, err := sm.PieceStore.GetPieceInfo(pieceCid)
if err != nil {
log.Warnw("DagstoreInitializeAll: failed to get piece info; skipping", "piece_cid", pieceCid, "error", err)
continue
}

var isUnsealed bool
for _, d := range pi.Deals {
isUnsealed, err = sm.RetrievalProviderNode.IsUnsealed(ctx, d.SectorID, d.Offset.Unpadded(), d.Length.Unpadded())
if err != nil {
log.Warnw("DagstoreInitializeAll: failed to get unsealed status; skipping deal", "deal_id", d.DealID, "error", err)
continue
}
if isUnsealed {
break
}
}

if !isUnsealed {
log.Infow("DagstoreInitializeAll: skipping piece because it's sealed", "piece_cid", pieceCid, "error", err)
continue
}
}

// yes, we're initializing this shard.
toInitialize = append(toInitialize, k.String())
}

total := len(uninit)
total := len(toInitialize)
if total == 0 {
out := make(chan api.DagstoreInitializeAllEvent)
close(out)
Expand Down Expand Up @@ -675,7 +718,7 @@ func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api
}()

go func() {
for i, k := range uninit {
for i, k := range toInitialize {
select {
case <-throttle:
// acquired a throttle token, proceed.
Expand Down

0 comments on commit 892e146

Please sign in to comment.