Skip to content

Commit

Permalink
feat: compress work/workplacements documents (#243)
Browse files Browse the repository at this point in the history
* wip: start compresing work.spec.content before creation in k8s

Signed-off-by: Shane Dowling <[email protected]>

* wip: pull out gzip functions into utils package

* fix: base64 encode compressed content and decompress before writing

* gzip content is binary data which cannot be used in k8s specs
* pull compression functions out to the kratix lib
* begin updating controller tests

Signed-off-by: Abby Bangser <[email protected]>

* fix: update scheduler tests to use compressed content

* whilst the Scheduler tests do not fail if the content is not
  compressed, the test have been updated to document that the content at
  the point at which the Work is being handles by the scheduler would be
  compressed. As the Schedule does not call the Works underlying
  Reconcile function or attempt to query of modify the content, the form
  of the content does not impact the functionalty of the Scheduler

* fix: update WorkplacementReconciler test to reflect where the content would be decompressed

* chore: update build-and-load-worker-creator make target to build-and-load-work-creator

* chore: remove unused ReconcileAllDependencyWorks function

* switch to ptr package in place of deprecated pointer package

* chore: ensure compression library is imported correctly

Signed-off-by: Chunyi Lyu <[email protected]>

* chore: commit compression library and remove go.mod replace

Signed-off-by: Chunyi Lyu <[email protected]>

* chore: update referenced to build-and-load-worker-creator make target

Signed-off-by: Chunyi Lyu <[email protected]>

* chore: use pointer library for Bool function

Signed-off-by: Chunyi Lyu <[email protected]>

* chore: refactor a very long func name

- requestReconcilationOfAllWorksOnDestinationCreateOrUpdate()

Co-authored-by: Sapphire Mason-Brown <[email protected]>

* chore: add crd description for workload context

Co-authored-by: Sapphire Mason-Brown <[email protected]>

---------

Signed-off-by: Shane Dowling <[email protected]>
Signed-off-by: Abby Bangser <[email protected]>
Signed-off-by: Chunyi Lyu <[email protected]>
Co-authored-by: Sapphire Mason-Brown <[email protected]>
  • Loading branch information
ChunyiLyu and SaphMB authored Sep 26, 2024
1 parent 9b0fd77 commit 0314320
Show file tree
Hide file tree
Showing 18 changed files with 265 additions and 142 deletions.
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ jobs:
kind create cluster --image kindest/node:v1.27.3 --name platform --config hack/platform/kind-platform-config.yaml
make install-cert-manager
make build-and-load-kratix
make build-and-load-worker-creator
make build-and-load-work-creator
helm install kratix charts/kratix/ -f hack/platform/helm-values-gitea.yaml
source ./scripts/utils.sh
Expand Down Expand Up @@ -269,7 +269,7 @@ jobs:
kind create cluster --image kindest/node:v1.27.3 --name platform --config hack/platform/kind-platform-config.yaml
make install-cert-manager
make build-and-load-kratix
make build-and-load-worker-creator
make build-and-load-work-creator
helm install kratix charts/kratix/ -f hack/platform/helm-values-minio.yaml
kubectl --context kind-platform apply --filename "hack/platform/minio-install.yaml"
kubectl --context kind-platform wait --for=condition=Ready --timeout=300s -n kratix-platform-system pod -l run=minio
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ kind-load-image: docker-build ## Load locally built image into KinD
build-and-load-kratix: kind-load-image ## Build kratix container image and reloads
kubectl rollout restart deployment -n kratix-platform-system -l control-plane=controller-manager

build-and-load-worker-creator: ## Build worker-creator container image and reloads
build-and-load-work-creator: ## Build work-creator container image and reloads
WC_IMG_VERSION=${WC_IMG_VERSION} WC_IMG_MIRROR=${WC_IMG_MIRROR} make -C work-creator kind-load-image

##@ Build
Expand Down
15 changes: 13 additions & 2 deletions api/v1alpha1/work_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/syntasso/kratix/lib/hash"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/syntasso/kratix/lib/compression"
)

