Skip to content

Commit

Permalink
Merge pull request #5023 from filecoin-project/feat/worker-set-task-t…
Browse files Browse the repository at this point in the history
…ypes

worker: Support setting task types at runtime
  • Loading branch information
magik6k authored Dec 1, 2020
2 parents 4d019f1 + 6dea0a6 commit c4a6b94
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 0 deletions.
3 changes: 3 additions & 0 deletions api/api_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type WorkerAPI interface {

storiface.WorkerCalls

TaskDisable(ctx context.Context, tt sealtasks.TaskType) error
TaskEnable(ctx context.Context, tt sealtasks.TaskType) error

// Storage / Other
Remove(ctx context.Context, sector abi.SectorID) error

Expand Down
11 changes: 11 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,9 @@ type WorkerStruct struct {
ReadPiece func(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (storiface.CallID, error) `perm:"admin"`
Fetch func(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`

TaskDisable func(ctx context.Context, tt sealtasks.TaskType) error `perm:"admin"`
TaskEnable func(ctx context.Context, tt sealtasks.TaskType) error `perm:"admin"`

Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`

Expand Down Expand Up @@ -1573,6 +1576,14 @@ func (w *WorkerStruct) Fetch(ctx context.Context, id storage.SectorRef, fileType
return w.Internal.Fetch(ctx, id, fileType, ptype, am)
}

func (w *WorkerStruct) TaskDisable(ctx context.Context, tt sealtasks.TaskType) error {
return w.Internal.TaskDisable(ctx, tt)
}

func (w *WorkerStruct) TaskEnable(ctx context.Context, tt sealtasks.TaskType) error {
return w.Internal.TaskEnable(ctx, tt)
}

func (w *WorkerStruct) Remove(ctx context.Context, sector abi.SectorID) error {
return w.Internal.Remove(ctx, sector)
}
Expand Down
1 change: 1 addition & 0 deletions api/docgen/docgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func init() {
addExample(map[sealtasks.TaskType]struct{}{
sealtasks.TTPreCommit2: {},
})
addExample(sealtasks.TTCommit2)
}

func exampleValue(method string, t, parent reflect.Type) interface{} {
Expand Down
25 changes: 25 additions & 0 deletions cmd/lotus-seal-worker/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package main

import (
"fmt"
"sort"

"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
)

var infoCmd = &cli.Command{
Expand Down Expand Up @@ -49,10 +51,22 @@ var infoCmd = &cli.Command{
return xerrors.Errorf("getting info: %w", err)
}

tt, err := api.TaskTypes(ctx)
if err != nil {
return xerrors.Errorf("getting task types: %w", err)
}

fmt.Printf("Hostname: %s\n", info.Hostname)
fmt.Printf("CPUs: %d; GPUs: %v\n", info.Resources.CPUs, info.Resources.GPUs)
fmt.Printf("RAM: %s; Swap: %s\n", types.SizeStr(types.NewInt(info.Resources.MemPhysical)), types.SizeStr(types.NewInt(info.Resources.MemSwap)))
fmt.Printf("Reserved memory: %s\n", types.SizeStr(types.NewInt(info.Resources.MemReserved)))

fmt.Printf("Task types: ")
for _, t := range ttList(tt) {
fmt.Printf("%s ", t.Short())
}
fmt.Println()

fmt.Println()

paths, err := api.Paths(ctx)
Expand Down Expand Up @@ -80,3 +94,14 @@ var infoCmd = &cli.Command{
return nil
},
}

func ttList(tt map[sealtasks.TaskType]struct{}) []sealtasks.TaskType {
tasks := make([]sealtasks.TaskType, 0, len(tt))
for taskType := range tt {
tasks = append(tasks, taskType)
}
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].Less(tasks[j])
})
return tasks
}
1 change: 1 addition & 0 deletions cmd/lotus-seal-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func main() {
storageCmd,
setCmd,
waitQuietCmd,
tasksCmd,
}

app := &cli.App{
Expand Down
82 changes: 82 additions & 0 deletions cmd/lotus-seal-worker/tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package main

import (
"context"
"strings"

"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/api"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
)

var tasksCmd = &cli.Command{
Name: "tasks",
Usage: "Manage task processing",
Subcommands: []*cli.Command{
tasksEnableCmd,
tasksDisableCmd,
},
}

var allowSetting = map[sealtasks.TaskType]struct{}{
sealtasks.TTAddPiece: {},
sealtasks.TTPreCommit1: {},
sealtasks.TTPreCommit2: {},
sealtasks.TTCommit2: {},
sealtasks.TTUnseal: {},
}

var settableStr = func() string {
var s []string
for _, tt := range ttList(allowSetting) {
s = append(s, tt.Short())
}
return strings.Join(s, "|")
}()

var tasksEnableCmd = &cli.Command{
Name: "enable",
Usage: "Enable a task type",
ArgsUsage: "[" + settableStr + "]",
Action: taskAction(api.WorkerAPI.TaskEnable),
}

var tasksDisableCmd = &cli.Command{
Name: "disable",
Usage: "Disable a task type",
ArgsUsage: "[" + settableStr + "]",
Action: taskAction(api.WorkerAPI.TaskDisable),
}

func taskAction(tf func(a api.WorkerAPI, ctx context.Context, tt sealtasks.TaskType) error) func(cctx *cli.Context) error {
return func(cctx *cli.Context) error {
if cctx.NArg() != 1 {
return xerrors.Errorf("expected 1 argument")
}

var tt sealtasks.TaskType
for taskType := range allowSetting {
if taskType.Short() == cctx.Args().First() {
tt = taskType
break
}
}

if tt == "" {
return xerrors.Errorf("unknown task type '%s'", cctx.Args().First())
}

api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()

ctx := lcli.ReqContext(cctx)

return tf(api, ctx, tt)
}
}
30 changes: 30 additions & 0 deletions documentation/en/api-methods-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
* [Storage](#Storage)
* [StorageAddLocal](#StorageAddLocal)
* [Task](#Task)
* [TaskDisable](#TaskDisable)
* [TaskEnable](#TaskEnable)
* [TaskTypes](#TaskTypes)
* [Unseal](#Unseal)
* [UnsealPiece](#UnsealPiece)
Expand Down Expand Up @@ -502,6 +504,34 @@ Response: `{}`
## Task


### TaskDisable
There are not yet any comments for this method.

Perms: admin

Inputs:
```json
[
"seal/v0/commit/2"
]
```

Response: `{}`

### TaskEnable
There are not yet any comments for this method.

Perms: admin

Inputs:
```json
[
"seal/v0/commit/2"
]
```

Response: `{}`

### TaskTypes
TaskType -> Weight

Expand Down
20 changes: 20 additions & 0 deletions extern/sector-storage/worker_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type LocalWorker struct {
ct *workerCallTracker
acceptTasks map[sealtasks.TaskType]struct{}
running sync.WaitGroup
taskLk sync.Mutex

session uuid.UUID
testDisable int64
Expand Down Expand Up @@ -457,9 +458,28 @@ func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector st
}

func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
l.taskLk.Lock()
defer l.taskLk.Unlock()

return l.acceptTasks, nil
}

func (l *LocalWorker) TaskDisable(ctx context.Context, tt sealtasks.TaskType) error {
l.taskLk.Lock()
defer l.taskLk.Unlock()

delete(l.acceptTasks, tt)
return nil
}

func (l *LocalWorker) TaskEnable(ctx context.Context, tt sealtasks.TaskType) error {
l.taskLk.Lock()
defer l.taskLk.Unlock()

l.acceptTasks[tt] = struct{}{}
return nil
}

func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
return l.localStore.Local(ctx)
}
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
Expand Down Expand Up @@ -1210,6 +1211,7 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c h1:5bFTChQxSKNwy8ALwOebjekYExl9HTT9urdawqC95tA=
github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c/go.mod h1:7qN3Y0BvzRUf4LofcoJplQL10lsFDb4PYlePTVwrP28=
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229 h1:E2B8qYyeSgv5MXpmzZXRNp8IAQ4vjxIjhpAf5hv/tAg=
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
Expand Down Expand Up @@ -1423,7 +1425,9 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4=
github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
Expand Down

0 comments on commit c4a6b94

Please sign in to comment.