Skip to content

Commit

Permalink
Refactor healthcheck signalling between server and service
Browse files Browse the repository at this point in the history
Simplifies the signalling of healthcheck status from the "servers"
to the "service": instead of using 2 channels to feed healthcheck
status back to the Service.HealthCheck, we just give the server
components direct access to the Healthcheck which they can update
directly. This is possible because the Healthcheck package is
threadsafe (uses `atomic.Value` for state).

This pattern is consisten with how the service's Healtcheck is
passed directly to cmd/collector/app package.

closes #5307

Signed-off-by: Will Sewell <[email protected]>
  • Loading branch information
Will Sewell committed Mar 28, 2024
1 parent 0be0cb9 commit df0ad22
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 186 deletions.
7 changes: 1 addition & 6 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,10 @@ func startQuery(
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, metricsFactory)
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, jt)
server, err := queryApp.NewServer(svc.Logger, svc.HC(), qs, metricsQueryService, qOpts, tm, jt)
if err != nil {
svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err))
}
go func() {
for s := range server.HealthCheckStatus() {
svc.SetHealthCheckStatus(s)
}
}()
if err := server.Start(); err != nil {
svc.Logger.Fatal("Could not start jaeger-query", zap.Error(err))
}
Expand Down
23 changes: 3 additions & 20 deletions cmd/internal/flags/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,16 @@ type Service struct {
MetricsFactory metrics.Factory

signalsChannel chan os.Signal

hcStatusChannel chan healthcheck.Status
}

// NewService creates a new Service.
func NewService(adminPort int) *Service {
signalsChannel := make(chan os.Signal, 1)
hcStatusChannel := make(chan healthcheck.Status)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)

return &Service{
Admin: NewAdminServer(ports.PortToHostPort(adminPort)),
signalsChannel: signalsChannel,
hcStatusChannel: hcStatusChannel,
Admin: NewAdminServer(ports.PortToHostPort(adminPort)),
signalsChannel: signalsChannel,
}
}

Expand All @@ -79,11 +75,6 @@ func (s *Service) AddFlags(flagSet *flag.FlagSet) {
s.Admin.AddFlags(flagSet)
}

// SetHealthCheckStatus sets status of healthcheck
func (s *Service) SetHealthCheckStatus(status healthcheck.Status) {
s.hcStatusChannel <- status
}

// Start bootstraps the service and starts the admin server.
func (s *Service) Start(v *viper.Viper) error {
if err := TryLoadConfigFile(v); err != nil {
Expand Down Expand Up @@ -143,15 +134,7 @@ func (s *Service) HC() *healthcheck.HealthCheck {
func (s *Service) RunAndThen(shutdown func()) {
s.HC().Ready()

statusLoop:
for {
select {
case status := <-s.hcStatusChannel:
s.HC().Set(status)
case <-s.signalsChannel:
break statusLoop
}
}
<-s.signalsChannel

s.Logger.Info("Shutting down")
s.HC().Set(healthcheck.Unavailable)
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/flags/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestStartErrors(t *testing.T) {
go s.RunAndThen(shutdown)

waitForEqual(t, healthcheck.Ready, func() interface{} { return s.HC().Get() })
s.SetHealthCheckStatus(healthcheck.Unavailable)
s.HC().Set(healthcheck.Unavailable)
waitForEqual(t, healthcheck.Unavailable, func() interface{} { return s.HC().Get() })

s.signalsChannel <- os.Interrupt
Expand Down
2 changes: 2 additions & 0 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
Expand Down Expand Up @@ -81,6 +82,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
//nolint
s.server, err = queryApp.NewServer(
s.logger,
healthcheck.New(),

Check warning on line 85 in cmd/jaeger/internal/extension/jaegerquery/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerquery/server.go#L85

Added line #L85 was not covered by tests
qs,
metricsQueryService,
s.makeQueryOptions(),
Expand Down
45 changes: 20 additions & 25 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,23 @@ import (
// Server runs HTTP, Mux and a grpc server
type Server struct {
logger *zap.Logger
healthCheck *healthcheck.HealthCheck
querySvc *querysvc.QueryService
queryOptions *QueryOptions

tracer *jtracer.JTracer // TODO make part of flags.Service

conn net.Listener
grpcConn net.Listener
httpConn net.Listener
cmuxServer cmux.CMux
grpcServer *grpc.Server
httpServer *http.Server
separatePorts bool
unavailableChannel chan healthcheck.Status
conn net.Listener
grpcConn net.Listener
httpConn net.Listener
cmuxServer cmux.CMux
grpcServer *grpc.Server
httpServer *http.Server
separatePorts bool
}

// NewServer creates and initializes Server
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer) (*Server, error) {
func NewServer(logger *zap.Logger, healthCheck *healthcheck.HealthCheck, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer) (*Server, error) {
_, httpPort, err := net.SplitHostPort(options.HTTPHostPort)
if err != nil {
return nil, fmt.Errorf("invalid HTTP server host:port: %w", err)
Expand All @@ -89,22 +89,17 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuery
}

return &Server{
logger: logger,
querySvc: querySvc,
queryOptions: options,
tracer: tracer,
grpcServer: grpcServer,
httpServer: httpServer,
separatePorts: grpcPort != httpPort,
unavailableChannel: make(chan healthcheck.Status),
logger: logger,
healthCheck: healthCheck,
querySvc: querySvc,
queryOptions: options,
tracer: tracer,
grpcServer: grpcServer,
httpServer: httpServer,
separatePorts: grpcPort != httpPort,
}, nil
}

// HealthCheckStatus returns health check status channel a client can subscribe to
func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}

func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, logger *zap.Logger, tracer *jtracer.JTracer) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

Expand Down Expand Up @@ -292,7 +287,7 @@ func (s *Server) Start() error {
s.logger.Error("Could not start HTTP server", zap.Error(err))
}

s.unavailableChannel <- healthcheck.Unavailable
s.healthCheck.Set(healthcheck.Unavailable)
}()

// Start GRPC server concurrently
Expand All @@ -302,7 +297,7 @@ func (s *Server) Start() error {
if err := s.grpcServer.Serve(s.grpcConn); err != nil {
s.logger.Error("Could not start GRPC server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
s.healthCheck.Set(healthcheck.Unavailable)
}()

// Start cmux server concurrently.
Expand All @@ -315,7 +310,7 @@ func (s *Server) Start() error {
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.logger.Error("Could not start multiplexed server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
s.healthCheck.Set(healthcheck.Unavailable)
}()
}

Expand Down
Loading

0 comments on commit df0ad22

Please sign in to comment.