Skip to content

Commit

Permalink
chore: refactor reader script into golang (#312)
Browse files Browse the repository at this point in the history
### TL;DR

Replaced shell-based reader script with a Go implementation for improved object retrieval and status handling.

### What changed?

- Replaced `/bin/reader` shell script with a Go-based implementation
- Added new environment variables for version control and CRD plural handling
- Removed unnecessary binary dependencies from the Dockerfile
- Updated pipeline factory to support cluster-scoped resources and CRD plural names
- Enhanced error handling and logging in the reader implementation
- Updated dependencies, including Ginkgo to v2.22.2

### How to test?

1. Build and deploy the updated pipeline adapter:
```bash
make quick-start
```

2. Apply a marketplace promise (or any promise with configure workflows)
3. Verify logs from the reader container show proper object retrieval and file creation
4. Send a resource request for the promise
5. Verify the workflow completes successfully 

### Why make this change?

The Go implementation provides better error handling, type safety, and maintainability compared to the shell script. It also adds support for proper cluster-scoped resource handling and improves the overall reliability of the object retrieval process.
  • Loading branch information
kirederik authored Jan 3, 2025
1 parent 0f0a769 commit 585b3ac
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 42 deletions.
8 changes: 2 additions & 6 deletions Dockerfile.pipeline-adapter
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ RUN go mod download
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} GO111MODULE=on go build -a -o bin/work-creator work-creator/pipeline/cmd/work-creator/main.go
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} GO111MODULE=on go build -a -o bin/update-status work-creator/pipeline/cmd/update-status/main.go
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} GO111MODULE=on go build -a -o bin/health-definition-creator work-creator/pipeline/cmd/health-definition-creator/main.go
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} GO111MODULE=on go build -a -o bin/reader work-creator/pipeline/cmd/reader/main.go

RUN mkdir work-creator-files

Expand All @@ -26,15 +27,10 @@ FROM gcr.io/distroless/cc:nonroot

COPY --chown=nonroot:nonroot --from=busybox /usr/bin/env /usr/bin/env
COPY --chown=nonroot:nonroot --from=busybox /bin/sh /bin/sh
COPY --chown=nonroot:nonroot --from=busybox /bin/cat /bin/cat
COPY --chown=nonroot:nonroot --from=busybox /bin/date /bin/date
COPY --chown=nonroot:nonroot --from=busybox /bin/head /bin/head
COPY --chown=nonroot:nonroot --from=bitnami/kubectl:1.31.4 /opt/bitnami/kubectl/bin/kubectl /bin/kubectl

COPY --chown=nonroot:nonroot --from=builder /workspace/bin/work-creator /bin/work-creator
COPY --chown=nonroot:nonroot --from=builder /workspace/bin/update-status /bin/update-status
COPY --chown=nonroot:nonroot --from=builder /workspace/bin/health-definition-creator /bin/health-definition-creator
COPY --chown=nonroot:nonroot --from=builder /workspace/work-creator/scripts/reader /bin/reader
COPY --chown=nonroot:nonroot --from=builder /workspace/bin/reader /bin/reader
COPY --chown=nonroot:nonroot --from=builder /workspace/work-creator-files /work-creator-files

USER 65532:65532
Expand Down
23 changes: 8 additions & 15 deletions api/v1alpha1/pipeline_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type PipelineFactory struct {
WorkflowAction Action
WorkflowType Type
ClusterScoped bool
CRDPlural string
}

