Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor healthcheck signalling between server and service #5308

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,10 @@ func startQuery(
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, metricsFactory)
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, jt)
server, err := queryApp.NewServer(svc.Logger, svc.HC(), qs, metricsQueryService, qOpts, tm, jt)
if err != nil {
svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err))
}
go func() {
for s := range server.HealthCheckStatus() {
svc.SetHealthCheckStatus(s)
}
}()
if err := server.Start(); err != nil {
svc.Logger.Fatal("Could not start jaeger-query", zap.Error(err))
}
Expand Down
23 changes: 3 additions & 20 deletions cmd/internal/flags/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,16 @@ type Service struct {
MetricsFactory metrics.Factory

signalsChannel chan os.Signal

hcStatusChannel chan healthcheck.Status
}

// NewService creates a new Service.
func NewService(adminPort int) *Service {
signalsChannel := make(chan os.Signal, 1)
hcStatusChannel := make(chan healthcheck.Status)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)

return &Service{
Admin: NewAdminServer(ports.PortToHostPort(adminPort)),
signalsChannel: signalsChannel,
hcStatusChannel: hcStatusChannel,
Admin: NewAdminServer(ports.PortToHostPort(adminPort)),
signalsChannel: signalsChannel,
}
}

Expand All @@ -79,11 +75,6 @@ func (s *Service) AddFlags(flagSet *flag.FlagSet) {
s.Admin.AddFlags(flagSet)
}

// SetHealthCheckStatus sets status of healthcheck
func (s *Service) SetHealthCheckStatus(status healthcheck.Status) {
s.hcStatusChannel <- status
}

// Start bootstraps the service and starts the admin server.
func (s *Service) Start(v *viper.Viper) error {
if err := TryLoadConfigFile(v); err != nil {
Expand Down Expand Up @@ -143,15 +134,7 @@ func (s *Service) HC() *healthcheck.HealthCheck {
func (s *Service) RunAndThen(shutdown func()) {
s.HC().Ready()

statusLoop:
for {
select {
case status := <-s.hcStatusChannel:
s.HC().Set(status)
case <-s.signalsChannel:
break statusLoop
}
}
<-s.signalsChannel

s.Logger.Info("Shutting down")
s.HC().Set(healthcheck.Unavailable)
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/flags/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestStartErrors(t *testing.T) {
go s.RunAndThen(shutdown)

waitForEqual(t, healthcheck.Ready, func() interface{} { return s.HC().Get() })
s.SetHealthCheckStatus(healthcheck.Unavailable)
s.HC().Set(healthcheck.Unavailable)
waitForEqual(t, healthcheck.Unavailable, func() interface{} { return s.HC().Get() })

s.signalsChannel <- os.Interrupt
Expand Down
2 changes: 2 additions & 0 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
Expand Down Expand Up @@ -81,6 +82,7 @@
//nolint
s.server, err = queryApp.NewServer(
s.logger,
healthcheck.New(),

Check warning on line 85 in cmd/jaeger/internal/extension/jaegerquery/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerquery/server.go#L85

Added line #L85 was not covered by tests
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
qs,
metricsQueryService,
s.makeQueryOptions(),
Expand Down
45 changes: 20 additions & 25 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,23 @@ import (
// Server runs HTTP, Mux and a grpc server
type Server struct {
logger *zap.Logger
healthCheck *healthcheck.HealthCheck
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt like this was nicer ordering because this felt related to the logger to me since they both come from the service and are both related to "reporting status".

querySvc *querysvc.QueryService
queryOptions *QueryOptions

tracer *jtracer.JTracer // TODO make part of flags.Service

conn net.Listener
grpcConn net.Listener
httpConn net.Listener
cmuxServer cmux.CMux
grpcServer *grpc.Server
httpServer *http.Server
separatePorts bool
unavailableChannel chan healthcheck.Status
conn net.Listener
grpcConn net.Listener
httpConn net.Listener
cmuxServer cmux.CMux
grpcServer *grpc.Server
httpServer *http.Server
separatePorts bool
}

// NewServer creates and initializes Server
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer) (*Server, error) {
func NewServer(logger *zap.Logger, healthCheck *healthcheck.HealthCheck, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer) (*Server, error) {
_, httpPort, err := net.SplitHostPort(options.HTTPHostPort)
if err != nil {
return nil, fmt.Errorf("invalid HTTP server host:port: %w", err)
Expand All @@ -89,22 +89,17 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuery
}

return &Server{
logger: logger,
querySvc: querySvc,
queryOptions: options,
tracer: tracer,
grpcServer: grpcServer,
httpServer: httpServer,
separatePorts: grpcPort != httpPort,
unavailableChannel: make(chan healthcheck.Status),
logger: logger,
healthCheck: healthCheck,
querySvc: querySvc,
queryOptions: options,
tracer: tracer,
grpcServer: grpcServer,
httpServer: httpServer,
separatePorts: grpcPort != httpPort,
}, nil
}

// HealthCheckStatus returns health check status channel a client can subscribe to
func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}

func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, logger *zap.Logger, tracer *jtracer.JTracer) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

Expand Down Expand Up @@ -292,7 +287,7 @@ func (s *Server) Start() error {
s.logger.Error("Could not start HTTP server", zap.Error(err))
}

s.unavailableChannel <- healthcheck.Unavailable
s.healthCheck.Set(healthcheck.Unavailable)
}()

// Start GRPC server concurrently
Expand All @@ -302,7 +297,7 @@ func (s *Server) Start() error {
if err := s.grpcServer.Serve(s.grpcConn); err != nil {
s.logger.Error("Could not start GRPC server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
s.healthCheck.Set(healthcheck.Unavailable)
}()

// Start cmux server concurrently.
Expand All @@ -315,7 +310,7 @@ func (s *Server) Start() error {
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.logger.Error("Could not start multiplexed server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
s.healthCheck.Set(healthcheck.Unavailable)
}()
}

Expand Down
Loading
Loading