Skip to content

Commit

Permalink
Collect worker task metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Mar 5, 2021
1 parent e05dc4e commit fe230f9
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 99 deletions.
2 changes: 1 addition & 1 deletion cmd/lotus-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var runCmd = &cli.Command{

// Register all metric views
if err := view.Register(
metrics.DefaultViews...,
metrics.ChainNodeViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}
Expand Down
14 changes: 11 additions & 3 deletions cmd/lotus-storage-miner/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -68,14 +69,20 @@ var runCmd = &cli.Command{
return xerrors.Errorf("getting full node api: %w", err)
}
defer ncloser()
ctx := lcli.DaemonContext(cctx)

ctx, _ := tag.New(lcli.DaemonContext(cctx),
tag.Insert(metrics.Version, build.BuildVersion),
tag.Insert(metrics.Commit, build.CurrentCommit),
tag.Insert(metrics.NodeType, "miner"),
)
// Register all metric views
if err := view.Register(
metrics.DefaultViews...,
if err = view.Register(
metrics.MinerNodeViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}
// Set the metric to one so it is published to the exporter
stats.Record(ctx, metrics.LotusInfo.M(1))

v, err := nodeApi.Version(ctx)
if err != nil {
Expand Down Expand Up @@ -162,6 +169,7 @@ var runCmd = &cli.Command{

mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/remote").HandlerFunc(minerapi.(*impl.StorageMinerAPI).ServeRemote)
mux.Handle("/debug/metrics", metrics.Exporter())
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof

ah := &auth.Handler{
Expand Down
25 changes: 14 additions & 11 deletions cmd/lotus/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,20 @@ var DaemonCmd = &cli.Command{
return fmt.Errorf("unrecognized profile type: %q", profile)
}

ctx, _ := tag.New(context.Background(), tag.Insert(metrics.Version, build.BuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit))
ctx, _ := tag.New(context.Background(),
tag.Insert(metrics.Version, build.BuildVersion),
tag.Insert(metrics.Commit, build.CurrentCommit),
tag.Insert(metrics.NodeType, "chain"),
)
// Register all metric views
if err = view.Register(
metrics.ChainNodeViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}
// Set the metric to one so it is published to the exporter
stats.Record(ctx, metrics.LotusInfo.M(1))

{
dir, err := homedir.Expand(cctx.String("repo"))
if err != nil {
Expand Down Expand Up @@ -332,16 +345,6 @@ var DaemonCmd = &cli.Command{
}
}

// Register all metric views
if err = view.Register(
metrics.DefaultViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}

// Set the metric to one so it is published to the exporter
stats.Record(ctx, metrics.LotusInfo.M(1))

endpoint, err := r.APIEndpoint()
if err != nil {
return xerrors.Errorf("getting api endpoint: %w", err)
Expand Down
21 changes: 1 addition & 20 deletions cmd/lotus/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
promclient "github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/tag"
"golang.org/x/xerrors"

"contrib.go.opencensus.io/exporter/prometheus"

"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"

Expand Down Expand Up @@ -55,23 +52,7 @@ func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shut

http.Handle("/rest/v0/import", importAH)

// Prometheus globals are exposed as interfaces, but the prometheus
// OpenCensus exporter expects a concrete *Registry. The concrete type of
// the globals are actually *Registry, so we downcast them, staying
// defensive in case things change under the hood.
registry, ok := promclient.DefaultRegisterer.(*promclient.Registry)
if !ok {
log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", promclient.DefaultRegisterer)
}
exporter, err := prometheus.NewExporter(prometheus.Options{
Registry: registry,
Namespace: "lotus",
})
if err != nil {
log.Fatalf("could not create the prometheus stats exporter: %v", err)
}

http.Handle("/debug/metrics", exporter)
http.Handle("/debug/metrics", metrics.Exporter())
http.Handle("/debug/pprof-set/block", handleFractionOpt("BlockProfileRate", runtime.SetBlockProfileRate))
http.Handle("/debug/pprof-set/mutex", handleFractionOpt("MutexProfileFraction",
func(x int) { runtime.SetMutexProfileFraction(x) },
Expand Down
22 changes: 11 additions & 11 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,47 +632,47 @@ func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
}

func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
return m.returnResult(callID, pi, err)
return m.returnResult(ctx, callID, pi, err)
}

func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error {
return m.returnResult(callID, p1o, err)
return m.returnResult(ctx, callID, p1o, err)
}

func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error {
return m.returnResult(callID, sealed, err)
return m.returnResult(ctx, callID, sealed, err)
}

func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error {
return m.returnResult(callID, out, err)
return m.returnResult(ctx, callID, out, err)
}

func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error {
return m.returnResult(callID, proof, err)
return m.returnResult(ctx, callID, proof, err)
}

func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}

func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}

func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}

func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}

func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error {
return m.returnResult(callID, ok, err)
return m.returnResult(ctx, callID, ok, err)
}

func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
return m.returnResult(ctx, callID, nil, err)
}

func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {
Expand Down
6 changes: 3 additions & 3 deletions extern/sector-storage/manager_calltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,15 +349,15 @@ func (m *Manager) waitCall(ctx context.Context, callID storiface.CallID) (interf
}
}

func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *storiface.CallError) error {
func (m *Manager) returnResult(ctx context.Context, callID storiface.CallID, r interface{}, cerr *storiface.CallError) error {
res := result{
r: r,
}
if cerr != nil {
res.err = cerr
}

m.sched.workTracker.onDone(callID)
m.sched.workTracker.onDone(ctx, callID)

m.workLk.Lock()
defer m.workLk.Unlock()
Expand Down Expand Up @@ -413,5 +413,5 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *sto

func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error {
// TODO: Allow temp error
return m.returnResult(call, nil, storiface.Err(storiface.ErrUnknown, xerrors.New("task aborted")))
return m.returnResult(ctx, call, nil, storiface.Err(storiface.ErrUnknown, xerrors.New("task aborted")))
}
4 changes: 2 additions & 2 deletions extern/sector-storage/sched_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe

go func() {
// first run the prepare step (e.g. fetching sector data from other worker)
err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.workerRpc))
err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))
sh.workersLk.Lock()

