Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NDC SDK PoC #1

Merged
merged 12 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
name: Unit tests

on:
{}
# pull_request:
# push:
# paths:
# - "**.go"
# - "go.mod"
# - "go.sum"
# - ".github/workflows/*.yml"
# - "example/hasura/docker-compose.yaml"

jobs:
test-go:
name: Run Go lint and unit tests
runs-on: ubuntu-20.04
permissions:
pull-requests: write
steps:
- name: Checkout
uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version: "1.20"
- uses: actions/cache@v3
with:
path: |
~/go/pkg/mod
~/.cache/go-build
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Install dependencies
run: |
go get -t -v ./...
go install ./...
- name: Format
run: diff -u <(echo -n) <(gofmt -d -s .)
- name: Vet
run: go vet ./...
- name: Run Go unit tests
run: go test -v -race -timeout 3m -coverprofile=coverage.out ./...
- name: Go coverage format
if: ${{ github.event_name == 'pull_request_target' }}
run: |
go get github.com/boumenot/gocover-cobertura
go install github.com/boumenot/gocover-cobertura
gocover-cobertura < coverage.out > coverage.xml
- name: Code Coverage Summary Report
uses: irongut/[email protected]
if: ${{ github.event_name == 'pull_request_target' }}
with:
filename: coverage.xml
badge: true
fail_below_min: true
format: markdown
hide_branch_rate: false
hide_complexity: true
indicators: true
output: both
thresholds: "60 80"
- name: Add Coverage PR Comment
uses: marocchino/sticky-pull-request-comment@v2
if: ${{ github.event_name == 'pull_request_target' }}
with:
path: code-coverage-results.md
- name: Dump docker logs on failure
if: failure()
uses: jwalton/gh-docker-logs@v2
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,12 @@
# Dependency directories (remove the comment below to include it)
# vendor/

# Typegen third-party files
typegen/*
!typegen/regenerate-schema.sh

# Go workspace file
go.work

.idea/
coverage.out
93 changes: 93 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Native Data Connector SDK for Go

This SDK is mostly analogous to the Rust SDK, except where necessary.

All functions of the Connector interface are analogous to their Rust counterparts, with the addition of `GetRawConfigurationSchema` which does exactly what it sounds like.

## Components

- Connector HTTP server
- Configuration server
- Authentication
- Observability with OpenTelemetry and Prometheus

## Using this SDK

The SDK exports a `Start` function, which takes a `connector` object, that is an object that implements the `Connector` interface defined in [connector/types.go](connector/types.go)

This function should be your starting point.

A connector can thus start likes so:

```go
import "github.com/hasura/ndc-sdk-go/connector"

type RawConfiguration struct{ ... }
type Configuration struct { ... }
type State struct { ... }
type Connector struct { ... }

/* implementation of the Connector interface removed for brevity */

func main() {

if err := connector.Start[RawConfiguration, Configuration, State](&Connector{}); err != nil {
panic(err)
}
}
```

The `Start` function create a CLI application with following commands:

```sh
Commands:
serve
Serve the NDC connector.

Flags:
--configuration=STRING Configuration file path ($CONFIGURATION).
--inline-config Inline JSON string or configuration file? ($INLINE_CONFIG)
--port=8100 Serve Port ($PORT).
--service-token-secret=STRING Service token secret ($SERVICE_TOKEN_SECRET).
--otlp-endpoint=STRING OpenTelemetry receiver endpoint that is set as default for all types ($OTLP_ENDPOINT).
--otlp-traces-endpoint=STRING OpenTelemetry endpoint for traces ($OTLP_TRACES_ENDPOINT).
--otlp-insecure Disable LTS for OpenTelemetry gRPC exporters ($OTLP_INSECURE).
--otlp-metrics-endpoint=STRING OpenTelemetry endpoint for metrics ($OTLP_METRICS_ENDPOINT).
--service-name=STRING OpenTelemetry service name ($OTEL_SERVICE_NAME).
--log-level="info" Log level ($LOG_LEVEL).

configuration serve
Serve the NDC configuration service.

Flags:
--port=8100 Serve Port ($PORT).
--log-level="info" Log level ($LOG_LEVEL).
```

Please refer to the [NDC Spec](https://hasura.github.io/ndc-spec/) for details on implementing the Connector interface, or see [examples](./examples).

## Observability

### OpenTelemetry

OpenTelemetry exporter is disabled by default unless one of `--otlp-endpoint`, `--otlp-traces-endpoint` or `--otlp-metrics-endpoint` argument is set. The SDK automatically detects either HTTP or gRPC protocol by the URL scheme. For example:

- `http://localhost:4318`: HTTP
- `localhost:4317`: gRPC

