diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index fe9bf5d4569..73a5eb51e44 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -203,25 +203,26 @@ func schedFetch(sector abi.SectorID, ft stores.SectorFileType, ptype stores.Path } } -func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error { +func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (foundUnsealed bool, readOk bool, selector WorkerSelector, returnErr error) { + + // acquire a lock purely for reading unsealed sectors ctx, cancel := context.WithCancel(ctx) defer cancel() - if err := m.index.StorageLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed); err != nil { - return xerrors.Errorf("acquiring sector lock: %w", err) + if err := m.index.StorageLock(ctx, sector, stores.FTUnsealed, stores.FTNone); err != nil { + returnErr = xerrors.Errorf("acquiring read sector lock: %w", err) + return } // passing 0 spt because we only need it when allowFetch is true best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false) if err != nil { - return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err) + returnErr = xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err) + return } - var readOk bool - var selector WorkerSelector - if len(best) == 0 { // new - selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing) - } else { // append to existing + foundUnsealed = len(best) > 0 + if foundUnsealed { // append to existing // There is unsealed sector, see if we can read from it selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false) @@ -231,12 +232,27 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect return err }) if err != nil { - return xerrors.Errorf("reading piece from sealed sector: %w", err) + returnErr = xerrors.Errorf("reading piece from sealed sector: %w", err) } + } else { + selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing) + } + return +} - if readOk { - return nil - } +func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error { + foundUnsealed, readOk, selector, err := m.tryReadUnsealedPiece(ctx, sink, sector, offset, size) + if err != nil { + return err + } + if readOk { + return nil + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + if err := m.index.StorageLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed); err != nil { + return xerrors.Errorf("acquiring unseal sector lock: %w", err) } unsealFetch := func(ctx context.Context, worker Worker) error { @@ -244,7 +260,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect return xerrors.Errorf("copy sealed/cache sector data: %w", err) } - if len(best) > 0 { + if foundUnsealed { if err := worker.Fetch(ctx, sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove); err != nil { return xerrors.Errorf("copy unsealed sector data: %w", err) }