Skip to content

Commit

Permalink
Adding healthz endpoint to IPamD
Browse files Browse the repository at this point in the history
  • Loading branch information
nithu0115 authored and mogren committed Jul 31, 2019
1 parent 2ceb59c commit 6ba7d76
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 3 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ endif
build-linux:
GOOS=linux GOARCH=$(ARCH) CGO_ENABLED=0 go build -o aws-k8s-agent -ldflags "$(LDFLAGS)"
GOOS=linux GOARCH=$(ARCH) CGO_ENABLED=0 go build -o aws-cni -ldflags "$(LDFLAGS)" ./plugins/routed-eni/
GOOS=linux GOARCH=$(ARCH) CGO_ENABLED=0 go build -o grpc_health_probe -ldflags "$(LDFLAGS)" ./client/health-check/

# Download portmap plugin
download-portmap:
Expand Down
138 changes: 138 additions & 0 deletions client/health-check/grpc_health_probe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package main

import (
"context"
"flag"
"log"
"os"
"os/signal"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)

var (
userAgent string
remoteURL string
serviceName string
connTimeoutDur time.Duration = time.Second
rpcTimeoutDur time.Duration = time.Second
verbose bool
)

const (
// StatusInvalidArguments indicates specified invalid arguments.
StatusInvalidArguments = 1
// StatusConnectionFailure indicates connection failed.
StatusConnectionFailure = 2
// StatusRPCFailure indicates rpc failed.
StatusRPCFailure = 3
// StatusUnhealthy indicates rpc succeeded but indicates unhealthy service.
StatusUnhealthy = 4
)

func init() {
log.SetFlags(0)
flag.StringVar(&remoteURL, "addr", "", "(required) tcp host:port to connect")
flag.StringVar(&serviceName, "service", "", "service name to check (default: \"\")")
flag.StringVar(&userAgent, "user-agent", "grpc_health_probe", "user-agent header value of health check requests")
// timeouts
flag.DurationVar(&connTimeoutDur, "connect-timeout", connTimeoutDur, "timeout for establishing connection")
flag.DurationVar(&rpcTimeoutDur, "rpc-timeout", rpcTimeoutDur, "timeout for health check rpc")
// verbose
flag.BoolVar(&verbose, "v", false, "verbose logs")

flag.Parse()

argError := func(s string, v ...interface{}) {
log.Printf("error: "+s, v...)
os.Exit(StatusInvalidArguments)
}

if remoteURL == "" {
argError("--addr not specified")
}

if connTimeoutDur <= 0 {
argError("--connect-timeout must be greater than zero (specified: %v)", connTimeoutDur)
}
if rpcTimeoutDur <= 0 {
argError("--rpc-timeout must be greater than zero (specified: %v)", rpcTimeoutDur)
}
if verbose {
log.Printf("parsed options:")
log.Printf("> remoteUrl=%s conn-timeout=%v rpc-timeout=%v", remoteURL, connTimeoutDur, rpcTimeoutDur)
}
}

func main() {
ctx, cancel := context.WithCancel(context.Background())

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
sig := <-c
if sig == os.Interrupt {
log.Printf("cancellation received")
cancel()
return
}
}()

opts := []grpc.DialOption{
grpc.WithUserAgent(userAgent),
grpc.WithBlock()}

opts = append(opts, grpc.WithInsecure())

if verbose {
log.Print("establishing connection")
}
connStart := time.Now()
dialCtx, cancel2 := context.WithTimeout(ctx, connTimeoutDur)
defer cancel2()
conn, err := grpc.DialContext(dialCtx, remoteURL, opts...)
if err != nil {
if err == context.DeadlineExceeded {
log.Printf("timeout: failed to connect service %q within %v", remoteURL, connTimeoutDur)
} else {
log.Printf("error: failed to connect service at %q: %+v", remoteURL, err)
}
os.Exit(StatusConnectionFailure)
}
connDuration := time.Since(connStart)
defer conn.Close()
if verbose {
log.Printf("connection establisted (took %v)", connDuration)
}

