Skip to content

Commit

Permalink
Merge pull request #3405 from filecoin-project/feat/worker-storage-cli
Browse files Browse the repository at this point in the history
worker: Cli to attach storage paths
  • Loading branch information
magik6k authored Aug 31, 2020
2 parents e1e0152 + 0e6ff66 commit c96613a
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 4 deletions.
2 changes: 2 additions & 0 deletions api/api_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type WorkerAPI interface {
UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error)

StorageAddLocal(ctx context.Context, path string) error

Fetch(context.Context, abi.SectorID, stores.SectorFileType, stores.PathType, stores.AcquireMode) error

Closing(context.Context) (<-chan struct{}, error)
Expand Down
5 changes: 5 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ type WorkerStruct struct {
ReleaseUnsealed func(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error `perm:"admin"`
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
MoveStorage func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`

UnsealPiece func(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error `perm:"admin"`
ReadPiece func(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error) `perm:"admin"`
Expand Down Expand Up @@ -1223,6 +1224,10 @@ func (w *WorkerStruct) MoveStorage(ctx context.Context, sector abi.SectorID) err
return w.Internal.MoveStorage(ctx, sector)
}

func (w *WorkerStruct) StorageAddLocal(ctx context.Context, path string) error {
return w.Internal.StorageAddLocal(ctx, path)
}

func (w *WorkerStruct) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, c cid.Cid) error {
return w.Internal.UnsealPiece(ctx, id, index, size, randomness, c)
}
Expand Down
17 changes: 17 additions & 0 deletions cli/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func flagForAPI(t repo.RepoType) string {
return "api"
case repo.StorageMiner:
return "miner-api"
case repo.Worker:
return "worker-api"
default:
panic(fmt.Sprintf("Unknown repo type: %v", t))
}
Expand All @@ -86,6 +88,8 @@ func flagForRepo(t repo.RepoType) string {
return "repo"
case repo.StorageMiner:
return "miner-repo"
case repo.Worker:
return "worker-repo"
default:
panic(fmt.Sprintf("Unknown repo type: %v", t))
}
Expand All @@ -97,6 +101,8 @@ func envForRepo(t repo.RepoType) string {
return "FULLNODE_API_INFO"
case repo.StorageMiner:
return "MINER_API_INFO"
case repo.Worker:
return "WORKER_API_INFO"
default:
panic(fmt.Sprintf("Unknown repo type: %v", t))
}
Expand All @@ -109,6 +115,8 @@ func envForRepoDeprecation(t repo.RepoType) string {
return "FULLNODE_API_INFO"
case repo.StorageMiner:
return "STORAGE_API_INFO"
case repo.Worker:
return "WORKER_API_INFO"
default:
panic(fmt.Sprintf("Unknown repo type: %v", t))
}
Expand Down Expand Up @@ -234,6 +242,15 @@ func GetStorageMinerAPI(ctx *cli.Context, opts ...jsonrpc.Option) (api.StorageMi
return client.NewStorageMinerRPC(ctx.Context, addr, headers, opts...)
}

func GetWorkerAPI(ctx *cli.Context) (api.WorkerAPI, jsonrpc.ClientCloser, error) {
addr, headers, err := GetRawAPI(ctx, repo.Worker)
if err != nil {
return nil, nil, err
}

return client.NewWorkerRPC(ctx.Context, addr, headers)
}

func DaemonContext(cctx *cli.Context) context.Context {
if mtCtx, ok := cctx.App.Metadata[metadataTraceContext]; ok {
return mtCtx.(context.Context)
Expand Down
71 changes: 71 additions & 0 deletions cmd/lotus-seal-worker/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"fmt"

"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
)

var infoCmd = &cli.Command{
Name: "info",
Usage: "Print worker info",
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()

ctx := lcli.ReqContext(cctx)

ver, err := api.Version(ctx)
if err != nil {
return xerrors.Errorf("getting version: %w", err)
}

fmt.Println("Worker version: ", ver)
fmt.Print("CLI version: ")
cli.VersionPrinter(cctx)
fmt.Println()

info, err := api.Info(ctx)
if err != nil {
return xerrors.Errorf("getting info: %w", err)
}

fmt.Printf("Hostname: %s\n", info.Hostname)
fmt.Printf("CPUs: %d; GPUs: %v\n", info.Resources.CPUs, info.Resources.GPUs)
fmt.Printf("RAM: %s; Swap: %s\n", types.SizeStr(types.NewInt(info.Resources.MemPhysical)), types.SizeStr(types.NewInt(info.Resources.MemSwap)))
fmt.Printf("Reserved memory: %s\n", types.SizeStr(types.NewInt(info.Resources.MemReserved)))
fmt.Println()

paths, err := api.Paths(ctx)
if err != nil {
return xerrors.Errorf("getting path info: %w", err)
}

for _, path := range paths {
fmt.Printf("%s:\n", path.ID)
fmt.Printf("\tWeight: %d; Use: ", path.Weight)
if path.CanSeal || path.CanStore {
fmt.Printf("Weight: %d; Use: ", path.Weight)
if path.CanSeal {
fmt.Print("Seal ")
}
if path.CanStore {
fmt.Print("Store")
}
fmt.Println("")
} else {
fmt.Print("Use: ReadOnly")
}
fmt.Printf("\tLocal: %s\n", path.LocalPath)
}

return nil
},
}
35 changes: 33 additions & 2 deletions cmd/lotus-seal-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/google/uuid"
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -46,10 +47,10 @@ const FlagWorkerRepoDeprecation = "workerrepo"
func main() {
lotuslog.SetupLogLevels()

log.Info("Starting lotus worker")

local := []*cli.Command{
runCmd,
infoCmd,
storageCmd,
}

app := &cli.App{
Expand Down Expand Up @@ -153,6 +154,8 @@ var runCmd = &cli.Command{
return nil
},
Action: func(cctx *cli.Context) error {
log.Info("Starting lotus worker")

if !cctx.Bool("enable-gpu-proving") {
if err := os.Setenv("BELLMAN_NO_GPU", "true"); err != nil {
return xerrors.Errorf("could not set no-gpu env: %+v", err)
Expand Down Expand Up @@ -342,6 +345,8 @@ var runCmd = &cli.Command{
SealProof: spt,
TaskTypes: taskTypes,
}, remote, localStore, nodeApi),
localStore: localStore,
ls: lr,
}

mux := mux.NewRouter()
Expand Down Expand Up @@ -383,6 +388,32 @@ var runCmd = &cli.Command{
return err
}

{
a, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return xerrors.Errorf("parsing address: %w", err)
}

ma, err := manet.FromNetAddr(a)
if err != nil {
return xerrors.Errorf("creating api multiaddress: %w", err)
}

if err := lr.SetAPIEndpoint(ma); err != nil {
return xerrors.Errorf("setting api endpoint: %w", err)
}

ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get miner API info: %w", err)
}

// TODO: ideally this would be a token with some permissions dropped
if err := lr.SetAPIToken(ainfo.Token); err != nil {
return xerrors.Errorf("setting api token: %w", err)
}
}

log.Info("Waiting for tasks")

go func() {
Expand Down
29 changes: 27 additions & 2 deletions cmd/lotus-seal-worker/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,44 @@ package main
import (
"context"

"github.com/filecoin-project/specs-storage/storage"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"

sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/build"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
)

type worker struct {
*sectorstorage.LocalWorker

localStore *stores.Local
ls stores.LocalStorage
}

func (w *worker) Version(context.Context) (build.Version, error) {
return build.APIVersion, nil
}

func (w *worker) StorageAddLocal(ctx context.Context, path string) error {
path, err := homedir.Expand(path)
if err != nil {
return xerrors.Errorf("expanding local path: %w", err)
}

if err := w.localStore.OpenPath(ctx, path); err != nil {
return xerrors.Errorf("opening local path: %w", err)
}

if err := w.ls.SetStorage(func(sc *stores.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{Path: path})
}); err != nil {
return xerrors.Errorf("get storage config: %w", err)
}

return nil
}

var _ storage.Sealer = &worker{}
105 changes: 105 additions & 0 deletions cmd/lotus-seal-worker/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"encoding/json"
"io/ioutil"
"os"
"path/filepath"

"github.com/google/uuid"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
)

const metaFile = "sectorstore.json"

var storageCmd = &cli.Command{
Name: "storage",
Usage: "manage sector storage",
Subcommands: []*cli.Command{
storageAttachCmd,
},
}

var storageAttachCmd = &cli.Command{
Name: "attach",
Usage: "attach local storage path",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "init",
Usage: "initialize the path first",
},
&cli.Uint64Flag{
Name: "weight",
Usage: "(for init) path weight",
Value: 10,
},
&cli.BoolFlag{
Name: "seal",
Usage: "(for init) use path for sealing",
},
&cli.BoolFlag{
Name: "store",
Usage: "(for init) use path for long-term storage",
},
},
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)

if !cctx.Args().Present() {
return xerrors.Errorf("must specify storage path to attach")
}

p, err := homedir.Expand(cctx.Args().First())
if err != nil {
return xerrors.Errorf("expanding path: %w", err)
}

if cctx.Bool("init") {
if err := os.MkdirAll(p, 0755); err != nil {
if !os.IsExist(err) {
return err
}
}

_, err := os.Stat(filepath.Join(p, metaFile))
if !os.IsNotExist(err) {
if err == nil {
return xerrors.Errorf("path is already initialized")
}
return err
}

cfg := &stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: cctx.Uint64("weight"),
CanSeal: cctx.Bool("seal"),
CanStore: cctx.Bool("store"),
}

if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store of --seal")
}

b, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
}

if err := ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644); err != nil {
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(p, metaFile), err)
}
}

return nodeApi.StorageAddLocal(ctx, p)
},
}

0 comments on commit c96613a

Please sign in to comment.