diff --git a/disperser/cmd/dataapi/config.go b/disperser/cmd/dataapi/config.go index 626427c587..8fd8a7aa93 100644 --- a/disperser/cmd/dataapi/config.go +++ b/disperser/cmd/dataapi/config.go @@ -29,8 +29,9 @@ type Config struct { BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string - DisperserHostname string - ChurnerHostname string + DisperserHostname string + ChurnerHostname string + BatcherHealthEndpt string } func NewConfig(ctx *cli.Context) (Config, error) { @@ -63,8 +64,9 @@ func NewConfig(ctx *cli.Context) (Config, error) { HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name), EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name), }, - DisperserHostname: ctx.GlobalString(flags.DisperserHostnameFlag.Name), - ChurnerHostname: ctx.GlobalString(flags.ChurnerHostnameFlag.Name), + DisperserHostname: ctx.GlobalString(flags.DisperserHostnameFlag.Name), + ChurnerHostname: ctx.GlobalString(flags.ChurnerHostnameFlag.Name), + BatcherHealthEndpt: ctx.GlobalString(flags.BatcherHealthEndptFlag.Name), } return config, nil } diff --git a/disperser/cmd/dataapi/flags/flags.go b/disperser/cmd/dataapi/flags/flags.go index 6b8906a5ea..453b25a721 100644 --- a/disperser/cmd/dataapi/flags/flags.go +++ b/disperser/cmd/dataapi/flags/flags.go @@ -117,6 +117,12 @@ var ( Required: true, EnvVar: common.PrefixEnvVar(envVarPrefix, "EIGENDA_CHURNER_HOSTNAME"), } + BatcherHealthEndptFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "eigenda-batcher-health-endpoint"), + Usage: "Endpt of EigenDA Batcher Health Sidecar", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "EIGENDA_BATCHER_HEALTH_ENDPOINT"), + } /* Optional Flags*/ MetricsHTTPPort = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"), @@ -143,6 +149,7 @@ var requiredFlags = []cli.Flag{ EnableMetricsFlag, DisperserHostnameFlag, ChurnerHostnameFlag, + BatcherHealthEndptFlag, } var optionalFlags = []cli.Flag{ diff --git a/disperser/cmd/dataapi/main.go b/disperser/cmd/dataapi/main.go index dcdcc53ecc..9a6dc5dd1d 100644 --- a/disperser/cmd/dataapi/main.go +++ b/disperser/cmd/dataapi/main.go @@ -96,11 +96,12 @@ func RunDataApi(ctx *cli.Context) error { metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger) server = dataapi.NewServer( dataapi.Config{ - ServerMode: config.ServerMode, - SocketAddr: config.SocketAddr, - AllowOrigins: config.AllowOrigins, - DisperserHostname: config.DisperserHostname, - ChurnerHostname: config.ChurnerHostname, + ServerMode: config.ServerMode, + SocketAddr: config.SocketAddr, + AllowOrigins: config.AllowOrigins, + DisperserHostname: config.DisperserHostname, + ChurnerHostname: config.ChurnerHostname, + BatcherHealthEndpt: config.BatcherHealthEndpt, }, sharedStorage, promClient, @@ -111,6 +112,7 @@ func RunDataApi(ctx *cli.Context) error { metrics, nil, nil, + nil, ) ) diff --git a/disperser/dataapi/config.go b/disperser/dataapi/config.go index c22bd1f054..b7027007d1 100644 --- a/disperser/dataapi/config.go +++ b/disperser/dataapi/config.go @@ -1,9 +1,10 @@ package dataapi type Config struct { - SocketAddr string - ServerMode string - AllowOrigins []string - DisperserHostname string - ChurnerHostname string + SocketAddr string + ServerMode string + AllowOrigins []string + DisperserHostname string + ChurnerHostname string + BatcherHealthEndpt string } diff --git a/disperser/dataapi/docs/docs.go b/disperser/dataapi/docs/docs.go index 8edf43b6f6..c43e4b8876 100644 --- a/disperser/dataapi/docs/docs.go +++ b/disperser/dataapi/docs/docs.go @@ -200,6 +200,43 @@ const docTemplate = `{ } } }, + "/metrics/batcher-service-availability": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Batcher Availability" + ], + "summary": "Get status of EigenDA batcher.", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.ServiceAvailabilityResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, "/metrics/churner-service-availability": { "get": { "produces": [ diff --git a/disperser/dataapi/docs/swagger.json b/disperser/dataapi/docs/swagger.json index 17df685fa3..d86dd6a816 100644 --- a/disperser/dataapi/docs/swagger.json +++ b/disperser/dataapi/docs/swagger.json @@ -196,6 +196,43 @@ } } }, + "/metrics/batcher-service-availability": { + "get": { + "produces": [ + "application/json" + ], + "tags": [ + "Batcher Availability" + ], + "summary": "Get status of EigenDA batcher.", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/dataapi.ServiceAvailabilityResponse" + } + }, + "400": { + "description": "error: Bad request", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "404": { + "description": "error: Not found", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + }, + "500": { + "description": "error: Server error", + "schema": { + "$ref": "#/definitions/dataapi.ErrorResponse" + } + } + } + } + }, "/metrics/churner-service-availability": { "get": { "produces": [ diff --git a/disperser/dataapi/docs/swagger.yaml b/disperser/dataapi/docs/swagger.yaml index ec0c151944..0c7fb1dd04 100644 --- a/disperser/dataapi/docs/swagger.yaml +++ b/disperser/dataapi/docs/swagger.yaml @@ -340,6 +340,30 @@ paths: summary: Fetch metrics tags: - Metrics + /metrics/batcher-service-availability: + get: + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/dataapi.ServiceAvailabilityResponse' + "400": + description: 'error: Bad request' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "404": + description: 'error: Not found' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + "500": + description: 'error: Server error' + schema: + $ref: '#/definitions/dataapi.ErrorResponse' + summary: Get status of EigenDA batcher. + tags: + - Batcher Availability /metrics/churner-service-availability: get: produces: diff --git a/disperser/dataapi/service_availability_handler.go b/disperser/dataapi/grpc_service_availability_handler.go similarity index 96% rename from disperser/dataapi/service_availability_handler.go rename to disperser/dataapi/grpc_service_availability_handler.go index 7a9d9c1f2c..718162647c 100644 --- a/disperser/dataapi/service_availability_handler.go +++ b/disperser/dataapi/grpc_service_availability_handler.go @@ -33,7 +33,7 @@ func (s *server) getServiceAvailability(ctx context.Context, services []string) var availabilityStatus *ServiceAvailability s.logger.Info("checking service health", "service", serviceName) - response, err := s.eigenDAServiceChecker.CheckHealth(ctx, serviceName) + response, err := s.eigenDAGRPCServiceChecker.CheckHealth(ctx, serviceName) if err != nil { if err.Error() == "disperser connection is nil" { @@ -74,7 +74,7 @@ func (s *server) getServiceAvailability(ctx context.Context, services []string) return availabilityStatuses, nil } -func NewEigenDAServiceHealthCheck(grpcConnection GRPCConn, disperserHostName, churnerHostName string) EigenDAServiceChecker { +func NewEigenDAServiceHealthCheck(grpcConnection GRPCConn, disperserHostName, churnerHostName string) EigenDAGRPCServiceChecker { // Create Pre-configured connections to the services // Saves from having to create new connection on each request diff --git a/disperser/dataapi/http_service_availability_handler.go b/disperser/dataapi/http_service_availability_handler.go new file mode 100644 index 0000000000..056e9c20ad --- /dev/null +++ b/disperser/dataapi/http_service_availability_handler.go @@ -0,0 +1,50 @@ +package dataapi + +import ( + "context" + "net/http" +) + +// Simple struct with a Service Name and its HealthEndPt. +type HttpServiceAvailabilityCheck struct { + ServiceName string + HealthEndPt string +} + +type HttpServiceAvailability struct{} + +func (s *server) getServiceHealth(ctx context.Context, services []HttpServiceAvailabilityCheck) ([]*ServiceAvailability, error) { + + availabilityStatuses := make([]*ServiceAvailability, len(services)) + for i, service := range services { + var availabilityStatus *ServiceAvailability + s.logger.Info("checking service health", "service", service.ServiceName) + + resp, err := s.eigenDAHttpServiceChecker.CheckHealth(service.HealthEndPt) + if err != nil { + s.logger.Error("Error querying service health:", "err", err) + } + + availabilityStatus = &ServiceAvailability{ + ServiceName: service.ServiceName, + ServiceStatus: resp, + } + availabilityStatuses[i] = availabilityStatus + } + return availabilityStatuses, nil +} + +// ServiceAvailability represents the status of a service. +func (sa *HttpServiceAvailability) CheckHealth(endpt string) (string, error) { + resp, err := http.Get(endpt) + if err != nil { + return "UNKNOWN", err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + return "SERVING", nil + } + + return "NOT_SERVING", nil +} diff --git a/disperser/dataapi/server.go b/disperser/dataapi/server.go index 545bc97f7a..0f0875e342 100644 --- a/disperser/dataapi/server.go +++ b/disperser/dataapi/server.go @@ -33,11 +33,15 @@ const ( var errNotFound = errors.New("not found") -type EigenDAServiceChecker interface { +type EigenDAGRPCServiceChecker interface { CheckHealth(ctx context.Context, serviceName string) (*grpc_health_v1.HealthCheckResponse, error) CloseConnections() error } +type EigenDAHttpServiceChecker interface { + CheckHealth(serviceName string) (string, error) +} + type ( BlobMetadataResponse struct { BlobKey string `json:"blob_key"` @@ -131,10 +135,12 @@ type ( transactor core.Transactor chainState core.ChainState - metrics *Metrics - disperserHostName string - churnerHostName string - eigenDAServiceChecker EigenDAServiceChecker + metrics *Metrics + disperserHostName string + churnerHostName string + batcherHealthEndpt string + eigenDAGRPCServiceChecker EigenDAGRPCServiceChecker + eigenDAHttpServiceChecker EigenDAHttpServiceChecker } ) @@ -148,7 +154,8 @@ func NewServer( logger logging.Logger, metrics *Metrics, grpcConn GRPCConn, - eigenDAServiceChecker EigenDAServiceChecker, + eigenDAGRPCServiceChecker EigenDAGRPCServiceChecker, + eigenDAHttpServiceChecker EigenDAHttpServiceChecker, ) *server { // Initialize the health checker service for EigenDA services @@ -156,25 +163,31 @@ func NewServer( grpcConn = &GRPCDialerSkipTLS{} } - if eigenDAServiceChecker == nil { + if eigenDAGRPCServiceChecker == nil { + + eigenDAGRPCServiceChecker = NewEigenDAServiceHealthCheck(grpcConn, config.DisperserHostname, config.ChurnerHostname) + } - eigenDAServiceChecker = NewEigenDAServiceHealthCheck(grpcConn, config.DisperserHostname, config.ChurnerHostname) + if eigenDAHttpServiceChecker == nil { + eigenDAHttpServiceChecker = &HttpServiceAvailability{} } return &server{ - logger: logger.With("component", "DataAPIServer"), - serverMode: config.ServerMode, - socketAddr: config.SocketAddr, - allowOrigins: config.AllowOrigins, - blobstore: blobstore, - promClient: promClient, - subgraphClient: subgraphClient, - transactor: transactor, - chainState: chainState, - metrics: metrics, - disperserHostName: config.DisperserHostname, - churnerHostName: config.ChurnerHostname, - eigenDAServiceChecker: eigenDAServiceChecker, + logger: logger.With("component", "DataAPIServer"), + serverMode: config.ServerMode, + socketAddr: config.SocketAddr, + allowOrigins: config.AllowOrigins, + blobstore: blobstore, + promClient: promClient, + subgraphClient: subgraphClient, + transactor: transactor, + chainState: chainState, + metrics: metrics, + disperserHostName: config.DisperserHostname, + churnerHostName: config.ChurnerHostname, + batcherHealthEndpt: config.BatcherHealthEndpt, + eigenDAGRPCServiceChecker: eigenDAGRPCServiceChecker, + eigenDAHttpServiceChecker: eigenDAHttpServiceChecker, } } @@ -212,6 +225,7 @@ func (s *server) Start() error { metrics.GET("/operator-nonsigning-percentage", s.FetchOperatorsNonsigningPercentageHandler) metrics.GET("/disperser-service-availability", s.FetchDisperserServiceAvailability) metrics.GET("/churner-service-availability", s.FetchChurnerServiceAvailability) + metrics.GET("/batcher-service-availability", s.FetchBatcherAvailability) } swagger := v1.Group("/swagger") { @@ -252,8 +266,8 @@ func (s *server) Start() error { func (s *server) Shutdown() error { - if s.eigenDAServiceChecker != nil { - err := s.eigenDAServiceChecker.CloseConnections() + if s.eigenDAGRPCServiceChecker != nil { + err := s.eigenDAGRPCServiceChecker.CloseConnections() if err != nil { s.logger.Error("Failed to close connections", "error", err) @@ -698,6 +712,59 @@ func (s *server) FetchChurnerServiceAvailability(c *gin.Context) { }) } +// FetchBatcherAvailability godoc +// +// @Summary Get status of EigenDA batcher. +// @Tags Batcher Availability +// @Produce json +// @Success 200 {object} ServiceAvailabilityResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" +// @Router /metrics/batcher-service-availability [get] +func (s *server) FetchBatcherAvailability(c *gin.Context) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchBatcherAvailability", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + // Check Batcher + services := []HttpServiceAvailabilityCheck{{"Batcher", s.batcherHealthEndpt}} + + s.logger.Info("Getting service availability for", "service", services[0].ServiceName, "endpoint", services[0].HealthEndPt) + + availabilityStatuses, err := s.getServiceHealth(c.Request.Context(), services) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchBatcherAvailability") + errorResponse(c, err) + return + } + + s.metrics.IncrementSuccessfulRequestNum("FetchBatcherAvailability") + + // Set the status code to 503 if any of the services are not serving + availabilityStatus := http.StatusOK + for _, status := range availabilityStatuses { + if status.ServiceStatus == "NOT_SERVING" { + availabilityStatus = http.StatusServiceUnavailable + break + } + + if status.ServiceStatus == "UNKNOWN" { + availabilityStatus = http.StatusInternalServerError + break + } + + } + + c.JSON(availabilityStatus, ServiceAvailabilityResponse{ + Meta: Meta{ + Size: len(availabilityStatuses), + }, + Data: availabilityStatuses, + }) +} + func (s *server) getBlobMetadataByBatchesWithLimit(ctx context.Context, limit int) ([]*Batch, []*disperser.BlobMetadata, error) { var ( blobMetadatas = make([]*disperser.BlobMetadata, 0) diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 079edbb1b9..3bf7c46100 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -56,7 +56,7 @@ var ( mockTx = &coremock.MockTransactor{} mockChainState, _ = coremock.MakeChainDataMock(core.OperatorIndex(1)) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) expectedBatchHeaderHash = [32]byte{1, 2, 3} expectedBlobIndex = uint32(1) expectedRequestedAt = uint64(5567830000000000000) @@ -77,6 +77,10 @@ type MockSubgraphClient struct { type MockGRPCConnection struct{} +type MockHttpClient struct { + ShouldSucceed bool +} + func (mc *MockGRPCConnection) Dial(serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { // Here, return a mock connection. How you implement this depends on your testing framework // and what aspects of the gRPC connection you wish to mock. @@ -123,6 +127,16 @@ func (m *MockHealthCheckService) AddResponse(serviceName string, response *grpc_ m.ResponseMap[serviceName] = response } +func (c *MockHttpClient) CheckHealth(url string) (string, error) { + // Simulate success or failure based on the ShouldSucceed flag + + if c.ShouldSucceed { + return "SERVING", nil + } + + return "NOT_SERVING", nil +} + func TestFetchBlobHandler(t *testing.T) { r := setUpRouter() @@ -351,6 +365,72 @@ func TestFetchUnsignedBatchesHandler(t *testing.T) { assert.Equal(t, "0xe22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311", operatorId) } +func TestCheckBatcherHealthExpectServing(t *testing.T) { + r := setUpRouter() + + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: true}) + + r.GET("/v1/metrics/batcher-service-availability", testDataApiServer.FetchBatcherAvailability) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/metrics/batcher-service-availability", nil) + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.ServiceAvailabilityResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + fmt.Printf("Response: %v\n", response) + + assert.Equal(t, http.StatusOK, res.StatusCode) + assert.Equal(t, 1, response.Meta.Size) + assert.Equal(t, 1, len(response.Data)) + + serviceData := response.Data[0] + assert.Equal(t, "Batcher", serviceData.ServiceName) + assert.Equal(t, "SERVING", serviceData.ServiceStatus) +} + +func TestCheckBatcherHealthExpectNotServing(t *testing.T) { + r := setUpRouter() + + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, &MockHttpClient{ShouldSucceed: false}) + + r.GET("/v1/metrics/batcher-service-availability", testDataApiServer.FetchBatcherAvailability) + + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/v1/metrics/batcher-service-availability", nil) + r.ServeHTTP(w, req) + + res := w.Result() + defer res.Body.Close() + + data, err := io.ReadAll(res.Body) + assert.NoError(t, err) + + var response dataapi.ServiceAvailabilityResponse + err = json.Unmarshal(data, &response) + assert.NoError(t, err) + assert.NotNil(t, response) + + fmt.Printf("Response: %v\n", response) + + assert.Equal(t, http.StatusServiceUnavailable, res.StatusCode) + assert.Equal(t, 1, response.Meta.Size) + assert.Equal(t, 1, len(response.Data)) + + serviceData := response.Data[0] + assert.Equal(t, "Batcher", serviceData.ServiceName) + assert.Equal(t, "NOT_SERVING", serviceData.ServiceStatus) +} + func TestFetchDisperserServiceAvailabilityHandler(t *testing.T) { r := setUpRouter() @@ -359,7 +439,7 @@ func TestFetchDisperserServiceAvailabilityHandler(t *testing.T) { Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService, nil) r.GET("/v1/metrics/disperser-service-availability", testDataApiServer.FetchDisperserServiceAvailability) @@ -397,7 +477,7 @@ func TestChurnerServiceAvailabilityHandler(t *testing.T) { Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService, nil) r.GET("/v1/metrics/churner-service-availability", testDataApiServer.FetchChurnerServiceAvailability) @@ -441,7 +521,7 @@ func TestFetchDeregisteredOperatorNoSocketInfoOneOperatorHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfoNoSocketInfo, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -490,7 +570,7 @@ func TestFetchDeregisteredMultipleOperatorsOneWithNoSocketInfoHandler(t *testing // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfoNoSocketInfo, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -558,7 +638,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -603,7 +683,7 @@ func TestFetchDeregisteredOperatorInfoInvalidTimeStampTwoOperatorsHandler(t *tes // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -659,7 +739,7 @@ func TestFetchMetricsDeregisteredOperatorHandler(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -724,7 +804,7 @@ func TestFetchDeregisteredOperatorOffline(t *testing.T) { mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) @@ -777,7 +857,7 @@ func TestFetchDeregisteredOperatorsWithoutDaysQueryParam(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -835,7 +915,7 @@ func TestFetchDeregisteredOperatorInvalidDaysQueryParam(t *testing.T) { mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -877,7 +957,7 @@ func TestFetchDeregisteredOperatorQueryDaysGreaterThan30(t *testing.T) { mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) @@ -923,7 +1003,7 @@ func TestFetchDeregisteredOperatorsMultipleOffline(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -981,7 +1061,7 @@ func TestFetchDeregisteredOperatorOnline(t *testing.T) { mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphOperatorDeregistereds, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorState, nil) @@ -1037,7 +1117,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineOnline(t *testing.T) { // Set up the mock calls for the two operators mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -1104,7 +1184,7 @@ func TestFetchDeregisteredOperatorsMultipleOnline(t *testing.T) { mockSubgraphApi.On("QueryDeregisteredOperatorsGreaterThanBlockTimestamp").Return(subgraphTwoOperatorsDeregistered, nil) mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -1179,7 +1259,7 @@ func TestFetchDeregisteredOperatorsMultipleOfflineSameBlock(t *testing.T) { mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo1, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo2, nil).Once() mockSubgraphApi.On("QueryOperatorInfoByOperatorIdAtBlockNumber").Return(subgraphIndexedOperatorInfo3, nil).Once() - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil) mockSubgraphApi.On("QueryIndexedDeregisteredOperatorsForTimeWindow").Return(indexedOperatorStates, nil) @@ -1241,7 +1321,7 @@ func TestGetServiceAvailability(t *testing.T) { Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService, nil) r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability) w := httptest.NewRecorder() @@ -1284,7 +1364,7 @@ func TestGetServiceAvailability_QueryDisperser(t *testing.T) { Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService, nil) // Initialize the gRPC client pools r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability) @@ -1326,7 +1406,7 @@ func TestGetServiceAvailability_QueryInvalidService(t *testing.T) { mockHealthCheckService.AddResponse("Churner", &grpc_health_v1.HealthCheckResponse{ Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService, nil) r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability) @@ -1361,7 +1441,7 @@ func TestGetServiceAvailability_HealthCheckError(t *testing.T) { mockHealthCheckService.AddResponse("Churner", &grpc_health_v1.HealthCheckResponse{ Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService, nil) r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability) @@ -1399,7 +1479,7 @@ func TestGetServiceAvailability_HealthyUnHealthyService(t *testing.T) { mockHealthCheckService.AddResponse("Churner", &grpc_health_v1.HealthCheckResponse{ Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService, nil) // Initialize the gRPC client pools r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability) @@ -1444,7 +1524,7 @@ func TestGetServiceAvailability_QueryDisperser_MultipleRequests(t *testing.T) { mockHealthCheckService.AddResponse("Churner", &grpc_health_v1.HealthCheckResponse{ Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPCConnection{}, mockHealthCheckService, nil) r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability) @@ -1482,7 +1562,7 @@ func TestGetServiceAvailability_HealthCheckerNilConnection(t *testing.T) { Status: grpc_health_v1.HealthCheckResponse_SERVING, }) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPNilConnection{}, nil) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, dataapi.NewSubgraphClient(mockSubgraphApi, mockLogger), mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger), &MockGRPNilConnection{}, nil, nil) // Initialize the gRPC client pools r.GET("/v1/eigenda/service-availability", testDataApiServer.GetEigenDAServiceAvailability)