From df0ad225cd3c290e6826e79bf530519d8864854b Mon Sep 17 00:00:00 2001 From: Will Sewell Date: Thu, 28 Mar 2024 09:58:52 +0000 Subject: [PATCH 1/2] Refactor healthcheck signalling between server and service Simplifies the signalling of healthcheck status from the "servers" to the "service": instead of using 2 channels to feed healthcheck status back to the Service.HealthCheck, we just give the server components direct access to the Healthcheck which they can update directly. This is possible because the Healthcheck package is threadsafe (uses `atomic.Value` for state). This pattern is consisten with how the service's Healtcheck is passed directly to cmd/collector/app package. closes #5307 Signed-off-by: Will Sewell --- cmd/all-in-one/main.go | 7 +- cmd/internal/flags/service.go | 23 +---- cmd/internal/flags/service_test.go | 2 +- .../internal/extension/jaegerquery/server.go | 2 + cmd/query/app/server.go | 45 +++++----- cmd/query/app/server_test.go | 87 ++++--------------- cmd/query/app/token_propagation_test.go | 2 +- cmd/query/main.go | 8 +- cmd/remote-storage/app/server.go | 30 +++---- cmd/remote-storage/app/server_test.go | 34 +------- cmd/remote-storage/main.go | 8 +- 11 files changed, 62 insertions(+), 186 deletions(-) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 721757a6079..08ac4313dff 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -283,15 +283,10 @@ func startQuery( ) *queryApp.Server { spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, metricsFactory) qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts) - server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, jt) + server, err := queryApp.NewServer(svc.Logger, svc.HC(), qs, metricsQueryService, qOpts, tm, jt) if err != nil { svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err)) } - go func() { - for s := range server.HealthCheckStatus() { - svc.SetHealthCheckStatus(s) - } - }() if err := server.Start(); err != nil { svc.Logger.Fatal("Could not start jaeger-query", zap.Error(err)) } diff --git a/cmd/internal/flags/service.go b/cmd/internal/flags/service.go index 291659573b8..622a960a4e2 100644 --- a/cmd/internal/flags/service.go +++ b/cmd/internal/flags/service.go @@ -50,20 +50,16 @@ type Service struct { MetricsFactory metrics.Factory signalsChannel chan os.Signal - - hcStatusChannel chan healthcheck.Status } // NewService creates a new Service. func NewService(adminPort int) *Service { signalsChannel := make(chan os.Signal, 1) - hcStatusChannel := make(chan healthcheck.Status) signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) return &Service{ - Admin: NewAdminServer(ports.PortToHostPort(adminPort)), - signalsChannel: signalsChannel, - hcStatusChannel: hcStatusChannel, + Admin: NewAdminServer(ports.PortToHostPort(adminPort)), + signalsChannel: signalsChannel, } } @@ -79,11 +75,6 @@ func (s *Service) AddFlags(flagSet *flag.FlagSet) { s.Admin.AddFlags(flagSet) } -// SetHealthCheckStatus sets status of healthcheck -func (s *Service) SetHealthCheckStatus(status healthcheck.Status) { - s.hcStatusChannel <- status -} - // Start bootstraps the service and starts the admin server. func (s *Service) Start(v *viper.Viper) error { if err := TryLoadConfigFile(v); err != nil { @@ -143,15 +134,7 @@ func (s *Service) HC() *healthcheck.HealthCheck { func (s *Service) RunAndThen(shutdown func()) { s.HC().Ready() -statusLoop: - for { - select { - case status := <-s.hcStatusChannel: - s.HC().Set(status) - case <-s.signalsChannel: - break statusLoop - } - } + <-s.signalsChannel s.Logger.Info("Shutting down") s.HC().Set(healthcheck.Unavailable) diff --git a/cmd/internal/flags/service_test.go b/cmd/internal/flags/service_test.go index 4194b1eab1f..fbdd13fb3cf 100644 --- a/cmd/internal/flags/service_test.go +++ b/cmd/internal/flags/service_test.go @@ -93,7 +93,7 @@ func TestStartErrors(t *testing.T) { go s.RunAndThen(shutdown) waitForEqual(t, healthcheck.Ready, func() interface{} { return s.HC().Get() }) - s.SetHealthCheckStatus(healthcheck.Unavailable) + s.HC().Set(healthcheck.Unavailable) waitForEqual(t, healthcheck.Unavailable, func() interface{} { return s.HC().Get() }) s.signalsChannel <- os.Interrupt diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index bc83ba06ef6..2a7e84a4097 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -14,6 +14,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" queryApp "github.com/jaegertracing/jaeger/cmd/query/app" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" + "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/metrics/disabled" @@ -81,6 +82,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error { //nolint s.server, err = queryApp.NewServer( s.logger, + healthcheck.New(), qs, metricsQueryService, s.makeQueryOptions(), diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 9cdb6ec879d..7882055f65d 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -48,23 +48,23 @@ import ( // Server runs HTTP, Mux and a grpc server type Server struct { logger *zap.Logger + healthCheck *healthcheck.HealthCheck querySvc *querysvc.QueryService queryOptions *QueryOptions tracer *jtracer.JTracer // TODO make part of flags.Service - conn net.Listener - grpcConn net.Listener - httpConn net.Listener - cmuxServer cmux.CMux - grpcServer *grpc.Server - httpServer *http.Server - separatePorts bool - unavailableChannel chan healthcheck.Status + conn net.Listener + grpcConn net.Listener + httpConn net.Listener + cmuxServer cmux.CMux + grpcServer *grpc.Server + httpServer *http.Server + separatePorts bool } // NewServer creates and initializes Server -func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer) (*Server, error) { +func NewServer(logger *zap.Logger, healthCheck *healthcheck.HealthCheck, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer) (*Server, error) { _, httpPort, err := net.SplitHostPort(options.HTTPHostPort) if err != nil { return nil, fmt.Errorf("invalid HTTP server host:port: %w", err) @@ -89,22 +89,17 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuery } return &Server{ - logger: logger, - querySvc: querySvc, - queryOptions: options, - tracer: tracer, - grpcServer: grpcServer, - httpServer: httpServer, - separatePorts: grpcPort != httpPort, - unavailableChannel: make(chan healthcheck.Status), + logger: logger, + healthCheck: healthCheck, + querySvc: querySvc, + queryOptions: options, + tracer: tracer, + grpcServer: grpcServer, + httpServer: httpServer, + separatePorts: grpcPort != httpPort, }, nil } -// HealthCheckStatus returns health check status channel a client can subscribe to -func (s Server) HealthCheckStatus() chan healthcheck.Status { - return s.unavailableChannel -} - func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, logger *zap.Logger, tracer *jtracer.JTracer) (*grpc.Server, error) { var grpcOpts []grpc.ServerOption @@ -292,7 +287,7 @@ func (s *Server) Start() error { s.logger.Error("Could not start HTTP server", zap.Error(err)) } - s.unavailableChannel <- healthcheck.Unavailable + s.healthCheck.Set(healthcheck.Unavailable) }() // Start GRPC server concurrently @@ -302,7 +297,7 @@ func (s *Server) Start() error { if err := s.grpcServer.Serve(s.grpcConn); err != nil { s.logger.Error("Could not start GRPC server", zap.Error(err)) } - s.unavailableChannel <- healthcheck.Unavailable + s.healthCheck.Set(healthcheck.Unavailable) }() // Start cmux server concurrently. @@ -315,7 +310,7 @@ func (s *Server) Start() error { if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { s.logger.Error("Could not start multiplexed server", zap.Error(err)) } - s.unavailableChannel <- healthcheck.Unavailable + s.healthCheck.Set(healthcheck.Unavailable) }() } diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 43e0bcc53a4..8cd0b300bb8 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -20,7 +20,6 @@ import ( "fmt" "net" "net/http" - "sync" "testing" "time" @@ -67,7 +66,7 @@ func TestCreateTLSServerSinglePortError(t *testing.T) { ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem", } - _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, + _, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil, &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg}, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) require.Error(t, err) @@ -81,7 +80,7 @@ func TestCreateTLSGrpcServerError(t *testing.T) { ClientCAPath: "invalid/path", } - _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, + _, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil, &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg}, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) require.Error(t, err) @@ -95,7 +94,7 @@ func TestCreateTLSHttpServerError(t *testing.T) { ClientCAPath: "invalid/path", } - _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, + _, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil, &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg}, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) require.Error(t, err) @@ -340,27 +339,12 @@ func TestServerHTTPTLS(t *testing.T) { spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil) querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) - server, err := NewServer(flagsSvc.Logger, querySvc, nil, - serverOptions, tenancy.NewManager(&tenancy.Options{}), + server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, + nil, serverOptions, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) require.NoError(t, err) require.NoError(t, server.Start()) - var wg sync.WaitGroup - wg.Add(1) - once := sync.Once{} - - go func() { - for s := range server.HealthCheckStatus() { - flagsSvc.HC().Set(s) - if s == healthcheck.Unavailable { - once.Do(func() { - wg.Done() - }) - } - } - }() - var clientError error var clientClose func() error var clientTLSCfg *tls.Config @@ -423,7 +407,6 @@ func TestServerHTTPTLS(t *testing.T) { } } server.Close() - wg.Wait() assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) }) } @@ -502,27 +485,12 @@ func TestServerGRPCTLS(t *testing.T) { spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil) querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) - server, err := NewServer(flagsSvc.Logger, querySvc, nil, - serverOptions, tenancy.NewManager(&tenancy.Options{}), + server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, + nil, serverOptions, tenancy.NewManager(&tenancy.Options{}), jtracer.NoOp()) require.NoError(t, err) require.NoError(t, server.Start()) - var wg sync.WaitGroup - wg.Add(1) - once := sync.Once{} - - go func() { - for s := range server.HealthCheckStatus() { - flagsSvc.HC().Set(s) - if s == healthcheck.Unavailable { - once.Do(func() { - wg.Done() - }) - } - } - }() - var clientError error var client *grpcClient @@ -549,14 +517,13 @@ func TestServerGRPCTLS(t *testing.T) { } require.NoError(t, client.conn.Close()) server.Close() - wg.Wait() assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) }) } } func TestServerBadHostPort(t *testing.T) { - _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, + _, err := NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil, &QueryOptions{ HTTPHostPort: "8080", GRPCHostPort: "127.0.0.1:8081", @@ -568,7 +535,7 @@ func TestServerBadHostPort(t *testing.T) { jtracer.NoOp()) require.Error(t, err) - _, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, + _, err = NewServer(zap.NewNop(), healthcheck.New(), &querysvc.QueryService{}, nil, &QueryOptions{ HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", @@ -600,6 +567,7 @@ func TestServerInUseHostPort(t *testing.T) { t.Run(tc.name, func(t *testing.T) { server, err := NewServer( zap.NewNop(), + healthcheck.New(), &querysvc.QueryService{}, nil, &QueryOptions{ @@ -637,7 +605,7 @@ func TestServerSinglePort(t *testing.T) { spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil) querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) - server, err := NewServer(flagsSvc.Logger, querySvc, nil, + server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil, &QueryOptions{ GRPCHostPort: hostPort, HTTPHostPort: hostPort, @@ -650,23 +618,6 @@ func TestServerSinglePort(t *testing.T) { require.NoError(t, err) require.NoError(t, server.Start()) - var wg sync.WaitGroup - wg.Add(1) - once := sync.Once{} - - go func() { - for s := range server.HealthCheckStatus() { - flagsSvc.HC().Set(s) - if s == healthcheck.Unavailable { - once.Do(func() { - wg.Done() - }) - } - - } - wg.Done() - }() - client := newGRPCClient(t, hostPort) defer client.conn.Close() @@ -678,7 +629,6 @@ func TestServerSinglePort(t *testing.T) { assert.Equal(t, expectedServices, res.Services) server.Close() - wg.Wait() assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) } @@ -694,15 +644,11 @@ func TestServerGracefulExit(t *testing.T) { querySvc := &querysvc.QueryService{} tracer := jtracer.NoOp() - server, err := NewServer(flagsSvc.Logger, querySvc, nil, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, + server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil, + &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tenancy.NewManager(&tenancy.Options{}), tracer) require.NoError(t, err) require.NoError(t, server.Start()) - go func() { - for s := range server.HealthCheckStatus() { - flagsSvc.HC().Set(s) - } - }() // Wait for servers to come up before we can call .Close() // TODO Find a way to wait only as long as necessary. Unconditional sleep slows down the tests. @@ -722,7 +668,7 @@ func TestServerHandlesPortZero(t *testing.T) { querySvc := &querysvc.QueryService{} tracer := jtracer.NoOp() - server, err := NewServer(flagsSvc.Logger, querySvc, nil, + server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tenancy.NewManager(&tenancy.Options{}), tracer) @@ -783,9 +729,8 @@ func TestServerHTTPTenancy(t *testing.T) { dependencyReader := &depsmocks.Reader{} querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) - server, err := NewServer(zap.NewNop(), querySvc, nil, - serverOptions, tenancyMgr, - jtracer.NoOp()) + server, err := NewServer(zap.NewNop(), healthcheck.New(), querySvc, + nil, serverOptions, tenancyMgr, jtracer.NoOp()) require.NoError(t, err) require.NoError(t, server.Start()) diff --git a/cmd/query/app/token_propagation_test.go b/cmd/query/app/token_propagation_test.go index 9fd3aca71fa..89eb9491df4 100644 --- a/cmd/query/app/token_propagation_test.go +++ b/cmd/query/app/token_propagation_test.go @@ -90,7 +90,7 @@ func runQueryService(t *testing.T, esURL string) *Server { require.NoError(t, err) querySvc := querysvc.NewQueryService(spanReader, nil, querysvc.QueryServiceOptions{}) - server, err := NewServer(flagsSvc.Logger, querySvc, nil, + server, err := NewServer(flagsSvc.Logger, flagsSvc.HC(), querySvc, nil, &QueryOptions{ GRPCHostPort: ":0", HTTPHostPort: ":0", diff --git a/cmd/query/main.go b/cmd/query/main.go index ccc9ab81933..23310ee64b1 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -113,17 +113,11 @@ func main() { dependencyReader, *queryServiceOptions) tm := tenancy.NewManager(&queryOpts.Tenancy) - server, err := app.NewServer(svc.Logger, queryService, metricsQueryService, queryOpts, tm, jt) + server, err := app.NewServer(svc.Logger, svc.HC(), queryService, metricsQueryService, queryOpts, tm, jt) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) } - go func() { - for s := range server.HealthCheckStatus() { - svc.SetHealthCheckStatus(s) - } - }() - if err := server.Start(); err != nil { logger.Fatal("Could not start servers", zap.Error(err)) } diff --git a/cmd/remote-storage/app/server.go b/cmd/remote-storage/app/server.go index 008d4302285..ecc29f0ff9b 100644 --- a/cmd/remote-storage/app/server.go +++ b/cmd/remote-storage/app/server.go @@ -35,17 +35,17 @@ import ( // Server runs a gRPC server type Server struct { - logger *zap.Logger - opts *Options + logger *zap.Logger + healthcheck *healthcheck.HealthCheck + opts *Options - grpcConn net.Listener - grpcServer *grpc.Server - unavailableChannel chan healthcheck.Status // used to signal to admin server that gRPC server is unavailable - wg sync.WaitGroup + grpcConn net.Listener + grpcServer *grpc.Server + wg sync.WaitGroup } // NewServer creates and initializes Server. -func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Manager, logger *zap.Logger) (*Server, error) { +func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Manager, logger *zap.Logger, healthcheck *healthcheck.HealthCheck) (*Server, error) { handler, err := createGRPCHandler(storageFactory, logger) if err != nil { return nil, err @@ -57,10 +57,10 @@ func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Man } return &Server{ - logger: logger, - opts: options, - grpcServer: grpcServer, - unavailableChannel: make(chan healthcheck.Status), + logger: logger, + healthcheck: healthcheck, + opts: options, + grpcServer: grpcServer, }, nil } @@ -96,11 +96,6 @@ func createGRPCHandler(f storage.Factory, logger *zap.Logger) (*shared.GRPCHandl return handler, nil } -// HealthCheckStatus returns health check status channel a client can subscribe to -func (s *Server) HealthCheckStatus() chan healthcheck.Status { - return s.unavailableChannel -} - func createGRPCServer(opts *Options, tm *tenancy.Manager, handler *shared.GRPCHandler, logger *zap.Logger) (*grpc.Server, error) { var grpcOpts []grpc.ServerOption @@ -140,7 +135,7 @@ func (s *Server) Start() error { if err := s.grpcServer.Serve(s.grpcConn); err != nil { s.logger.Error("GRPC server exited", zap.Error(err)) } - s.unavailableChannel <- healthcheck.Unavailable + s.healthcheck.Set(healthcheck.Unavailable) }() return nil @@ -152,6 +147,5 @@ func (s *Server) Close() error { s.grpcConn.Close() s.opts.TLSGRPC.Close() s.wg.Wait() - close(s.unavailableChannel) return nil } diff --git a/cmd/remote-storage/app/server_test.go b/cmd/remote-storage/app/server_test.go index 201949e8b2f..f36b13629d4 100644 --- a/cmd/remote-storage/app/server_test.go +++ b/cmd/remote-storage/app/server_test.go @@ -17,7 +17,6 @@ package app import ( "context" "errors" - "sync" "testing" "time" @@ -59,6 +58,7 @@ func TestNewServer_CreateStorageErrors(t *testing.T) { factory, tenancy.NewManager(&tenancy.Options{}), zap.NewNop(), + healthcheck.New(), ) } _, err := f() @@ -80,7 +80,6 @@ func TestNewServer_CreateStorageErrors(t *testing.T) { validateGRPCServer(t, s.grpcConn.Addr().String(), s.grpcServer) s.grpcConn.Close() // causes logged error - <-s.HealthCheckStatus() } func TestServerStart_BadPortErrors(t *testing.T) { @@ -130,6 +129,7 @@ func TestNewServer_TLSConfigError(t *testing.T) { storageMocks.factory, tenancy.NewManager(&tenancy.Options{}), zap.NewNop(), + healthcheck.New(), ) require.Error(t, err) assert.Contains(t, err.Error(), "invalid TLS config") @@ -337,23 +337,11 @@ func TestServerGRPCTLS(t *testing.T) { storageMocks.factory, tm, flagsSvc.Logger, + flagsSvc.HC(), ) require.NoError(t, err) require.NoError(t, server.Start()) - var wg sync.WaitGroup - wg.Add(1) - once := sync.Once{} - - go func() { - for s := range server.HealthCheckStatus() { - flagsSvc.HC().Set(s) - if s == healthcheck.Unavailable { - once.Do(wg.Done) - } - } - }() - var clientError error var client *grpcClient @@ -380,7 +368,6 @@ func TestServerGRPCTLS(t *testing.T) { } require.NoError(t, client.conn.Close()) server.Close() - wg.Wait() assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) }) } @@ -397,22 +384,10 @@ func TestServerHandlesPortZero(t *testing.T) { storageMocks.factory, tenancy.NewManager(&tenancy.Options{}), flagsSvc.Logger, + flagsSvc.HC(), ) require.NoError(t, err) - var wg sync.WaitGroup - wg.Add(1) - once := sync.Once{} - - go func() { - for s := range server.HealthCheckStatus() { - flagsSvc.HC().Set(s) - if s == healthcheck.Unavailable { - once.Do(wg.Done) - } - } - }() - require.NoError(t, server.Start()) const line = "Starting GRPC server" @@ -424,7 +399,6 @@ func TestServerHandlesPortZero(t *testing.T) { validateGRPCServer(t, hostPort, server.grpcServer) server.Close() - wg.Wait() assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) } diff --git a/cmd/remote-storage/main.go b/cmd/remote-storage/main.go index a4f7759a3aa..51166025f6a 100644 --- a/cmd/remote-storage/main.go +++ b/cmd/remote-storage/main.go @@ -77,17 +77,11 @@ func main() { } tm := tenancy.NewManager(&opts.Tenancy) - server, err := app.NewServer(opts, storageFactory, tm, svc.Logger) + server, err := app.NewServer(opts, storageFactory, tm, svc.Logger, svc.HC()) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) } - go func() { - for s := range server.HealthCheckStatus() { - svc.SetHealthCheckStatus(s) - } - }() - if err := server.Start(); err != nil { logger.Fatal("Could not start servers", zap.Error(err)) } From 0c5307eef4af62a655778676bc0001cae9105a31 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Thu, 28 Mar 2024 16:38:55 -0400 Subject: [PATCH 2/2] add TODO Signed-off-by: Yuri Shkuro --- cmd/jaeger/internal/extension/jaegerquery/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index 2a7e84a4097..91c282ca333 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -82,6 +82,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error { //nolint s.server, err = queryApp.NewServer( s.logger, + // TODO propagate healthcheck updates up to the collector's runtime healthcheck.New(), qs, metricsQueryService,