diff --git a/config.go b/config.go index dd58f0c..155d84c 100644 --- a/config.go +++ b/config.go @@ -59,7 +59,7 @@ func (c *Config) InitDefaults() error { //nolint:gocyclo,gocognit c.GrpcPool.InitDefaults() if !strings.Contains(c.Listen, ":") { - return errors.E(op, errors.Errorf("mailformed grpc address, provided: %s", c.Listen)) + return errors.E(op, errors.Errorf("malformed grpc address, provided: %s", c.Listen)) } for i := 0; i < len(c.Proto); i++ { diff --git a/health_server.go b/health_server.go new file mode 100644 index 0000000..50cc395 --- /dev/null +++ b/health_server.go @@ -0,0 +1,113 @@ +package grpc + +import ( + "context" + "sync" + + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" +) + +type HealthCheckServer struct { + mu sync.Mutex + grpc_health_v1.HealthCheckRequest + plugin *Plugin + log *zap.Logger + shutdown bool + updates map[grpc_health_v1.Health_WatchServer]chan grpc_health_v1.HealthCheckResponse_ServingStatus + status grpc_health_v1.HealthCheckResponse_ServingStatus +} + +func NewHeathServer(p *Plugin, log *zap.Logger) *HealthCheckServer { + return &HealthCheckServer{ + updates: make(map[grpc_health_v1.Health_WatchServer]chan grpc_health_v1.HealthCheckResponse_ServingStatus, 1), + plugin: p, + log: log, + status: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + } +} + +func (h *HealthCheckServer) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + return &grpc_health_v1.HealthCheckResponse{ + Status: h.status, + }, nil +} + +func (h *HealthCheckServer) Watch(_ *grpc_health_v1.HealthCheckRequest, stream grpc_health_v1.Health_WatchServer) error { + update := make(chan grpc_health_v1.HealthCheckResponse_ServingStatus, 1) + h.mu.Lock() + + // put the initial status + update <- h.status + h.updates[stream] = update + + defer func() { + h.mu.Lock() + delete(h.updates, stream) + h.mu.Unlock() + }() + + h.mu.Unlock() + + var lastStatus grpc_health_v1.HealthCheckResponse_ServingStatus = -1 + + for { + select { + case servingStatus := <-update: + if lastStatus == servingStatus { + h.log.Debug("status won't changed", zap.String("status", lastStatus.String())) + continue + } + lastStatus = servingStatus + + err := stream.Send(&grpc_health_v1.HealthCheckResponse{Status: servingStatus}) + if err != nil { + return status.Error(codes.Canceled, "Stream has ended") + } + case <-stream.Context().Done(): + return status.Error(codes.Canceled, "Stream has ended") + } + } +} + +func (h *HealthCheckServer) SetServingStatus(servingStatus grpc_health_v1.HealthCheckResponse_ServingStatus) { + h.mu.Lock() + if h.shutdown { + h.log.Info("health status changing is ignored, because health service is shutdown") + return + } + h.status = servingStatus + + // clear non relevant statuses + for _, upd := range h.updates { + select { + case <-upd: + default: + } + + // put the most recent one + upd <- servingStatus + } + h.mu.Unlock() +} + +func (h *HealthCheckServer) Shutdown() { + h.mu.Lock() + defer h.mu.Unlock() + + h.shutdown = true + + for _, upd := range h.updates { + select { + case <-upd: + default: + } + } +} + +func (h *HealthCheckServer) RegisterServer(serv *grpc.Server) { + grpc_health_v1.RegisterHealthServer(serv, h) +} diff --git a/plugin.go b/plugin.go index 869a1ef..683c51b 100644 --- a/plugin.go +++ b/plugin.go @@ -18,6 +18,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/encoding" + "google.golang.org/grpc/health/grpc_health_v1" // Will register via init _ "google.golang.org/grpc/encoding/gzip" @@ -29,15 +30,15 @@ const ( ) type Plugin struct { - mu *sync.RWMutex - config *Config - gPool pool.Pool - opts []grpc.ServerOption - services []func(server *grpc.Server) - server *grpc.Server - rrServer server.Server - proxyList []*proxy.Proxy - + mu *sync.RWMutex + config *Config + gPool pool.Pool + opts []grpc.ServerOption + services []func(server *grpc.Server) + server *grpc.Server + rrServer server.Server + proxyList []*proxy.Proxy + healthServer *HealthCheckServer statsExporter *statsExporter log *zap.Logger @@ -113,9 +114,15 @@ func (p *Plugin) Serve() chan error { return errCh } + p.healthServer = NewHeathServer(p, p.log) + p.healthServer.RegisterServer(p.server) + go func() { p.log.Info("grpc server was started", zap.String("address", p.config.Listen)) + + p.healthServer.SetServingStatus(grpc_health_v1.HealthCheckResponse_SERVING) err = p.server.Serve(l) + p.healthServer.Shutdown() if err != nil { // skip errors when stopping the server if stderr.Is(err, grpc.ErrServerStopped) { @@ -135,9 +142,13 @@ func (p *Plugin) Stop() error { p.mu.Lock() defer p.mu.Unlock() + p.healthServer.SetServingStatus(grpc_health_v1.HealthCheckResponse_NOT_SERVING) + if p.server != nil { p.server.Stop() } + + p.healthServer.Shutdown() return nil } @@ -148,6 +159,10 @@ func (p *Plugin) Name() string { func (p *Plugin) Reset() error { p.mu.Lock() defer p.mu.Unlock() + + p.healthServer.SetServingStatus(grpc_health_v1.HealthCheckResponse_NOT_SERVING) + defer p.healthServer.SetServingStatus(grpc_health_v1.HealthCheckResponse_SERVING) + const op = errors.Op("grpc_plugin_reset") p.log.Info("reset signal was received") // destroy old pool