diff --git a/cmd/root.go b/cmd/root.go index 12931fd23..60837caf2 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -22,6 +22,8 @@ import ( "path/filepath" "runtime" + "github.com/ory/keto/cmd/status" + "github.com/ory/keto/cmd/expand" "github.com/ory/keto/cmd/check" @@ -53,6 +55,7 @@ func NewRootCmd() *cobra.Command { server.RegisterCommandsRecursive(cmd) check.RegisterCommandsRecursive(cmd) expand.RegisterCommandsRecursive(cmd) + status.RegisterCommandRecursive(cmd) cmd.AddCommand(cmdx.Version(&config.Version, &config.Commit, &config.Date)) diff --git a/cmd/status/root.go b/cmd/status/root.go new file mode 100644 index 000000000..08bcc6999 --- /dev/null +++ b/cmd/status/root.go @@ -0,0 +1,88 @@ +package status + +import ( + "context" + "errors" + "fmt" + + "github.com/ory/x/cmdx" + "github.com/ory/x/stringsx" + "github.com/spf13/cobra" + "google.golang.org/grpc" + grpcHealthV1 "google.golang.org/grpc/health/grpc_health_v1" + + cliclient "github.com/ory/keto/cmd/client" +) + +const ( + FlagBlock = "block" + FlagEndpoint = "endpoint" +) + +func newStatusCmd() *cobra.Command { + var ( + block bool + endpoint string + ) + + cmd := &cobra.Command{ + Use: "status", + Short: "Get the status of the upstream Keto instance.", + Long: "Get a status report about the upstream Keto instance. Can also block until the service is healthy.", + Args: cobra.ExactArgs(0), + RunE: func(cmd *cobra.Command, args []string) error { + var connect func(*cobra.Command) (*grpc.ClientConn, error) + + endpoints := stringsx.RegisteredCases{} + switch endpoint { + case endpoints.AddCase("read"): + connect = cliclient.GetReadConn + case endpoints.AddCase("write"): + connect = cliclient.GetWriteConn + default: + return endpoints.ToUnknownCaseErr(endpoint) + } + + loudPrinter := cmdx.NewLoudOutPrinter(cmd) + + conn, err := connect(cmd) + for block && err != nil { + if !errors.Is(err, context.DeadlineExceeded) { + return err + } + _, _ = loudPrinter.Println("Context deadline exceeded, going to retry.") + conn, err = connect(cmd) + } + + if errors.Is(err, context.DeadlineExceeded) { + _, _ = fmt.Fprintln(cmd.OutOrStdout(), grpcHealthV1.HealthCheckResponse_NOT_SERVING.String()) + return nil + } else if err != nil { + return err + } + + c := grpcHealthV1.NewHealthClient(conn) + + resp, err := c.Check(cmd.Context(), &grpcHealthV1.HealthCheckRequest{}) + if err != nil { + _, _ = fmt.Fprintf(cmd.ErrOrStderr(), "Unable to get a check response: %+v\n", err) + return cmdx.FailSilently(cmd) + } + + _, _ = fmt.Fprintln(cmd.OutOrStdout(), resp.Status.String()) + return nil + }, + } + + cliclient.RegisterRemoteURLFlags(cmd.Flags()) + cmdx.RegisterNoiseFlags(cmd.Flags()) + + cmd.Flags().BoolVarP(&block, FlagBlock, "b", false, "block until the service is healthy") + cmd.Flags().StringVar(&endpoint, FlagEndpoint, "read", "which endpoint to use; one of {read, write}") + + return cmd +} + +func RegisterCommandRecursive(parent *cobra.Command) { + parent.AddCommand(newStatusCmd()) +} diff --git a/internal/driver/registry_default.go b/internal/driver/registry_default.go index 1ab9405ee..44bf3ddbb 100644 --- a/internal/driver/registry_default.go +++ b/internal/driver/registry_default.go @@ -7,6 +7,8 @@ import ( "github.com/julienschmidt/httprouter" "google.golang.org/grpc" + "google.golang.org/grpc/health" + grpcHealthV1 "google.golang.org/grpc/health/grpc_health_v1" "github.com/ory/keto/internal/driver/config" @@ -40,8 +42,9 @@ type ( ee *expand.Engine c *config.Provider - healthH *healthx.Handler - handlers []Handler + healthH *healthx.Handler + healthServer *health.Server + handlers []Handler } Handler interface { RegisterReadRoutes(r *x.ReadRouter) @@ -75,6 +78,14 @@ func (r *RegistryDefault) HealthHandler() *healthx.Handler { return r.healthH } +func (r *RegistryDefault) HealthServer() *health.Server { + if r.healthServer == nil { + r.healthServer = health.NewServer() + } + + return r.healthServer +} + func (r *RegistryDefault) Tracer() *tracing.Tracer { panic("implement me") } @@ -204,6 +215,8 @@ func (r *RegistryDefault) WriteRouter() *x.WriteRouter { func (r *RegistryDefault) ReadGRPCServer() *grpc.Server { s := grpc.NewServer() + grpcHealthV1.RegisterHealthServer(s, r.HealthServer()) + for _, h := range r.allHandlers() { h.RegisterReadGRPC(s) } @@ -214,6 +227,8 @@ func (r *RegistryDefault) ReadGRPCServer() *grpc.Server { func (r *RegistryDefault) WriteGRPCServer() *grpc.Server { s := grpc.NewServer() + grpcHealthV1.RegisterHealthServer(s, r.HealthServer()) + for _, h := range r.allHandlers() { h.RegisterWriteGRPC(s) } diff --git a/internal/e2e/cases_test.go b/internal/e2e/cases_test.go index a26538b75..3ba9c456a 100644 --- a/internal/e2e/cases_test.go +++ b/internal/e2e/cases_test.go @@ -16,6 +16,8 @@ import ( func runCases(c client, nspaces []*namespace.Namespace) func(*testing.T) { return func(t *testing.T) { + c.waitUntilLive(t) + t.Run("case=creates tuple and uses it then", func(t *testing.T) { tuple := &relationtuple.InternalRelationTuple{ Namespace: nspaces[0].Name, diff --git a/internal/e2e/full_suit_test.go b/internal/e2e/full_suit_test.go index 93ce2c939..4f80aa916 100644 --- a/internal/e2e/full_suit_test.go +++ b/internal/e2e/full_suit_test.go @@ -24,6 +24,7 @@ type ( queryTuple(t require.TestingT, q *relationtuple.RelationQuery, opts ...x.PaginationOptionSetter) *relationtuple.GetResponse check(t require.TestingT, r *relationtuple.InternalRelationTuple) bool expand(t require.TestingT, r *relationtuple.SubjectSet, depth int) *expand.Tree + waitUntilLive(t require.TestingT) } ) diff --git a/internal/e2e/grpc_client_test.go b/internal/e2e/grpc_client_test.go index 5b9eba2fb..4ed4903f2 100644 --- a/internal/e2e/grpc_client_test.go +++ b/internal/e2e/grpc_client_test.go @@ -2,10 +2,16 @@ package e2e import ( "bytes" + "context" "encoding/json" "fmt" "strconv" "strings" + "time" + + grpcHealthV1 "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/ory/keto/cmd/status" "github.com/ory/keto/internal/x" @@ -76,3 +82,21 @@ func (g *grpcClient) expand(t require.TestingT, r *relationtuple.SubjectSet, dep require.NoError(t, json.Unmarshal([]byte(out), &res)) return &res } + +func (g *grpcClient) waitUntilLive(t require.TestingT) { + flags := make([]string, len(g.c.PersistentArgs)) + copy(flags, g.c.PersistentArgs) + + for i, f := range flags { + if f == "--"+cmdx.FlagFormat { + flags = append(flags[:i], flags[i+2:]...) + break + } + } + + ctx, cancel := context.WithTimeout(g.c.Ctx, time.Minute) + defer cancel() + + out := cmdx.ExecNoErrCtx(ctx, t, g.c.New(), append(flags, "status", "--"+status.FlagBlock)...) + require.Equal(t, grpcHealthV1.HealthCheckResponse_SERVING.String()+"\n", out) +} diff --git a/internal/e2e/helpers.go b/internal/e2e/helpers.go index 470465d46..346b161bf 100644 --- a/internal/e2e/helpers.go +++ b/internal/e2e/helpers.go @@ -4,18 +4,15 @@ import ( "bytes" "context" "io/ioutil" - "net/http" "path/filepath" "strings" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/ory/keto/internal/x" "github.com/ory/x/configx" - "github.com/ory/x/healthx" "github.com/phayes/freeport" "github.com/spf13/pflag" @@ -134,22 +131,6 @@ func startServer(ctx context.Context, t testing.TB, reg driver.Registry) func() serverErr <- reg.ServeAll(serverCtx) }() - var healthReady = func() error { - ctx, cancel := context.WithTimeout(ctx, 2*time.Second) - defer cancel() - - r, err := http.NewRequestWithContext(ctx, "GET", "http://"+reg.Config().ReadAPIListenOn()+healthx.ReadyCheckPath, nil) - if err != nil { - return err - } - _, err = http.DefaultClient.Do(r) - return err - } - // wait for /health/ready - for err := healthReady(); err != nil; err = healthReady() { - time.Sleep(10 * time.Millisecond) - } - // defer this close function to make sure it is shutdown on test failure as well return func() { // stop the server diff --git a/internal/e2e/rest_client_test.go b/internal/e2e/rest_client_test.go index b65c24798..2fc3653f8 100644 --- a/internal/e2e/rest_client_test.go +++ b/internal/e2e/rest_client_test.go @@ -8,6 +8,9 @@ import ( "io/ioutil" "net/http" "strconv" + "time" + + "github.com/ory/x/healthx" "github.com/ory/keto/internal/x" @@ -101,3 +104,14 @@ func (rc *restClient) expand(t require.TestingT, r *relationtuple.SubjectSet, de return tree } + +func (rc *restClient) waitUntilLive(t require.TestingT) { + var healthReady = func() bool { + _, status := rc.makeRequest(t, "GET", healthx.ReadyCheckPath, "", false) + return status == http.StatusOK + } + // wait for /health/ready + for !healthReady() { + time.Sleep(10 * time.Millisecond) + } +}