Skip to content

Commit

Permalink
use errors for kcp request
Browse files Browse the repository at this point in the history
  • Loading branch information
lindnerby committed Dec 4, 2023
1 parent dfcb09d commit dbb5b0d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 44 deletions.
93 changes: 50 additions & 43 deletions runtime-watcher/internal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ func (h *Handler) Handle(writer http.ResponseWriter, request *http.Request) {
}

validationMsg := h.validateResources(admissionReview.Request, moduleName)

h.logger.Info(string(validationMsg))
h.logger.Info(validationMsg)

responseBytes := h.prepareResponse(admissionReview, validationMsg)
if responseBytes == nil {
Expand Down Expand Up @@ -118,7 +117,7 @@ func getModuleName(urlPath string) (string, error) {
}

func (h *Handler) prepareResponse(admissionReview *admissionv1.AdmissionReview,
validationMessage admissionMessage,
validationMessage string,
) []byte {
h.logger.Info(fmt.Sprintf("Preparing response for AdmissionReview: %s %s %s",
admissionReview.Request.Kind.Kind,
Expand All @@ -134,7 +133,7 @@ func (h *Handler) prepareResponse(admissionReview *admissionv1.AdmissionReview,
UID: admissionReview.Request.UID,
Allowed: true,
Result: &metav1.Status{
Message: string(validationMessage),
Message: validationMessage,
Status: metav1.StatusSuccess,
},
},
Expand All @@ -148,10 +147,8 @@ func (h *Handler) prepareResponse(admissionReview *admissionv1.AdmissionReview,
return admissionReviewBytes
}

type admissionMessage string

func (h *Handler) validateResources(request *admissionv1.AdmissionRequest, moduleName string,
) admissionMessage {
) string {
object, oldObject := WatchedObject{}, WatchedObject{}

switch request.Operation {
Expand All @@ -162,24 +159,42 @@ func (h *Handler) validateResources(request *admissionv1.AdmissionRequest, modul
GroupVersionKind: request.Kind,
SubResource: request.SubResource,
}
msg := h.sendRequestToKcpOnUpdate(resource, oldObject, object, moduleName)
return admissionMessage(msg)
changed, err := h.checkForChange(resource, oldObject, object)
if err != nil {
h.metrics.UpdateFailedKCPTotal()
return err.Error()
}
if !changed {
return fmt.Sprintf("no change detected on watched resource %s/%s",
object.Namespace, object.Name)
}
err = h.sendRequestToKcp(moduleName, object)
if err != nil {
h.metrics.UpdateFailedKCPTotal()
return err.Error()
}
case admissionv1.Delete:
h.unmarshalWatchedObject(request.OldObject.Raw, &oldObject)
msg := h.sendRequestToKcp(moduleName, oldObject)
return admissionMessage(msg)
err := h.sendRequestToKcp(moduleName, oldObject)
if err != nil {
h.metrics.UpdateFailedKCPTotal()
return err.Error()
}
case admissionv1.Create:
h.unmarshalWatchedObject(request.Object.Raw, &object)
msg := h.sendRequestToKcp(moduleName, object)
return admissionMessage(msg)
err := h.sendRequestToKcp(moduleName, object)
if err != nil {
h.metrics.UpdateFailedKCPTotal()
return err.Error()
}
case admissionv1.Connect:
msg := fmt.Sprintf("operation %s not supported for %s", admissionv1.Connect, request.Kind.String())
return admissionMessage(msg)
return fmt.Sprintf("operation %s not supported for %s", admissionv1.Connect, request.Kind.String())
}
return ""
return kcpReqSucceededMsg
}

var errAdmission = errors.New(admissionError)

Check failure on line 196 in runtime-watcher/internal/handler.go

View workflow job for this annotation

GitHub Actions / lint-build-test

File is not `gofumpt`-ed (gofumpt)
var errKcpRequest = errors.New(kcpReqFailedMsg)

func (h *Handler) unmarshalWatchedObject(rawBytes []byte, response responseInterface) {
if err := json.Unmarshal(rawBytes, response); err != nil {
Expand All @@ -190,9 +205,7 @@ func (h *Handler) unmarshalWatchedObject(rawBytes []byte, response responseInter
}
}

func (h *Handler) sendRequestToKcpOnUpdate(resource *Resource, oldObj, obj WatchedObject,
moduleName string,
) string {
func (h *Handler) checkForChange(resource *Resource, oldObj, obj WatchedObject) (bool, error) {
var registerChange bool
// e.g. slice or status subresource. Only status is supported.
watchedSubResource := strings.ToLower(resource.SubResource)
Expand All @@ -210,24 +223,17 @@ func (h *Handler) sendRequestToKcpOnUpdate(resource *Resource, oldObj, obj Watch
case statusSubResource:
registerChange = !reflect.DeepEqual(oldObj.Status, obj.Status)
default:
return fmt.Sprintf("invalid subresource for watched resource %s/%s",
obj.Namespace, obj.Name)
}

if !registerChange {
return fmt.Sprintf("no change detected on watched resource %s/%s",
return false, fmt.Errorf("invalid subresource for watched resource %s/%s",
obj.Namespace, obj.Name)
}

return h.sendRequestToKcp(moduleName, obj)
return registerChange, nil
}

func (h *Handler) sendRequestToKcp(moduleName string, watched WatchedObject) error {
owner, err := extractOwner(watched)
if err != nil {
err = fmt.Errorf("resource owner name could not be determined: %w", err)
h.logger.Error(err, err.Error())
return err
return h.logAndReturnKCPErr(fmt.Errorf("resource owner name could not be determined: %w", err))
}

watcherEvent := &listenerTypes.WatchEvent{
Expand All @@ -237,44 +243,45 @@ func (h *Handler) sendRequestToKcp(moduleName string, watched WatchedObject) err
}
postBody, err := json.Marshal(watcherEvent)
if err != nil {
err = fmt.Errorf("%s: %w", kcpReqFailedMsg, err)
h.logger.Error(err, err.Error())
return err
return h.logAndReturnKCPErr(err)
}

requestPayload := bytes.NewBuffer(postBody)

if h.config.KCPAddress == "" || h.config.KCPContract == "" {
return errors.New("")
return h.logAndReturnKCPErr(errors.New("KCPAddress or KCPContract empty"))
}

url := fmt.Sprintf("https://%s/%s/%s/%s", h.config.KCPAddress, h.config.KCPContract, moduleName, eventEndpoint)
httpsClient, err := h.getHTTPSClient()
if err != nil {
h.logger.Error(err, kcpReqFailedMsg)
return err.Error()
return h.logAndReturnKCPErr(err)
}

resp, err := httpsClient.Post(url, "application/json", requestPayload)
if err != nil {
h.logger.Error(err, kcpReqFailedMsg, "postBody", watcherEvent)
return kcpReqFailedMsg
return h.logAndReturnKCPErr(err, "postBody", watcherEvent)
}
defer resp.Body.Close()
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
h.logger.Error(err, kcpReqFailedMsg, "postBody", watcherEvent)
return kcpReqFailedMsg
return h.logAndReturnKCPErr(err, "postBody", watcherEvent)
}
if resp.StatusCode != http.StatusOK {
h.logger.Error(fmt.Errorf("%w: responseBody: %s with StatusCode: %d", err, responseBody, resp.StatusCode),
kcpReqFailedMsg, "postBody", watcherEvent)
return kcpReqFailedMsg
return h.logAndReturnKCPErr(
fmt.Errorf("%w: responseBody: %s with StatusCode: %d", err, responseBody, resp.StatusCode),
"postBody", watcherEvent)
}

h.logger.Info(fmt.Sprintf("sent request to KCP successfully for resource %s/%s",
watched.Namespace, watched.Name), "postBody", watcherEvent)
return kcpReqSucceededMsg
return nil
}

func (h *Handler) logAndReturnKCPErr(err error, params ...any) error {
err = errors.Join(errKcpRequest, err)
h.logger.Error(err, err.Error(), params)

Check failure on line 283 in runtime-watcher/internal/handler.go

View workflow job for this annotation

GitHub Actions / lint-build-test

pass []any as any to func h.logger.Error func(err error, msg string, keysAndValues ...any) (asasalint)
return err
}

func extractOwner(watched WatchedObject) (types.NamespacedName, error) {
Expand Down
4 changes: 3 additions & 1 deletion runtime-watcher/internal/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package internal_test
import (
"encoding/json"
"fmt"
"github.com/kyma-project/runtime-watcher/skr/internal/watchermetrics"

Check failure on line 7 in runtime-watcher/internal/handler_test.go

View workflow job for this annotation

GitHub Actions / lint-build-test

File is not `gofumpt`-ed (gofumpt)
"io"
"net/http"
"net/http/httptest"

Check failure on line 10 in runtime-watcher/internal/handler_test.go

View workflow job for this annotation

GitHub Actions / lint-build-test

File is not `gofumpt`-ed (gofumpt)
Expand Down Expand Up @@ -107,7 +108,8 @@ var _ = Describe("given watched resource", Ordered, func() {

skrRecorder := httptest.NewRecorder()
requestParser := requestparser.NewRequestParser(serializer.NewCodecFactory(runtime.NewScheme()).UniversalDeserializer())

Check failure on line 110 in runtime-watcher/internal/handler_test.go

View workflow job for this annotation

GitHub Actions / lint-build-test

line is 122 characters (lll)
handler := internal.NewHandler(k8sClient, logger, config, *requestParser)
metrics := watchermetrics.NewMetrics()
handler := internal.NewHandler(k8sClient, logger, config, *requestParser, *metrics)
handler.Handle(skrRecorder, request)

bytes, err := io.ReadAll(skrRecorder.Body)
Expand Down

0 comments on commit dbb5b0d

Please sign in to comment.