Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(healtchecks): add grpc native healthchecks #12

Merged
merged 1 commit into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *Config) InitDefaults() error { //nolint:gocyclo,gocognit
c.GrpcPool.InitDefaults()

if !strings.Contains(c.Listen, ":") {
return errors.E(op, errors.Errorf("mailformed grpc address, provided: %s", c.Listen))
return errors.E(op, errors.Errorf("malformed grpc address, provided: %s", c.Listen))
}

for i := 0; i < len(c.Proto); i++ {
Expand Down
113 changes: 113 additions & 0 deletions health_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package grpc

import (
"context"
"sync"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)

type HealthCheckServer struct {
mu sync.Mutex
grpc_health_v1.HealthCheckRequest
plugin *Plugin
log *zap.Logger
shutdown bool
updates map[grpc_health_v1.Health_WatchServer]chan grpc_health_v1.HealthCheckResponse_ServingStatus
status grpc_health_v1.HealthCheckResponse_ServingStatus
}

func NewHeathServer(p *Plugin, log *zap.Logger) *HealthCheckServer {
return &HealthCheckServer{
updates: make(map[grpc_health_v1.Health_WatchServer]chan grpc_health_v1.HealthCheckResponse_ServingStatus, 1),
plugin: p,
log: log,
status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
}
}

func (h *HealthCheckServer) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{
Status: h.status,
}, nil
}

func (h *HealthCheckServer) Watch(_ *grpc_health_v1.HealthCheckRequest, stream grpc_health_v1.Health_WatchServer) error {
update := make(chan grpc_health_v1.HealthCheckResponse_ServingStatus, 1)
h.mu.Lock()

// put the initial status
update <- h.status
h.updates[stream] = update

defer func() {
h.mu.Lock()
delete(h.updates, stream)
h.mu.Unlock()
}()

h.mu.Unlock()

var lastStatus grpc_health_v1.HealthCheckResponse_ServingStatus = -1

for {
select {
case servingStatus := <-update:
if lastStatus == servingStatus {
h.log.Debug("status won't changed", zap.String("status", lastStatus.String()))
continue
}
lastStatus = servingStatus

err := stream.Send(&grpc_health_v1.HealthCheckResponse{Status: servingStatus})
if err != nil {
return status.Error(codes.Canceled, "Stream has ended")
}
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Stream has ended")
}
}
}

func (h *HealthCheckServer) SetServingStatus(servingStatus grpc_health_v1.HealthCheckResponse_ServingStatus) {
h.mu.Lock()
if h.shutdown {
h.log.Info("health status changing is ignored, because health service is shutdown")
return
}
h.status = servingStatus

// clear non relevant statuses
for _, upd := range h.updates {
select {
case <-upd:
default:
}

// put the most recent one
upd <- servingStatus
}
h.mu.Unlock()
}

func (h *HealthCheckServer) Shutdown() {
h.mu.Lock()
defer h.mu.Unlock()

h.shutdown = true

for _, upd := range h.updates {
select {
case <-upd:
default:
}
}
}

func (h *HealthCheckServer) RegisterServer(serv *grpc.Server) {
grpc_health_v1.RegisterHealthServer(serv, h)
}
33 changes: 24 additions & 9 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/health/grpc_health_v1"

// Will register via init
_ "google.golang.org/grpc/encoding/gzip"
Expand All @@ -29,15 +30,15 @@ const (
)

type Plugin struct {
mu *sync.RWMutex
config *Config
gPool pool.Pool
opts []grpc.ServerOption
services []func(server *grpc.Server)
server *grpc.Server
rrServer server.Server
proxyList []*proxy.Proxy

mu *sync.RWMutex
config *Config
gPool pool.Pool
opts []grpc.ServerOption
services []func(server *grpc.Server)
server *grpc.Server
rrServer server.Server
proxyList []*proxy.Proxy
healthServer *HealthCheckServer
statsExporter *statsExporter

log *zap.Logger
Expand Down Expand Up @@ -113,9 +114,15 @@ func (p *Plugin) Serve() chan error {
return errCh
}

p.healthServer = NewHeathServer(p, p.log)
p.healthServer.RegisterServer(p.server)

go func() {
p.log.Info("grpc server was started", zap.String("address", p.config.Listen))

p.healthServer.SetServingStatus(grpc_health_v1.HealthCheckResponse_SERVING)
err = p.server.Serve(l)
p.healthServer.Shutdown()
if err != nil {
// skip errors when stopping the server
if stderr.Is(err, grpc.ErrServerStopped) {
Expand All @@ -135,9 +142,13 @@ func (p *Plugin) Stop() error {
p.mu.Lock()
defer p.mu.Unlock()

p.healthServer.SetServingStatus(grpc_health_v1.HealthCheckResponse_NOT_SERVING)

if p.server != nil {
p.server.Stop()
}

p.healthServer.Shutdown()
return nil
}

Expand All @@ -148,6 +159,10 @@ func (p *Plugin) Name() string {
func (p *Plugin) Reset() error {
p.mu.Lock()
defer p.mu.Unlock()

p.healthServer.SetServingStatus(grpc_health_v1.HealthCheckResponse_NOT_SERVING)
defer p.healthServer.SetServingStatus(grpc_health_v1.HealthCheckResponse_SERVING)

const op = errors.Op("grpc_plugin_reset")
p.log.Info("reset signal was received")
// destroy old pool
Expand Down