Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extern/storage: add ability to ignore worker resources when scheduling. #6542

Merged
merged 6 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ type SealerConfig struct {
AllowPreCommit2 bool
AllowCommit bool
AllowUnseal bool

// IgnoreResourceFiltering instructs the system to ignore available
// resources when assigning tasks to the local worker.
IgnoreResourceFiltering bool
}

type StorageAuth http.Header
Expand Down
19 changes: 10 additions & 9 deletions extern/sector-storage/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,24 +349,24 @@ func (sh *scheduler) trySched() {
defer sh.workersLk.RUnlock()

windowsLen := len(sh.openWindows)
queuneLen := sh.schedQueue.Len()
queueLen := sh.schedQueue.Len()
raulk marked this conversation as resolved.
Show resolved Hide resolved

log.Debugf("SCHED %d queued; %d open windows", queuneLen, windowsLen)
log.Debugf("SCHED %d queued; %d open windows", queueLen, windowsLen)

if windowsLen == 0 || queuneLen == 0 {
if windowsLen == 0 || queueLen == 0 {
// nothing to schedule on
return
}

windows := make([]schedWindow, windowsLen)
acceptableWindows := make([][]int, queuneLen)
acceptableWindows := make([][]int, queueLen)

// Step 1
throttle := make(chan struct{}, windowsLen)

var wg sync.WaitGroup
wg.Add(queuneLen)
for i := 0; i < queuneLen; i++ {
wg.Add(queueLen)
for i := 0; i < queueLen; i++ {
throttle <- struct{}{}

go func(sqi int) {
Expand All @@ -393,7 +393,8 @@ func (sh *scheduler) trySched() {
}

// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) {
ignoringResources := worker.info.IgnoreResources
if !ignoringResources && !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) {
raulk marked this conversation as resolved.
Show resolved Hide resolved
continue
}

Expand Down Expand Up @@ -451,9 +452,9 @@ func (sh *scheduler) trySched() {

// Step 2
scheduled := 0
rmQueue := make([]int, 0, queuneLen)
rmQueue := make([]int, 0, queueLen)

for sqi := 0; sqi < queuneLen; sqi++ {
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][task.sector.ProofType]

Expand Down
7 changes: 6 additions & 1 deletion extern/sector-storage/storiface/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ import (
type WorkerInfo struct {
Hostname string

Resources WorkerResources
// IgnoreResources indicates whether the worker's available resources should
// be used ignored (true) or used (false) for the purposes of scheduling and
// task assignment. Only supported on local workers. Used for testing.
// Default should be false (zero value, i.e. resources taken into account).
IgnoreResources bool
Resources WorkerResources
}

type WorkerResources struct {
Expand Down
25 changes: 17 additions & 8 deletions extern/sector-storage/worker_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
storage "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
Expand All @@ -33,6 +33,11 @@ var pathTypes = []storiface.SectorFileType{storiface.FTUnsealed, storiface.FTSea
type WorkerConfig struct {
TaskTypes []sealtasks.TaskType
NoSwap bool

// IgnoreResourceFiltering enables task distribution to happen on this
// worker regardless of its currently available resources. Used in testing
// with the local worker.
IgnoreResourceFiltering bool
}

// used do provide custom proofs impl (mostly used in testing)
Expand All @@ -46,6 +51,9 @@ type LocalWorker struct {
executor ExecutorFunc
noSwap bool

// see equivalent field on WorkerConfig.
ignoreResources bool

ct *workerCallTracker
acceptTasks map[sealtasks.TaskType]struct{}
running sync.WaitGroup
Expand All @@ -71,12 +79,12 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store
ct: &workerCallTracker{
st: cst,
},
acceptTasks: acceptTasks,
executor: executor,
noSwap: wcfg.NoSwap,

session: uuid.New(),
closing: make(chan struct{}),
acceptTasks: acceptTasks,
executor: executor,
noSwap: wcfg.NoSwap,
ignoreResources: wcfg.IgnoreResourceFiltering,
session: uuid.New(),
closing: make(chan struct{}),
}

if w.executor == nil {
Expand Down Expand Up @@ -501,7 +509,8 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
}

return storiface.WorkerInfo{
Hostname: hostname,
Hostname: hostname,
IgnoreResources: l.ignoreResources,
Resources: storiface.WorkerResources{
MemPhysical: mem.Total,
MemSwap: memSwap,
Expand Down
12 changes: 12 additions & 0 deletions itests/kit/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/node/config"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
Expand Down Expand Up @@ -127,6 +128,12 @@ func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Addr
node.Override(new(v1api.FullNode), tnd),
node.Override(new(*lotusminer.Miner), lotusminer.NewTestMiner(mineBlock, act)),

node.Override(new(*sectorstorage.SealerConfig), func() *sectorstorage.SealerConfig {
scfg := config.DefaultStorageMiner()
scfg.Storage.IgnoreResourceFiltering = true
return &scfg.Storage
}),

opts,
)
if err != nil {
Expand Down Expand Up @@ -532,6 +539,11 @@ func mockMinerBuilderOpts(t *testing.T, fullOpts []FullNodeOpts, storage []Stora
node.Override(new(sectorstorage.SectorManager), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.Unsealer), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.PieceProvider), node.From(new(*mock.SectorMgr))),
node.Override(new(*sectorstorage.SealerConfig), func() *sectorstorage.SealerConfig {
scfg := config.DefaultStorageMiner()
scfg.Storage.IgnoreResourceFiltering = true
return &scfg.Storage
}),

node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
node.Override(new(ffiwrapper.Prover), mock.MockProver),
Expand Down