Skip to content

Commit

Permalink
change the query output type and improve code lint
Browse files Browse the repository at this point in the history
  • Loading branch information
hgiasac committed Feb 4, 2024
1 parent 8e8c7c8 commit 3e0b24e
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 64 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ jobs:
run: diff -u <(echo -n) <(gofmt -d -s .)
- name: Vet
run: go vet ./...
- name: Lint
uses: golangci/golangci-lint-action@v3
with:
version: latest
only-new-issues: true
skip-cache: true
- name: Run Go unit tests
run: go test -v -race -timeout 3m -coverprofile=coverage.out ./...
- name: Go coverage format
Expand Down
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@ typegen:

.PHONY: typegen
format:
gofmt -w -s .
gofmt -w -s .


.PHONY: lint
lint:
@(./scripts/check_installed.sh golangci-lint "golangci-lint: https://golangci-lint.run/usage/install/" && \
golangci-lint run )
23 changes: 13 additions & 10 deletions connector/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ func NewConfigurationServer[RawConfiguration any, Configuration any, State any](
// GetIndex implements a handler for the index endpoint, GET method.
// Returns an empty configuration of the connector
func (cs *ConfigurationServer[RawConfiguration, Configuration, State]) GetIndex(w http.ResponseWriter, r *http.Request) {
writeJson(w, http.StatusOK, cs.connector.MakeEmptyConfiguration())
writeJson(w, GetLogger(r.Context()), http.StatusOK, cs.connector.MakeEmptyConfiguration())
}

// PostIndex implements a handler for the index endpoint, POST method.
// Take a raw configuration, update it where appropriate by connecting to the underlying data source, and otherwise return it as-is
func (cs *ConfigurationServer[RawConfiguration, Configuration, State]) PostIndex(w http.ResponseWriter, r *http.Request) {
logger := GetLogger(r.Context())
var rawConfig RawConfiguration
if err := json.NewDecoder(r.Body).Decode(&rawConfig); err != nil {
writeJson(w, http.StatusBadRequest, schema.ErrorResponse{
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
Message: "failed to decode json request body",
Details: map[string]any{
"cause": err.Error(),
Expand All @@ -51,24 +52,26 @@ func (cs *ConfigurationServer[RawConfiguration, Configuration, State]) PostIndex

conf, err := cs.connector.UpdateConfiguration(r.Context(), &rawConfig)
if err != nil {
writeError(w, err)
writeError(w, logger, err)
return
}
writeJson(w, http.StatusOK, conf)
writeJson(w, logger, http.StatusOK, conf)
}

// GetSchema implements a handler for the /schema endpoint, GET method.
// Return jsonschema for the raw configuration for this connector
func (cs *ConfigurationServer[RawConfiguration, Configuration, State]) GetSchema(w http.ResponseWriter, r *http.Request) {
writeJson(w, http.StatusOK, cs.connector.GetRawConfigurationSchema())
writeJson(w, GetLogger(r.Context()), http.StatusOK, cs.connector.GetRawConfigurationSchema())
}

// Validate implements a handler for the /validate endpoint, POST method.
// that validates the raw configuration provided by the user
func (cs *ConfigurationServer[RawConfiguration, Configuration, State]) Validate(w http.ResponseWriter, r *http.Request) {
logger := GetLogger(r.Context())

var rawConfig RawConfiguration
if err := json.NewDecoder(r.Body).Decode(&rawConfig); err != nil {
writeJson(w, http.StatusBadRequest, schema.ErrorResponse{
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
Message: "failed to decode json request body",
Details: map[string]any{
"cause": err.Error(),
Expand All @@ -81,24 +84,24 @@ func (cs *ConfigurationServer[RawConfiguration, Configuration, State]) Validate(
&rawConfig,
)
if err != nil {
writeError(w, err)
writeError(w, logger, err)
return
}

connectorSchema, err := cs.connector.GetSchema(resolvedConfiguration)
if err != nil {
writeError(w, err)
writeError(w, logger, err)
return
}

capabilities := cs.connector.GetCapabilities(resolvedConfiguration)
configurationBytes, err := json.Marshal(resolvedConfiguration)
if err != nil {
writeError(w, schema.InternalServerError(err.Error(), nil))
writeError(w, logger, schema.InternalServerError(err.Error(), nil))
return
}

writeJson(w, http.StatusOK, &schema.ValidateResponse{
writeJson(w, logger, http.StatusOK, &schema.ValidateResponse{
Schema: *connectorSchema,
Capabilities: *capabilities,
ResolvedConfiguration: string(configurationBytes),
Expand Down
4 changes: 2 additions & 2 deletions connector/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestConfigurationServer(t *testing.T) {
})

t.Run("GET /", func(t *testing.T) {
res, err := http.Get(fmt.Sprintf("%s", httpServer.URL))
res, err := http.Get(httpServer.URL)
if err != nil {
t.Errorf("GET /: expected no error, got %s", err)
t.FailNow()
Expand All @@ -55,7 +55,7 @@ func TestConfigurationServer(t *testing.T) {
})

t.Run("POST / - json decode failure", func(t *testing.T) {
res, err := httpPostJSON(fmt.Sprintf("%s", httpServer.URL), "")
res, err := httpPostJSON(httpServer.URL, "")
if err != nil {
t.Errorf("POST /: expected no error, got %s", err)
t.FailNow()
Expand Down
32 changes: 19 additions & 13 deletions connector/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"github.com/rs/zerolog/log"
)

type serverContextKey string

const (
logContextKey = "hasura-log"
headerContentType = "Content-Type"
contentTypeJson = "application/json"
logContextKey serverContextKey = "hasura-log"
headerContentType string = "Content-Type"
contentTypeJson string = "application/json"
)

// define a custom response write to capture response information for logging
Expand Down Expand Up @@ -87,7 +89,7 @@ func (rt *router) Build() *http.ServeMux {
Str("stacktrace", string(debug.Stack())).
Msg("internal server error")

writeJson(w, http.StatusInternalServerError, schema.ErrorResponse{
writeJson(w, rt.logger, http.StatusInternalServerError, schema.ErrorResponse{
Message: "internal server error",
Details: map[string]any{
"cause": err,
Expand Down Expand Up @@ -118,7 +120,7 @@ func (rt *router) Build() *http.ServeMux {
err := schema.ErrorResponse{
Message: fmt.Sprintf("Invalid content type %s, accept %s only", contentType, contentTypeJson),
}
writeJson(w, http.StatusBadRequest, err)
writeJson(w, rt.logger, http.StatusBadRequest, err)

rt.logger.Error().
Str("request_id", requestID).
Expand Down Expand Up @@ -179,7 +181,7 @@ func getRequestID(r *http.Request) string {
}

// writeJson writes response data with json encode
func writeJson(w http.ResponseWriter, statusCode int, body any) {
func writeJson(w http.ResponseWriter, logger zerolog.Logger, statusCode int, body any) {
if body == nil {
w.WriteHeader(statusCode)
return
Expand All @@ -189,11 +191,15 @@ func writeJson(w http.ResponseWriter, statusCode int, body any) {
jsonBytes, err := json.Marshal(body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf(`{"message": "%s"}`, http.StatusText(http.StatusInternalServerError))))
if _, err := w.Write([]byte(fmt.Sprintf(`{"message": "%s"}`, http.StatusText(http.StatusInternalServerError)))); err != nil {
logger.Error().Err(err).Msg("failed to write response")
}
return
}
w.WriteHeader(statusCode)
w.Write(jsonBytes)
if _, err := w.Write(jsonBytes); err != nil {
logger.Error().Err(err).Msg("failed to write response")
}
}

// GetLogger gets the logger instance from context
Expand All @@ -208,28 +214,28 @@ func GetLogger(ctx context.Context) zerolog.Logger {
return log.Level(zerolog.GlobalLevel())
}

func writeError(w http.ResponseWriter, err error) int {
func writeError(w http.ResponseWriter, logger zerolog.Logger, err error) int {
w.Header().Add("Content-Type", "application/json")

var connectorErrorPtr *schema.ConnectorError
if errors.As(err, &connectorErrorPtr) {
writeJson(w, connectorErrorPtr.StatusCode(), connectorErrorPtr)
writeJson(w, logger, connectorErrorPtr.StatusCode(), connectorErrorPtr)
return connectorErrorPtr.StatusCode()
}

var errorResponse schema.ErrorResponse
if errors.As(err, &errorResponse) {
writeJson(w, http.StatusBadRequest, errorResponse)
writeJson(w, logger, http.StatusBadRequest, errorResponse)
return http.StatusBadRequest
}

var errorResponsePtr *schema.ErrorResponse
if errors.As(err, &errorResponsePtr) {
writeJson(w, http.StatusBadRequest, errorResponsePtr)
writeJson(w, logger, http.StatusBadRequest, errorResponsePtr)
return http.StatusBadRequest
}

writeJson(w, http.StatusBadRequest, schema.ErrorResponse{
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
Message: err.Error(),
})

Expand Down
41 changes: 26 additions & 15 deletions connector/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ func NewServer[RawConfiguration any, Configuration any, State any](connector Con
func (s *Server[RawConfiguration, Configuration, State]) withAuth(handler http.HandlerFunc) http.HandlerFunc {

return func(w http.ResponseWriter, r *http.Request) {
logger := GetLogger(r.Context())
if s.options.ServiceTokenSecret != "" && r.Header.Get("authorization") != fmt.Sprintf("Bearer %s", s.options.ServiceTokenSecret) {
writeJson(w, http.StatusUnauthorized, schema.ErrorResponse{
writeJson(w, logger, http.StatusUnauthorized, schema.ErrorResponse{
Message: "Unauthorized",
Details: map[string]any{
"cause": "Bearer token does not match.",
Expand All @@ -145,13 +146,15 @@ func (s *Server[RawConfiguration, Configuration, State]) withAuth(handler http.H
}

func (s *Server[RawConfiguration, Configuration, State]) GetCapabilities(w http.ResponseWriter, r *http.Request) {
logger := GetLogger(r.Context())
capabilities := s.connector.GetCapabilities(s.configuration)
writeJson(w, http.StatusOK, capabilities)
writeJson(w, logger, http.StatusOK, capabilities)
}

func (s *Server[RawConfiguration, Configuration, State]) Health(w http.ResponseWriter, r *http.Request) {
logger := GetLogger(r.Context())
if err := s.connector.HealthCheck(r.Context(), s.configuration, s.state); err != nil {
writeError(w, err)
writeError(w, logger, err)
return
}

Expand All @@ -160,17 +163,19 @@ func (s *Server[RawConfiguration, Configuration, State]) Health(w http.ResponseW

// GetSchema implements a handler for the /schema endpoint, GET method.
func (s *Server[RawConfiguration, Configuration, State]) GetSchema(w http.ResponseWriter, r *http.Request) {
logger := GetLogger(r.Context())
schemaResult, err := s.connector.GetSchema(s.configuration)
if err != nil {
writeError(w, err)
writeError(w, logger, err)
return
}

writeJson(w, http.StatusOK, schemaResult)
writeJson(w, logger, http.StatusOK, schemaResult)
}

func (s *Server[RawConfiguration, Configuration, State]) Query(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
logger := GetLogger(r.Context())
ctx, span := s.telemetry.Tracer.Start(r.Context(), "Query", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

Expand All @@ -179,7 +184,7 @@ func (s *Server[RawConfiguration, Configuration, State]) Query(w http.ResponseWr
defer decodeSpan.End()
var body schema.QueryRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
writeJson(w, http.StatusBadRequest, schema.ErrorResponse{
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
Message: "failed to decode json request body",
Details: map[string]any{
"cause": err.Error(),
Expand All @@ -205,7 +210,7 @@ func (s *Server[RawConfiguration, Configuration, State]) Query(w http.ResponseWr
response, err := s.connector.Query(execQueryCtx, s.configuration, s.state, &body)

if err != nil {
status := writeError(w, err)
status := writeError(w, logger, err)
statusAttributes := []attribute.KeyValue{
attribute.String("status", "failed"),
attribute.String("reason", fmt.Sprintf("%d", status)),
Expand All @@ -219,7 +224,7 @@ func (s *Server[RawConfiguration, Configuration, State]) Query(w http.ResponseWr
statusAttribute := attribute.String("status", "success")
span.SetAttributes(statusAttribute)
_, responseSpan := s.telemetry.Tracer.Start(ctx, "Response")
writeJson(w, http.StatusOK, response)
writeJson(w, logger, http.StatusOK, response)
responseSpan.End()

s.telemetry.queryCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttribute)...))
Expand All @@ -229,6 +234,7 @@ func (s *Server[RawConfiguration, Configuration, State]) Query(w http.ResponseWr

func (s *Server[RawConfiguration, Configuration, State]) Explain(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
logger := GetLogger(r.Context())
ctx, span := s.telemetry.Tracer.Start(r.Context(), "Explain", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

Expand All @@ -237,7 +243,7 @@ func (s *Server[RawConfiguration, Configuration, State]) Explain(w http.Response
defer decodeSpan.End()
var body schema.QueryRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
writeJson(w, http.StatusBadRequest, schema.ErrorResponse{
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
Message: "failed to decode json request body",
Details: map[string]any{
"cause": err.Error(),
Expand All @@ -262,7 +268,7 @@ func (s *Server[RawConfiguration, Configuration, State]) Explain(w http.Response

response, err := s.connector.Explain(execCtx, s.configuration, s.state, &body)
if err != nil {
status := writeError(w, err)
status := writeError(w, logger, err)
statusAttributes := []attribute.KeyValue{
attribute.String("status", "failed"),
attribute.String("reason", fmt.Sprintf("%d", status)),
Expand All @@ -276,7 +282,7 @@ func (s *Server[RawConfiguration, Configuration, State]) Explain(w http.Response
statusAttribute := attribute.String("status", "success")
span.SetAttributes(statusAttribute)
_, responseSpan := s.telemetry.Tracer.Start(ctx, "Response")
writeJson(w, http.StatusOK, response)
writeJson(w, logger, http.StatusOK, response)
responseSpan.End()
s.telemetry.explainCounter.Add(r.Context(), 1, metric.WithAttributes(append(attributes, statusAttribute)...))

Expand All @@ -286,14 +292,15 @@ func (s *Server[RawConfiguration, Configuration, State]) Explain(w http.Response

func (s *Server[RawConfiguration, Configuration, State]) Mutation(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
logger := GetLogger(r.Context())
ctx, span := s.telemetry.Tracer.Start(r.Context(), "Mutation", trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

_, decodeSpan := s.telemetry.Tracer.Start(ctx, "Decode JSON Body")
defer decodeSpan.End()
var body schema.MutationRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
writeJson(w, http.StatusBadRequest, schema.ErrorResponse{
writeJson(w, logger, http.StatusBadRequest, schema.ErrorResponse{
Message: "failed to decode json request body",
Details: map[string]any{
"cause": err.Error(),
Expand All @@ -314,7 +321,7 @@ func (s *Server[RawConfiguration, Configuration, State]) Mutation(w http.Respons
defer execSpan.End()
response, err := s.connector.Mutation(execCtx, s.configuration, s.state, &body)
if err != nil {
status := writeError(w, err)
status := writeError(w, logger, err)
attributes := []attribute.KeyValue{
attribute.String("status", "failed"),
attribute.String("reason", fmt.Sprintf("%d", status)),
Expand All @@ -328,7 +335,7 @@ func (s *Server[RawConfiguration, Configuration, State]) Mutation(w http.Respons
attributes := attribute.String("status", "success")
span.SetAttributes(attributes)
_, responseSpan := s.telemetry.Tracer.Start(ctx, "Response")
writeJson(w, http.StatusOK, response)
writeJson(w, logger, http.StatusOK, response)
responseSpan.End()

s.telemetry.mutationCounter.Add(r.Context(), 1, metric.WithAttributes(attributes))
Expand All @@ -354,7 +361,11 @@ func (s *Server[RawConfiguration, Configuration, State]) buildHandler() *http.Se
// You can also replace this method with any router or web framework that is compatible with net/http.
func (s *Server[RawConfiguration, Configuration, State]) ListenAndServe(port uint) error {
defer s.stop()
defer s.telemetry.Shutdown(context.Background())
defer func() {
if err := s.telemetry.Shutdown(context.Background()); err != nil {
s.logger.Error().Err(err).Msg("failed to shutdown OpenTelemetry")
}
}()

server := http.Server{
Addr: fmt.Sprintf(":%d", port),
Expand Down
Loading

0 comments on commit 3e0b24e

Please sign in to comment.