Skip to content

Commit

Permalink
feat(manager): less restrictive storage lock
Browse files Browse the repository at this point in the history
Use initial less restrictive storage lock when trying to read unsealed data before acquiring more
restrictive lock needed for unsealing
  • Loading branch information
hannahhoward committed Sep 15, 2020
1 parent f9ea393 commit 7dc0910
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,25 +203,26 @@ func schedFetch(sector abi.SectorID, ft stores.SectorFileType, ptype stores.Path
}
}

func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (foundUnsealed bool, readOk bool, selector WorkerSelector, returnErr error) {

// acquire a lock purely for reading unsealed sectors
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if err := m.index.StorageLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
if err := m.index.StorageLock(ctx, sector, stores.FTUnsealed, stores.FTNone); err != nil {
returnErr = xerrors.Errorf("acquiring read sector lock: %w", err)
return
}

// passing 0 spt because we only need it when allowFetch is true
best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false)
if err != nil {
return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
returnErr = xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
return
}

var readOk bool
var selector WorkerSelector
if len(best) == 0 { // new
selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing)
} else { // append to existing
foundUnsealed = len(best) > 0
if foundUnsealed { // append to existing
// There is unsealed sector, see if we can read from it

selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false)
Expand All @@ -231,20 +232,35 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
return err
})
if err != nil {
return xerrors.Errorf("reading piece from sealed sector: %w", err)
returnErr = xerrors.Errorf("reading piece from sealed sector: %w", err)
}
} else {
selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing)
}
return
}

if readOk {
return nil
}
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
foundUnsealed, readOk, selector, err := m.tryReadUnsealedPiece(ctx, sink, sector, offset, size)
if err != nil {
return err
}
if readOk {
return nil
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if err := m.index.StorageLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed); err != nil {
return xerrors.Errorf("acquiring unseal sector lock: %w", err)
}

unsealFetch := func(ctx context.Context, worker Worker) error {
if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, stores.PathSealing, stores.AcquireCopy); err != nil {
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
}

if len(best) > 0 {
if foundUnsealed {
if err := worker.Fetch(ctx, sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove); err != nil {
return xerrors.Errorf("copy unsealed sector data: %w", err)
}
Expand Down

0 comments on commit 7dc0910

Please sign in to comment.