func (p *PipelineFactory) Resources(jobEnv []corev1.EnvVar) (PipelineJobResources, error) {
Expand Down Expand Up @@ -157,22 +158,24 @@ func (p *PipelineFactory) defaultEnvVars() []corev1.EnvVar {
}

func (p *PipelineFactory) readerContainer() corev1.Container {
kind := p.Promise.GroupVersionKind().Kind
group := p.Promise.GroupVersionKind().Group
name := p.Promise.GetName()
version := p.Promise.GroupVersionKind().Version

if p.ResourceWorkflow {
kind = p.ResourceRequest.GetKind()
group = p.ResourceRequest.GroupVersionKind().Group
name = p.ResourceRequest.GetName()
version = p.ResourceRequest.GroupVersionKind().Version
}

envVars := []corev1.EnvVar{
{Name: "OBJECT_KIND", Value: strings.ToLower(kind)},
{Name: "OBJECT_GROUP", Value: group},
{Name: "OBJECT_NAME", Value: name},
{Name: "OBJECT_VERSION", Value: version},
{Name: "OBJECT_NAMESPACE", Value: p.Namespace},
{Name: "KRATIX_WORKFLOW_TYPE", Value: string(p.WorkflowType)},
{Name: "CRD_PLURAL", Value: p.CRDPlural},
{Name: "CLUSTER_SCOPED", Value: fmt.Sprintf("%t", p.ClusterScoped)},
}

if p.WorkflowAction == WorkflowActionHealthCheck {
Expand Down Expand Up @@ -350,15 +353,6 @@ func (p *PipelineFactory) pipelineJob(schedulingConfigMap *corev1.ConfigMap, ser
}

func (p *PipelineFactory) statusWriterContainer(obj *unstructured.Unstructured, env []corev1.EnvVar) corev1.Container {
plural := "promises"
if p.ResourceWorkflow {
_, crd, err := p.Promise.GetAPI()
if err != nil {
return corev1.Container{}
}
plural = crd.Spec.Names.Plural
}

return corev1.Container{
Name: "status-writer",
Image: os.Getenv("PIPELINE_ADAPTER_IMG"),
Expand All @@ -369,7 +363,7 @@ func (p *PipelineFactory) statusWriterContainer(obj *unstructured.Unstructured,
corev1.EnvVar{Name: "OBJECT_VERSION", Value: obj.GroupVersionKind().Version},
corev1.EnvVar{Name: "OBJECT_NAME", Value: obj.GetName()},
corev1.EnvVar{Name: "OBJECT_NAMESPACE", Value: p.Namespace},
corev1.EnvVar{Name: "CRD_PLURAL", Value: plural},
corev1.EnvVar{Name: "CRD_PLURAL", Value: p.CRDPlural},
corev1.EnvVar{Name: "CLUSTER_SCOPED", Value: fmt.Sprintf("%t", p.ClusterScoped)},
),
VolumeMounts: []corev1.VolumeMount{{
Expand Down Expand Up @@ -437,7 +431,6 @@ func (p *PipelineFactory) role() ([]rbacv1.Role, error) {
if err != nil {
return nil, err
}
plural := crd.Spec.Names.Plural
roles = append(roles, rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: p.ID,
Expand All @@ -451,7 +444,7 @@ func (p *PipelineFactory) role() ([]rbacv1.Role, error) {
Rules: []rbacv1.PolicyRule{
{
APIGroups: []string{crd.Spec.Group},
Resources: []string{plural, plural + "/status"},
Resources: []string{p.CRDPlural, p.CRDPlural + "/status"},
Verbs: []string{"get", "list", "update", "create", "patch"},
},
{
Expand Down
10 changes: 8 additions & 2 deletions api/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,19 @@ func (p *Pipeline) ForPromise(promise *Promise, action Action) *PipelineFactory
WorkflowType: WorkflowTypePromise,
WorkflowAction: action,
ClusterScoped: true,
CRDPlural: "promises",
}
}

func (p *Pipeline) ForResource(promise *Promise, action Action, resourceRequest *unstructured.Unstructured) *PipelineFactory {
_, crd, _ := promise.GetAPI()
var clusterScoped bool
if crd != nil && crd.Spec.Scope == apiextensionsv1.ClusterScoped {
clusterScoped = true
var plural string
if crd != nil {
plural = crd.Spec.Names.Plural
if crd.Spec.Scope == apiextensionsv1.ClusterScoped {
clusterScoped = true
}
}
return &PipelineFactory{
ID: promise.GetName() + "-resource-" + string(action) + "-" + p.GetName(),
Expand All @@ -207,6 +212,7 @@ func (p *Pipeline) ForResource(promise *Promise, action Action, resourceRequest
WorkflowType: WorkflowTypeResource,
WorkflowAction: action,
ClusterScoped: clusterScoped,
CRDPlural: plural,
}
}

Expand Down
21 changes: 19 additions & 2 deletions api/v1alpha1/pipeline_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ var _ = Describe("Pipeline", func() {
Expect(f.WorkflowType).To(Equal(v1alpha1.WorkflowTypePromise))
Expect(f.ClusterScoped).To(Equal(true))
Expect(f.ResourceWorkflow).To(BeFalse())
Expect(f.CRDPlural).To(Equal("promises"))
})
})

Expand All @@ -151,6 +152,7 @@ var _ = Describe("Pipeline", func() {
Expect(f.WorkflowType).To(Equal(v1alpha1.WorkflowTypeResource))
Expect(f.ClusterScoped).To(Equal(false))
Expect(f.ResourceWorkflow).To(BeTrue())
Expect(f.CRDPlural).To(Equal("promiseCrdPlural"))
})

It("sets ClusterScoped to true if the promise API is cluster scoped", func() {
Expand Down Expand Up @@ -184,6 +186,10 @@ var _ = Describe("Pipeline", func() {

Describe("Resources()", func() {
When("promise", func() {
BeforeEach(func() {
factory.CRDPlural = "promises"
})

When("building for configure action", func() {
It("returns a list of resources", func() {
factory.WorkflowAction = v1alpha1.WorkflowActionConfigure
Expand Down Expand Up @@ -264,6 +270,7 @@ var _ = Describe("Pipeline", func() {
When("ResourceWorkflow=true", func() {
BeforeEach(func() {
factory.ResourceWorkflow = true
factory.CRDPlural = "promiseCrdPlural"
})

DescribeTable("generates the appropriate resources for action", func(action v1alpha1.Action, expectedNumObjects int, expectedConfigMap bool, expectedClusterRoles bool) {
Expand Down Expand Up @@ -649,6 +656,13 @@ var _ = Describe("Pipeline", func() {

DescribeTable("ReaderContainer", func(isResourceWorkflow bool, act v1alpha1.Action, additionalEnvVars []corev1.EnvVar) {
factory.ResourceWorkflow = isResourceWorkflow
if isResourceWorkflow {
factory.ClusterScoped = false
factory.CRDPlural = "promiseCrdPlural"
} else {
factory.ClusterScoped = true
factory.CRDPlural = "promises"
}
factory.WorkflowAction = act

var err error
Expand All @@ -668,19 +682,21 @@ var _ = Describe("Pipeline", func() {
expectedEnvVars := []corev1.EnvVar{
{Name: "OBJECT_NAMESPACE", Value: factory.Namespace},
{Name: "KRATIX_WORKFLOW_TYPE", Value: string(factory.WorkflowType)},
{Name: "CLUSTER_SCOPED", Value: fmt.Sprintf("%t", factory.ClusterScoped)},
{Name: "CRD_PLURAL", Value: factory.CRDPlural},
}

if isResourceWorkflow {
expectedEnvVars = append(expectedEnvVars,
corev1.EnvVar{Name: "OBJECT_KIND", Value: resourceRequest.GroupVersionKind().Kind},
corev1.EnvVar{Name: "OBJECT_GROUP", Value: resourceRequest.GroupVersionKind().Group},
corev1.EnvVar{Name: "OBJECT_NAME", Value: resourceRequest.GetName()},
corev1.EnvVar{Name: "OBJECT_VERSION", Value: resourceRequest.GroupVersionKind().Version},
)
} else {
expectedEnvVars = append(expectedEnvVars,
corev1.EnvVar{Name: "OBJECT_KIND", Value: promise.GroupVersionKind().Kind},
corev1.EnvVar{Name: "OBJECT_GROUP", Value: promise.GroupVersionKind().Group},
corev1.EnvVar{Name: "OBJECT_NAME", Value: promise.GetName()},
corev1.EnvVar{Name: "OBJECT_VERSION", Value: promise.GroupVersionKind().Version},
)
}

Expand Down Expand Up @@ -799,6 +815,7 @@ var _ = Describe("Pipeline", func() {
Describe("StatusWriterContainer", func() {
BeforeEach(func() {
factory.ResourceWorkflow = true
factory.CRDPlural = "promiseCrdPlural"
var err error
resources, err = factory.Resources([]corev1.EnvVar{
{Name: "env1", Value: "value1"},
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/google/uuid v1.6.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1
github.com/minio/minio-go/v7 v7.0.68
github.com/onsi/ginkgo/v2 v2.22.1
github.com/onsi/ginkgo/v2 v2.22.2
github.com/onsi/gomega v1.36.2
github.com/pkg/errors v0.9.1
go.uber.org/zap v1.26.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/onsi/ginkgo/v2 v2.22.1 h1:QW7tbJAUDyVDVOM5dFa7qaybo+CRfR7bemlQUN6Z8aM=
github.com/onsi/ginkgo/v2 v2.22.1/go.mod h1:S6aTpoRsSq2cZOd+pssHAlKW/Q/jZt6cPrPlnj4a1xM=
github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU=
github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk=
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4=
Expand Down
139 changes: 139 additions & 0 deletions work-creator/pipeline/cmd/reader/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package main

import (
"context"
"fmt"
"log"
"os"
"path/filepath"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

v1alpha1 "github.com/syntasso/kratix/api/v1alpha1"

"sigs.k8s.io/yaml"
)

func main() {
if err := run(); err != nil {
log.Fatalf("Error: %v", err)
}
}

func run() error {
// Parse environment variables
objectGroup := os.Getenv("OBJECT_GROUP")
objectName := os.Getenv("OBJECT_NAME")
objectNamespace := os.Getenv("OBJECT_NAMESPACE")
objectVersion := os.Getenv("OBJECT_VERSION")
crdPlural := os.Getenv("CRD_PLURAL")
healthcheck := os.Getenv("HEALTHCHECK")
outputDir := os.Getenv("OUTPUT_DIR")
clusterScoped := os.Getenv("CLUSTER_SCOPED") == "true"
if clusterScoped {
objectNamespace = "" // promises are cluster scoped
}
if outputDir == "" {
outputDir = "/kratix/input"
}

dynamicClient, err := getK8sClient()
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %v", err)
}

if err := writeObjectToFile(dynamicClient, outputDir, crdPlural, objectGroup, objectVersion, objectName, objectNamespace); err != nil {
return err
}

if healthcheck == "true" {
promiseName := os.Getenv("PROMISE_NAME")
if err := writePromiseToFile(dynamicClient, outputDir, promiseName); err != nil {
return err
}
}

return nil
}

func writeObjectToFile(client dynamic.Interface, outputDir, plural, group, version, name, namespace string) error {
// Create GVR for the object
gvr := schema.GroupVersionResource{
Group: group,
Version: version,
Resource: plural,
}

// Get the object
dynamicResource := client.Resource(gvr).Namespace(namespace)
obj, err := dynamicResource.Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get object: %v", err)
}

objYAML, err := yaml.Marshal(obj)
if err != nil {
return fmt.Errorf("failed to marshal object: %v", err)
}

objectFilePath := filepath.Join(outputDir, "object.yaml")
if err := os.WriteFile(objectFilePath, objYAML, 0644); err != nil {
return fmt.Errorf("failed to write object to file: %v", err)
}

log.Printf("Object written to %s. Head is:\n%s", objectFilePath, string(objYAML[:min(len(objYAML), 500)]))
return nil
}

func writePromiseToFile(client dynamic.Interface, outputDir, promiseName string) error {
gvr := schema.GroupVersionResource{
Group: v1alpha1.GroupVersion.Group,
Version: v1alpha1.GroupVersion.Version,
Resource: "promises",
}

dynamicResource := client.Resource(gvr)
obj, err := dynamicResource.Get(context.TODO(), promiseName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get Promise: %v", err)
}

promiseYAML, err := yaml.Marshal(obj)
if err != nil {
return fmt.Errorf("failed to marshal Promise: %v", err)
}

promiseFilePath := filepath.Join(outputDir, "promise.yaml")
if err := os.WriteFile(promiseFilePath, promiseYAML, 0644); err != nil {
return fmt.Errorf("failed to write Promise to file: %v", err)
}

log.Printf("Promise written to %s. Head is:\n%s", promiseFilePath, string(promiseYAML[:min(len(promiseYAML), 500)]))
return nil
}

func getK8sClient() (dynamic.Interface, error) {
// Try to load in-cluster config first
config, err := rest.InClusterConfig()
if err != nil {
// Fall back to kubeconfig
kubeconfig := filepath.Join(os.Getenv("HOME"), ".kube", "config")
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
}

return dynamic.NewForConfig(config)
}

func min(a, b int) int {
if a < b {
return a
}
return b
}
14 changes: 0 additions & 14 deletions work-creator/scripts/reader

This file was deleted.

0 comments on commit 585b3ac

Please sign in to comment.