The SDK can also detect TLS connections via http(s). However, if you want to disable TLS for gRPC, you must add `--otlp-insecure` the flag.

### Prometheus

Prometheus metrics are exported via the `/metrics` endpoint.

## Regenerating Schema Types

The NDC spec types are borrowed from ndc-sdk-typescript that are generated from the NDC Spec Rust types.
Then the Go types are generated from that JSON Schema document into `./schema/schema.generated.go`.

In order to regenerate the types, run

```
> cd typegen && ./regenerate-schema.sh
```
83 changes: 83 additions & 0 deletions connector/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package connector

import (
"fmt"

"github.com/alecthomas/kong"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

var cli struct {
Serve struct {
Configuration string `help:"Configuration file path." env:"CONFIGURATION"`
InlineConfig bool `help:"Inline JSON string or configuration file?" env:"INLINE_CONFIG"`
Port uint `help:"Serve Port." env:"PORT" default:"8100"`
ServiceTokenSecret string `help:"Service token secret." env:"SERVICE_TOKEN_SECRET"`
OtlpEndpoint string `help:"OpenTelemetry receiver endpoint that is set as default for all types." env:"OTLP_ENDPOINT"`
OtlpTracesEndpoint string `help:"OpenTelemetry endpoint for traces." env:"OTLP_TRACES_ENDPOINT"`
OtlpInsecure bool `help:"Disable LTS for OpenTelemetry gRPC exporters." env:"OTLP_INSECURE"`
OtlpMetricsEndpoint string `help:"OpenTelemetry endpoint for metrics." env:"OTLP_METRICS_ENDPOINT"`
ServiceName string `help:"OpenTelemetry service name." env:"OTEL_SERVICE_NAME"`
LogLevel string `help:"Log level." env:"LOG_LEVEL" enum:"trace,debug,info,warn,error" default:"info"`
} `cmd:"" help:"Serve the NDC connector."`

Configuration struct {
Serve struct {
Port int `help:"Serve Port." env:"PORT" default:"8100"`
LogLevel string `help:"Log level." env:"LOG_LEVEL" default:"info"`
} `cmd:"" help:"Serve the NDC configuration service."`
} `cmd:"" help:"Configuration helpers."`
}

// Starts the connector.
// Will read runtime flags or environment variables to determine startup mode.
//
// This should be the entrypoint of your connector
func Start[RawConfiguration any, Configuration any, State any](connector Connector[RawConfiguration, Configuration, State], options ...ServeOption) error {
cmd := kong.Parse(&cli)
switch cmd.Command() {
case "serve":
logger, err := initLogger(cli.Serve.LogLevel)
if err != nil {
return err
}

server, err := NewServer[RawConfiguration, Configuration, State](connector, &ServerOptions{
Configuration: cli.Serve.Configuration,
InlineConfig: cli.Serve.InlineConfig,
ServiceTokenSecret: cli.Serve.ServiceTokenSecret,
OTLPEndpoint: cli.Serve.OtlpEndpoint,
OTLPInsecure: cli.Serve.OtlpInsecure,
OTLPTracesEndpoint: cli.Serve.OtlpTracesEndpoint,
OTLPMetricsEndpoint: cli.Serve.OtlpMetricsEndpoint,
ServiceName: cli.Serve.ServiceName,
}, append(options, WithLogger(*logger))...)
if err != nil {
return err
}
return server.ListenAndServe(cli.Serve.Port)

case "configuration serve":
logger, err := initLogger(cli.Serve.LogLevel)
if err != nil {
return err
}

server := NewConfigurationServer[RawConfiguration, Configuration, State](connector, WithLogger(*logger))
return server.ListenAndServe(cli.Serve.Port)
default:
return fmt.Errorf("Unknown command <%s>", cmd.Command())
}
}

func initLogger(logLevel string) (*zerolog.Logger, error) {
level, err := zerolog.ParseLevel(logLevel)
if err != nil {
return nil, err
}
zerolog.SetGlobalLevel(level)
logger := log.Level(level)

return &logger, nil
}
1 change: 1 addition & 0 deletions connector/cli_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package connector
130 changes: 130 additions & 0 deletions connector/configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package connector

import (
"encoding/json"
"fmt"
"net/http"

"github.com/hasura/ndc-sdk-go/schema"
"github.com/rs/zerolog"
)

// ConfigurationServer provides configuration APIs of the connector
// that help validate and generate Hasura v3 metadata
type ConfigurationServer[RawConfiguration any, Configuration any, State any] struct {
connector Connector[RawConfiguration, Configuration, State]
logger zerolog.Logger
}

// NewConfigurationServer creates a ConfigurationServer instance
func NewConfigurationServer[RawConfiguration any, Configuration any, State any](connector Connector[RawConfiguration, Configuration, State], options ...ServeOption) *ConfigurationServer[RawConfiguration, Configuration, State] {
defaultOptions := defaultServeOptions()
for _, opts := range options {
opts(defaultOptions)
}

return &ConfigurationServer[RawConfiguration, Configuration, State]{
connector: connector,
logger: defaultOptions.logger,
}
}

// GetIndex implements a handler for the index endpoint, GET method.
// Returns an empty configuration of the connector
func (cs *ConfigurationServer[RawConfiguration, Configuration, State]) GetIndex(w http.ResponseWriter, r *http.Request) {
writeJson(w, http.StatusOK, cs.connector.MakeEmptyConfiguration())
}

// PostIndex implements a handler for the index endpoint, POST method.
// Take a raw configuration, update it where appropriate by connecting to the underlying data source, and otherwise return it as-is
func (cs *ConfigurationServer[RawConfiguration, Configuration, State]) PostIndex(w http.ResponseWriter, r *http.Request) {
var rawConfig RawConfiguration
if err := json.NewDecoder(r.Body).Decode(&rawConfig); err != nil {
writeJson(w, http.StatusBadRequest, schema.BadRequestError(err.Error(), nil))
return
}

conf, err := cs.connector.UpdateConfiguration(r.Context(), &rawConfig)
if err != nil {
writeError(w, err)
return
}
writeJson(w, http.StatusOK, conf)
}

// GetSchema implements a handler for the /schema endpoint, GET method.
// Return jsonschema for the raw configuration for this connector
func (cs *ConfigurationServer[RawConfiguration, Configuration, State]) GetSchema(w http.ResponseWriter, r *http.Request) {
writeJson(w, http.StatusOK, cs.connector.GetRawConfigurationSchema())
}

// Validate implements a handler for the /validate endpoint, POST method.
// that validates the raw configuration provided by the user
func (cs *ConfigurationServer[RawConfiguration, Configuration, State]) Validate(w http.ResponseWriter, r *http.Request) {
var rawConfig RawConfiguration
if err := json.NewDecoder(r.Body).Decode(&rawConfig); err != nil {
writeJson(w, http.StatusBadRequest, schema.ErrorResponse{
Message: "failed to decode json request body",
Details: map[string]any{
"cause": err.Error(),
},
})
return
}

resolvedConfiguration, err := cs.connector.ValidateRawConfiguration(
&rawConfig,
)
if err != nil {
writeError(w, err)
return
}

connectorSchema, err := cs.connector.GetSchema(resolvedConfiguration)
if err != nil {
writeError(w, err)
return
}

capabilities := cs.connector.GetCapabilities(resolvedConfiguration)
configurationBytes, err := json.Marshal(resolvedConfiguration)
if err != nil {
writeError(w, schema.InternalServerError(err.Error(), nil))
return
}

writeJson(w, http.StatusOK, &schema.ValidateResponse{
Schema: *connectorSchema,
Capabilities: *capabilities,
ResolvedConfiguration: string(configurationBytes),
})
}

// Health implements a handler for /healthz endpoint.
// The endpoint has nothing to check, because the reference implementation does not need to connect to any other services.
// Therefore, once the reference implementation is running, it can always report a healthy status
func (cs *ConfigurationServer[RawConfiguration, Configuration, State]) Health(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

// ListenAndServe serves the configuration server with the standard http server.
// You can also replace this method with any router or web framework that is compatible with net/http.
func (cs *ConfigurationServer[RawConfiguration, Configuration, State]) ListenAndServe(port uint) error {
router := newRouter(cs.logger)
router.Use("/", http.MethodGet, cs.GetIndex)
router.Use("/", http.MethodPost, cs.PostIndex)
router.Use("/schema", http.MethodGet, cs.GetSchema)
router.Use("/validate", http.MethodPost, cs.Validate)
router.Use("/healthz", http.MethodGet, cs.Health)

server := http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: router.Build(),
}

cs.logger.Info().Msgf("Listening server on %s", server.Addr)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
return err
}
return nil
}
Loading