Skip to content

Commit

Permalink
Merge pull request #4516 from filecoin-project/feat/api-request-metri…
Browse files Browse the repository at this point in the history
…cs-wrapper

Feat/api request metrics wrapper
  • Loading branch information
magik6k authored Oct 22, 2020
2 parents 29e334d + 7cbd4d4 commit 0289c39
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 7 deletions.
15 changes: 14 additions & 1 deletion cmd/lotus-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ import (
"os"

"github.com/filecoin-project/go-jsonrpc"
"go.opencensus.io/tag"

"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/metrics"

logging "github.com/ipfs/go-log"
"go.opencensus.io/stats/view"

"github.com/gorilla/mux"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -64,6 +69,13 @@ var runCmd = &cli.Command{
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Register all metric views
if err := view.Register(
metrics.DefaultViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}

api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
Expand All @@ -76,7 +88,7 @@ var runCmd = &cli.Command{
log.Info("Setting up API endpoint at " + address)

rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", NewGatewayAPI(api))
rpcServer.Register("Filecoin", metrics.MetricedGatewayAPI(NewGatewayAPI(api)))

mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/").Handler(http.DefaultServeMux)
Expand All @@ -89,6 +101,7 @@ var runCmd = &cli.Command{
srv := &http.Server{
Handler: mux,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-gateway"))
return ctx
},
}
Expand Down
13 changes: 12 additions & 1 deletion cmd/lotus-seal-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
logging "github.com/ipfs/go-log/v2"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-jsonrpc"
Expand All @@ -34,6 +36,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/repo"
)

Expand Down Expand Up @@ -190,6 +193,13 @@ var runCmd = &cli.Command{
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Register all metric views
if err := view.Register(
metrics.DefaultViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}

v, err := nodeApi.Version(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -363,7 +373,7 @@ var runCmd = &cli.Command{

readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(readerServerOpt)
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi))
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(metrics.MetricedWorkerAPI(workerApi)))

mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
Expand All @@ -378,6 +388,7 @@ var runCmd = &cli.Command{
srv := &http.Server{
Handler: ah,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker"))
return ctx
},
}
Expand Down
21 changes: 19 additions & 2 deletions cmd/lotus-storage-miner/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"net"
"net/http"
_ "net/http/pprof"
"os"
Expand All @@ -12,6 +13,8 @@ import (
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-jsonrpc"
Expand All @@ -22,6 +25,7 @@ import (
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/ulimit"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/modules/dtypes"
Expand Down Expand Up @@ -66,6 +70,13 @@ var runCmd = &cli.Command{
defer ncloser()
ctx := lcli.DaemonContext(cctx)

// Register all metric views
if err := view.Register(
metrics.DefaultViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}

v, err := nodeApi.Version(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -147,7 +158,7 @@ var runCmd = &cli.Command{
mux := mux.NewRouter()

rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", apistruct.PermissionedStorMinerAPI(minerapi))
rpcServer.Register("Filecoin", apistruct.PermissionedStorMinerAPI(metrics.MetricedStorMinerAPI(minerapi)))

mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/remote").HandlerFunc(minerapi.(*impl.StorageMinerAPI).ServeRemote)
Expand All @@ -158,7 +169,13 @@ var runCmd = &cli.Command{
Next: mux.ServeHTTP,
}

srv := &http.Server{Handler: ah}
srv := &http.Server{
Handler: ah,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-miner"))
return ctx
},
}

sigChan := make(chan os.Signal, 2)
go func() {
Expand Down
13 changes: 12 additions & 1 deletion cmd/lotus-wallet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"

"github.com/filecoin-project/go-jsonrpc"

Expand All @@ -18,6 +20,7 @@ import (
ledgerwallet "github.com/filecoin-project/lotus/chain/wallet/ledger"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/repo"
)

Expand Down Expand Up @@ -75,6 +78,13 @@ var runCmd = &cli.Command{
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Register all metric views
if err := view.Register(
metrics.DefaultViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}

repoPath := cctx.String(FlagWalletRepo)
r, err := repo.NewFS(repoPath)
if err != nil {
Expand Down Expand Up @@ -125,7 +135,7 @@ var runCmd = &cli.Command{
log.Info("Setting up API endpoint at " + address)

rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", &LoggedWallet{under: w})
rpcServer.Register("Filecoin", &LoggedWallet{under: metrics.MetricedWalletAPI(w)})

mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
Expand All @@ -138,6 +148,7 @@ var runCmd = &cli.Command{
srv := &http.Server{
Handler: mux,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-wallet"))
return ctx
},
}
Expand Down
13 changes: 11 additions & 2 deletions cmd/lotus/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/json"
"net"
"net/http"
_ "net/http/pprof"
"os"
Expand All @@ -13,6 +14,7 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"go.opencensus.io/tag"
"golang.org/x/xerrors"

"contrib.go.opencensus.io/exporter/prometheus"
Expand All @@ -22,6 +24,7 @@ import (

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/impl"
)
Expand All @@ -30,7 +33,7 @@ var log = logging.Logger("main")

func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shutdownCh <-chan struct{}) error {
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(a))
rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(metrics.MetricedFullAPI(a)))

ah := &auth.Handler{
Verify: a.AuthVerify,
Expand Down Expand Up @@ -60,7 +63,13 @@ func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shut
return xerrors.Errorf("could not listen: %w", err)
}

srv := &http.Server{Handler: http.DefaultServeMux}
srv := &http.Server{
Handler: http.DefaultServeMux,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-daemon"))
return ctx
},
}

sigCh := make(chan os.Signal, 2)
shutdownDone := make(chan struct{})
Expand Down
19 changes: 19 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"context"
"time"

"go.opencensus.io/stats"
Expand All @@ -24,6 +25,8 @@ var (
MessageTo, _ = tag.NewKey("message_to")
MessageNonce, _ = tag.NewKey("message_nonce")
ReceivedFrom, _ = tag.NewKey("received_from")
Endpoint, _ = tag.NewKey("endpoint")
APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls
)

// Measures
Expand All @@ -49,6 +52,7 @@ var (
PubsubRecvRPC = stats.Int64("pubsub/recv_rpc", "Counter for total received RPCs", stats.UnitDimensionless)
PubsubSendRPC = stats.Int64("pubsub/send_rpc", "Counter for total sent RPCs", stats.UnitDimensionless)
PubsubDropRPC = stats.Int64("pubsub/drop_rpc", "Counter for total dropped RPCs", stats.UnitDimensionless)
APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds)
)

var (
Expand Down Expand Up @@ -137,6 +141,11 @@ var (
Measure: PubsubDropRPC,
Aggregation: view.Count(),
}
APIRequestDurationView = &view.View{
Measure: APIRequestDuration,
Aggregation: defaultMillisecondsDistribution,
TagKeys: []tag.Key{APIInterface, Endpoint},
}
)

// DefaultViews is an array of OpenCensus views for metric gathering purposes
Expand All @@ -161,10 +170,20 @@ var DefaultViews = append([]*view.View{
PubsubRecvRPCView,
PubsubSendRPCView,
PubsubDropRPCView,
APIRequestDurationView,
},
rpcmetrics.DefaultViews...)

// SinceInMilliseconds returns the duration of time since the provide time as a float64.
func SinceInMilliseconds(startTime time.Time) float64 {
return float64(time.Since(startTime).Nanoseconds()) / 1e6
}

// Timer is a function stopwatch, calling it starts the timer,
// calling the returned function will record the duration.
func Timer(ctx context.Context, m *stats.Float64Measure) func() {
start := time.Now()
return func() {
stats.Record(ctx, m.M(SinceInMilliseconds(start)))
}
}
65 changes: 65 additions & 0 deletions metrics/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package metrics

import (
"context"
"reflect"

"go.opencensus.io/tag"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
)

func MetricedStorMinerAPI(a api.StorageMiner) api.StorageMiner {
var out apistruct.StorageMinerStruct
proxy(a, &out.Internal)
proxy(a, &out.CommonStruct.Internal)
return &out
}

func MetricedFullAPI(a api.FullNode) api.FullNode {
var out apistruct.FullNodeStruct
proxy(a, &out.Internal)
proxy(a, &out.CommonStruct.Internal)
return &out
}

func MetricedWorkerAPI(a api.WorkerAPI) api.WorkerAPI {
var out apistruct.WorkerStruct
proxy(a, &out.Internal)
return &out
}

func MetricedWalletAPI(a api.WalletAPI) api.WalletAPI {
var out apistruct.WalletStruct
proxy(a, &out.Internal)
return &out
}

func MetricedGatewayAPI(a api.GatewayAPI) api.GatewayAPI {
var out apistruct.GatewayStruct
proxy(a, &out.Internal)
return &out
}

func proxy(in interface{}, out interface{}) {
rint := reflect.ValueOf(out).Elem()
ra := reflect.ValueOf(in)

for f := 0; f < rint.NumField(); f++ {
field := rint.Type().Field(f)
fn := ra.MethodByName(field.Name)

rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
ctx := args[0].Interface().(context.Context)
// upsert function name into context
ctx, _ = tag.New(ctx, tag.Upsert(Endpoint, field.Name))
stop := Timer(ctx, APIRequestDuration)
defer stop()
// pass tagged ctx back into function call
args[0] = reflect.ValueOf(ctx)
return fn.Call(args)
}))

}
}

0 comments on commit 0289c39

Please sign in to comment.