Skip to content

Commit

Permalink
Apply default memory and disk to Tasks
Browse files Browse the repository at this point in the history
Issue: #1197
Co-authored-by: Georgi Sabev <[email protected]>
  • Loading branch information
gcapizzi and georgethebeatle committed Jun 14, 2022
1 parent ff28d30 commit 1e85e1d
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 32 deletions.
4 changes: 4 additions & 0 deletions api/handlers/task_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ var _ = Describe("TaskHandler", func() {
AppGUID: "the-app-guid",
SequenceID: 123456,
CreationTimestamp: time.Date(2022, 6, 14, 13, 22, 34, 0, time.UTC),
MemoryMB: 256,
DiskMB: 128,
}, nil)
})

Expand All @@ -78,6 +80,8 @@ var _ = Describe("TaskHandler", func() {
"sequence_id": 123456,
"created_at": "2022-06-14T13:22:34Z",
"updated_at": "2022-06-14T13:22:34Z",
"memory_in_mb": 256,
"disk_in_mb": 128,
"relationships": {
"app": {
"data": {
Expand Down
4 changes: 4 additions & 0 deletions api/presenter/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type TaskResponse struct {
SequenceID int64 `json:"sequence_id"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
MemoryMB int64 `json:"memory_in_mb"`
DiskMB int64 `json:"disk_in_mb"`
}

type TaskLinks struct {
Expand All @@ -36,6 +38,8 @@ func ForTask(responseTask repositories.TaskRecord, baseURL url.URL) TaskResponse
SequenceID: responseTask.SequenceID,
CreatedAt: creationTimestamp,
UpdatedAt: creationTimestamp,
MemoryMB: responseTask.MemoryMB,
DiskMB: responseTask.DiskMB,
Relationships: Relationships{
"app": Relationship{
Data: &RelationshipData{
Expand Down
27 changes: 20 additions & 7 deletions api/repositories/task_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type TaskRecord struct {
AppGUID string
SequenceID int64
CreationTimestamp time.Time
MemoryMB int64
DiskMB int64
}

type CreateTaskMessage struct {
Expand Down Expand Up @@ -77,9 +79,13 @@ func (r *TaskRepo) CreateTask(ctx context.Context, authInfo authorization.Info,
return TaskRecord{}, apierrors.FromK8sError(err, TaskResourceType)
}

task, err = r.awaitSequenceID(ctx, userClient, task)
task, err = r.waitForStatusCondition(ctx, userClient, task, func(updatedTask *korifiv1alpha1.CFTask) bool {
return updatedTask.Status.SequenceID != 0 &&
updatedTask.Status.MemoryMB != 0 &&
updatedTask.Status.DiskQuotaMB != 0
})
if err != nil {
return TaskRecord{}, fmt.Errorf("failed awaiting task being ready: %w", err)
return TaskRecord{}, fmt.Errorf("failed waiting for status to get populated: %w", err)
}

return TaskRecord{
Expand All @@ -89,10 +95,17 @@ func (r *TaskRepo) CreateTask(ctx context.Context, authInfo authorization.Info,
AppGUID: task.Spec.AppRef.Name,
SequenceID: task.Status.SequenceID,
CreationTimestamp: task.CreationTimestamp.Time,
MemoryMB: task.Status.MemoryMB,
DiskMB: task.Status.DiskQuotaMB,
}, nil
}

func (r *TaskRepo) awaitSequenceID(ctx context.Context, userClient client.WithWatch, task korifiv1alpha1.CFTask) (korifiv1alpha1.CFTask, error) {
func (r *TaskRepo) waitForStatusCondition(
ctx context.Context,
userClient client.WithWatch,
task korifiv1alpha1.CFTask,
condition func(*korifiv1alpha1.CFTask) bool,
) (korifiv1alpha1.CFTask, error) {
watch, err := userClient.Watch(ctx, &korifiv1alpha1.CFTaskList{}, client.InNamespace(task.Namespace), client.MatchingFields{"metadata.name": task.Name})
if err != nil {
return korifiv1alpha1.CFTask{}, apierrors.FromK8sError(err, TaskResourceType)
Expand All @@ -102,16 +115,16 @@ func (r *TaskRepo) awaitSequenceID(ctx context.Context, userClient client.WithWa
for {
select {
case e := <-watch.ResultChan():
task, ok := e.Object.(*korifiv1alpha1.CFTask)
updatedTask, ok := e.Object.(*korifiv1alpha1.CFTask)
if !ok {
continue
}

if task.Status.SequenceID != 0 {
return *task, nil
if condition(updatedTask) {
return *updatedTask, nil
}
case <-time.After(r.timeout):
return korifiv1alpha1.CFTask{}, fmt.Errorf("task did not become ready within timeout period %d ms", r.timeout.Milliseconds())
return korifiv1alpha1.CFTask{}, fmt.Errorf("task status did not get populated within timeout period %d ms", r.timeout.Milliseconds())
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions api/repositories/task_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var _ = Describe("TaskRepository", func() {
BeforeEach(func() {
dummyTaskController = func(cft *korifiv1alpha1.CFTask) error {
cft.Status.SequenceID = 6
cft.Status.MemoryMB = 256
cft.Status.DiskQuotaMB = 128
return k8sClient.Status().Update(ctx, cft)
}
createMessage = repositories.CreateTaskMessage{
Expand Down Expand Up @@ -94,6 +96,8 @@ var _ = Describe("TaskRepository", func() {
Expect(taskRecord.AppGUID).To(Equal(cfApp.Name))
Expect(taskRecord.SequenceID).NotTo(BeZero())
Expect(taskRecord.CreationTimestamp).To(BeTemporally("~", time.Now(), time.Second))
Expect(taskRecord.MemoryMB).To(BeNumerically("==", 256))
Expect(taskRecord.DiskMB).To(BeNumerically("==", 128))
})

When("the task never becomes ready", func() {
Expand Down
4 changes: 3 additions & 1 deletion controllers/api/v1alpha1/cftask_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ type CFTaskStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file

SequenceID int64 `json:"sequenceId"`
SequenceID int64 `json:"sequenceId"`
MemoryMB int64 `json:"memoryMB"`
DiskQuotaMB int64 `json:"diskQuotaMB"`
}

//+kubebuilder:object:root=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,18 @@ spec:
status:
description: CFTaskStatus defines the observed state of CFTask
properties:
diskQuotaMB:
format: int64
type: integer
memoryMB:
format: int64
type: integer
sequenceId:
format: int64
type: integer
required:
- diskQuotaMB
- memoryMB
- sequenceId
type: object
type: object
Expand Down
47 changes: 31 additions & 16 deletions controllers/controllers/workloads/cftask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"

korifiv1alpha1 "code.cloudfoundry.org/korifi/controllers/api/v1alpha1"
"code.cloudfoundry.org/korifi/controllers/config"
)

//counterfeiter:generate -o fake -fake-name SeqIdGenerator . SeqIdGenerator
Expand All @@ -41,20 +42,29 @@ type SeqIdGenerator interface {

// CFTaskReconciler reconciles a CFTask object
type CFTaskReconciler struct {
k8sClient client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
logger logr.Logger
seqIdGenerator SeqIdGenerator
k8sClient client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
logger logr.Logger
seqIdGenerator SeqIdGenerator
cfProcessDefaults config.CFProcessDefaults
}

func NewCFTaskReconciler(client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder, logger logr.Logger, seqIdGenerator SeqIdGenerator) *CFTaskReconciler {
func NewCFTaskReconciler(
client client.Client,
scheme *runtime.Scheme,
recorder record.EventRecorder,
logger logr.Logger,
seqIdGenerator SeqIdGenerator,
cfProcessDefaults config.CFProcessDefaults,
) *CFTaskReconciler {
return &CFTaskReconciler{
k8sClient: client,
scheme: scheme,
recorder: recorder,
logger: logger,
seqIdGenerator: seqIdGenerator,
k8sClient: client,
scheme: scheme,
recorder: recorder,
logger: logger,
seqIdGenerator: seqIdGenerator,
cfProcessDefaults: cfProcessDefaults,
}
}

Expand All @@ -77,7 +87,7 @@ func (r *CFTaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, err
}

err = r.ensureSequenceId(ctx, cfTask)
err = r.updateStatus(ctx, cfTask)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -155,9 +165,11 @@ func (r *CFTaskReconciler) createEiriniTask(ctx context.Context, cfTask *korifiv
},
},
Spec: eiriniv1.TaskSpec{
GUID: cfTask.Name,
Command: cfTask.Spec.Command,
Image: cfDroplet.Status.Droplet.Registry.Image,
GUID: cfTask.Name,
Command: cfTask.Spec.Command,
Image: cfDroplet.Status.Droplet.Registry.Image,
MemoryMB: r.cfProcessDefaults.MemoryMB,
DiskMB: r.cfProcessDefaults.DefaultDiskQuotaMB,
},
}

Expand All @@ -174,7 +186,7 @@ func (r *CFTaskReconciler) createEiriniTask(ctx context.Context, cfTask *korifiv
return nil
}

func (r *CFTaskReconciler) ensureSequenceId(ctx context.Context, cfTask *korifiv1alpha1.CFTask) error {
func (r *CFTaskReconciler) updateStatus(ctx context.Context, cfTask *korifiv1alpha1.CFTask) error {
if cfTask.Status.SequenceID == 0 {
cfTaskCopy := cfTask.DeepCopy()
var err error
Expand All @@ -184,6 +196,9 @@ func (r *CFTaskReconciler) ensureSequenceId(ctx context.Context, cfTask *korifiv
return err
}

cfTaskCopy.Status.MemoryMB = r.cfProcessDefaults.MemoryMB
cfTaskCopy.Status.DiskQuotaMB = r.cfProcessDefaults.DefaultDiskQuotaMB

err = r.k8sClient.Status().Patch(ctx, cfTaskCopy, client.MergeFrom(cfTask))
if err != nil {
r.logger.Info("error-updating-status", "error", err)
Expand Down
28 changes: 24 additions & 4 deletions controllers/controllers/workloads/cftask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

eiriniv1 "code.cloudfoundry.org/eirini-controller/pkg/apis/eirini/v1"
korifiv1alpha1 "code.cloudfoundry.org/korifi/controllers/api/v1alpha1"
"code.cloudfoundry.org/korifi/controllers/config"
"code.cloudfoundry.org/korifi/controllers/controllers/workloads"
workloadsfake "code.cloudfoundry.org/korifi/controllers/controllers/workloads/fake"
"code.cloudfoundry.org/korifi/controllers/fake"
Expand Down Expand Up @@ -40,7 +41,17 @@ var _ = Describe("CFTask Controller", func() {
seqIdGenerator = new(workloadsfake.SeqIdGenerator)
seqIdGenerator.GenerateReturns(314, nil)
logger := zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))
taskReconciler = *workloads.NewCFTaskReconciler(k8sClient, scheme.Scheme, eventRecorder, logger, seqIdGenerator)
taskReconciler = *workloads.NewCFTaskReconciler(
k8sClient,
scheme.Scheme,
eventRecorder,
logger,
seqIdGenerator,
config.CFProcessDefaults{
MemoryMB: 256,
DefaultDiskQuotaMB: 128,
},
)
})

Describe("task creation", func() {
Expand Down Expand Up @@ -125,7 +136,9 @@ var _ = Describe("CFTask Controller", func() {
Expect(eiriniTask.Namespace).To(Equal("the-task-namespace"))
Expect(eiriniTask.Labels).To(HaveKeyWithValue(korifiv1alpha1.CFTaskGUIDLabelKey, "the-task-guid"))
Expect(eiriniTask.Spec.Command).To(ConsistOf("echo", "hello"))
Expect(eiriniTask.Spec.Image).To(Equal("the-image"))
Expect(eiriniTask.Spec.Command).To(ConsistOf("echo", "hello"))
Expect(eiriniTask.Spec.MemoryMB).To(BeNumerically("==", 256))
Expect(eiriniTask.Spec.DiskMB).To(BeNumerically("==", 128))
})

It("emits a normal event for successful reconciliation", func() {
Expand All @@ -139,12 +152,19 @@ var _ = Describe("CFTask Controller", func() {
Expect(message).To(ContainSubstring("Created eirini task %s"))
})

It("initialises Status.SequenceID", func() {
It("populates the CFTask Status", func() {
Expect(statusWriter.PatchCallCount()).To(Equal(1))
_, object, patch, _ := statusWriter.PatchArgsForCall(0)
patchBytes, patchErr := patch.Data(object)
Expect(patchErr).NotTo(HaveOccurred())
Expect(string(patchBytes)).To(MatchJSON(`{"status":{"sequenceId":314}}`))
Expect(string(patchBytes)).To(MatchJSON(`
{
"status": {
"sequenceId": 314,
"memoryMB": 256,
"diskQuotaMB": 128
}
}`))
})

When("Status.SequenceID has been already set", func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,13 @@ var _ = Describe("CFTaskReconciler Integration Tests", func() {
Expect(k8sClient.Create(ctx, cfTask)).To(Succeed())
})

It("sets Status.SequenceID in the CFTask", func() {
It("populates the Status of the CFTask", func() {
Eventually(func(g Gomega) {
var task korifiv1alpha1.CFTask
g.Expect(k8sClient.Get(ctx, types.NamespacedName{Namespace: ns, Name: cfTask.Name}, &task)).To(Succeed())
g.Expect(task.Status.SequenceID).NotTo(BeZero())
g.Expect(task.Status.MemoryMB).To(Equal(cfProcessDefaults.MemoryMB))
g.Expect(task.Status.DiskQuotaMB).To(Equal(cfProcessDefaults.DefaultDiskQuotaMB))
}).Should(Succeed())
})

Expand Down Expand Up @@ -143,6 +145,8 @@ var _ = Describe("CFTaskReconciler Integration Tests", func() {
Expect(tasks.Items[0].Spec.GUID).To(Equal(cfTask.Name))
Expect(tasks.Items[0].Spec.Command).To(ConsistOf("echo", "hello"))
Expect(tasks.Items[0].Spec.Image).To(Equal("registry.io/my/image"))
Expect(tasks.Items[0].Spec.MemoryMB).To(Equal(cfProcessDefaults.MemoryMB))
Expect(tasks.Items[0].Spec.DiskMB).To(Equal(cfProcessDefaults.DefaultDiskQuotaMB))
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ import (
)

var (
cancel context.CancelFunc
testEnv *envtest.Environment
k8sClient client.Client
cancel context.CancelFunc
testEnv *envtest.Environment
k8sClient client.Client
cfProcessDefaults config.CFProcessDefaults
)

const (
Expand Down Expand Up @@ -149,12 +150,17 @@ var _ = BeforeSuite(func() {
).SetupWithManager(k8sManager)
Expect(err).NotTo(HaveOccurred())

cfProcessDefaults = config.CFProcessDefaults{
MemoryMB: 256,
DefaultDiskQuotaMB: 128,
}
err = NewCFTaskReconciler(
k8sManager.GetClient(),
k8sManager.GetScheme(),
k8sManager.GetEventRecorderFor("cftask-controller"),
ctrl.Log.WithName("controllers").WithName("CFSpace"),
NewSequenceId(clockwork.NewRealClock()),
cfProcessDefaults,
).SetupWithManager(k8sManager)
Expect(err).NotTo(HaveOccurred())

Expand Down
1 change: 1 addition & 0 deletions controllers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func main() {
mgr.GetEventRecorderFor("cftask-controller"),
ctrl.Log.WithName("controllers").WithName("CFTask"),
workloadscontrollers.NewSequenceId(clockwork.NewRealClock()),
controllerConfig.CFProcessDefaults,
).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CFTask")
os.Exit(1)
Expand Down

0 comments on commit 1e85e1d

Please sign in to comment.