Skip to content

Commit

Permalink
chore: refactor helpers common code into a pkg (#313)
Browse files Browse the repository at this point in the history
Refactors the reader pipeline component into a testable library and adds unit tests

- Extracts reader functionality into a dedicated library package
- Introduces shared helpers for kubernetes client and parameter handling
- Adds unit tests for the reader component

closes #122
  • Loading branch information
kirederik authored Jan 3, 2025
1 parent 585b3ac commit 3b513b8
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 200 deletions.
132 changes: 6 additions & 126 deletions work-creator/pipeline/cmd/reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,138 +2,18 @@ package main

import (
"context"
"fmt"
"log"
"os"
"path/filepath"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

v1alpha1 "github.com/syntasso/kratix/api/v1alpha1"

"sigs.k8s.io/yaml"
"github.com/syntasso/kratix/work-creator/pipeline/lib"
)

func main() {
if err := run(); err != nil {
log.Fatalf("Error: %v", err)
}
}

func run() error {
// Parse environment variables
objectGroup := os.Getenv("OBJECT_GROUP")
objectName := os.Getenv("OBJECT_NAME")
objectNamespace := os.Getenv("OBJECT_NAMESPACE")
objectVersion := os.Getenv("OBJECT_VERSION")
crdPlural := os.Getenv("CRD_PLURAL")
healthcheck := os.Getenv("HEALTHCHECK")
outputDir := os.Getenv("OUTPUT_DIR")
clusterScoped := os.Getenv("CLUSTER_SCOPED") == "true"
if clusterScoped {
objectNamespace = "" // promises are cluster scoped
}
if outputDir == "" {
outputDir = "/kratix/input"
}

dynamicClient, err := getK8sClient()
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %v", err)
}

if err := writeObjectToFile(dynamicClient, outputDir, crdPlural, objectGroup, objectVersion, objectName, objectNamespace); err != nil {
return err
}

if healthcheck == "true" {
promiseName := os.Getenv("PROMISE_NAME")
if err := writePromiseToFile(dynamicClient, outputDir, promiseName); err != nil {
return err
}
}

return nil
}

func writeObjectToFile(client dynamic.Interface, outputDir, plural, group, version, name, namespace string) error {
// Create GVR for the object
gvr := schema.GroupVersionResource{
Group: group,
Version: version,
Resource: plural,
}

// Get the object
dynamicResource := client.Resource(gvr).Namespace(namespace)
obj, err := dynamicResource.Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get object: %v", err)
}

objYAML, err := yaml.Marshal(obj)
if err != nil {
return fmt.Errorf("failed to marshal object: %v", err)
ctx := context.Background()
r := lib.Reader{
Out: os.Stdout,
}

objectFilePath := filepath.Join(outputDir, "object.yaml")
if err := os.WriteFile(objectFilePath, objYAML, 0644); err != nil {
return fmt.Errorf("failed to write object to file: %v", err)
}

log.Printf("Object written to %s. Head is:\n%s", objectFilePath, string(objYAML[:min(len(objYAML), 500)]))
return nil
}

func writePromiseToFile(client dynamic.Interface, outputDir, promiseName string) error {
gvr := schema.GroupVersionResource{
Group: v1alpha1.GroupVersion.Group,
Version: v1alpha1.GroupVersion.Version,
Resource: "promises",
}

dynamicResource := client.Resource(gvr)
obj, err := dynamicResource.Get(context.TODO(), promiseName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get Promise: %v", err)
}

promiseYAML, err := yaml.Marshal(obj)
if err != nil {
return fmt.Errorf("failed to marshal Promise: %v", err)
}

promiseFilePath := filepath.Join(outputDir, "promise.yaml")
if err := os.WriteFile(promiseFilePath, promiseYAML, 0644); err != nil {
return fmt.Errorf("failed to write Promise to file: %v", err)
}

log.Printf("Promise written to %s. Head is:\n%s", promiseFilePath, string(promiseYAML[:min(len(promiseYAML), 500)]))
return nil
}

func getK8sClient() (dynamic.Interface, error) {
// Try to load in-cluster config first
config, err := rest.InClusterConfig()
if err != nil {
// Fall back to kubeconfig
kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
}

return dynamic.NewForConfig(config)
}

