From 7b96924f37481970ed8f7f6a75e6503d7e6867ac Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Thu, 18 Apr 2024 16:24:35 +0400 Subject: [PATCH 1/9] break trees task --- cmd/curio/tasks/tasks.go | 5 +- curiosrc/ffi/sdr_funcs.go | 184 ++++++++----- curiosrc/ffi/task_storage.go | 21 +- curiosrc/gc/storage_endpoint_gc.go | 1 + curiosrc/piece/task_park_piece.go | 3 +- curiosrc/seal/poller.go | 43 ++- curiosrc/seal/poller_precommit_msg.go | 2 +- curiosrc/seal/task_movestorage.go | 3 +- curiosrc/seal/task_sdr.go | 3 +- .../seal/{task_trees.go => task_treed.go} | 208 +++++--------- ...{task_trees_test.go => task_treed_test.go} | 0 curiosrc/seal/task_treerc.go | 260 ++++++++++++++++++ lib/harmony/harmonytask/harmonytask.go | 5 + storage/paths/interface.go | 2 +- storage/paths/local.go | 9 +- storage/paths/mocks/store.go | 8 +- storage/paths/remote.go | 4 +- storage/sealer/proofpaths/cachefiles.go | 8 + storage/sealer/worker_local.go | 8 +- 19 files changed, 531 insertions(+), 246 deletions(-) rename curiosrc/seal/{task_trees.go => task_treed.go} (67%) rename curiosrc/seal/{task_trees_test.go => task_treed_test.go} (100%) create mode 100644 curiosrc/seal/task_treerc.go diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index 71923018d9e..fde13c33659 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -114,9 +114,10 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task activeTasks = append(activeTasks, sdrTask) } if cfg.Subsystems.EnableSealSDRTrees { - treesTask := seal.NewTreesTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks) + treeDTask := seal.NewTreeDTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks) + treeRCTask := seal.NewTreeRCTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks) finalizeTask := seal.NewFinalizeTask(cfg.Subsystems.FinalizeMaxTasks, sp, slr, db) - activeTasks = append(activeTasks, treesTask, finalizeTask) + activeTasks = append(activeTasks, treeDTask, treeRCTask, finalizeTask) } if cfg.Subsystems.EnableSendPrecommitMsg { precommitTask := seal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg.Fees.MaxPreCommitGasFee) diff --git a/curiosrc/ffi/sdr_funcs.go b/curiosrc/ffi/sdr_funcs.go index e9ce62831de..5246e440508 100644 --- a/curiosrc/ffi/sdr_funcs.go +++ b/curiosrc/ffi/sdr_funcs.go @@ -61,7 +61,7 @@ type storageProvider struct { } func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask.TaskID, sector storiface.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType) (fspaths, ids storiface.SectorPaths, release func(), err error) { - var paths, storageIDs storiface.SectorPaths + var sectorPaths, storageIDs storiface.SectorPaths var releaseStorage func() var ok bool @@ -77,7 +77,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask log.Debugw("using existing storage reservation", "task", taskID, "sector", sector, "existing", existing, "allocate", allocate) - paths = resv.Paths + sectorPaths = resv.Paths storageIDs = resv.PathIDs releaseStorage = resv.Release @@ -87,7 +87,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask // present locally. Note that we do not care about 'allocate' reqeuests, those files don't exist, and are just // proposed paths with a reservation of space. - _, checkPathIDs, err := l.storage.AcquireSector(ctx, sector, existing, storiface.FTNone, sealing, storiface.AcquireMove, storiface.AcquireInto(storiface.PathsWithIDs{Paths: paths, IDs: storageIDs})) + _, checkPathIDs, err := l.storage.AcquireSector(ctx, sector, existing, storiface.FTNone, sealing, storiface.AcquireMove, storiface.AcquireInto(storiface.PathsWithIDs{Paths: sectorPaths, IDs: storageIDs})) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquire reserved existing files: %w", err) } @@ -101,20 +101,20 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask // No related reservation, acquire storage as usual var err error - paths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove) + sectorPaths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, err } - releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal) + releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal, paths.MaxStorageUtilizationPercentage) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err) } } - log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths) + log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, sectorPaths) - return paths, storageIDs, func() { + return sectorPaths, storageIDs, func() { releaseStorage() for _, fileType := range storiface.PathTypes { @@ -194,13 +194,13 @@ func (sb *SealCalls) ensureOneCopy(ctx context.Context, sid abi.SectorID, pathID return nil } -func (sb *SealCalls) TreeDRC(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool) (scid cid.Cid, ucid cid.Cid, err error) { +func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, unsealed cid.Cid) (scid cid.Cid, ucid cid.Cid, err error) { p1o, err := sb.makePhase1Out(unsealed, sector.ProofType) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err) } - paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) + paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err) } @@ -208,65 +208,13 @@ func (sb *SealCalls) TreeDRC(ctx context.Context, task *harmonytask.TaskID, sect defer func() { if err != nil { - clerr := removeDRCTrees(paths.Cache) + clerr := removeDRCTrees(paths.Cache, "RC") if clerr != nil { log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", paths.Cache) } } }() - treeDUnsealed, err := proof.BuildTreeD(data, unpaddedData, filepath.Join(paths.Cache, proofpaths.TreeDName), size) - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("building tree-d: %w", err) - } - - if treeDUnsealed != unsealed { - return cid.Undef, cid.Undef, xerrors.Errorf("tree-d cid mismatch with supplied unsealed cid") - } - - { - // create sector-sized file at paths.Sealed; PC2 transforms it into a sealed sector in-place - ssize, err := sector.ProofType.SectorSize() - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err) - } - - { - // copy TreeD prefix to sealed sector, SealPreCommitPhase2 will mutate it in place into the sealed sector - - // first try reflink + truncate, that should be way faster - err := reflink.Always(filepath.Join(paths.Cache, proofpaths.TreeDName), paths.Sealed) - if err == nil { - err = os.Truncate(paths.Sealed, int64(ssize)) - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("truncating reflinked sealed file: %w", err) - } - } else { - log.Errorw("reflink treed -> sealed failed, falling back to slow copy, use single scratch btrfs or xfs filesystem", "error", err, "sector", sector, "cache", paths.Cache, "sealed", paths.Sealed) - - // fallback to slow copy, copy ssize bytes from treed to sealed - dst, err := os.OpenFile(paths.Sealed, os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("opening sealed sector file: %w", err) - } - src, err := os.Open(filepath.Join(paths.Cache, proofpaths.TreeDName)) - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("opening treed sector file: %w", err) - } - - _, err = io.CopyN(dst, src, int64(ssize)) - derr := dst.Close() - _ = src.Close() - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("copying treed -> sealed: %w", err) - } - if derr != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("closing sealed file: %w", derr) - } - } - } - } - sl, uns, err := ffi.SealPreCommitPhase2(p1o, paths.Cache, paths.Sealed) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("computing seal proof: %w", err) @@ -283,22 +231,36 @@ func (sb *SealCalls) TreeDRC(ctx context.Context, task *harmonytask.TaskID, sect return sl, uns, nil } -func removeDRCTrees(cache string) error { +func removeDRCTrees(cache string, tree string) error { // list files in cache files, err := os.ReadDir(cache) if err != nil { return xerrors.Errorf("listing cache: %w", err) } - for _, file := range files { - if proofpaths.IsTreeFile(file.Name()) { - err := os.Remove(filepath.Join(cache, file.Name())) - if err != nil { - return xerrors.Errorf("removing tree file: %w", err) + switch tree { + case "D": + for _, file := range files { + if proofpaths.IsTreeDFile(file.Name()) { + err := os.Remove(filepath.Join(cache, file.Name())) + if err != nil { + return xerrors.Errorf("removing tree file: %w", err) + } } } - } + case "RC": + for _, file := range files { + if proofpaths.IsTreeRCFile(file.Name()) { + err := os.Remove(filepath.Join(cache, file.Name())) + if err != nil { + return xerrors.Errorf("removing tree file: %w", err) + } + } + } + default: + return xerrors.Errorf("incorrect input Tree type") + } return nil } @@ -625,3 +587,87 @@ func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.Sec return true, storiface.PathStorage, nil } + +// PreFetch fetches the sector file to local storage before SDR and TreeRC Tasks +func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, task *harmonytask.TaskID) error { + _, _, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing) + if err != nil { + return xerrors.Errorf("acquiring sector paths: %w", err) + } + defer releaseSector() + + return nil +} + +func (sb *SealCalls) TreeD(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool) error { + fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) + if err != nil { + return xerrors.Errorf("acquiring sector paths: %w", err) + } + defer releaseSector() + + defer func() { + if err != nil { + clerr := removeDRCTrees(fspaths.Cache, "D") + if clerr != nil { + log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", fspaths.Cache) + } + } + }() + + treeDUnsealed, err := proof.BuildTreeD(data, unpaddedData, filepath.Join(fspaths.Cache, proofpaths.TreeDName), size) + if err != nil { + return xerrors.Errorf("building tree-d: %w", err) + } + + if treeDUnsealed != unsealed { + return xerrors.Errorf("tree-d cid mismatch with supplied unsealed cid") + } + + // create sector-sized file at paths.Sealed; PC2 transforms it into a sealed sector in-place + ssize, err := sector.ProofType.SectorSize() + if err != nil { + return xerrors.Errorf("getting sector size: %w", err) + } + + { + // copy TreeD prefix to sealed sector, SealPreCommitPhase2 will mutate it in place into the sealed sector + + // first try reflink + truncate, that should be way faster + err := reflink.Always(filepath.Join(fspaths.Cache, proofpaths.TreeDName), fspaths.Sealed) + if err == nil { + err = os.Truncate(fspaths.Sealed, int64(ssize)) + if err != nil { + return xerrors.Errorf("truncating reflinked sealed file: %w", err) + } + } else { + log.Errorw("reflink treed -> sealed failed, falling back to slow copy, use single scratch btrfs or xfs filesystem", "error", err, "sector", sector, "cache", fspaths.Cache, "sealed", fspaths.Sealed) + + // fallback to slow copy, copy ssize bytes from treed to sealed + dst, err := os.OpenFile(fspaths.Sealed, os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return xerrors.Errorf("opening sealed sector file: %w", err) + } + src, err := os.Open(filepath.Join(fspaths.Cache, proofpaths.TreeDName)) + if err != nil { + return xerrors.Errorf("opening treed sector file: %w", err) + } + + _, err = io.CopyN(dst, src, int64(ssize)) + derr := dst.Close() + _ = src.Close() + if err != nil { + return xerrors.Errorf("copying treed -> sealed: %w", err) + } + if derr != nil { + return xerrors.Errorf("closing sealed file: %w", derr) + } + } + } + + if err := sb.ensureOneCopy(ctx, sector.ID, pathIDs, storiface.FTCache|storiface.FTSealed); err != nil { + return xerrors.Errorf("ensure one copy: %w", err) + } + + return nil +} diff --git a/curiosrc/ffi/task_storage.go b/curiosrc/ffi/task_storage.go index f01a472fa8c..b214499d176 100644 --- a/curiosrc/ffi/task_storage.go +++ b/curiosrc/ffi/task_storage.go @@ -43,6 +43,10 @@ type TaskStorage struct { pathType storiface.PathType taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error) + + // Maximum storage utilization percentage beyond which + // storage claim will fail for a task type + maxStoragePercentage int } type ReleaseStorageFunc func() // free storage reservation @@ -56,14 +60,15 @@ type StorageReservation struct { Alloc, Existing storiface.SectorFileType } -func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) *TaskStorage { +func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, maxStoragePercentage int) *TaskStorage { return &TaskStorage{ - sc: sb, - alloc: alloc, - existing: existing, - ssize: ssize, - pathType: pathType, - taskToSectorRef: taskToSectorRef, + sc: sb, + alloc: alloc, + existing: existing, + ssize: ssize, + pathType: pathType, + taskToSectorRef: taskToSectorRef, + maxStoragePercentage: maxStoragePercentage, } } @@ -166,7 +171,7 @@ func (t *TaskStorage) Claim(taskID int) error { } // reserve the space - release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal) + release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal, t.maxStoragePercentage) if err != nil { return err } diff --git a/curiosrc/gc/storage_endpoint_gc.go b/curiosrc/gc/storage_endpoint_gc.go index 8c6e6a0bf30..83542b508cc 100644 --- a/curiosrc/gc/storage_endpoint_gc.go +++ b/curiosrc/gc/storage_endpoint_gc.go @@ -196,6 +196,7 @@ func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool // Remove dead URLs from storage_path entries and handle path cleanup for _, du := range deadURLs { + du := du // Fetch the current URLs for the storage path var currentPath struct { URLs string diff --git a/curiosrc/piece/task_park_piece.go b/curiosrc/piece/task_park_piece.go index 68a94a295a0..e20565d4361 100644 --- a/curiosrc/piece/task_park_piece.go +++ b/curiosrc/piece/task_park_piece.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" "github.com/filecoin-project/lotus/lib/promise" + "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -185,7 +186,7 @@ func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails { Cpu: 1, Gpu: 0, Ram: 64 << 20, - Storage: p.sc.Storage(p.taskToRef, storiface.FTPiece, storiface.FTNone, maxSizePiece, storiface.PathSealing), + Storage: p.sc.Storage(p.taskToRef, storiface.FTPiece, storiface.FTNone, maxSizePiece, storiface.PathSealing, paths.MaxStorageUtilizationPercentage), }, MaxFailures: 10, } diff --git a/curiosrc/seal/poller.go b/curiosrc/seal/poller.go index 568280bdbce..073091f83df 100644 --- a/curiosrc/seal/poller.go +++ b/curiosrc/seal/poller.go @@ -21,7 +21,8 @@ var log = logging.Logger("lpseal") const ( pollerSDR = iota - pollerTrees + pollerTreeD + pollerTreeRC pollerPrecommitMsg pollerPoRep pollerCommitMsg @@ -154,7 +155,8 @@ func (s *SealPoller) poll(ctx context.Context) error { } s.pollStartSDR(ctx, task) - s.pollStartSDRTrees(ctx, task) + s.pollStartSDRTreeD(ctx, task) + s.pollStartSDRTreeRC(ctx, task) s.pollStartPrecommitMsg(ctx, task) s.mustPoll(s.pollPrecommitMsgLanded(ctx, task)) s.pollStartPoRep(ctx, task, ts) @@ -187,14 +189,31 @@ func (t pollTask) afterSDR() bool { return t.AfterSDR } -func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) { - if !task.AfterTreeD && !task.AfterTreeC && !task.AfterTreeR && - task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil && - s.pollers[pollerTrees].IsSet() && task.AfterSDR { +func (s *SealPoller) pollStartSDRTreeD(ctx context.Context, task pollTask) { + if !task.AfterTreeD && task.TaskTreeD == nil && s.pollers[pollerTreeD].IsSet() && task.afterSDR() { + s.pollers[pollerTreeD].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1 WHERE sp_id = $2 AND sector_number = $3 AND after_sdr = TRUE AND task_id_tree_d IS NULL`, id, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } + + return true, nil + }) + } +} + +func (t pollTask) afterTreeD() bool { + return t.AfterTreeD && t.afterSDR() +} - s.pollers[pollerTrees].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1, task_id_tree_c = $1, task_id_tree_r = $1 - WHERE sp_id = $2 AND sector_number = $3 AND after_sdr = TRUE AND task_id_tree_d IS NULL AND task_id_tree_c IS NULL AND task_id_tree_r IS NULL`, id, task.SpID, task.SectorNumber) +func (s *SealPoller) pollStartSDRTreeRC(ctx context.Context, task pollTask) { + if !task.AfterTreeC && !task.AfterTreeR && task.TaskTreeC == nil && task.TaskTreeR == nil && s.pollers[pollerTreeRC].IsSet() && task.afterTreeD() { + s.pollers[pollerTreeRC].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_c = $1, task_id_tree_r = $1 + WHERE sp_id = $2 AND sector_number = $3 AND after_tree_d = TRUE AND task_id_tree_c IS NULL AND task_id_tree_r IS NULL`, id, task.SpID, task.SectorNumber) if err != nil { return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) } @@ -207,12 +226,12 @@ func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) { } } -func (t pollTask) afterTrees() bool { - return t.AfterTreeD && t.AfterTreeC && t.AfterTreeR && t.afterSDR() +func (t pollTask) afterTreeRC() bool { + return t.AfterTreeC && t.AfterTreeR && t.afterTreeD() } func (t pollTask) afterPrecommitMsg() bool { - return t.AfterPrecommitMsg && t.afterTrees() + return t.AfterPrecommitMsg && t.afterTreeRC() } func (t pollTask) afterPrecommitMsgSuccess() bool { diff --git a/curiosrc/seal/poller_precommit_msg.go b/curiosrc/seal/poller_precommit_msg.go index 4372cbb9223..42986499f61 100644 --- a/curiosrc/seal/poller_precommit_msg.go +++ b/curiosrc/seal/poller_precommit_msg.go @@ -16,7 +16,7 @@ import ( ) func (s *SealPoller) pollStartPrecommitMsg(ctx context.Context, task pollTask) { - if task.TaskPrecommitMsg == nil && !task.AfterPrecommitMsg && task.afterTrees() && s.pollers[pollerPrecommitMsg].IsSet() { + if task.TaskPrecommitMsg == nil && !task.AfterPrecommitMsg && task.afterTreeRC() && s.pollers[pollerPrecommitMsg].IsSet() { s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_precommit_msg IS NULL AND after_tree_r = TRUE AND after_tree_d = TRUE`, id, task.SpID, task.SectorNumber) if err != nil { diff --git a/curiosrc/seal/task_movestorage.go b/curiosrc/seal/task_movestorage.go index 6037a390dc7..53054953e68 100644 --- a/curiosrc/seal/task_movestorage.go +++ b/curiosrc/seal/task_movestorage.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -148,7 +149,7 @@ func (m *MoveStorageTask) TypeDetails() harmonytask.TaskTypeDetails { Cpu: 1, Gpu: 0, Ram: 128 << 20, - Storage: m.sc.Storage(m.taskToSector, storiface.FTNone, storiface.FTCache|storiface.FTSealed|storiface.FTUnsealed, ssize, storiface.PathStorage), + Storage: m.sc.Storage(m.taskToSector, storiface.FTNone, storiface.FTCache|storiface.FTSealed|storiface.FTUnsealed, ssize, storiface.PathStorage, paths.MaxStorageUtilizationPercentage), }, MaxFailures: 10, } diff --git a/curiosrc/seal/task_sdr.go b/curiosrc/seal/task_sdr.go index 4c1164e0581..8253ba7248a 100644 --- a/curiosrc/seal/task_sdr.go +++ b/curiosrc/seal/task_sdr.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -204,7 +205,7 @@ func (s *SDRTask) TypeDetails() harmonytask.TaskTypeDetails { Cpu: 4, // todo multicore sdr Gpu: 0, Ram: 54 << 30, - Storage: s.sc.Storage(s.taskToSector, storiface.FTCache, storiface.FTNone, ssize, storiface.PathSealing), + Storage: s.sc.Storage(s.taskToSector, storiface.FTCache, storiface.FTNone, ssize, storiface.PathSealing, paths.MaxStorageUtilizationPercentage), }, MaxFailures: 2, Follows: nil, diff --git a/curiosrc/seal/task_trees.go b/curiosrc/seal/task_treed.go similarity index 67% rename from curiosrc/seal/task_trees.go rename to curiosrc/seal/task_treed.go index 7994c354aad..25962c2a69f 100644 --- a/curiosrc/seal/task_trees.go +++ b/curiosrc/seal/task_treed.go @@ -3,7 +3,6 @@ package seal import ( "context" "io" - "net/http" "net/url" "strconv" @@ -23,7 +22,7 @@ import ( "github.com/filecoin-project/lotus/storage/sealer/storiface" ) -type TreesTask struct { +type TreeDTask struct { sp *SealPoller db *harmonydb.DB sc *ffi.SealCalls @@ -31,8 +30,51 @@ type TreesTask struct { max int } -func NewTreesTask(sp *SealPoller, db *harmonydb.DB, sc *ffi.SealCalls, maxTrees int) *TreesTask { - return &TreesTask{ +func (t *TreeDTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + return &ids[0], nil +} + +func (t *TreeDTask) TypeDetails() harmonytask.TaskTypeDetails { + ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size + if isDevnet { + ssize = abi.SectorSize(2 << 20) + } + + return harmonytask.TaskTypeDetails{ + Max: t.max, + Name: "SDRTreeD", + Cost: resources.Resources{ + Cpu: 1, + Ram: 50 << 20, // todo + Gpu: 0, + Storage: t.sc.Storage(t.taskToSector, storiface.FTSealed, storiface.FTCache, ssize, storiface.PathSealing, 97), + }, + MaxFailures: 3, + Follows: nil, + } +} + +func (t *TreeDTask) taskToSector(id harmonytask.TaskID) (ffi.SectorRef, error) { + var refs []ffi.SectorRef + + err := t.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_tree_r = $1`, id) + if err != nil { + return ffi.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err) + } + + if len(refs) != 1 { + return ffi.SectorRef{}, xerrors.Errorf("expected 1 sector ref, got %d", len(refs)) + } + + return refs[0], nil +} + +func (t *TreeDTask) Adder(taskFunc harmonytask.AddTaskFunc) { + t.sp.pollers[pollerTreeD].Set(taskFunc) +} + +func NewTreeDTask(sp *SealPoller, db *harmonydb.DB, sc *ffi.SealCalls, maxTrees int) *TreeDTask { + return &TreeDTask{ sp: sp, db: db, sc: sc, @@ -41,7 +83,7 @@ func NewTreesTask(sp *SealPoller, db *harmonydb.DB, sc *ffi.SealCalls, maxTrees } } -func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { +func (t *TreeDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { ctx := context.Background() var sectorParamsArr []struct { @@ -63,6 +105,20 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } sectorParams := sectorParamsArr[0] + sref := storiface.SectorRef{ + ID: abi.SectorID{ + Miner: abi.ActorID(sectorParams.SpID), + Number: abi.SectorNumber(sectorParams.SectorNumber), + }, + ProofType: sectorParams.RegSealProof, + } + + // Fetch the Sector to local storage + err = t.sc.PreFetch(ctx, sref, &taskID) + if err != nil { + return false, xerrors.Errorf("failed to prefetch sectors: %w", err) + } + var pieces []struct { PieceIndex int64 `db:"piece_index"` PieceCID string `db:"piece_cid"` @@ -178,149 +234,23 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done unpaddedData = false // nullreader includes fr32 zero bits } - sref := storiface.SectorRef{ - ID: abi.SectorID{ - Miner: abi.ActorID(sectorParams.SpID), - Number: abi.SectorNumber(sectorParams.SectorNumber), - }, - ProofType: sectorParams.RegSealProof, - } - - // D / R / C - sealed, unsealed, err := t.sc.TreeDRC(ctx, &taskID, sref, commd, abi.PaddedPieceSize(ssize), dataReader, unpaddedData) + // Generate Tree D + err = t.sc.TreeD(ctx, &taskID, sref, commd, abi.PaddedPieceSize(ssize), dataReader, unpaddedData) if err != nil { - return false, xerrors.Errorf("computing tree d, r and c: %w", err) + return false, xerrors.Errorf("failed to generate TreeD: %w", err) } - // todo synth porep - - // todo porep challenge check - n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET after_tree_r = true, after_tree_c = true, after_tree_d = true, tree_r_cid = $3, tree_d_cid = $4 - WHERE sp_id = $1 AND sector_number = $2`, - sectorParams.SpID, sectorParams.SectorNumber, sealed, unsealed) + SET after_tree_d = true, tree_d_cid = $3 WHERE sp_id = $1 AND sector_number = $2`, + sectorParams.SpID, sectorParams.SectorNumber, commd) if err != nil { - return false, xerrors.Errorf("store sdr-trees success: updating pipeline: %w", err) + return false, xerrors.Errorf("store sdr-treeD success: updating pipeline: %w", err) } if n != 1 { - return false, xerrors.Errorf("store sdr-trees success: updated %d rows", n) + return false, xerrors.Errorf("store sdr-treeD success: updated %d rows", n) } return true, nil } -func (t *TreesTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { - id := ids[0] - return &id, nil -} - -func (t *TreesTask) TypeDetails() harmonytask.TaskTypeDetails { - ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size - if isDevnet { - ssize = abi.SectorSize(2 << 20) - } - - return harmonytask.TaskTypeDetails{ - Max: t.max, - Name: "SDRTrees", - Cost: resources.Resources{ - Cpu: 1, - Gpu: 1, - Ram: 8000 << 20, // todo - Storage: t.sc.Storage(t.taskToSector, storiface.FTSealed, storiface.FTCache, ssize, storiface.PathSealing), - }, - MaxFailures: 3, - Follows: nil, - } -} - -func (t *TreesTask) Adder(taskFunc harmonytask.AddTaskFunc) { - t.sp.pollers[pollerTrees].Set(taskFunc) -} - -func (t *TreesTask) taskToSector(id harmonytask.TaskID) (ffi.SectorRef, error) { - var refs []ffi.SectorRef - - err := t.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_tree_r = $1`, id) - if err != nil { - return ffi.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err) - } - - if len(refs) != 1 { - return ffi.SectorRef{}, xerrors.Errorf("expected 1 sector ref, got %d", len(refs)) - } - - return refs[0], nil -} - -type UrlPieceReader struct { - Url string - RawSize int64 // the exact number of bytes read, if we read more or less that's an error - - readSoFar int64 - closed bool - active io.ReadCloser // auto-closed on EOF -} - -func (u *UrlPieceReader) Read(p []byte) (n int, err error) { - // Check if we have already read the required amount of data - if u.readSoFar >= u.RawSize { - return 0, io.EOF - } - - // If 'active' is nil, initiate the HTTP request - if u.active == nil { - resp, err := http.Get(u.Url) - if err != nil { - return 0, err - } - - // Set 'active' to the response body - u.active = resp.Body - } - - // Calculate the maximum number of bytes we can read without exceeding RawSize - toRead := u.RawSize - u.readSoFar - if int64(len(p)) > toRead { - p = p[:toRead] - } - - n, err = u.active.Read(p) - - // Update the number of bytes read so far - u.readSoFar += int64(n) - - // If the number of bytes read exceeds RawSize, return an error - if u.readSoFar > u.RawSize { - return n, xerrors.New("read beyond the specified RawSize") - } - - // If EOF is reached, close the reader - if err == io.EOF { - cerr := u.active.Close() - u.closed = true - if cerr != nil { - log.Errorf("error closing http piece reader: %s", cerr) - } - - // if we're below the RawSize, return an unexpected EOF error - if u.readSoFar < u.RawSize { - log.Errorw("unexpected EOF", "readSoFar", u.readSoFar, "rawSize", u.RawSize, "url", u.Url) - return n, io.ErrUnexpectedEOF - } - } - - return n, err -} - -func (u *UrlPieceReader) Close() error { - if !u.closed { - u.closed = true - return u.active.Close() - } - - return nil -} - -var _ harmonytask.TaskInterface = &TreesTask{} +var _ harmonytask.TaskInterface = &TreeDTask{} diff --git a/curiosrc/seal/task_trees_test.go b/curiosrc/seal/task_treed_test.go similarity index 100% rename from curiosrc/seal/task_trees_test.go rename to curiosrc/seal/task_treed_test.go diff --git a/curiosrc/seal/task_treerc.go b/curiosrc/seal/task_treerc.go new file mode 100644 index 00000000000..d519d37a9ef --- /dev/null +++ b/curiosrc/seal/task_treerc.go @@ -0,0 +1,260 @@ +package seal + +import ( + "context" + "io" + "net/http" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/curiosrc/ffi" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +type TreeRCTask struct { + sp *SealPoller + db *harmonydb.DB + sc *ffi.SealCalls + + max int +} + +func NewTreeRCTask(sp *SealPoller, db *harmonydb.DB, sc *ffi.SealCalls, maxTrees int) *TreeRCTask { + return &TreeRCTask{ + sp: sp, + db: db, + sc: sc, + + max: maxTrees, + } +} + +func (t *TreeRCTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + ctx := context.Background() + + var sectorParamsArr []struct { + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + RegSealProof abi.RegisteredSealProof `db:"reg_seal_proof"` + CommD string `db:"tree_d_cid"` + } + + err = t.db.Select(ctx, §orParamsArr, ` + SELECT sp_id, sector_number, reg_seal_proof + FROM sectors_sdr_pipeline + WHERE task_id_tree_c = $1 AND task_id_tree_r = $1`, taskID) + if err != nil { + return false, xerrors.Errorf("getting sector params: %w", err) + } + + if len(sectorParamsArr) != 1 { + return false, xerrors.Errorf("expected 1 sector params, got %d", len(sectorParamsArr)) + } + sectorParams := sectorParamsArr[0] + + commd, err := cid.Parse(sectorParams.CommD) + if err != nil { + return false, xerrors.Errorf("parsing unsealed CID: %w", err) + } + + sref := storiface.SectorRef{ + ID: abi.SectorID{ + Miner: abi.ActorID(sectorParams.SpID), + Number: abi.SectorNumber(sectorParams.SectorNumber), + }, + ProofType: sectorParams.RegSealProof, + } + + // R / C + sealed, _, err := t.sc.TreeRC(ctx, &taskID, sref, commd) + if err != nil { + return false, xerrors.Errorf("computing tree r and c: %w", err) + } + + // todo synth porep + + // todo porep challenge check + + n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline + SET after_tree_r = true, after_tree_c = true, tree_r_cid = $3, WHERE sp_id = $1 AND sector_number = $2`, + sectorParams.SpID, sectorParams.SectorNumber, sealed) + if err != nil { + return false, xerrors.Errorf("store sdr-trees success: updating pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("store sdr-trees success: updated %d rows", n) + } + + return true, nil +} + +func (t *TreeRCTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + var tasks []struct { + TaskID harmonytask.TaskID `db:"task_id_finalize"` + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + StorageID string `db:"storage_id"` + } + + if storiface.FTCache != 4 { + panic("storiface.FTCache != 4") + } + + ctx := context.Background() + + indIDs := make([]int64, len(ids)) + for i, id := range ids { + indIDs[i] = int64(id) + } + + err := t.db.Select(ctx, &tasks, ` + SELECT p.task_id_tree_c, p.sp_id, p.sector_number, l.storage_id FROM sectors_sdr_pipeline p + INNER JOIN sector_location l ON p.sp_id = l.miner_id AND p.sector_number = l.sector_num + WHERE task_id_tree_r = ANY ($1) AND l.sector_filetype = 4 +`, indIDs) + if err != nil { + return nil, xerrors.Errorf("getting tasks: %w", err) + } + + ls, err := t.sc.LocalStorage(ctx) + if err != nil { + return nil, xerrors.Errorf("getting local storage: %w", err) + } + + acceptables := map[harmonytask.TaskID]bool{} + + for _, t := range ids { + acceptables[t] = true + } + + for _, t := range tasks { + if _, ok := acceptables[t.TaskID]; !ok { + continue + } + + for _, l := range ls { + if string(l.ID) == t.StorageID { + return &t.TaskID, nil + } + } + } + + return nil, nil +} + +func (t *TreeRCTask) TypeDetails() harmonytask.TaskTypeDetails { + ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size + if isDevnet { + ssize = abi.SectorSize(2 << 20) + } + + return harmonytask.TaskTypeDetails{ + Max: t.max, + Name: "SDRTrees", + Cost: resources.Resources{ + Cpu: 1, + Gpu: 1, + Ram: 8000 << 20, // todo + Storage: t.sc.Storage(t.taskToSector, storiface.FTNone, storiface.FTCache|storiface.FTSealed, ssize, storiface.PathSealing, paths.MaxStorageUtilizationPercentage), + }, + MaxFailures: 3, + Follows: nil, + } +} + +func (t *TreeRCTask) Adder(taskFunc harmonytask.AddTaskFunc) { + t.sp.pollers[pollerTreeRC].Set(taskFunc) +} + +func (t *TreeRCTask) taskToSector(id harmonytask.TaskID) (ffi.SectorRef, error) { + var refs []ffi.SectorRef + + err := t.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_tree_r = $1`, id) + if err != nil { + return ffi.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err) + } + + if len(refs) != 1 { + return ffi.SectorRef{}, xerrors.Errorf("expected 1 sector ref, got %d", len(refs)) + } + + return refs[0], nil +} + +type UrlPieceReader struct { + Url string + RawSize int64 // the exact number of bytes read, if we read more or less that's an error + + readSoFar int64 + closed bool + active io.ReadCloser // auto-closed on EOF +} + +func (u *UrlPieceReader) Read(p []byte) (n int, err error) { + // Check if we have already read the required amount of data + if u.readSoFar >= u.RawSize { + return 0, io.EOF + } + + // If 'active' is nil, initiate the HTTP request + if u.active == nil { + resp, err := http.Get(u.Url) + if err != nil { + return 0, err + } + + // Set 'active' to the response body + u.active = resp.Body + } + + // Calculate the maximum number of bytes we can read without exceeding RawSize + toRead := u.RawSize - u.readSoFar + if int64(len(p)) > toRead { + p = p[:toRead] + } + + n, err = u.active.Read(p) + + // Update the number of bytes read so far + u.readSoFar += int64(n) + + // If the number of bytes read exceeds RawSize, return an error + if u.readSoFar > u.RawSize { + return n, xerrors.New("read beyond the specified RawSize") + } + + // If EOF is reached, close the reader + if err == io.EOF { + cerr := u.active.Close() + u.closed = true + if cerr != nil { + log.Errorf("error closing http piece reader: %s", cerr) + } + + // if we're below the RawSize, return an unexpected EOF error + if u.readSoFar < u.RawSize { + log.Errorw("unexpected EOF", "readSoFar", u.readSoFar, "rawSize", u.RawSize, "url", u.Url) + return n, io.ErrUnexpectedEOF + } + } + + return n, err +} + +func (u *UrlPieceReader) Close() error { + if !u.closed { + u.closed = true + return u.active.Close() + } + + return nil +} + +var _ harmonytask.TaskInterface = &TreeRCTask{} diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go index 0c66891d0c7..b4b8c08fae1 100644 --- a/lib/harmony/harmonytask/harmonytask.go +++ b/lib/harmony/harmonytask/harmonytask.go @@ -385,3 +385,8 @@ func (e *TaskEngine) ResourcesAvailable() resources.Resources { } return tmp } + +// Resources returns the resources available in the TaskEngine's registry. +func (e *TaskEngine) Resources() resources.Resources { + return e.reg.Resources +} diff --git a/storage/paths/interface.go b/storage/paths/interface.go index d3dce8886d4..61412f4cbfc 100644 --- a/storage/paths/interface.go +++ b/storage/paths/interface.go @@ -47,7 +47,7 @@ type Store interface { FsStat(ctx context.Context, id storiface.ID) (fsutil.FsStat, error) - Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) + Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, maxPercentage int) (func(), error) GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, si storiface.PostSectorChallenge, ppt abi.RegisteredPoStProof) ([]byte, error) GeneratePoRepVanillaProof(ctx context.Context, sr storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error) diff --git a/storage/paths/local.go b/storage/paths/local.go index 006854bbfe0..539cdc6f92d 100644 --- a/storage/paths/local.go +++ b/storage/paths/local.go @@ -36,6 +36,8 @@ type LocalStorage interface { const MetaFile = "sectorstore.json" +const MaxStorageUtilizationPercentage = 100 + type Local struct { localStorage LocalStorage index SectorIndex @@ -460,7 +462,7 @@ func (st *Local) reportStorage(ctx context.Context) { } } -func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (release func(), err error) { +func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, maxPercentage int) (release func(), err error) { ssize, err := sid.ProofType.SectorSize() if err != nil { return nil, err @@ -500,6 +502,7 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif }() for _, fileType := range ft.AllSet() { + fileType := fileType id := storiface.ID(storiface.PathByType(storageIDs, fileType)) p, ok := st.paths[id] @@ -523,6 +526,10 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available)) } + if (uint64(p.reserved+overhead)/p.maxStorage)*100 > uint64(maxPercentage) { + return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), total %d, exceeds the maximum reservation percentage", overhead, p.local, id, p.maxStorage)) + } + resID := sectorFile{sid.ID, fileType} log.Debugw("reserve add", "id", id, "sector", sid, "fileType", fileType, "overhead", overhead, "reserved-before", p.reserved, "reserved-after", p.reserved+overhead) diff --git a/storage/paths/mocks/store.go b/storage/paths/mocks/store.go index 1224e6b571f..3cfba4d54ae 100644 --- a/storage/paths/mocks/store.go +++ b/storage/paths/mocks/store.go @@ -154,16 +154,16 @@ func (mr *MockStoreMockRecorder) RemoveCopies(arg0, arg1, arg2 interface{}) *gom } // Reserve mocks base method. -func (m *MockStore) Reserve(arg0 context.Context, arg1 storiface.SectorRef, arg2 storiface.SectorFileType, arg3 storiface.SectorPaths, arg4 map[storiface.SectorFileType]int) (func(), error) { +func (m *MockStore) Reserve(arg0 context.Context, arg1 storiface.SectorRef, arg2 storiface.SectorFileType, arg3 storiface.SectorPaths, arg4 map[storiface.SectorFileType]int, arg5 int) (func(), error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Reserve", arg0, arg1, arg2, arg3, arg4) + ret := m.ctrl.Call(m, "Reserve", arg0, arg1, arg2, arg3, arg4, arg5) ret0, _ := ret[0].(func()) ret1, _ := ret[1].(error) return ret0, ret1 } // Reserve indicates an expected call of Reserve. -func (mr *MockStoreMockRecorder) Reserve(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { +func (mr *MockStoreMockRecorder) Reserve(arg0, arg1, arg2, arg3, arg4, arg5 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reserve", reflect.TypeOf((*MockStore)(nil).Reserve), arg0, arg1, arg2, arg3, arg4) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reserve", reflect.TypeOf((*MockStore)(nil).Reserve), arg0, arg1, arg2, arg3, arg4, arg5) } diff --git a/storage/paths/remote.go b/storage/paths/remote.go index ab27548632c..d9d9f626a8d 100644 --- a/storage/paths/remote.go +++ b/storage/paths/remote.go @@ -177,7 +177,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, exist // If any path types weren't found in local storage, try fetching them // First reserve storage - releaseStorage, err := r.local.Reserve(ctx, s, toFetch, fetchIDs, overheadTable) + releaseStorage, err := r.local.Reserve(ctx, s, toFetch, fetchIDs, overheadTable, MaxStorageUtilizationPercentage) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err) } @@ -812,7 +812,7 @@ func (r *Remote) ReaderSeq(ctx context.Context, s storiface.SectorRef, ft storif return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound) } -func (r *Remote) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) { +func (r *Remote) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, maxPercentage int) (func(), error) { log.Warnf("reserve called on remote store, sectorID: %v", sid.ID) return func() { diff --git a/storage/sealer/proofpaths/cachefiles.go b/storage/sealer/proofpaths/cachefiles.go index 628ab158565..cbb6839ce95 100644 --- a/storage/sealer/proofpaths/cachefiles.go +++ b/storage/sealer/proofpaths/cachefiles.go @@ -53,3 +53,11 @@ func SDRLayers(spt abi.RegisteredSealProof) (int, error) { return 0, fmt.Errorf("unsupported proof type: %v", spt) } } + +func IsTreeRCFile(baseName string) bool { + return IsFileTreeRLast(baseName) || IsFileTreeC(baseName) +} + +func IsTreeDFile(baseName string) bool { + return IsFileTreeD(baseName) +} diff --git a/storage/sealer/worker_local.go b/storage/sealer/worker_local.go index 417a15e62b1..a7e92e88939 100644 --- a/storage/sealer/worker_local.go +++ b/storage/sealer/worker_local.go @@ -150,19 +150,19 @@ type localWorkerPathProvider struct { } func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) { - paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing, l.op) + spaths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing, l.op) if err != nil { return storiface.SectorPaths{}, nil, err } - releaseStorage, err := l.w.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal) + releaseStorage, err := l.w.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal, paths.MaxStorageUtilizationPercentage) if err != nil { return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err) } - log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths) + log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, spaths) - return paths, func() { + return spaths, func() { releaseStorage() for _, fileType := range storiface.PathTypes { From c3925ad828a110837d2f3044d1eac48f409e0d36 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Thu, 18 Apr 2024 23:19:38 +0400 Subject: [PATCH 2/9] fix TreeD reservation --- curiosrc/ffi/sdr_funcs.go | 4 ++-- curiosrc/ffi/task_storage.go | 23 +++++++++++------------ curiosrc/piece/task_park_piece.go | 2 +- curiosrc/seal/task_movestorage.go | 2 +- curiosrc/seal/task_sdr.go | 2 +- curiosrc/seal/task_treed.go | 13 ++++++++----- curiosrc/seal/task_treerc.go | 9 +++++---- storage/paths/interface.go | 2 +- storage/paths/local.go | 8 ++++---- storage/paths/remote.go | 4 ++-- storage/sealer/worker_local.go | 2 +- 11 files changed, 37 insertions(+), 34 deletions(-) diff --git a/curiosrc/ffi/sdr_funcs.go b/curiosrc/ffi/sdr_funcs.go index 5246e440508..444cf7bdc37 100644 --- a/curiosrc/ffi/sdr_funcs.go +++ b/curiosrc/ffi/sdr_funcs.go @@ -106,7 +106,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, err } - releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal, paths.MaxStorageUtilizationPercentage) + releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal, paths.MinFreeStoragePercentage) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err) } @@ -590,7 +590,7 @@ func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.Sec // PreFetch fetches the sector file to local storage before SDR and TreeRC Tasks func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, task *harmonytask.TaskID) error { - _, _, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing) + _, _, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) if err != nil { return xerrors.Errorf("acquiring sector paths: %w", err) } diff --git a/curiosrc/ffi/task_storage.go b/curiosrc/ffi/task_storage.go index b214499d176..3ebec97c949 100644 --- a/curiosrc/ffi/task_storage.go +++ b/curiosrc/ffi/task_storage.go @@ -44,9 +44,8 @@ type TaskStorage struct { taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error) - // Maximum storage utilization percentage beyond which - // storage claim will fail for a task type - maxStoragePercentage int + // Minimum free storage percentage cutoff for reservation rejection + MinFreeStoragePercentage int } type ReleaseStorageFunc func() // free storage reservation @@ -60,15 +59,15 @@ type StorageReservation struct { Alloc, Existing storiface.SectorFileType } -func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, maxStoragePercentage int) *TaskStorage { +func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, MinFreeStoragePercentage int) *TaskStorage { return &TaskStorage{ - sc: sb, - alloc: alloc, - existing: existing, - ssize: ssize, - pathType: pathType, - taskToSectorRef: taskToSectorRef, - maxStoragePercentage: maxStoragePercentage, + sc: sb, + alloc: alloc, + existing: existing, + ssize: ssize, + pathType: pathType, + taskToSectorRef: taskToSectorRef, + MinFreeStoragePercentage: MinFreeStoragePercentage, } } @@ -171,7 +170,7 @@ func (t *TaskStorage) Claim(taskID int) error { } // reserve the space - release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal, t.maxStoragePercentage) + release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal, t.MinFreeStoragePercentage) if err != nil { return err } diff --git a/curiosrc/piece/task_park_piece.go b/curiosrc/piece/task_park_piece.go index e20565d4361..18ebcdef849 100644 --- a/curiosrc/piece/task_park_piece.go +++ b/curiosrc/piece/task_park_piece.go @@ -186,7 +186,7 @@ func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails { Cpu: 1, Gpu: 0, Ram: 64 << 20, - Storage: p.sc.Storage(p.taskToRef, storiface.FTPiece, storiface.FTNone, maxSizePiece, storiface.PathSealing, paths.MaxStorageUtilizationPercentage), + Storage: p.sc.Storage(p.taskToRef, storiface.FTPiece, storiface.FTNone, maxSizePiece, storiface.PathSealing, paths.MinFreeStoragePercentage), }, MaxFailures: 10, } diff --git a/curiosrc/seal/task_movestorage.go b/curiosrc/seal/task_movestorage.go index 53054953e68..dab89958236 100644 --- a/curiosrc/seal/task_movestorage.go +++ b/curiosrc/seal/task_movestorage.go @@ -149,7 +149,7 @@ func (m *MoveStorageTask) TypeDetails() harmonytask.TaskTypeDetails { Cpu: 1, Gpu: 0, Ram: 128 << 20, - Storage: m.sc.Storage(m.taskToSector, storiface.FTNone, storiface.FTCache|storiface.FTSealed|storiface.FTUnsealed, ssize, storiface.PathStorage, paths.MaxStorageUtilizationPercentage), + Storage: m.sc.Storage(m.taskToSector, storiface.FTNone, storiface.FTCache|storiface.FTSealed|storiface.FTUnsealed, ssize, storiface.PathStorage, paths.MinFreeStoragePercentage), }, MaxFailures: 10, } diff --git a/curiosrc/seal/task_sdr.go b/curiosrc/seal/task_sdr.go index 8253ba7248a..0a3aebcd4a4 100644 --- a/curiosrc/seal/task_sdr.go +++ b/curiosrc/seal/task_sdr.go @@ -205,7 +205,7 @@ func (s *SDRTask) TypeDetails() harmonytask.TaskTypeDetails { Cpu: 4, // todo multicore sdr Gpu: 0, Ram: 54 << 30, - Storage: s.sc.Storage(s.taskToSector, storiface.FTCache, storiface.FTNone, ssize, storiface.PathSealing, paths.MaxStorageUtilizationPercentage), + Storage: s.sc.Storage(s.taskToSector, storiface.FTCache, storiface.FTNone, ssize, storiface.PathSealing, paths.MinFreeStoragePercentage), }, MaxFailures: 2, Follows: nil, diff --git a/curiosrc/seal/task_treed.go b/curiosrc/seal/task_treed.go index 25962c2a69f..da46683a44f 100644 --- a/curiosrc/seal/task_treed.go +++ b/curiosrc/seal/task_treed.go @@ -31,7 +31,10 @@ type TreeDTask struct { } func (t *TreeDTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { - return &ids[0], nil + if engine.Resources().Gpu > 0 { + return &ids[0], nil + } + return nil, nil } func (t *TreeDTask) TypeDetails() harmonytask.TaskTypeDetails { @@ -45,9 +48,9 @@ func (t *TreeDTask) TypeDetails() harmonytask.TaskTypeDetails { Name: "SDRTreeD", Cost: resources.Resources{ Cpu: 1, - Ram: 50 << 20, // todo + Ram: 64 << 20, // todo Gpu: 0, - Storage: t.sc.Storage(t.taskToSector, storiface.FTSealed, storiface.FTCache, ssize, storiface.PathSealing, 97), + Storage: t.sc.Storage(t.taskToSector, storiface.FTSealed, storiface.FTCache, ssize, storiface.PathSealing, 1), }, MaxFailures: 3, Follows: nil, @@ -57,7 +60,7 @@ func (t *TreeDTask) TypeDetails() harmonytask.TaskTypeDetails { func (t *TreeDTask) taskToSector(id harmonytask.TaskID) (ffi.SectorRef, error) { var refs []ffi.SectorRef - err := t.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_tree_r = $1`, id) + err := t.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_tree_d = $1`, id) if err != nil { return ffi.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err) } @@ -95,7 +98,7 @@ func (t *TreeDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done err = t.db.Select(ctx, §orParamsArr, ` SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline - WHERE task_id_tree_r = $1 AND task_id_tree_c = $1 AND task_id_tree_d = $1`, taskID) + WHERE task_id_tree_d = $1`, taskID) if err != nil { return false, xerrors.Errorf("getting sector params: %w", err) } diff --git a/curiosrc/seal/task_treerc.go b/curiosrc/seal/task_treerc.go index d519d37a9ef..cf6ee2a4566 100644 --- a/curiosrc/seal/task_treerc.go +++ b/curiosrc/seal/task_treerc.go @@ -47,7 +47,7 @@ func (t *TreeRCTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } err = t.db.Select(ctx, §orParamsArr, ` - SELECT sp_id, sector_number, reg_seal_proof + SELECT sp_id, sector_number, reg_seal_proof, tree_d_cid FROM sectors_sdr_pipeline WHERE task_id_tree_c = $1 AND task_id_tree_r = $1`, taskID) if err != nil { @@ -83,7 +83,8 @@ func (t *TreeRCTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done // todo porep challenge check n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET after_tree_r = true, after_tree_c = true, tree_r_cid = $3, WHERE sp_id = $1 AND sector_number = $2`, + SET after_tree_r = true, after_tree_c = true, tree_r_cid = $3 + WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber, sealed) if err != nil { return false, xerrors.Errorf("store sdr-trees success: updating pipeline: %w", err) @@ -97,7 +98,7 @@ func (t *TreeRCTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done func (t *TreeRCTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { var tasks []struct { - TaskID harmonytask.TaskID `db:"task_id_finalize"` + TaskID harmonytask.TaskID `db:"task_id_tree_c"` SpID int64 `db:"sp_id"` SectorNumber int64 `db:"sector_number"` StorageID string `db:"storage_id"` @@ -162,7 +163,7 @@ func (t *TreeRCTask) TypeDetails() harmonytask.TaskTypeDetails { Cpu: 1, Gpu: 1, Ram: 8000 << 20, // todo - Storage: t.sc.Storage(t.taskToSector, storiface.FTNone, storiface.FTCache|storiface.FTSealed, ssize, storiface.PathSealing, paths.MaxStorageUtilizationPercentage), + Storage: t.sc.Storage(t.taskToSector, storiface.FTNone, storiface.FTCache|storiface.FTSealed, ssize, storiface.PathSealing, paths.MinFreeStoragePercentage), }, MaxFailures: 3, Follows: nil, diff --git a/storage/paths/interface.go b/storage/paths/interface.go index 61412f4cbfc..4b0a2a82996 100644 --- a/storage/paths/interface.go +++ b/storage/paths/interface.go @@ -47,7 +47,7 @@ type Store interface { FsStat(ctx context.Context, id storiface.ID) (fsutil.FsStat, error) - Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, maxPercentage int) (func(), error) + Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, minFreePercentage int) (func(), error) GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, si storiface.PostSectorChallenge, ppt abi.RegisteredPoStProof) ([]byte, error) GeneratePoRepVanillaProof(ctx context.Context, sr storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error) diff --git a/storage/paths/local.go b/storage/paths/local.go index 539cdc6f92d..b2738df322f 100644 --- a/storage/paths/local.go +++ b/storage/paths/local.go @@ -36,7 +36,7 @@ type LocalStorage interface { const MetaFile = "sectorstore.json" -const MaxStorageUtilizationPercentage = 100 +const MinFreeStoragePercentage = 0 type Local struct { localStorage LocalStorage @@ -462,7 +462,7 @@ func (st *Local) reportStorage(ctx context.Context) { } } -func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, maxPercentage int) (release func(), err error) { +func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, minFreePercentage int) (release func(), err error) { ssize, err := sid.ProofType.SectorSize() if err != nil { return nil, err @@ -526,8 +526,8 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available)) } - if (uint64(p.reserved+overhead)/p.maxStorage)*100 > uint64(maxPercentage) { - return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), total %d, exceeds the maximum reservation percentage", overhead, p.local, id, p.maxStorage)) + if float64((stat.Available-(p.reserved+overhead))/stat.Available) < float64(minFreePercentage/100) { + return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), minmum free threshold reached", overhead, p.local, id)) } resID := sectorFile{sid.ID, fileType} diff --git a/storage/paths/remote.go b/storage/paths/remote.go index d9d9f626a8d..1f27da3355b 100644 --- a/storage/paths/remote.go +++ b/storage/paths/remote.go @@ -177,7 +177,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, exist // If any path types weren't found in local storage, try fetching them // First reserve storage - releaseStorage, err := r.local.Reserve(ctx, s, toFetch, fetchIDs, overheadTable, MaxStorageUtilizationPercentage) + releaseStorage, err := r.local.Reserve(ctx, s, toFetch, fetchIDs, overheadTable, MinFreeStoragePercentage) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err) } @@ -812,7 +812,7 @@ func (r *Remote) ReaderSeq(ctx context.Context, s storiface.SectorRef, ft storif return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound) } -func (r *Remote) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, maxPercentage int) (func(), error) { +func (r *Remote) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, minFreePercentage int) (func(), error) { log.Warnf("reserve called on remote store, sectorID: %v", sid.ID) return func() { diff --git a/storage/sealer/worker_local.go b/storage/sealer/worker_local.go index a7e92e88939..2a49447a271 100644 --- a/storage/sealer/worker_local.go +++ b/storage/sealer/worker_local.go @@ -155,7 +155,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor return storiface.SectorPaths{}, nil, err } - releaseStorage, err := l.w.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal, paths.MaxStorageUtilizationPercentage) + releaseStorage, err := l.w.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal, paths.MinFreeStoragePercentage) if err != nil { return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err) } From 8382a208d976243b580dee531feba6ac768f3a89 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Fri, 19 Apr 2024 18:05:12 +0400 Subject: [PATCH 3/9] fix nil pointer err --- storage/paths/local.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/paths/local.go b/storage/paths/local.go index b2738df322f..1e227b3e78f 100644 --- a/storage/paths/local.go +++ b/storage/paths/local.go @@ -462,7 +462,7 @@ func (st *Local) reportStorage(ctx context.Context) { } } -func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, minFreePercentage int) (release func(), err error) { +func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, minFreePercentage int) (userRelease func(), err error) { ssize, err := sid.ProofType.SectorSize() if err != nil { return nil, err @@ -476,7 +476,7 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif var firstDonebuf []byte var releaseFuncs []func() - release = func() { + release := func() { for _, releaseFunc := range releaseFuncs { releaseFunc() } From feabcdb2e5c0fbcb5adab3274141c0a9f5a19bd7 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Fri, 19 Apr 2024 20:29:53 +0400 Subject: [PATCH 4/9] apply suggestions --- curiosrc/ffi/sdr_funcs.go | 113 +++++++++++++++++------------------ curiosrc/ffi/task_storage.go | 4 +- curiosrc/seal/task_treed.go | 78 ++++++++++++++++++++++-- curiosrc/seal/task_treerc.go | 73 +--------------------- storage/paths/interface.go | 2 +- storage/paths/local.go | 14 +++-- storage/paths/mocks/store.go | 2 +- storage/paths/remote.go | 2 +- 8 files changed, 143 insertions(+), 145 deletions(-) diff --git a/curiosrc/ffi/sdr_funcs.go b/curiosrc/ffi/sdr_funcs.go index 444cf7bdc37..10d3e8503db 100644 --- a/curiosrc/ffi/sdr_funcs.go +++ b/curiosrc/ffi/sdr_funcs.go @@ -200,7 +200,7 @@ func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, secto return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err) } - paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing) + fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err) } @@ -208,14 +208,55 @@ func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, secto defer func() { if err != nil { - clerr := removeDRCTrees(paths.Cache, "RC") + clerr := removeDRCTrees(fspaths.Cache, "RC") if clerr != nil { - log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", paths.Cache) + log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", fspaths.Cache) } } }() - sl, uns, err := ffi.SealPreCommitPhase2(p1o, paths.Cache, paths.Sealed) + // create sector-sized file at paths.Sealed; PC2 transforms it into a sealed sector in-place + ssize, err := sector.ProofType.SectorSize() + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err) + } + + { + // copy TreeD prefix to sealed sector, SealPreCommitPhase2 will mutate it in place into the sealed sector + + // first try reflink + truncate, that should be way faster + err := reflink.Always(filepath.Join(fspaths.Cache, proofpaths.TreeDName), fspaths.Sealed) + if err == nil { + err = os.Truncate(fspaths.Sealed, int64(ssize)) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("truncating reflinked sealed file: %w", err) + } + } else { + log.Errorw("reflink treed -> sealed failed, falling back to slow copy, use single scratch btrfs or xfs filesystem", "error", err, "sector", sector, "cache", fspaths.Cache, "sealed", fspaths.Sealed) + + // fallback to slow copy, copy ssize bytes from treed to sealed + dst, err := os.OpenFile(fspaths.Sealed, os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("opening sealed sector file: %w", err) + } + src, err := os.Open(filepath.Join(fspaths.Cache, proofpaths.TreeDName)) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("opening treed sector file: %w", err) + } + + _, err = io.CopyN(dst, src, int64(ssize)) + derr := dst.Close() + _ = src.Close() + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("copying treed -> sealed: %w", err) + } + if derr != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("closing sealed file: %w", derr) + } + } + } + + sl, uns, err := ffi.SealPreCommitPhase2(p1o, fspaths.Cache, fspaths.Sealed) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("computing seal proof: %w", err) } @@ -589,23 +630,18 @@ func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.Sec } // PreFetch fetches the sector file to local storage before SDR and TreeRC Tasks -func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, task *harmonytask.TaskID) error { - _, _, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) +func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, task *harmonytask.TaskID) (fsPath, pathID storiface.SectorPaths, releaseSector func(), err error) { + fsPath, pathID, releaseSector, err = sb.sectors.AcquireSector(ctx, task, sector, storiface.FTNone, storiface.FTSealed, storiface.PathSealing) if err != nil { - return xerrors.Errorf("acquiring sector paths: %w", err) + return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector paths: %w", err) } - defer releaseSector() - - return nil + // Don't release the storage locks. They will be released in TreeD func() + return } -func (sb *SealCalls) TreeD(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool) error { - fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) - if err != nil { - return xerrors.Errorf("acquiring sector paths: %w", err) - } - defer releaseSector() - +func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool, fspaths, pathIDs storiface.SectorPaths, release func()) error { + defer release() + var err error defer func() { if err != nil { clerr := removeDRCTrees(fspaths.Cache, "D") @@ -624,48 +660,7 @@ func (sb *SealCalls) TreeD(ctx context.Context, task *harmonytask.TaskID, sector return xerrors.Errorf("tree-d cid mismatch with supplied unsealed cid") } - // create sector-sized file at paths.Sealed; PC2 transforms it into a sealed sector in-place - ssize, err := sector.ProofType.SectorSize() - if err != nil { - return xerrors.Errorf("getting sector size: %w", err) - } - - { - // copy TreeD prefix to sealed sector, SealPreCommitPhase2 will mutate it in place into the sealed sector - - // first try reflink + truncate, that should be way faster - err := reflink.Always(filepath.Join(fspaths.Cache, proofpaths.TreeDName), fspaths.Sealed) - if err == nil { - err = os.Truncate(fspaths.Sealed, int64(ssize)) - if err != nil { - return xerrors.Errorf("truncating reflinked sealed file: %w", err) - } - } else { - log.Errorw("reflink treed -> sealed failed, falling back to slow copy, use single scratch btrfs or xfs filesystem", "error", err, "sector", sector, "cache", fspaths.Cache, "sealed", fspaths.Sealed) - - // fallback to slow copy, copy ssize bytes from treed to sealed - dst, err := os.OpenFile(fspaths.Sealed, os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - return xerrors.Errorf("opening sealed sector file: %w", err) - } - src, err := os.Open(filepath.Join(fspaths.Cache, proofpaths.TreeDName)) - if err != nil { - return xerrors.Errorf("opening treed sector file: %w", err) - } - - _, err = io.CopyN(dst, src, int64(ssize)) - derr := dst.Close() - _ = src.Close() - if err != nil { - return xerrors.Errorf("copying treed -> sealed: %w", err) - } - if derr != nil { - return xerrors.Errorf("closing sealed file: %w", derr) - } - } - } - - if err := sb.ensureOneCopy(ctx, sector.ID, pathIDs, storiface.FTCache|storiface.FTSealed); err != nil { + if err := sb.ensureOneCopy(ctx, sector.ID, pathIDs, storiface.FTCache); err != nil { return xerrors.Errorf("ensure one copy: %w", err) } diff --git a/curiosrc/ffi/task_storage.go b/curiosrc/ffi/task_storage.go index 3ebec97c949..4bbb8e343e7 100644 --- a/curiosrc/ffi/task_storage.go +++ b/curiosrc/ffi/task_storage.go @@ -45,7 +45,7 @@ type TaskStorage struct { taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error) // Minimum free storage percentage cutoff for reservation rejection - MinFreeStoragePercentage int + MinFreeStoragePercentage float64 } type ReleaseStorageFunc func() // free storage reservation @@ -59,7 +59,7 @@ type StorageReservation struct { Alloc, Existing storiface.SectorFileType } -func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, MinFreeStoragePercentage int) *TaskStorage { +func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, MinFreeStoragePercentage float64) *TaskStorage { return &TaskStorage{ sc: sb, alloc: alloc, diff --git a/curiosrc/seal/task_treed.go b/curiosrc/seal/task_treed.go index da46683a44f..f2b66e021cf 100644 --- a/curiosrc/seal/task_treed.go +++ b/curiosrc/seal/task_treed.go @@ -3,6 +3,7 @@ package seal import ( "context" "io" + "net/http" "net/url" "strconv" @@ -45,12 +46,12 @@ func (t *TreeDTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ Max: t.max, - Name: "SDRTreeD", + Name: "TreeD", Cost: resources.Resources{ Cpu: 1, Ram: 64 << 20, // todo Gpu: 0, - Storage: t.sc.Storage(t.taskToSector, storiface.FTSealed, storiface.FTCache, ssize, storiface.PathSealing, 1), + Storage: t.sc.Storage(t.taskToSector, storiface.FTNone, storiface.FTCache, ssize, storiface.PathSealing, 1.0), }, MaxFailures: 3, Follows: nil, @@ -117,7 +118,7 @@ func (t *TreeDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } // Fetch the Sector to local storage - err = t.sc.PreFetch(ctx, sref, &taskID) + fsPaths, pathIds, release, err := t.sc.PreFetch(ctx, sref, &taskID) if err != nil { return false, xerrors.Errorf("failed to prefetch sectors: %w", err) } @@ -238,7 +239,7 @@ func (t *TreeDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } // Generate Tree D - err = t.sc.TreeD(ctx, &taskID, sref, commd, abi.PaddedPieceSize(ssize), dataReader, unpaddedData) + err = t.sc.TreeD(ctx, sref, commd, abi.PaddedPieceSize(ssize), dataReader, unpaddedData, fsPaths, pathIds, release) if err != nil { return false, xerrors.Errorf("failed to generate TreeD: %w", err) } @@ -256,4 +257,73 @@ func (t *TreeDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return true, nil } +type UrlPieceReader struct { + Url string + RawSize int64 // the exact number of bytes read, if we read more or less that's an error + + readSoFar int64 + closed bool + active io.ReadCloser // auto-closed on EOF +} + +func (u *UrlPieceReader) Read(p []byte) (n int, err error) { + // Check if we have already read the required amount of data + if u.readSoFar >= u.RawSize { + return 0, io.EOF + } + + // If 'active' is nil, initiate the HTTP request + if u.active == nil { + resp, err := http.Get(u.Url) + if err != nil { + return 0, err + } + + // Set 'active' to the response body + u.active = resp.Body + } + + // Calculate the maximum number of bytes we can read without exceeding RawSize + toRead := u.RawSize - u.readSoFar + if int64(len(p)) > toRead { + p = p[:toRead] + } + + n, err = u.active.Read(p) + + // Update the number of bytes read so far + u.readSoFar += int64(n) + + // If the number of bytes read exceeds RawSize, return an error + if u.readSoFar > u.RawSize { + return n, xerrors.New("read beyond the specified RawSize") + } + + // If EOF is reached, close the reader + if err == io.EOF { + cerr := u.active.Close() + u.closed = true + if cerr != nil { + log.Errorf("error closing http piece reader: %s", cerr) + } + + // if we're below the RawSize, return an unexpected EOF error + if u.readSoFar < u.RawSize { + log.Errorw("unexpected EOF", "readSoFar", u.readSoFar, "rawSize", u.RawSize, "url", u.Url) + return n, io.ErrUnexpectedEOF + } + } + + return n, err +} + +func (u *UrlPieceReader) Close() error { + if !u.closed { + u.closed = true + return u.active.Close() + } + + return nil +} + var _ harmonytask.TaskInterface = &TreeDTask{} diff --git a/curiosrc/seal/task_treerc.go b/curiosrc/seal/task_treerc.go index cf6ee2a4566..0092466eece 100644 --- a/curiosrc/seal/task_treerc.go +++ b/curiosrc/seal/task_treerc.go @@ -2,8 +2,6 @@ package seal import ( "context" - "io" - "net/http" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -158,7 +156,7 @@ func (t *TreeRCTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ Max: t.max, - Name: "SDRTrees", + Name: "TreeRC", Cost: resources.Resources{ Cpu: 1, Gpu: 1, @@ -189,73 +187,4 @@ func (t *TreeRCTask) taskToSector(id harmonytask.TaskID) (ffi.SectorRef, error) return refs[0], nil } -type UrlPieceReader struct { - Url string - RawSize int64 // the exact number of bytes read, if we read more or less that's an error - - readSoFar int64 - closed bool - active io.ReadCloser // auto-closed on EOF -} - -func (u *UrlPieceReader) Read(p []byte) (n int, err error) { - // Check if we have already read the required amount of data - if u.readSoFar >= u.RawSize { - return 0, io.EOF - } - - // If 'active' is nil, initiate the HTTP request - if u.active == nil { - resp, err := http.Get(u.Url) - if err != nil { - return 0, err - } - - // Set 'active' to the response body - u.active = resp.Body - } - - // Calculate the maximum number of bytes we can read without exceeding RawSize - toRead := u.RawSize - u.readSoFar - if int64(len(p)) > toRead { - p = p[:toRead] - } - - n, err = u.active.Read(p) - - // Update the number of bytes read so far - u.readSoFar += int64(n) - - // If the number of bytes read exceeds RawSize, return an error - if u.readSoFar > u.RawSize { - return n, xerrors.New("read beyond the specified RawSize") - } - - // If EOF is reached, close the reader - if err == io.EOF { - cerr := u.active.Close() - u.closed = true - if cerr != nil { - log.Errorf("error closing http piece reader: %s", cerr) - } - - // if we're below the RawSize, return an unexpected EOF error - if u.readSoFar < u.RawSize { - log.Errorw("unexpected EOF", "readSoFar", u.readSoFar, "rawSize", u.RawSize, "url", u.Url) - return n, io.ErrUnexpectedEOF - } - } - - return n, err -} - -func (u *UrlPieceReader) Close() error { - if !u.closed { - u.closed = true - return u.active.Close() - } - - return nil -} - var _ harmonytask.TaskInterface = &TreeRCTask{} diff --git a/storage/paths/interface.go b/storage/paths/interface.go index 4b0a2a82996..088e2340b73 100644 --- a/storage/paths/interface.go +++ b/storage/paths/interface.go @@ -47,7 +47,7 @@ type Store interface { FsStat(ctx context.Context, id storiface.ID) (fsutil.FsStat, error) - Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, minFreePercentage int) (func(), error) + Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, minFreePercentage float64) (func(), error) GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, si storiface.PostSectorChallenge, ppt abi.RegisteredPoStProof) ([]byte, error) GeneratePoRepVanillaProof(ctx context.Context, sr storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error) diff --git a/storage/paths/local.go b/storage/paths/local.go index 1e227b3e78f..3c99866cc1d 100644 --- a/storage/paths/local.go +++ b/storage/paths/local.go @@ -36,7 +36,7 @@ type LocalStorage interface { const MetaFile = "sectorstore.json" -const MinFreeStoragePercentage = 0 +const MinFreeStoragePercentage = float64(0) type Local struct { localStorage LocalStorage @@ -462,7 +462,7 @@ func (st *Local) reportStorage(ctx context.Context) { } } -func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, minFreePercentage int) (userRelease func(), err error) { +func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, minFreePercentage float64) (userRelease func(), err error) { ssize, err := sid.ProofType.SectorSize() if err != nil { return nil, err @@ -522,12 +522,16 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif resvOnDisk = overhead } - if stat.Available < overhead-resvOnDisk { + overheadOnDisk := overhead - resvOnDisk + + if stat.Available < overheadOnDisk { return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available)) } - if float64((stat.Available-(p.reserved+overhead))/stat.Available) < float64(minFreePercentage/100) { - return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), minmum free threshold reached", overhead, p.local, id)) + freePercentag := (float64(stat.Available-overheadOnDisk) / float64(stat.Available)) * 100.0 + + if freePercentag < minFreePercentage { + return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), free disk percentage %f will be lower than minimum %f", overhead, p.local, id, freePercentag, minFreePercentage)) } resID := sectorFile{sid.ID, fileType} diff --git a/storage/paths/mocks/store.go b/storage/paths/mocks/store.go index 3cfba4d54ae..1303cf0690b 100644 --- a/storage/paths/mocks/store.go +++ b/storage/paths/mocks/store.go @@ -154,7 +154,7 @@ func (mr *MockStoreMockRecorder) RemoveCopies(arg0, arg1, arg2 interface{}) *gom } // Reserve mocks base method. -func (m *MockStore) Reserve(arg0 context.Context, arg1 storiface.SectorRef, arg2 storiface.SectorFileType, arg3 storiface.SectorPaths, arg4 map[storiface.SectorFileType]int, arg5 int) (func(), error) { +func (m *MockStore) Reserve(arg0 context.Context, arg1 storiface.SectorRef, arg2 storiface.SectorFileType, arg3 storiface.SectorPaths, arg4 map[storiface.SectorFileType]int, arg5 float64) (func(), error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Reserve", arg0, arg1, arg2, arg3, arg4, arg5) ret0, _ := ret[0].(func()) diff --git a/storage/paths/remote.go b/storage/paths/remote.go index 1f27da3355b..2cad97537fc 100644 --- a/storage/paths/remote.go +++ b/storage/paths/remote.go @@ -812,7 +812,7 @@ func (r *Remote) ReaderSeq(ctx context.Context, s storiface.SectorRef, ft storif return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound) } -func (r *Remote) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, minFreePercentage int) (func(), error) { +func (r *Remote) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int, minFreePercentage float64) (func(), error) { log.Warnf("reserve called on remote store, sectorID: %v", sid.ID) return func() { From 6ff467bf1aa6c9e5174799d7dcbaec79a029b447 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Sun, 21 Apr 2024 19:42:30 +0400 Subject: [PATCH 5/9] fix allocate file types --- curiosrc/ffi/sdr_funcs.go | 2 +- curiosrc/seal/task_treerc.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/curiosrc/ffi/sdr_funcs.go b/curiosrc/ffi/sdr_funcs.go index 10d3e8503db..c46678a31a9 100644 --- a/curiosrc/ffi/sdr_funcs.go +++ b/curiosrc/ffi/sdr_funcs.go @@ -631,7 +631,7 @@ func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.Sec // PreFetch fetches the sector file to local storage before SDR and TreeRC Tasks func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, task *harmonytask.TaskID) (fsPath, pathID storiface.SectorPaths, releaseSector func(), err error) { - fsPath, pathID, releaseSector, err = sb.sectors.AcquireSector(ctx, task, sector, storiface.FTNone, storiface.FTSealed, storiface.PathSealing) + fsPath, pathID, releaseSector, err = sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector paths: %w", err) } diff --git a/curiosrc/seal/task_treerc.go b/curiosrc/seal/task_treerc.go index 0092466eece..d7c5b317835 100644 --- a/curiosrc/seal/task_treerc.go +++ b/curiosrc/seal/task_treerc.go @@ -161,7 +161,7 @@ func (t *TreeRCTask) TypeDetails() harmonytask.TaskTypeDetails { Cpu: 1, Gpu: 1, Ram: 8000 << 20, // todo - Storage: t.sc.Storage(t.taskToSector, storiface.FTNone, storiface.FTCache|storiface.FTSealed, ssize, storiface.PathSealing, paths.MinFreeStoragePercentage), + Storage: t.sc.Storage(t.taskToSector, storiface.FTSealed, storiface.FTCache, ssize, storiface.PathSealing, paths.MinFreeStoragePercentage), }, MaxFailures: 3, Follows: nil, From f7fc8acc8beae858610ac9971cc8c5f363d5d772 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Sun, 21 Apr 2024 20:54:27 +0400 Subject: [PATCH 6/9] fix dbIndex inserts --- storage/paths/db_index.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go index 79239544533..77cc716170b 100644 --- a/storage/paths/db_index.go +++ b/storage/paths/db_index.go @@ -226,8 +226,8 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, // Insert storage id _, err = tx.Exec( - "INSERT INTO storage_path "+ - "Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NOW(), $16, $17)", + "INSERT INTO storage_path (storage_id, urls, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types, capacity, available, fs_available, reserved, used, last_heartbeat, heartbeat_err, allow_miners, deny_miners)"+ + "Values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NOW(), NULL, $16, $17)", si.ID, strings.Join(si.URLs, ","), si.Weight, @@ -406,7 +406,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac } } else { _, err = tx.Exec( - "INSERT INTO sector_location "+ + "INSERT INTO sector_location (miner_id, sector_num, sector_filetype, storage_id, is_primary)"+ "values($1, $2, $3, $4, $5)", s.Miner, s.Number, ft, storageID, primary) if err != nil { @@ -723,7 +723,7 @@ func (dbi *DBIndex) StorageBestAlloc(ctx context.Context, allocate storiface.Sec FROM storage_path WHERE available >= $1 and NOW()-($2 * INTERVAL '1 second') < last_heartbeat - and heartbeat_err = '' + and heartbeat_err is null and (($3 and can_seal = TRUE) or ($4 and can_store = TRUE)) order by (available::numeric * weight) desc`, spaceReq, From 1e1d305063dd521c01be03eb3ab817146bc0feb1 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Mon, 29 Apr 2024 20:05:38 +0530 Subject: [PATCH 7/9] set resource, move release func --- curiosrc/ffi/sdr_funcs.go | 1 - curiosrc/seal/task_treed.go | 7 ++++--- storage/paths/local.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/curiosrc/ffi/sdr_funcs.go b/curiosrc/ffi/sdr_funcs.go index c46678a31a9..6c139aa4dc4 100644 --- a/curiosrc/ffi/sdr_funcs.go +++ b/curiosrc/ffi/sdr_funcs.go @@ -640,7 +640,6 @@ func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, t } func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool, fspaths, pathIDs storiface.SectorPaths, release func()) error { - defer release() var err error defer func() { if err != nil { diff --git a/curiosrc/seal/task_treed.go b/curiosrc/seal/task_treed.go index f2b66e021cf..b336af92929 100644 --- a/curiosrc/seal/task_treed.go +++ b/curiosrc/seal/task_treed.go @@ -49,7 +49,7 @@ func (t *TreeDTask) TypeDetails() harmonytask.TaskTypeDetails { Name: "TreeD", Cost: resources.Resources{ Cpu: 1, - Ram: 64 << 20, // todo + Ram: 1 << 30, Gpu: 0, Storage: t.sc.Storage(t.taskToSector, storiface.FTNone, storiface.FTCache, ssize, storiface.PathSealing, 1.0), }, @@ -122,6 +122,7 @@ func (t *TreeDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done if err != nil { return false, xerrors.Errorf("failed to prefetch sectors: %w", err) } + defer release() var pieces []struct { PieceIndex int64 `db:"piece_index"` @@ -248,10 +249,10 @@ func (t *TreeDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done SET after_tree_d = true, tree_d_cid = $3 WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber, commd) if err != nil { - return false, xerrors.Errorf("store sdr-treeD success: updating pipeline: %w", err) + return false, xerrors.Errorf("store TreeD success: updating pipeline: %w", err) } if n != 1 { - return false, xerrors.Errorf("store sdr-treeD success: updated %d rows", n) + return false, xerrors.Errorf("store TreeD success: updated %d rows", n) } return true, nil diff --git a/storage/paths/local.go b/storage/paths/local.go index 29253356b7a..9d662e88a45 100644 --- a/storage/paths/local.go +++ b/storage/paths/local.go @@ -533,7 +533,7 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif } } - return + return release, nil } // DoubleCallWrap wraps a function to make sure it's not called twice From ccc704cc1b36d939ae037dbbaaaa8e6632fa2d93 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Tue, 30 Apr 2024 12:18:59 +0530 Subject: [PATCH 8/9] refactor func(), update memory --- curiosrc/ffi/sdr_funcs.go | 41 +++++++++++++++--------------------- curiosrc/seal/task_treerc.go | 2 +- 2 files changed, 18 insertions(+), 25 deletions(-) diff --git a/curiosrc/ffi/sdr_funcs.go b/curiosrc/ffi/sdr_funcs.go index 6c139aa4dc4..f532861ac4c 100644 --- a/curiosrc/ffi/sdr_funcs.go +++ b/curiosrc/ffi/sdr_funcs.go @@ -208,7 +208,7 @@ func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, secto defer func() { if err != nil { - clerr := removeDRCTrees(fspaths.Cache, "RC") + clerr := removeDRCTrees(fspaths.Cache, false) if clerr != nil { log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", fspaths.Cache) } @@ -272,35 +272,27 @@ func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, secto return sl, uns, nil } -func removeDRCTrees(cache string, tree string) error { - // list files in cache +func removeDRCTrees(cache string, isDTree bool) error { files, err := os.ReadDir(cache) if err != nil { return xerrors.Errorf("listing cache: %w", err) } - switch tree { - case "D": - for _, file := range files { - if proofpaths.IsTreeDFile(file.Name()) { - err := os.Remove(filepath.Join(cache, file.Name())) - if err != nil { - return xerrors.Errorf("removing tree file: %w", err) - } - } - } - case "RC": - for _, file := range files { - if proofpaths.IsTreeRCFile(file.Name()) { - err := os.Remove(filepath.Join(cache, file.Name())) - if err != nil { - return xerrors.Errorf("removing tree file: %w", err) - } + var testFunc func(string) bool + + if isDTree { + testFunc = proofpaths.IsTreeDFile + } else { + testFunc = proofpaths.IsTreeRCFile + } + + for _, file := range files { + if testFunc(file.Name()) { + err := os.Remove(filepath.Join(cache, file.Name())) + if err != nil { + return xerrors.Errorf("removing tree file: %w", err) } } - default: - return xerrors.Errorf("incorrect input Tree type") - } return nil } @@ -640,10 +632,11 @@ func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, t } func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool, fspaths, pathIDs storiface.SectorPaths, release func()) error { + defer release() var err error defer func() { if err != nil { - clerr := removeDRCTrees(fspaths.Cache, "D") + clerr := removeDRCTrees(fspaths.Cache, true) if clerr != nil { log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", fspaths.Cache) } diff --git a/curiosrc/seal/task_treerc.go b/curiosrc/seal/task_treerc.go index d7c5b317835..02cf0350e03 100644 --- a/curiosrc/seal/task_treerc.go +++ b/curiosrc/seal/task_treerc.go @@ -160,7 +160,7 @@ func (t *TreeRCTask) TypeDetails() harmonytask.TaskTypeDetails { Cost: resources.Resources{ Cpu: 1, Gpu: 1, - Ram: 8000 << 20, // todo + Ram: 8 << 30, Storage: t.sc.Storage(t.taskToSector, storiface.FTSealed, storiface.FTCache, ssize, storiface.PathSealing, paths.MinFreeStoragePercentage), }, MaxFailures: 3, From d4d92cfb971af0726de83cd4f1ddbae5d09988ee Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Tue, 30 Apr 2024 23:19:09 +0530 Subject: [PATCH 9/9] remove extra release --- curiosrc/ffi/sdr_funcs.go | 3 +-- curiosrc/seal/task_treed.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/curiosrc/ffi/sdr_funcs.go b/curiosrc/ffi/sdr_funcs.go index f532861ac4c..eff49578d98 100644 --- a/curiosrc/ffi/sdr_funcs.go +++ b/curiosrc/ffi/sdr_funcs.go @@ -631,8 +631,7 @@ func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, t return } -func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool, fspaths, pathIDs storiface.SectorPaths, release func()) error { - defer release() +func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool, fspaths, pathIDs storiface.SectorPaths) error { var err error defer func() { if err != nil { diff --git a/curiosrc/seal/task_treed.go b/curiosrc/seal/task_treed.go index b336af92929..7b31e12fb52 100644 --- a/curiosrc/seal/task_treed.go +++ b/curiosrc/seal/task_treed.go @@ -240,7 +240,7 @@ func (t *TreeDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } // Generate Tree D - err = t.sc.TreeD(ctx, sref, commd, abi.PaddedPieceSize(ssize), dataReader, unpaddedData, fsPaths, pathIds, release) + err = t.sc.TreeD(ctx, sref, commd, abi.PaddedPieceSize(ssize), dataReader, unpaddedData, fsPaths, pathIds) if err != nil { return false, xerrors.Errorf("failed to generate TreeD: %w", err) }