Skip to content

Commit

Permalink
Set content type header to application/json
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgeorgousis authored and peter-hazell committed Nov 7, 2024
1 parent 5366ab7 commit c6b732b
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 14 deletions.
13 changes: 5 additions & 8 deletions controllers/webhook/runcompletion_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,8 @@ func getRequestBody(ctx context.Context, request *http.Request) ([]byte, error)
} else if len(body) == 0 {
return nil, errors.New("request body is empty")
}
return body, nil
}

type DataWrapper struct {
Data common.RunCompletionEventData `json:"data"`
return body, nil
}

func (rcf RunCompletionFeed) extractRunCompletionEvent(request *http.Request) (*common.RunCompletionEvent, error) {
Expand All @@ -75,12 +72,12 @@ func (rcf RunCompletionFeed) extractRunCompletionEvent(request *http.Request) (*
return nil, err
}

runDataWrapper := &DataWrapper{}
if err := json.Unmarshal(body, runDataWrapper); err != nil {
rced := common.RunCompletionEventData{}
if err := json.Unmarshal(body, &rced); err != nil {
return nil, err
}

rce, err := rcf.eventProcessor.ToRunCompletionEvent(rcf.ctx, runDataWrapper.Data)
rce, err := rcf.eventProcessor.ToRunCompletionEvent(rcf.ctx, rced)
if err != nil {
return nil, err
} else if rce == nil {
Expand All @@ -95,7 +92,7 @@ func (rcf RunCompletionFeed) handleEvent(response http.ResponseWriter, request *
switch request.Method {
case http.MethodPost:
if request.Header.Get(HttpHeaderContentType) != HttpContentTypeJSON {
logger.Error(errors.New("RunCompletionFeed call failed"), "invalid %s [%s], want `%s`", HttpHeaderContentType, request.Header.Get(HttpHeaderContentType), HttpContentTypeJSON)
logger.Error(errors.New("RunCompletionFeed call failed"), fmt.Sprintf("invalid %s [%s], want `%s`", HttpHeaderContentType, request.Header.Get(HttpHeaderContentType), HttpContentTypeJSON))
http.Error(response, fmt.Sprintf("invalid %s, want `%s`", HttpHeaderContentType, HttpContentTypeJSON), http.StatusUnsupportedMediaType)
return
}
Expand Down
15 changes: 10 additions & 5 deletions provider-service/internal/outputhandlers/http_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (hws HttpWebhookSink) SendEvents() {
logger.Error(err, "Failed to send event", "event", fmt.Sprintf("%+v", object))
object.OnFailure()
} else {
logger.Info("Successfully Sent event", "event", fmt.Sprintf("%+v", object))
logger.Info("Successfully sent event", "event", fmt.Sprintf("%+v", object))
object.OnSuccess()
}
default:
Expand All @@ -69,7 +69,14 @@ func (hws HttpWebhookSink) SendEvents() {
}

func (hws HttpWebhookSink) buildRequest(bodyBytes []byte) (*http.Request, error) {
return http.NewRequestWithContext(hws.context, http.MethodPost, hws.operatorWebhook, bytes.NewReader(bodyBytes))
request, err := http.NewRequestWithContext(hws.context, http.MethodPost, hws.operatorWebhook, bytes.NewReader(bodyBytes))
if err != nil {
return nil, err
}
request.Header.Set("Content-Type", "application/json")

return request, nil

}

func (hws HttpWebhookSink) Send(rced common.RunCompletionEventData) error {
Expand All @@ -95,16 +102,14 @@ func (hws HttpWebhookSink) Send(rced common.RunCompletionEventData) error {
}
}(response.Body)

// Fully consume the response body
_, err = io.Copy(io.Discard, response.Body)
if err != nil {
logger.Error(err, "Failed to fully consume response body")
}

if response.StatusCode != 200 {
return errors.New(fmt.Sprintf("KFP Operator error response, http status code: [%s]", response.Status))
return errors.New(fmt.Sprintf("KFP Operator error response received with http status code: [%s]", response.Status))
}

logger.Info("successfully sent")
return nil
}
2 changes: 1 addition & 1 deletion provider-service/internal/vai/event_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type VAILogEntry struct {
}

func NewVAIEventDataSource(ctx context.Context, provider string, namespace string) (*VAIEventDataSource, error) {
logger := common.LoggerFromContext(ctx)
k8sClient, err := NewK8sClient()
if err != nil {
return nil, err
Expand All @@ -84,7 +85,6 @@ func NewVAIEventDataSource(ctx context.Context, provider string, namespace strin
return nil, err
}

logger := common.LoggerFromContext(ctx)
vaiEventDataSource := &VAIEventDataSource{
K8sClient: *k8sClient,
ProviderConfig: *config,
Expand Down
3 changes: 3 additions & 0 deletions provider-service/internal/vai/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package vai
import (
"context"
"github.com/reugn/go-streams/flow"
"github.com/sky-uk/kfp-operator/argo/common"
"github.com/sky-uk/kfp-operator/provider-service/internal/config"
"github.com/sky-uk/kfp-operator/provider-service/internal/outputhandlers"
"os"
)

func Start(ctx context.Context, config config.Config) {
logger := common.LoggerFromContext(ctx)
source, err := NewVAIEventDataSource(ctx, config.ProviderName, config.Pod.Namespace)
if err != nil {
logger.Error(err, "Failed to create VAI event data source")
os.Exit(1)
}

Expand Down

0 comments on commit c6b732b

Please sign in to comment.