rpcStart := time.Now()
rpcCtx, rpcCancel := context.WithTimeout(ctx, rpcTimeoutDur)
defer rpcCancel()
resp, err := healthpb.NewHealthClient(conn).Check(rpcCtx, &healthpb.HealthCheckRequest{Service: serviceName})
log.Print(resp)
if err != nil {
if stat, ok := status.FromError(err); ok && stat.Code() == codes.Unimplemented {
log.Printf("error: this server does not implement the grpc health protocol (grpc.health.v1.Health)")
} else if stat, ok := status.FromError(err); ok && stat.Code() == codes.DeadlineExceeded {
log.Printf("timeout: health rpc did not complete within %v", rpcTimeoutDur)
} else {
log.Printf("error: health rpc failed: %+v", err)
}
os.Exit(StatusRPCFailure)
}
rpcDuration := time.Since(rpcStart)

if resp.GetStatus() != healthpb.HealthCheckResponse_SERVING {
log.Printf("service unhealthy (responded with %q)", resp.GetStatus().String())
os.Exit(StatusUnhealthy)
}
if verbose {
log.Printf("time elapsed: connect=%v rpc=%v", connDuration, rpcDuration)
}
log.Printf("status: %v", resp.GetStatus().String())
}

8 changes: 8 additions & 0 deletions config/v1.5/aws-k8s-cni.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ spec:
- containerPort: 61678
name: metrics
name: aws-node
#readinessProbe:
# exec:
# command: ["/app/grpc_health_probe", "-addr=:50051"]
# initialDelaySeconds: 5
#livenessProbe:
# exec:
# command: ["/app/grpc_health_probe", "-addr=:50051"]
# initialDelaySeconds: 5
env:
- name: AWS_VPC_K8S_CNI_LOGLEVEL
value: DEBUG
Expand Down
8 changes: 8 additions & 0 deletions ipamd/rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"

log "github.com/cihub/seelog"
Expand All @@ -34,10 +35,16 @@ const (
ipamdgRPCaddress = "127.0.0.1:50051"
)

// server controls RPC service responses.
type server struct {
ipamContext *IPAMContext
}

// Check is for health checking.
func (s *server) Check(ctx context.Context, req *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
return &healthpb.HealthCheckResponse{Status: healthpb.HealthCheckResponse_SERVING}, nil
}

// AddNetwork processes CNI add network request and return an IP address for container
func (s *server) AddNetwork(ctx context.Context, in *pb.AddNetworkRequest) (*pb.AddNetworkReply, error) {
log.Infof("Received AddNetwork for NS %s, Pod %s, NameSpace %s, Container %s, ifname %s",
Expand Down Expand Up @@ -98,6 +105,7 @@ func (c *IPAMContext) RunRPCHandler() error {
}
s := grpc.NewServer()
pb.RegisterCNIBackendServer(s, &server{ipamContext: c})
healthpb.RegisterHealthServer(s, &server{})
// Register reflection service on gRPC server.
reflection.Register(s)
if err := s.Serve(lis); err != nil {
Expand Down
1 change: 1 addition & 0 deletions scripts/dockerfiles/Dockerfile.release
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ COPY --from=builder /go/src/github.com/aws/amazon-vpc-cni-k8s/misc/10-aws.confli
COPY --from=builder /go/src/github.com/aws/amazon-vpc-cni-k8s/portmap /app

COPY --from=builder /go/src/github.com/aws/amazon-vpc-cni-k8s/aws-k8s-agent /app
COPY --from=builder /go/src/github.com/aws/amazon-vpc-cni-k8s/grpc_health_probe /app
COPY --from=builder /go/src/github.com/aws/amazon-vpc-cni-k8s/scripts/aws-cni-support.sh /app
COPY --from=builder /go/src/github.com/aws/amazon-vpc-cni-k8s/scripts/install-aws.sh /app
ENTRYPOINT /app/install-aws.sh
Loading

0 comments on commit 6ba7d76

Please sign in to comment.