Skip to content

Commit

Permalink
feat: use metric container for RPC request metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanVergiliev committed Oct 19, 2022
1 parent e4f72a9 commit c932bb1
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 10 deletions.
18 changes: 13 additions & 5 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var (
Name: "upstream_rpc_requests",
Help: "Count of total RPC requests forwarded to upstreams.",
},
// jsonrpc_method is "batch" for batch requests
// jsonrpc_method is "batch" for batch requests
[]string{"client", "upstream_id", "url", "jsonrpc_method"},
)

Expand Down Expand Up @@ -248,6 +248,10 @@ var (
)

type Container struct {
RPCRequestsCounter *prometheus.CounterVec
RPCRequestsDuration prometheus.ObserverVec
RPCResponseSizes prometheus.ObserverVec

UpstreamRPCRequestsTotal *prometheus.CounterVec
UpstreamJSONRPCRequestsTotal *prometheus.CounterVec
UpstreamRPCRequestErrorsTotal *prometheus.CounterVec
Expand Down Expand Up @@ -280,6 +284,10 @@ func NewContainer() *Container {
result.UpstreamJSONRPCRequestErrorsTotal = upstreamJSONRPCRequestErrorsTotal.MustCurryWith(presetLabels)
result.UpstreamRPCDuration = upstreamRPCDuration.MustCurryWith(presetLabels)

result.RPCRequestsCounter = rpcRequestsCounter.MustCurryWith(presetLabels)
result.RPCRequestsDuration = rpcRequestsDuration.MustCurryWith(presetLabels)
result.RPCResponseSizes = rpcResponseSizes.MustCurryWith(presetLabels)

result.BlockHeight = blockHeight.MustCurryWith(presetLabels)
result.BlockHeightCheckRequests = blockHeightCheckRequests.MustCurryWith(presetLabels)
result.BlockHeightCheckDuration = blockHeightCheckDuration.MustCurryWith(presetLabels)
Expand Down Expand Up @@ -309,10 +317,10 @@ func NewMetricsServer() *http.Server {
}
}

func InstrumentHandler(handler http.Handler) http.Handler {
withRequestsCounter := promhttp.InstrumentHandlerCounter(rpcRequestsCounter, handler)
withRequestsDuration := promhttp.InstrumentHandlerDuration(rpcRequestsDuration, withRequestsCounter)
withResponseSizes := promhttp.InstrumentHandlerResponseSize(rpcResponseSizes, withRequestsDuration)
func InstrumentHandler(handler http.Handler, container *Container) http.Handler {
withRequestsCounter := promhttp.InstrumentHandlerCounter(container.RPCRequestsCounter, handler)
withRequestsDuration := promhttp.InstrumentHandlerDuration(container.RPCRequestsDuration, withRequestsCounter)
withResponseSizes := promhttp.InstrumentHandlerResponseSize(container.RPCResponseSizes, withRequestsDuration)

return withResponseSizes
}
19 changes: 15 additions & 4 deletions internal/server/web_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ type RPCServer struct {
}

func NewRPCServer(config conf.Config) RPCServer {
router := wireRouter(config)
dependencyContainer := wireDependencies(config)
router := dependencyContainer.router
handler := &RPCHandler{
router: router,
}
Expand All @@ -49,7 +50,7 @@ func NewRPCServer(config conf.Config) RPCServer {
mux := http.NewServeMux()
mux.Handle("/health", healthCheckHandler)

mux.Handle("/", metrics.InstrumentHandler(handler))
mux.Handle("/", metrics.InstrumentHandler(handler, dependencyContainer.metricsContainer))

httpServer := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Expand All @@ -66,7 +67,12 @@ func NewRPCServer(config conf.Config) RPCServer {
return *rpcServer
}

func wireRouter(config conf.Config) route.Router {
type DependencyContainer struct {
router route.Router
metricsContainer *metrics.Container
}

func wireDependencies(config conf.Config) *DependencyContainer {
metricContainer := metrics.NewContainer()
chainMetadataStore := metadata.NewChainMetadataStore()
ticker := time.NewTicker(checks.PeriodicHealthCheckInterval)
Expand All @@ -79,7 +85,12 @@ func wireRouter(config conf.Config) route.Router {
BackingStrategy: route.NewPriorityRoundRobinStrategy(),
}

return route.NewRouter(config.Upstreams, config.Groups, chainMetadataStore, healthCheckManager, &routingStrategy, metricContainer)
router := route.NewRouter(config.Upstreams, config.Groups, chainMetadataStore, healthCheckManager, &routingStrategy, metricContainer)

return &DependencyContainer{
router: router,
metricsContainer: metricContainer,
}
}

func (s *RPCServer) Start() error {
Expand Down
4 changes: 3 additions & 1 deletion internal/server/web_server_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/satsuma-data/node-gateway/internal/jsonrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestServeHTTP_ForwardsToSoleHealthyUpstream(t *testing.T) {
Expand Down Expand Up @@ -256,7 +257,8 @@ func executeRequest(t *testing.T, request jsonrpc.RequestBody, handler *RPCHandl
}

func startRouterAndHandler(conf config.Config) *RPCHandler {
router := wireRouter(conf)
dependencyContainer := wireDependencies(conf, zap.L())
router := dependencyContainer.router
router.Start()

for router.IsInitialized() == false {
Expand Down

0 comments on commit c932bb1

Please sign in to comment.