if err != nil {
Expand Down Expand Up @@ -437,7 +437,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
}

// Do the work!
err = req.work(req.ctx, sh.workTracker.worker(sw.wid, w.workerRpc))
err = req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))

select {
case req.ret <- workerResponse{err: err}:
Expand Down
63 changes: 44 additions & 19 deletions extern/sector-storage/worker_tracked.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ import (
"time"

"github.com/ipfs/go-cid"
"go.opencensus.io/stats"
"go.opencensus.io/tag"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/metrics"
)

type trackedWork struct {
job storiface.WorkerJob
worker WorkerID
job storiface.WorkerJob
worker WorkerID
workerHostname string
}

type workTracker struct {
Expand All @@ -29,20 +33,31 @@ type workTracker struct {
// TODO: done, aggregate stats, queue stats, scheduler feedback
}

func (wt *workTracker) onDone(callID storiface.CallID) {
func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) {
wt.lk.Lock()
defer wt.lk.Unlock()

_, ok := wt.running[callID]
t, ok := wt.running[callID]
if !ok {
wt.done[callID] = struct{}{}

stats.Record(ctx, metrics.WorkerUntrackedCallsReturned.M(1))
return
}

took := metrics.SinceInMilliseconds(t.job.Start)

ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.TaskType, string(t.job.Task)),
tag.Upsert(metrics.WorkerHostname, t.workerHostname),
)
stats.Record(ctx, metrics.WorkerCallsReturnedCount.M(1), metrics.WorkerCallsReturnedDuration.M(took))

delete(wt.running, callID)
}

func (wt *workTracker) track(wid WorkerID, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
func (wt *workTracker) track(ctx context.Context, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
return func(callID storiface.CallID, err error) (storiface.CallID, error) {
if err != nil {
return callID, err
Expand All @@ -64,17 +79,26 @@ func (wt *workTracker) track(wid WorkerID, sid storage.SectorRef, task sealtasks
Task: task,
Start: time.Now(),
},
worker: wid,
worker: wid,
workerHostname: wi.Hostname,
}

ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.TaskType, string(task)),
tag.Upsert(metrics.WorkerHostname, wi.Hostname),
)
stats.Record(ctx, metrics.WorkerCallsStarted.M(1))

return callID, err
}
}

func (wt *workTracker) worker(wid WorkerID, w Worker) Worker {
func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) Worker {
return &trackedWorker{
Worker: w,
wid: wid,
Worker: w,
wid: wid,
workerInfo: wi,

tracker: wt,
}
Expand All @@ -94,45 +118,46 @@ func (wt *workTracker) Running() []trackedWork {

type trackedWorker struct {
Worker
wid WorkerID
wid WorkerID
workerInfo storiface.WorkerInfo

tracker *workTracker
}

func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces))
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces))
}

func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o))
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o))
}

func (t *trackedWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
}

func (t *trackedWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o))
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o))
}

func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed))
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed))
}

func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.tracker.track(t.wid, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData))
return t.tracker.track(ctx, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData))
}

func (t *trackedWorker) Fetch(ctx context.Context, s storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.tracker.track(t.wid, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am))
return t.tracker.track(ctx, t.wid, t.workerInfo, s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am))
}

func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
return t.tracker.track(t.wid, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
}

func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
return t.tracker.track(t.wid, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size))
return t.tracker.track(ctx, t.wid, t.workerInfo, id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size))
}

var _ Worker = &trackedWorker{}
Loading

0 comments on commit fe230f9

Please sign in to comment.