diff --git a/api/api_storage.go b/api/api_storage.go index 9a5b6ccd1ae..a26080617cc 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -398,6 +398,7 @@ type DagstoreShardResult struct { type DagstoreInitializeAllParams struct { MaxConcurrency int + IncludeSealed bool } // DagstoreInitializeAllEvent represents an initialization event. diff --git a/cmd/lotus-miner/dagstore.go b/cmd/lotus-miner/dagstore.go index 6ebaacc49df..f3a1b9fea06 100644 --- a/cmd/lotus-miner/dagstore.go +++ b/cmd/lotus-miner/dagstore.go @@ -120,16 +120,21 @@ var dagstoreRecoverShardCmd = &cli.Command{ var dagstoreInitializeAllCmd = &cli.Command{ Name: "initialize-all", - Usage: "Initialize all uninitialized shards, streaming results as they're produced", + Usage: "Initialize all uninitialized shards, streaming results as they're produced; only shards for unsealed pieces are initialized by default", Flags: []cli.Flag{ &cli.UintFlag{ Name: "concurrency", Usage: "maximum shards to initialize concurrently at a time; use 0 for unlimited", Required: true, }, + &cli.BoolFlag{ + Name: "include-sealed", + Usage: "initialize sealed pieces as well", + }, }, Action: func(cctx *cli.Context) error { concurrency := cctx.Uint("concurrency") + sealed := cctx.Bool("sealed") marketsApi, closer, err := lcli.GetMarketsAPI(cctx) if err != nil { @@ -141,6 +146,7 @@ var dagstoreInitializeAllCmd = &cli.Command{ params := api.DagstoreInitializeAllParams{ MaxConcurrency: int(concurrency), + IncludeSealed: sealed, } ch, err := marketsApi.DagstoreInitializeAll(ctx, params) diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 81db0198913..88ad554c935 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -65,14 +65,15 @@ type StorageMinerAPI struct { RemoteStore *stores.Remote // Markets - PieceStore dtypes.ProviderPieceStore `optional:"true"` - StorageProvider storagemarket.StorageProvider `optional:"true"` - RetrievalProvider retrievalmarket.RetrievalProvider `optional:"true"` - DataTransfer dtypes.ProviderDataTransfer `optional:"true"` - DealPublisher *storageadapter.DealPublisher `optional:"true"` - SectorBlocks *sectorblocks.SectorBlocks `optional:"true"` - Host host.Host `optional:"true"` - DAGStore *dagstore.DAGStore `optional:"true"` + PieceStore dtypes.ProviderPieceStore `optional:"true"` + StorageProvider storagemarket.StorageProvider `optional:"true"` + RetrievalProvider retrievalmarket.RetrievalProvider `optional:"true"` + RetrievalProviderNode retrievalmarket.RetrievalProviderNode `optional:"true"` + DataTransfer dtypes.ProviderDataTransfer `optional:"true"` + DealPublisher *storageadapter.DealPublisher `optional:"true"` + SectorBlocks *sectorblocks.SectorBlocks `optional:"true"` + Host host.Host `optional:"true"` + DAGStore *dagstore.DAGStore `optional:"true"` // Miner / storage Miner *storage.Miner `optional:"true"` @@ -629,6 +630,10 @@ func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api return nil, fmt.Errorf("dagstore not available on this node") } + if sm.RetrievalProviderNode == nil { + return nil, fmt.Errorf("retrieval provider node not available on this node") + } + // prepare the thottler tokens. var throttle chan struct{} if c := params.MaxConcurrency; c > 0 { @@ -638,16 +643,54 @@ func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api } } + // are we initializing only unsealed pieces? + onlyUnsealed := !params.IncludeSealed + info := sm.DAGStore.AllShardsInfo() - var uninit []string + var toInitialize []string for k, i := range info { if i.ShardState != dagstore.ShardStateNew { continue } - uninit = append(uninit, k.String()) + + // if we're initializing only unsealed pieces, check if there's an + // unsealed deal for this piece available. + if onlyUnsealed { + pieceCid, err := cid.Decode(k.String()) + if err != nil { + log.Warnw("DagstoreInitializeAll: failed to decode shard key as piece CID; skipping", "shard_key", k.String(), "error", err) + continue + } + + pi, err := sm.PieceStore.GetPieceInfo(pieceCid) + if err != nil { + log.Warnw("DagstoreInitializeAll: failed to get piece info; skipping", "piece_cid", pieceCid, "error", err) + continue + } + + var isUnsealed bool + for _, d := range pi.Deals { + isUnsealed, err = sm.RetrievalProviderNode.IsUnsealed(ctx, d.SectorID, d.Offset.Unpadded(), d.Length.Unpadded()) + if err != nil { + log.Warnw("DagstoreInitializeAll: failed to get unsealed status; skipping deal", "deal_id", d.DealID, "error", err) + continue + } + if isUnsealed { + break + } + } + + if !isUnsealed { + log.Infow("DagstoreInitializeAll: skipping piece because it's sealed", "piece_cid", pieceCid, "error", err) + continue + } + } + + // yes, we're initializing this shard. + toInitialize = append(toInitialize, k.String()) } - total := len(uninit) + total := len(toInitialize) if total == 0 { out := make(chan api.DagstoreInitializeAllEvent) close(out) @@ -675,7 +718,7 @@ func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api }() go func() { - for i, k := range uninit { + for i, k := range toInitialize { select { case <-throttle: // acquired a throttle token, proceed.