Skip to content

Commit

Permalink
Permit workers to override resource table
Browse files Browse the repository at this point in the history
In an environment with heterogenious worker nodes, a universal resource
table for all workers does not allow effective scheduling of tasks. Some
workers may have different proof cache settings, changing the required
memory for different tasks. Some workers may have a different count of
CPUs per core-complex, changing the max parallelism of PC1.

This change allows workers to customize these parameters with
environment variables. A worker could set the environment variable
PC1_MIN_MEMORY for example to customize the minimum memory requirement
for PC1 tasks. If no environment variables are specified, the resource
table on the miner is used, except for PC1 parallelism.

If PC1_MAX_PARALLELISM is not specified, and
FIL_PROOFS_USE_MULTICORE_SDR is set, PC1_MAX_PARALLELSIM will
automatically be set to FIL_PROOFS_MULTICORE_SDR_PRODUCERS + 1.
  • Loading branch information
clinta committed Sep 10, 2021
1 parent 0058179 commit 3cf6a81
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 9 deletions.
2 changes: 1 addition & 1 deletion api/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var (
FullAPIVersion1 = newVer(2, 1, 0)

MinerAPIVersion0 = newVer(1, 2, 0)
WorkerAPIVersion0 = newVer(1, 3, 0)
WorkerAPIVersion0 = newVer(1, 4, 0)
)

//nolint:varcheck,deadcode
Expand Down
48 changes: 48 additions & 0 deletions extern/sector-storage/resources.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package sectorstorage

import (
"strconv"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)

type Resources struct {
Expand Down Expand Up @@ -44,6 +47,51 @@ func (r Resources) Threads(wcpus uint64) uint64 {
return uint64(r.MaxParallelism)
}

func (r *Resources) customizeForWorker(taskShortName string, wid WorkerID, info storiface.WorkerInfo) {
// update needed resources with worker options
if o, ok := info.Resources.ResourceOpts[taskShortName+"_MAX_MEMORY"]; ok {
i, err := strconv.ParseUint(o, 10, 64)
if err != nil {
log.Errorf("unable to parse %s_MAX_MEMORY value %s: %e", taskShortName, o, err)
} else {
r.MaxMemory = i
}
}
if o, ok := info.Resources.ResourceOpts[taskShortName+"_MIN_MEMORY"]; ok {
i, err := strconv.ParseUint(o, 10, 64)
if err != nil {
log.Errorf("unable to parse %s_MIN_MEMORY value %s: %e", taskShortName, o, err)
} else {
r.MinMemory = i
}
}
if o, ok := info.Resources.ResourceOpts[taskShortName+"_BASE_MIN_MEMORY"]; ok {
i, err := strconv.ParseUint(o, 10, 64)
if err != nil {
log.Errorf("unable to parse %s_BASE_MIN_MEMORY value %s: %e", taskShortName, o, err)
} else {
r.BaseMinMemory = i
}
}
if o, ok := info.Resources.ResourceOpts[taskShortName+"_MAX_PARALLELISM"]; ok {
i, err := strconv.Atoi(o)
if err != nil {
log.Errorf("unable to parse %s_MAX_PARALLELISM value %s: %e", taskShortName, o, err)
} else {
r.MaxParallelism = i
}
}
if o, ok := info.Resources.ResourceOpts[taskShortName+"_GPU_UTILIZATION"]; ok {
i, err := strconv.ParseFloat(o, 64)
if err != nil {
log.Errorf("unable to parse %s_GPU_UTILIZATION value %s: %e", taskShortName, o, err)
} else {
r.GPUUtilization = i
}
}
log.Debugf("resources required for %s on %s(%s): %+v", taskShortName, wid, info.Hostname, r)
}

var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{
sealtasks.TTAddPiece: {
abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
Expand Down
3 changes: 3 additions & 0 deletions extern/sector-storage/sched_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (sw *schedWorker) workerCompactWindows() {

for ti, todo := range window.todo {
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
needRes.customizeForWorker(todo.taskType.Short(), sw.wid, worker.info)
if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info) {
continue
}
Expand Down Expand Up @@ -352,6 +353,7 @@ assignLoop:
worker.lk.Lock()
for t, todo := range firstWindow.todo {
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
needRes.customizeForWorker(todo.taskType.Short(), sw.wid, worker.info)
if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) {
tidx = t
break
Expand Down Expand Up @@ -391,6 +393,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
w, sh := sw.worker, sw.sched

needRes := ResourceTable[req.taskType][req.sector.ProofType]
needRes.customizeForWorker(req.taskType.Short(), sw.wid, w.info)

w.lk.Lock()
w.preparing.add(w.info.Resources, needRes)
Expand Down
5 changes: 3 additions & 2 deletions extern/sector-storage/storiface/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type WorkerResources struct {
MemSwap uint64
MemSwapUsed uint64

CPUs uint64 // Logical cores
GPUs []string
CPUs uint64 // Logical cores
GPUs []string
ResourceOpts map[string]string
}

type WorkerStats struct {
Expand Down
38 changes: 32 additions & 6 deletions extern/sector-storage/worker_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"reflect"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -620,16 +621,41 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
return storiface.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err)
}

resourceOpts := make(map[string]string)
for tt := range l.acceptTasks {
ttShort := tt.Short()
for _, res_opt := range []string{"_MAX_MEMORY", "_MIN_MEMORY", "_MAX_PARALLELISM", "_BASE_MIN_MEMORY", "_GPU_UTILIZATION"} {
n := ttShort + res_opt
if val, ok := os.LookupEnv(n); ok {
resourceOpts[n] = val
}
}
}
if _, ok := resourceOpts["PC1_MAX_PARALLELISM"]; !ok {
if os.Getenv("FIL_PROOFS_USE_MULTICORE_SDR") == "1" {
pc1MulticoreSDRProducers := 3
if pc1MulticoreSDRProducersEnv := os.Getenv("FIL_PROOFS_MULTICORE_SDR_PRODUCERS"); pc1MulticoreSDRProducersEnv != "" {
pc1MulticoreSDRProducers, err = strconv.Atoi(pc1MulticoreSDRProducersEnv)
if err != nil {
log.Errorf("FIL_PROOFS_MULTICORE_SDR_PRODUCERS is not an integer: %+v", err)
pc1MulticoreSDRProducers = 3
}
}
resourceOpts["PC1_MAX_PARALLELISM"] = fmt.Sprintf("%d", 1+pc1MulticoreSDRProducers)
}
}

return storiface.WorkerInfo{
Hostname: hostname,
IgnoreResources: l.ignoreResources,
Resources: storiface.WorkerResources{
MemPhysical: memPhysical,
MemUsed: memUsed,
MemSwap: memSwap,
MemSwapUsed: memSwapUsed,
CPUs: uint64(runtime.NumCPU()),
GPUs: gpus,
MemPhysical: memPhysical,
MemUsed: memUsed,
MemSwap: memSwap,
MemSwapUsed: memSwapUsed,
CPUs: uint64(runtime.NumCPU()),
GPUs: gpus,
ResourceOpts: resourceOpts,
},
}, nil
}
Expand Down

0 comments on commit 3cf6a81

Please sign in to comment.