diff --git a/api/version.go b/api/version.go index 6dfcc3f7f6a..ff1115e1d89 100644 --- a/api/version.go +++ b/api/version.go @@ -58,7 +58,7 @@ var ( FullAPIVersion1 = newVer(2, 1, 0) MinerAPIVersion0 = newVer(1, 2, 0) - WorkerAPIVersion0 = newVer(1, 3, 0) + WorkerAPIVersion0 = newVer(1, 4, 0) ) //nolint:varcheck,deadcode diff --git a/extern/sector-storage/resources.go b/extern/sector-storage/resources.go index d3c7bd7a5f0..c05bca62b95 100644 --- a/extern/sector-storage/resources.go +++ b/extern/sector-storage/resources.go @@ -1,9 +1,12 @@ package sectorstorage import ( + "strconv" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) type Resources struct { @@ -44,6 +47,51 @@ func (r Resources) Threads(wcpus uint64) uint64 { return uint64(r.MaxParallelism) } +func (r *Resources) customizeForWorker(taskShortName string, wid WorkerID, info storiface.WorkerInfo) { + // update needed resources with worker options + if o, ok := info.Resources.ResourceOpts[taskShortName+"_MAX_MEMORY"]; ok { + i, err := strconv.ParseUint(o, 10, 64) + if err != nil { + log.Errorf("unable to parse %s_MAX_MEMORY value %s: %e", taskShortName, o, err) + } else { + r.MaxMemory = i + } + } + if o, ok := info.Resources.ResourceOpts[taskShortName+"_MIN_MEMORY"]; ok { + i, err := strconv.ParseUint(o, 10, 64) + if err != nil { + log.Errorf("unable to parse %s_MIN_MEMORY value %s: %e", taskShortName, o, err) + } else { + r.MinMemory = i + } + } + if o, ok := info.Resources.ResourceOpts[taskShortName+"_BASE_MIN_MEMORY"]; ok { + i, err := strconv.ParseUint(o, 10, 64) + if err != nil { + log.Errorf("unable to parse %s_BASE_MIN_MEMORY value %s: %e", taskShortName, o, err) + } else { + r.BaseMinMemory = i + } + } + if o, ok := info.Resources.ResourceOpts[taskShortName+"_MAX_PARALLELISM"]; ok { + i, err := strconv.Atoi(o) + if err != nil { + log.Errorf("unable to parse %s_MAX_PARALLELISM value %s: %e", taskShortName, o, err) + } else { + r.MaxParallelism = i + } + } + if o, ok := info.Resources.ResourceOpts[taskShortName+"_GPU_UTILIZATION"]; ok { + i, err := strconv.ParseFloat(o, 64) + if err != nil { + log.Errorf("unable to parse %s_GPU_UTILIZATION value %s: %e", taskShortName, o, err) + } else { + r.GPUUtilization = i + } + } + log.Debugf("resources required for %s on %s(%s): %+v", taskShortName, wid, info.Hostname, r) +} + var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{ sealtasks.TTAddPiece: { abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ diff --git a/extern/sector-storage/sched_worker.go b/extern/sector-storage/sched_worker.go index e717e58e28a..bb6ba627bd3 100644 --- a/extern/sector-storage/sched_worker.go +++ b/extern/sector-storage/sched_worker.go @@ -297,6 +297,7 @@ func (sw *schedWorker) workerCompactWindows() { for ti, todo := range window.todo { needRes := ResourceTable[todo.taskType][todo.sector.ProofType] + needRes.customizeForWorker(todo.taskType.Short(), sw.wid, worker.info) if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info) { continue } @@ -358,6 +359,7 @@ assignLoop: worker.lk.Lock() for t, todo := range firstWindow.todo { needRes := ResourceTable[todo.taskType][todo.sector.ProofType] + needRes.customizeForWorker(todo.taskType.Short(), sw.wid, worker.info) if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) { tidx = t break @@ -457,6 +459,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error { w, sh := sw.worker, sw.sched needRes := ResourceTable[req.taskType][req.sector.ProofType] + needRes.customizeForWorker(req.taskType.Short(), sw.wid, w.info) w.lk.Lock() w.preparing.add(w.info.Resources, needRes) diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index 50d8b215911..f28f106b108 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -32,8 +32,9 @@ type WorkerResources struct { MemSwap uint64 MemSwapUsed uint64 - CPUs uint64 // Logical cores - GPUs []string + CPUs uint64 // Logical cores + GPUs []string + ResourceOpts map[string]string } type WorkerStats struct { diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 1a1b3627f31..de69cea8037 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -3,10 +3,12 @@ package sectorstorage import ( "context" "encoding/json" + "fmt" "io" "os" "reflect" "runtime" + "strconv" "sync" "sync/atomic" "time" @@ -544,16 +546,41 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) { return storiface.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err) } + resourceOpts := make(map[string]string) + for tt := range l.acceptTasks { + ttShort := tt.Short() + for _, res_opt := range []string{"_MAX_MEMORY", "_MIN_MEMORY", "_MAX_PARALLELISM", "_BASE_MIN_MEMORY", "_GPU_UTILIZATION"} { + n := ttShort + res_opt + if val, ok := os.LookupEnv(n); ok { + resourceOpts[n] = val + } + } + } + if _, ok := resourceOpts["PC1_MAX_PARALLELISM"]; !ok { + if os.Getenv("FIL_PROOFS_USE_MULTICORE_SDR") == "1" { + pc1MulticoreSDRProducers := 3 + if pc1MulticoreSDRProducersEnv := os.Getenv("FIL_PROOFS_MULTICORE_SDR_PRODUCERS"); pc1MulticoreSDRProducersEnv != "" { + pc1MulticoreSDRProducers, err = strconv.Atoi(pc1MulticoreSDRProducersEnv) + if err != nil { + log.Errorf("FIL_PROOFS_MULTICORE_SDR_PRODUCERS is not an integer: %+v", err) + pc1MulticoreSDRProducers = 3 + } + } + resourceOpts["PC1_MAX_PARALLELISM"] = fmt.Sprintf("%d", 1+pc1MulticoreSDRProducers) + } + } + return storiface.WorkerInfo{ Hostname: hostname, IgnoreResources: l.ignoreResources, Resources: storiface.WorkerResources{ - MemPhysical: memPhysical, - MemUsed: memUsed, - MemSwap: memSwap, - MemSwapUsed: memSwapUsed, - CPUs: uint64(runtime.NumCPU()), - GPUs: gpus, + MemPhysical: memPhysical, + MemUsed: memUsed, + MemSwap: memSwap, + MemSwapUsed: memSwapUsed, + CPUs: uint64(runtime.NumCPU()), + GPUs: gpus, + ResourceOpts: resourceOpts, }, }, nil }