const (
Expand Down Expand Up @@ -83,13 +85,18 @@ func NewPromiseDependenciesWork(promise *Promise, name string) (*Work, error) {
return nil, err
}

workContent, err := compression.CompressContent(yamlBytes)
if err != nil {
return nil, err
}

work.Spec.WorkloadGroups = []WorkloadGroup{
{
ID: hash.ComputeHash(DefaultWorkloadGroupDirectory),
Directory: DefaultWorkloadGroupDirectory,
Workloads: []Workload{
{
Content: string(yamlBytes),
Content: string(workContent),
Filepath: fmt.Sprintf("static/%s-dependencies.yaml", promise.GetName()),
},
},
Expand Down Expand Up @@ -120,6 +127,9 @@ func (w *Work) IsDependency() bool {
// be scheduled to a Destination
type WorkloadGroup struct {
// +optional
// List of Workloads scheduled to target Destination;
// Each Workload details name of the filepath on Destination,
// and the compressed content of the workload.
Workloads []Workload `json:"workloads,omitempty"`
Directory string `json:"directory,omitempty"`
ID string `json:"id,omitempty"`
Expand All @@ -135,7 +145,8 @@ type WorkloadGroupScheduling struct {
type Workload struct {
// +optional
Filepath string `json:"filepath,omitempty"`
Content string `json:"content,omitempty"`
// Content of the workload, which is base64 encoded and compressed with gzip.
Content string `json:"content,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
9 changes: 6 additions & 3 deletions api/v1alpha1/workplacement_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ import (

// WorkPlacementSpec defines the desired state of WorkPlacement
type WorkPlacementSpec struct {
TargetDestinationName string `json:"targetDestinationName,omitempty"`
Workloads []Workload `json:"workloads,omitempty"`
PromiseName string `json:"promiseName,omitempty"`
TargetDestinationName string `json:"targetDestinationName,omitempty"`
// List of Workloads scheduled to target Destination;
// Each Workload details name of the filepath on Destination,
// and the compressed content of the workload.
Workloads []Workload `json:"workloads,omitempty"`
PromiseName string `json:"promiseName,omitempty"`
// +optional
ResourceName string `json:"resourceName,omitempty"`
ID string `json:"id,omitempty"`
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/platform.kratix.io_workplacements.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,17 @@ spec:
targetDestinationName:
type: string
workloads:
description: |-
List of Workloads scheduled to target Destination;
Each Workload details name of the filepath on Destination,
and the compressed content of the workload.
items:
description: Workload represents the manifest workload to be deployed
on destination
properties:
content:
description: Content of the workload, which is base64 encoded
and compressed with gzip.
type: string
filepath:
type: string
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/platform.kratix.io_works.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,17 @@ spec:
id:
type: string
workloads:
description: |-
List of Workloads scheduled to target Destination;
Each Workload details name of the filepath on Destination,
and the compressed content of the workload.
items:
description: Workload represents the manifest workload to
be deployed on destination
properties:
content:
description: Content of the workload, which is base64
encoded and compressed with gzip.
type: string
filepath:
type: string
Expand Down
16 changes: 12 additions & 4 deletions controllers/promise_controller_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controllers_test

import (
"bytes"
"context"
"encoding/json"
"os"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/syntasso/kratix/api/v1alpha1"
"github.com/syntasso/kratix/controllers"
"github.com/syntasso/kratix/controllers/controllersfakes"
"github.com/syntasso/kratix/lib/compression"
"github.com/syntasso/kratix/lib/workflow"
)

Expand Down Expand Up @@ -178,8 +180,8 @@ var _ = Describe("PromiseController", func() {

By("creating a Work resource for the dependencies", func() {
work := getWork("kratix-platform-system", promise.GetName(), "", "")
Expect(work.Spec.WorkloadGroups[0].Workloads[0].Content).To(ContainSubstring("kind: Deployment"))
Expect(work.Spec.WorkloadGroups[0].Workloads[0].Content).To(ContainSubstring("kind: ClusterRoleBinding"))
Expect(inCompressedContents(work.Spec.WorkloadGroups[0].Workloads[0].Content, []byte("kind: Deployment"))).To(BeTrue())
Expect(inCompressedContents(work.Spec.WorkloadGroups[0].Workloads[0].Content, []byte("kind: ClusterRoleBinding"))).To(BeTrue())

})

Expand Down Expand Up @@ -504,7 +506,7 @@ var _ = Describe("PromiseController", func() {
Expect(fakeK8sClient.List(ctx, works)).To(Succeed())
Expect(works.Items).To(HaveLen(1))
Expect(works.Items[0].Spec.WorkloadGroups[0].Workloads).To(HaveLen(1))
Expect(works.Items[0].Spec.WorkloadGroups[0].Workloads[0].Content).To(ContainSubstring("redisoperator"))
Expect(inCompressedContents(works.Items[0].Spec.WorkloadGroups[0].Workloads[0].Content, []byte("redisoperator"))).To(BeTrue())
Expect(works.Items[0].Spec.WorkloadGroups[0].DestinationSelectors).To(HaveLen(1))
Expect(works.Items[0].Spec.WorkloadGroups[0].DestinationSelectors[0]).To(Equal(
v1alpha1.WorkloadGroupScheduling{
Expand Down Expand Up @@ -771,7 +773,7 @@ var _ = Describe("PromiseController", func() {
Expect(fakeK8sClient.List(ctx, works)).To(Succeed())
Expect(works.Items).To(HaveLen(1))
Expect(works.Items[0].Spec.WorkloadGroups[0].Workloads).To(HaveLen(1))
Expect(works.Items[0].Spec.WorkloadGroups[0].Workloads[0].Content).To(ContainSubstring("postgresoperator"))
Expect(inCompressedContents(works.Items[0].Spec.WorkloadGroups[0].Workloads[0].Content, []byte("postgresoperator"))).To(BeTrue())
Expect(works.Items[0].Spec.WorkloadGroups[0].DestinationSelectors).To(HaveLen(1))
Expect(works.Items[0].Spec.WorkloadGroups[0].DestinationSelectors[0]).To(Equal(
v1alpha1.WorkloadGroupScheduling{
Expand Down Expand Up @@ -935,3 +937,9 @@ func createPromise(promisePath string) *v1alpha1.Promise {
Expect(fakeK8sClient.Get(ctx, promiseName, promise)).To(Succeed())
return promise
}

func inCompressedContents(compressedContent string, content []byte) bool {
decompressedContent, err := compression.DecompressContent([]byte(compressedContent))
Expect(err).ToNot(HaveOccurred())
return bytes.Contains(decompressedContent, content)
}
21 changes: 0 additions & 21 deletions controllers/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,6 @@ type Scheduler struct {
Log logr.Logger
}

// Reconciles all Works by scheduling each Work's WorkloadGroups to appropriate
// Destinations.
// Only reconciles Works that are from a Promise Dependency.
func (s *Scheduler) ReconcileAllDependencyWorks() error {
works := v1alpha1.WorkList{}
lo := &client.ListOptions{}
if err := s.Client.List(context.Background(), &works, lo); err != nil {
return err
}

for _, work := range works.Items {
if work.IsDependency() {
if _, err := s.ReconcileWork(&work); err != nil {
s.Log.Error(err, "Failed reconciling Work: ")
}
}
}

return nil
}

// Reconciles all WorkloadGroups in a Work by scheduling them to Destinations via
// Workplacements.
func (s *Scheduler) ReconcileWork(work *v1alpha1.Work) ([]string, error) {
Expand Down
85 changes: 19 additions & 66 deletions controllers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
. "github.com/onsi/gomega"
"github.com/syntasso/kratix/api/v1alpha1"
. "github.com/syntasso/kratix/api/v1alpha1"
"github.com/syntasso/kratix/lib/compression"
"github.com/syntasso/kratix/lib/hash"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -25,6 +25,7 @@ var _ = Describe("Controllers/Scheduler", func() {
var devDestination, devDestination2, pciDestination, prodDestination, strictDestination Destination
var workPlacements WorkPlacementList
var scheduler *Scheduler
var fakeCompressedContent []byte

BeforeEach(func() {
// create a set of destinations to be used throughout the tests
Expand All @@ -41,71 +42,16 @@ var _ = Describe("Controllers/Scheduler", func() {
Expect(fakeK8sClient.Create(context.Background(), &prodDestination)).To(Succeed())
Expect(fakeK8sClient.Create(context.Background(), &strictDestination)).To(Succeed())

var err error
fakeCompressedContent, err = compression.CompressContent([]byte(string("fake: content")))
Expect(err).ToNot(HaveOccurred())

scheduler = &Scheduler{
Client: fakeK8sClient,
Log: ctrl.Log.WithName("controllers").WithName("Scheduler"),
}
})

Describe("#ReconcileAllDependencyWorks", func() {
var devDestination3 Destination
var dependencyWorkForDev Work

BeforeEach(func() {
// dependency Work which is scheduled to all Destinations
newWork("work-name", false)

// dependency Work scheduled only to devDestination
dependencyWorkForDev = newWork("dev-work-name", false, schedulingFor(devDestination))

// dependency Work scheduled only to prodDestination
newWork("prod-work-name", false, schedulingFor(prodDestination))

// resource Work scheduled only to devDestination
newWork("rr-work-name", true, schedulingFor(devDestination))
})

When("a new Destination is added which matches a Work's scheduling", func() {
BeforeEach(func() {
// register new destination which matches both the dependency and resource schedulings
devDestination3 = newDestination("dev-3", schedulingFor(devDestination).MatchLabels)
Expect(fakeK8sClient.Create(context.Background(), &devDestination3)).To(Succeed())

scheduler.ReconcileAllDependencyWorks()
})

It("schedules dependency Works with matching labels to the new Destination", func() {
// get all the WorkPlacements for the dependency Work
lo := &client.ListOptions{}
selector, err := labels.Parse(labels.FormatLabels(map[string]string{"kratix.io/work": "dev-work-name"}))
Expect(err).NotTo(HaveOccurred())
lo.LabelSelector = selector
Expect(fakeK8sClient.List(context.Background(), &workPlacements, lo)).To(Succeed())

// check that the dependency work is scheduled to all destinations, including the new one (dev-3)
Expect(workPlacements.Items).To(HaveLen(3))

Expect(workPlacements.Items[0].Name).To(HavePrefix("dev-work-name.dev-1"))
Expect(workPlacements.Items[0].Namespace).To(Equal(SystemNamespace))
Expect(workPlacements.Items[0].Spec.TargetDestinationName).To(Equal(devDestination.Name))
Expect(workPlacements.Items[0].Spec.Workloads).To(Equal(dependencyWorkForDev.Spec.WorkloadGroups[0].Workloads))
Expect(workPlacements.Items[0].Spec.ID).To(Equal(dependencyWorkForDev.Spec.WorkloadGroups[0].ID))

Expect(workPlacements.Items[1].Name).To(HavePrefix("dev-work-name.dev-2"))
Expect(workPlacements.Items[1].Namespace).To(Equal(SystemNamespace))
Expect(workPlacements.Items[1].Spec.TargetDestinationName).To(Equal(devDestination2.Name))
Expect(workPlacements.Items[1].Spec.Workloads).To(Equal(dependencyWorkForDev.Spec.WorkloadGroups[0].Workloads))
Expect(workPlacements.Items[1].Spec.ID).To(Equal(dependencyWorkForDev.Spec.WorkloadGroups[0].ID))

Expect(workPlacements.Items[2].Name).To(HavePrefix("dev-work-name.dev-3"))
Expect(workPlacements.Items[2].Namespace).To(Equal(SystemNamespace))
Expect(workPlacements.Items[2].Spec.TargetDestinationName).To(Equal(devDestination3.Name))
Expect(workPlacements.Items[2].Spec.Workloads).To(Equal(dependencyWorkForDev.Spec.WorkloadGroups[0].Workloads))
Expect(workPlacements.Items[2].Spec.ID).To(Equal(dependencyWorkForDev.Spec.WorkloadGroups[0].ID))
})
})
})

Describe("#ReconcileWork", func() {
Describe("Scheduling Resources", func() {
var resourceWork, resourceWorkWithMultipleGroups Work
Expand Down Expand Up @@ -209,11 +155,12 @@ var _ = Describe("Controllers/Scheduler", func() {
Expect(workPlacement.Spec.Workloads).To(HaveLen(1))

previousResourceVersion, err := strconv.Atoi(workPlacement.ResourceVersion)
Expect(err).ToNot(HaveOccurred())

// update the Work's WorkloadGroup with an extra Workload
Expect(fakeK8sClient.Get(context.Background(), client.ObjectKeyFromObject(&resourceWork), &resourceWork))
resourceWork.Spec.WorkloadGroups[0].Workloads = append(resourceWork.Spec.WorkloadGroups[0].Workloads, Workload{
Content: "fake: content",
Content: string(fakeCompressedContent),
})

_, err = scheduler.ReconcileWork(&resourceWork)
Expand All @@ -226,10 +173,11 @@ var _ = Describe("Controllers/Scheduler", func() {
workPlacement = workPlacements.Items[0]
Expect(workPlacement.Spec.Workloads).To(HaveLen(2))
Expect(workPlacement.Spec.Workloads).To(ContainElement(Workload{
Content: "fake: content",
Content: string(fakeCompressedContent),
}))

newResourceVersion, err := strconv.Atoi(workPlacement.ResourceVersion)
Expect(err).ToNot(HaveOccurred())
Expect(newResourceVersion).To(BeNumerically(">", previousResourceVersion))
})
})
Expand Down Expand Up @@ -281,7 +229,7 @@ var _ = Describe("Controllers/Scheduler", func() {
ID: hash.ComputeHash("foo"),
Workloads: []Workload{
{
Content: "fake: content",
Content: string(fakeCompressedContent),
},
},
DestinationSelectors: []WorkloadGroupScheduling{schedulingFor(devDestination)},
Expand Down Expand Up @@ -332,7 +280,7 @@ var _ = Describe("Controllers/Scheduler", func() {
ID: hash.ComputeHash("foo"),
Workloads: []Workload{
{
Content: "fake: content",
Content: string(fakeCompressedContent),
},
},
},
Expand Down Expand Up @@ -941,6 +889,11 @@ func newWorkWithTwoWorkloadGroups(name string, isResource bool, promiseSchedulin
namespace = SystemNamespace
}

newFakeCompressedContent, err := compression.CompressContent([]byte(string("key: value")))
Expect(err).ToNot(HaveOccurred())
additionalFakeCompressedContent, err := compression.CompressContent([]byte(string("foo: bar")))
Expect(err).ToNot(HaveOccurred())

w := &Work{
ObjectMeta: v1.ObjectMeta{
Name: name,
Expand All @@ -951,15 +904,15 @@ func newWorkWithTwoWorkloadGroups(name string, isResource bool, promiseSchedulin
WorkloadGroups: []WorkloadGroup{
{
Workloads: []Workload{
{Content: "key: value"},
{Content: string(newFakeCompressedContent)},
},
Directory: ".",
ID: hash.ComputeHash("."),
DestinationSelectors: []WorkloadGroupScheduling{promiseScheduling},
},
{
Workloads: []Workload{
{Content: "foo: bar"},
{Content: string(additionalFakeCompressedContent)},
},
DestinationSelectors: []WorkloadGroupScheduling{directoryOverrideScheduling},
Directory: "foo",
Expand Down
4 changes: 2 additions & 2 deletions controllers/work_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,12 @@ func (r *WorkReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&v1alpha1.WorkPlacement{}).
Watches(
&v1alpha1.Destination{},
handler.EnqueueRequestsFromMapFunc(r.requestReconilationOfAllWorksOnDestinationCreateOrUpdate),
handler.EnqueueRequestsFromMapFunc(r.requestReconcilationOfWorksOnDestination),
).
Complete(r)
}

func (r *WorkReconciler) requestReconilationOfAllWorksOnDestinationCreateOrUpdate(ctx context.Context, obj client.Object) []reconcile.Request {
func (r *WorkReconciler) requestReconcilationOfWorksOnDestination(ctx context.Context, obj client.Object) []reconcile.Request {
dest := obj.(*v1alpha1.Destination)

if dest.GetDeletionTimestamp() != nil {
Expand Down
Loading

0 comments on commit 0314320

Please sign in to comment.