From 409908cb258e14813fc3536683d47881da695711 Mon Sep 17 00:00:00 2001 From: siddimore Date: Mon, 1 Apr 2024 12:45:57 -0700 Subject: [PATCH] [DataAPI] Implement metric for Disperser and Churner Service Availability (#405) Co-authored-by: Siddharth More --- disperser/dataapi/docs/docs.go | 101 ++++++++++++++++++++++-- disperser/dataapi/docs/swagger.json | 104 ++++++++++++++++++++++--- disperser/dataapi/docs/swagger.yaml | 78 ++++++++++++++++--- disperser/dataapi/server.go | 116 +++++++++++++++++++++++++++- disperser/dataapi/server_test.go | 80 ++++++++++++++++++- 5 files changed, 445 insertions(+), 34 deletions(-) diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index c950c2f63c..8edf43b6f6 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -23,7 +23,7 @@ const docTemplate = `{ "tags": [ "ServiceAvailability" ], - "summary": "Get status of public EigenDA services.", + "summary": "Get status of EigenDA services.", "responses": { "200": { "description": "OK", @@ -200,6 +200,80 @@ const docTemplate = `{ } } }, + "/metrics/churner-service-availability": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Churner ServiceAvailability" + ], + "summary": "Get status of EigenDA churner service.", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.ServiceAvailabilityResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, + "/metrics/disperser-service-availability": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "ServiceAvailability" + ], + "summary": "Get status of EigenDA Disperser service.", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.ServiceAvailabilityResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, "/metrics/non-signers": { "get": { "produces": [ @@ -389,19 +463,19 @@ const docTemplate = `{ "core.SecurityParam": { "type": "object", "properties": { - "adversary_threshold": { + "adversaryThreshold": { "description": "AdversaryThreshold is the maximum amount of stake that can be controlled by an adversary in the quorum as a percentage of the total stake in the quorum", "type": "integer" }, - "quorum_id": { + "confirmationThreshold": { + "description": "ConfirmationThreshold is the amount of stake that must sign a message for it to be considered valid as a percentage of the total stake in the quorum", "type": "integer" }, - "quorum_rate": { - "description": "Rate Limit. This is a temporary measure until the node can derive rates on its own using rollup authentication. This is used\nfor restricting the rate at which retrievers are able to download data from the DA node to a multiple of the rate at which the\ndata was posted to the DA node.", + "quorumID": { "type": "integer" }, - "quorum_threshold": { - "description": "QuorumThreshold is the amount of stake that must sign a message for it to be considered valid as a percentage of the total stake in the quorum", + "quorumRate": { + "description": "Rate Limit. This is a temporary measure until the node can derive rates on its own using rollup authentication. This is used\nfor restricting the rate at which retrievers are able to download data from the DA node to a multiple of the rate at which the\ndata was posted to the DA node.", "type": "integer" } } @@ -533,7 +607,14 @@ const docTemplate = `{ "type": "number" }, "total_stake": { + "description": "deprecated: use TotalStakePerQuorum instead. Remove when the frontend is updated.", "type": "integer" + }, + "total_stake_per_quorum": { + "type": "object", + "additionalProperties": { + "type": "integer" + } } } }, @@ -551,12 +632,18 @@ const docTemplate = `{ "dataapi.OperatorNonsigningPercentageMetrics": { "type": "object", "properties": { + "operator_address": { + "type": "string" + }, "operator_id": { "type": "string" }, "percentage": { "type": "number" }, + "quorum_id": { + "type": "integer" + }, "total_batches": { "type": "integer" }, diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index e91bdc3867..17df685fa3 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -19,7 +19,7 @@ "tags": [ "ServiceAvailability" ], - "summary": "Get status of public EigenDA services.", + "summary": "Get status of EigenDA services.", "responses": { "200": { "description": "OK", @@ -196,6 +196,80 @@ } } }, + "/metrics/churner-service-availability": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Churner ServiceAvailability" + ], + "summary": "Get status of EigenDA churner service.", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.ServiceAvailabilityResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, + "/metrics/disperser-service-availability": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "ServiceAvailability" + ], + "summary": "Get status of EigenDA Disperser service.", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.ServiceAvailabilityResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, "/metrics/non-signers": { "get": { "produces": [ @@ -385,19 +459,19 @@ "core.SecurityParam": { "type": "object", "properties": { - "adversary_threshold": { + "adversaryThreshold": { "description": "AdversaryThreshold is the maximum amount of stake that can be controlled by an adversary in the quorum as a percentage of the total stake in the quorum", "type": "integer" }, - "quorum_id": { + "confirmationThreshold": { + "description": "ConfirmationThreshold is the amount of stake that must sign a message for it to be considered valid as a percentage of the total stake in the quorum", "type": "integer" }, - "quorum_rate": { - "description": "Rate Limit. This is a temporary measure until the node can derive rates on its own using rollup authentication. This is used\nfor restricting the rate at which retrievers are able to download data from the DA node to a multiple of the rate at which the\ndata was posted to the DA node.", + "quorumID": { "type": "integer" }, - "quorum_threshold": { - "description": "QuorumThreshold is the amount of stake that must sign a message for it to be considered valid as a percentage of the total stake in the quorum", + "quorumRate": { + "description": "Rate Limit. This is a temporary measure until the node can derive rates on its own using rollup authentication. This is used\nfor restricting the rate at which retrievers are able to download data from the DA node to a multiple of the rate at which the\ndata was posted to the DA node.", "type": "integer" } } @@ -529,7 +603,14 @@ "type": "number" }, "total_stake": { + "description": "deprecated: use TotalStakePerQuorum instead. Remove when the frontend is updated.", "type": "integer" + }, + "total_stake_per_quorum": { + "type": "object", + "additionalProperties": { + "type": "integer" + } } } }, @@ -547,15 +628,18 @@ "dataapi.OperatorNonsigningPercentageMetrics": { "type": "object", "properties": { - "operator_id": { + "operator_address": { "type": "string" }, - "operator_address": { + "operator_id": { "type": "string" }, "percentage": { "type": "number" }, + "quorum_id": { + "type": "integer" + }, "total_batches": { "type": "integer" }, @@ -687,4 +771,4 @@ } } } -} +} \ No newline at end of file diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index 13d066fc80..ec0c151944 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -1,24 +1,24 @@ definitions: core.SecurityParam: properties: - adversary_threshold: + adversaryThreshold: description: AdversaryThreshold is the maximum amount of stake that can be controlled by an adversary in the quorum as a percentage of the total stake in the quorum type: integer - quorum_id: + confirmationThreshold: + description: ConfirmationThreshold is the amount of stake that must sign a + message for it to be considered valid as a percentage of the total stake + in the quorum type: integer - quorum_rate: + quorumID: + type: integer + quorumRate: description: |- Rate Limit. This is a temporary measure until the node can derive rates on its own using rollup authentication. This is used for restricting the rate at which retrievers are able to download data from the DA node to a multiple of the rate at which the data was posted to the DA node. type: integer - quorum_threshold: - description: QuorumThreshold is the amount of stake that must sign a message - for it to be considered valid as a percentage of the total stake in the - quorum - type: integer type: object dataapi.BlobMetadataResponse: properties: @@ -103,7 +103,13 @@ definitions: throughput: type: number total_stake: + description: 'deprecated: use TotalStakePerQuorum instead. Remove when the + frontend is updated.' type: integer + total_stake_per_quorum: + additionalProperties: + type: integer + type: object type: object dataapi.NonSigner: properties: @@ -114,12 +120,14 @@ definitions: type: object dataapi.OperatorNonsigningPercentageMetrics: properties: - operator_id: - type: string operator_address: type: string + operator_id: + type: string percentage: type: number + quorum_id: + type: integer total_batches: type: integer total_unsigned_batches: @@ -233,7 +241,7 @@ paths: description: 'error: Server error' schema: $ref: '#/definitions/dataapi.ErrorResponse' - summary: Get status of public EigenDA services. + summary: Get status of EigenDA services. tags: - ServiceAvailability /feed/blobs: @@ -332,6 +340,54 @@ paths: summary: Fetch metrics tags: - Metrics + /metrics/churner-service-availability: + get: + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.ServiceAvailabilityResponse' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Get status of EigenDA churner service. + tags: + - Churner ServiceAvailability + /metrics/disperser-service-availability: + get: + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.ServiceAvailabilityResponse' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Get status of EigenDA Disperser service. + tags: + - ServiceAvailability /metrics/non-signers: get: parameters: diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 79104d9336..545bc97f7a 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -207,9 +207,11 @@ func (s *server) Start() error { metrics := v1.Group("/metrics") { metrics.GET("/", s.FetchMetricsHandler) - metrics.GET("/throughput", s.FetchMetricsTroughputHandler) + metrics.GET("/throughput", s.FetchMetricsThroughputHandler) metrics.GET("/non-signers", s.FetchNonSigners) metrics.GET("/operator-nonsigning-percentage", s.FetchOperatorsNonsigningPercentageHandler) + metrics.GET("/disperser-service-availability", s.FetchDisperserServiceAvailability) + metrics.GET("/churner-service-availability", s.FetchChurnerServiceAvailability) } swagger := v1.Group("/swagger") { @@ -375,7 +377,7 @@ func (s *server) FetchMetricsHandler(c *gin.Context) { c.JSON(http.StatusOK, metric) } -// FetchMetricsTroughputHandler godoc +// FetchMetricsThroughputHandler godoc // // @Summary Fetch throughput time series // @Tags Metrics @@ -387,7 +389,7 @@ func (s *server) FetchMetricsHandler(c *gin.Context) { // @Failure 404 {object} ErrorResponse "error: Not found" // @Failure 500 {object} ErrorResponse "error: Server error" // @Router /metrics/throughput [get] -func (s *server) FetchMetricsTroughputHandler(c *gin.Context) { +func (s *server) FetchMetricsThroughputHandler(c *gin.Context) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("FetchMetricsTroughput", f*1000) // make milliseconds })) @@ -529,7 +531,7 @@ func (s *server) FetchDeregisteredOperators(c *gin.Context) { // GetEigenDAServiceAvailability godoc // -// @Summary Get status of public EigenDA services. +// @Summary Get status of EigenDA services. // @Tags ServiceAvailability // @Produce json // @Success 200 {object} ServiceAvailabilityResponse @@ -590,6 +592,112 @@ func (s *server) GetEigenDAServiceAvailability(c *gin.Context) { }) } +// FetchDisperserServiceAvailability godoc +// +// @Summary Get status of EigenDA Disperser service. +// @Tags ServiceAvailability +// @Produce json +// @Success 200 {object} ServiceAvailabilityResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /metrics/disperser-service-availability [get] +func (s *server) FetchDisperserServiceAvailability(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchDisperserServiceAvailability", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + // Check Disperser + services := []string{"Disperser"} + + s.logger.Info("Getting service availability for", "services", strings.Join(services, ", ")) + + availabilityStatuses, err := s.getServiceAvailability(c.Request.Context(), services) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchDisperserServiceAvailability") + errorResponse(c, err) + return + } + + s.metrics.IncrementSuccessfulRequestNum("FetchDisperserServiceAvailability") + + // Set the status code to 503 if any of the services are not serving + availabilityStatus := http.StatusOK + for _, status := range availabilityStatuses { + if status.ServiceStatus == "NOT_SERVING" { + availabilityStatus = http.StatusServiceUnavailable + break + } + + if status.ServiceStatus == "UNKNOWN" { + availabilityStatus = http.StatusInternalServerError + break + } + + } + + c.JSON(availabilityStatus, ServiceAvailabilityResponse{ + Meta: Meta{ + Size: len(availabilityStatuses), + }, + Data: availabilityStatuses, + }) +} + +// FetchChurnerServiceAvailability godoc +// +// @Summary Get status of EigenDA churner service. +// @Tags Churner ServiceAvailability +// @Produce json +// @Success 200 {object} ServiceAvailabilityResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /metrics/churner-service-availability [get] +func (s *server) FetchChurnerServiceAvailability(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchChurnerServiceAvailability", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + // Check Disperser + services := []string{"Churner"} + + s.logger.Info("Getting service availability for", "services", strings.Join(services, ", ")) + + availabilityStatuses, err := s.getServiceAvailability(c.Request.Context(), services) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchChurnerServiceAvailability") + errorResponse(c, err) + return + } + + s.metrics.IncrementSuccessfulRequestNum("FetchChurnerServiceAvailability") + + // Set the status code to 503 if any of the services are not serving + availabilityStatus := http.StatusOK + for _, status := range availabilityStatuses { + if status.ServiceStatus == "NOT_SERVING" { + availabilityStatus = http.StatusServiceUnavailable + break + } + + if status.ServiceStatus == "UNKNOWN" { + availabilityStatus = http.StatusInternalServerError + break + } + + } + + c.JSON(availabilityStatus, ServiceAvailabilityResponse{ + Meta: Meta{ + Size: len(availabilityStatuses), + }, + Data: availabilityStatuses, + }) +} + func (s *server) getBlobMetadataByBatchesWithLimit(ctx context.Context, limit int) ([]*Batch, []*disperser.BlobMetadata, error) { var ( blobMetadatas = make([]*disperser.BlobMetadata, 0) diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 82312ede55..079edbb1b9 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -259,7 +259,7 @@ func TestFetchMetricsHandler(t *testing.T) { assert.Equal(t, uint64(1), response.TotalStakePerQuorum[1]) } -func TestFetchMetricsTroughputHandler(t *testing.T) { +func TestFetchMetricsThroughputHandler(t *testing.T) { r := setUpRouter() s := new(model.SampleStream) @@ -270,7 +270,7 @@ func TestFetchMetricsTroughputHandler(t *testing.T) { matrix = append(matrix, s) mockPrometheusApi.On("QueryRange").Return(matrix, nil, nil).Once() - r.GET("/v1/metrics/throughput", testDataApiServer.FetchMetricsTroughputHandler) + r.GET("/v1/metrics/throughput", testDataApiServer.FetchMetricsThroughputHandler) w := httptest.NewRecorder() req := httptest.NewRequest(http.MethodGet, "/v1/metrics/throughput", nil) @@ -351,6 +351,82 @@ func TestFetchUnsignedBatchesHandler(t *testing.T) { assert.Equal(t, "0xe22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311", operatorId) } +func TestFetchDisperserServiceAvailabilityHandler(t *testing.T) { + r := setUpRouter() + + mockHealthCheckService := NewMockHealthCheckService() + mockHealthCheckService.AddResponse("Disperser", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }) + + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + + r.GET("/v1/metrics/disperser-service-availability", testDataApiServer.FetchDisperserServiceAvailability) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/metrics/disperser-service-availability", nil) + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.ServiceAvailabilityResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + fmt.Printf("Response: %v\n", response) + + assert.Equal(t, http.StatusOK, res.StatusCode) + assert.Equal(t, 1, response.Meta.Size) + assert.Equal(t, 1, len(response.Data)) + + serviceData := response.Data[0] + assert.Equal(t, "Disperser", serviceData.ServiceName) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING.String(), serviceData.ServiceStatus) +} + +func TestChurnerServiceAvailabilityHandler(t *testing.T) { + r := setUpRouter() + + mockHealthCheckService := NewMockHealthCheckService() + mockHealthCheckService.AddResponse("Churner", &grpc_health_v1.HealthCheckResponse{ + Status: grpc_health_v1.HealthCheckResponse_SERVING, + }) + + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + + r.GET("/v1/metrics/churner-service-availability", testDataApiServer.FetchChurnerServiceAvailability) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/metrics/churner-service-availability", nil) + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.ServiceAvailabilityResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + fmt.Printf("Response: %v\n", response) + + assert.Equal(t, http.StatusOK, res.StatusCode) + assert.Equal(t, 1, response.Meta.Size) + assert.Equal(t, 1, len(response.Data)) + + serviceData := response.Data[0] + assert.Equal(t, "Churner", serviceData.ServiceName) + assert.Equal(t, grpc_health_v1.HealthCheckResponse_SERVING.String(), serviceData.ServiceStatus) +} + func TestFetchDeregisteredOperatorNoSocketInfoOneOperatorHandler(t *testing.T) { defer goleak.VerifyNone(t)