Skip to content

Commit

Permalink
feat: add gRPC health status (#427)
Browse files Browse the repository at this point in the history
  • Loading branch information
zepatrik authored Feb 3, 2021
1 parent eb76913 commit 51c4223
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 21 deletions.
3 changes: 3 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"path/filepath"
"runtime"

"github.com/ory/keto/cmd/status"

"github.com/ory/keto/cmd/expand"

"github.com/ory/keto/cmd/check"
Expand Down Expand Up @@ -53,6 +55,7 @@ func NewRootCmd() *cobra.Command {
server.RegisterCommandsRecursive(cmd)
check.RegisterCommandsRecursive(cmd)
expand.RegisterCommandsRecursive(cmd)
status.RegisterCommandRecursive(cmd)

cmd.AddCommand(cmdx.Version(&config.Version, &config.Commit, &config.Date))

Expand Down
88 changes: 88 additions & 0 deletions cmd/status/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package status

import (
"context"
"errors"
"fmt"

"github.com/ory/x/cmdx"
"github.com/ory/x/stringsx"
"github.com/spf13/cobra"
"google.golang.org/grpc"
grpcHealthV1 "google.golang.org/grpc/health/grpc_health_v1"

cliclient "github.com/ory/keto/cmd/client"
)

const (
FlagBlock = "block"
FlagEndpoint = "endpoint"
)

func newStatusCmd() *cobra.Command {
var (
block bool
endpoint string
)

cmd := &cobra.Command{
Use: "status",
Short: "Get the status of the upstream Keto instance.",
Long: "Get a status report about the upstream Keto instance. Can also block until the service is healthy.",
Args: cobra.ExactArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
var connect func(*cobra.Command) (*grpc.ClientConn, error)

endpoints := stringsx.RegisteredCases{}
switch endpoint {
case endpoints.AddCase("read"):
connect = cliclient.GetReadConn
case endpoints.AddCase("write"):
connect = cliclient.GetWriteConn
default:
return endpoints.ToUnknownCaseErr(endpoint)
}

loudPrinter := cmdx.NewLoudOutPrinter(cmd)

conn, err := connect(cmd)
for block && err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
return err
}
_, _ = loudPrinter.Println("Context deadline exceeded, going to retry.")
conn, err = connect(cmd)
}

if errors.Is(err, context.DeadlineExceeded) {
_, _ = fmt.Fprintln(cmd.OutOrStdout(), grpcHealthV1.HealthCheckResponse_NOT_SERVING.String())
return nil
} else if err != nil {
return err
}

c := grpcHealthV1.NewHealthClient(conn)

resp, err := c.Check(cmd.Context(), &grpcHealthV1.HealthCheckRequest{})
if err != nil {
_, _ = fmt.Fprintf(cmd.ErrOrStderr(), "Unable to get a check response: %+v\n", err)
return cmdx.FailSilently(cmd)
}

_, _ = fmt.Fprintln(cmd.OutOrStdout(), resp.Status.String())
return nil
},
}

cliclient.RegisterRemoteURLFlags(cmd.Flags())
cmdx.RegisterNoiseFlags(cmd.Flags())

cmd.Flags().BoolVarP(&block, FlagBlock, "b", false, "block until the service is healthy")
cmd.Flags().StringVar(&endpoint, FlagEndpoint, "read", "which endpoint to use; one of {read, write}")

return cmd
}

func RegisterCommandRecursive(parent *cobra.Command) {
parent.AddCommand(newStatusCmd())
}
19 changes: 17 additions & 2 deletions internal/driver/registry_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

"github.com/julienschmidt/httprouter"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
grpcHealthV1 "google.golang.org/grpc/health/grpc_health_v1"

"github.com/ory/keto/internal/driver/config"