func min(a, b int) int {
if a < b {
return a
if err := r.Run(ctx); err != nil {
log.Fatalf("Error: %v", err)
}
return b
}
86 changes: 12 additions & 74 deletions work-creator/pipeline/cmd/update-status/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,49 +8,36 @@ import (
"path/filepath"

"github.com/syntasso/kratix/work-creator/pipeline/lib"
"github.com/syntasso/kratix/work-creator/pipeline/lib/helpers"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/yaml"
)

type Inputs struct {
ObjectGroup string
ObjectName string
ObjectVersion string
Plural string
ClusterScoped bool
ObjectNamespace string
IsLastPipeline bool
}

func main() {
if err := run(); err != nil {
ctx := context.Background()
if err := run(ctx); err != nil {
log.Fatalf("Error: %v", err)
}
}

func run() error {
func run(ctx context.Context) error {
workspaceDir := "/work-creator-files"
statusFile := filepath.Join(workspaceDir, "metadata", "status.yaml")

inputs := parseInputsFromEnv()
params := helpers.GetParametersFromEnv()

// Initialize Kubernetes client
dynamicClient, err := getClientForInputs(inputs)
client, err := helpers.GetK8sClient()
if err != nil {
return fmt.Errorf("failed to get dynamic client: %v", err)
return fmt.Errorf("failed to create Kubernetes client: %v", err)
}

// Get existing object
existingObj, err := dynamicClient.Get(context.TODO(), inputs.ObjectName, metav1.GetOptions{})
objectClient := client.Resource(helpers.ObjectGVR(params)).Namespace(params.ObjectNamespace)

existingObj, err := objectClient.Get(ctx, params.ObjectName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get existing object: %v", err)
}

// Get existing obj status
existingStatus := map[string]any{}
if existingObj.Object["status"] != nil {
existingStatus = existingObj.Object["status"].(map[string]any)
Expand All @@ -63,70 +50,21 @@ func run() error {
}

mergedStatus := lib.MergeStatuses(existingStatus, incomingStatus)
if inputs.IsLastPipeline {
if params.IsLastPipeline {
mergedStatus = lib.MarkAsCompleted(mergedStatus)
}

// Apply merged status to the existing object
existingObj.Object["status"] = mergedStatus

// Update the object's status
if _, err = dynamicClient.UpdateStatus(context.TODO(), existingObj, metav1.UpdateOptions{}); err != nil {
if _, err = objectClient.UpdateStatus(ctx, existingObj, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update status: %v", err)
}

return nil
}

func getK8sClient() (dynamic.Interface, error) {
// Try to load in-cluster config first
config, err := rest.InClusterConfig()
if err != nil {
// Fall back to kubeconfig
kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
}

return dynamic.NewForConfig(config)
}

func parseInputsFromEnv() Inputs {
inputs := Inputs{
ObjectGroup: os.Getenv("OBJECT_GROUP"),
ObjectName: os.Getenv("OBJECT_NAME"),
ObjectVersion: os.Getenv("OBJECT_VERSION"),
Plural: os.Getenv("CRD_PLURAL"),
ClusterScoped: os.Getenv("CLUSTER_SCOPED") == "true",
ObjectNamespace: os.Getenv("OBJECT_NAMESPACE"),
IsLastPipeline: os.Getenv("IS_LAST_PIPELINE") == "true",
}

if inputs.ClusterScoped {
inputs.ObjectNamespace = "" // promises are cluster scoped
}

return inputs
}

func getClientForInputs(inputs Inputs) (dynamic.ResourceInterface, error) {
client, err := getK8sClient()
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes client: %v", err)
}

// Create dynamic client for the specified GVR
gvr := schema.GroupVersionResource{
Group: inputs.ObjectGroup,
Version: inputs.ObjectVersion,
Resource: inputs.Plural,
}

return client.Resource(gvr).Namespace(inputs.ObjectNamespace), nil
}

func readStatusFile(statusFile string) (map[string]any, error) {
incomingStatus := map[string]any{}
if _, err := os.Stat(statusFile); err == nil {
Expand Down
Loading

0 comments on commit 3b513b8

Please sign in to comment.