Expand Down Expand Up @@ -40,8 +42,9 @@ type (
ee *expand.Engine
c *config.Provider

healthH *healthx.Handler
handlers []Handler
healthH *healthx.Handler
healthServer *health.Server
handlers []Handler
}
Handler interface {
RegisterReadRoutes(r *x.ReadRouter)
Expand Down Expand Up @@ -75,6 +78,14 @@ func (r *RegistryDefault) HealthHandler() *healthx.Handler {
return r.healthH
}

func (r *RegistryDefault) HealthServer() *health.Server {
if r.healthServer == nil {
r.healthServer = health.NewServer()
}

return r.healthServer
}

func (r *RegistryDefault) Tracer() *tracing.Tracer {
panic("implement me")
}
Expand Down Expand Up @@ -204,6 +215,8 @@ func (r *RegistryDefault) WriteRouter() *x.WriteRouter {
func (r *RegistryDefault) ReadGRPCServer() *grpc.Server {
s := grpc.NewServer()

grpcHealthV1.RegisterHealthServer(s, r.HealthServer())

for _, h := range r.allHandlers() {
h.RegisterReadGRPC(s)
}
Expand All @@ -214,6 +227,8 @@ func (r *RegistryDefault) ReadGRPCServer() *grpc.Server {
func (r *RegistryDefault) WriteGRPCServer() *grpc.Server {
s := grpc.NewServer()

grpcHealthV1.RegisterHealthServer(s, r.HealthServer())

for _, h := range r.allHandlers() {
h.RegisterWriteGRPC(s)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/e2e/cases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

func runCases(c client, nspaces []*namespace.Namespace) func(*testing.T) {
return func(t *testing.T) {
c.waitUntilLive(t)

t.Run("case=creates tuple and uses it then", func(t *testing.T) {
tuple := &relationtuple.InternalRelationTuple{
Namespace: nspaces[0].Name,
Expand Down
1 change: 1 addition & 0 deletions internal/e2e/full_suit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type (
queryTuple(t require.TestingT, q *relationtuple.RelationQuery, opts ...x.PaginationOptionSetter) *relationtuple.GetResponse
check(t require.TestingT, r *relationtuple.InternalRelationTuple) bool
expand(t require.TestingT, r *relationtuple.SubjectSet, depth int) *expand.Tree
waitUntilLive(t require.TestingT)
}
)

Expand Down
24 changes: 24 additions & 0 deletions internal/e2e/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package e2e

import (
"bytes"
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

grpcHealthV1 "google.golang.org/grpc/health/grpc_health_v1"

"github.com/ory/keto/cmd/status"

"github.com/ory/keto/internal/x"

Expand Down Expand Up @@ -76,3 +82,21 @@ func (g *grpcClient) expand(t require.TestingT, r *relationtuple.SubjectSet, dep
require.NoError(t, json.Unmarshal([]byte(out), &res))
return &res
}

func (g *grpcClient) waitUntilLive(t require.TestingT) {
flags := make([]string, len(g.c.PersistentArgs))
copy(flags, g.c.PersistentArgs)

for i, f := range flags {
if f == "--"+cmdx.FlagFormat {
flags = append(flags[:i], flags[i+2:]...)
break
}
}

ctx, cancel := context.WithTimeout(g.c.Ctx, time.Minute)
defer cancel()

out := cmdx.ExecNoErrCtx(ctx, t, g.c.New(), append(flags, "status", "--"+status.FlagBlock)...)
require.Equal(t, grpcHealthV1.HealthCheckResponse_SERVING.String()+"\n", out)
}
19 changes: 0 additions & 19 deletions internal/e2e/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@ import (
"bytes"
"context"
"io/ioutil"
"net/http"
"path/filepath"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/ory/keto/internal/x"

"github.com/ory/x/configx"
"github.com/ory/x/healthx"
"github.com/phayes/freeport"
"github.com/spf13/pflag"

Expand Down Expand Up @@ -134,22 +131,6 @@ func startServer(ctx context.Context, t testing.TB, reg driver.Registry) func()
serverErr <- reg.ServeAll(serverCtx)
}()

var healthReady = func() error {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

r, err := http.NewRequestWithContext(ctx, "GET", "http://"+reg.Config().ReadAPIListenOn()+healthx.ReadyCheckPath, nil)
if err != nil {
return err
}
_, err = http.DefaultClient.Do(r)
return err
}
// wait for /health/ready
for err := healthReady(); err != nil; err = healthReady() {
time.Sleep(10 * time.Millisecond)
}

// defer this close function to make sure it is shutdown on test failure as well
return func() {
// stop the server
Expand Down
14 changes: 14 additions & 0 deletions internal/e2e/rest_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"io/ioutil"
"net/http"
"strconv"
"time"

"github.com/ory/x/healthx"

"github.com/ory/keto/internal/x"

Expand Down Expand Up @@ -101,3 +104,14 @@ func (rc *restClient) expand(t require.TestingT, r *relationtuple.SubjectSet, de

return tree
}

func (rc *restClient) waitUntilLive(t require.TestingT) {
var healthReady = func() bool {
_, status := rc.makeRequest(t, "GET", healthx.ReadyCheckPath, "", false)
return status == http.StatusOK
}
// wait for /health/ready
for !healthReady() {
time.Sleep(10 * time.Millisecond)
}
}

0 comments on commit 51c4223

Please sign in to comment.