Skip to content

Commit

Permalink
feat: better updates for DataflowFlexTemplateJob in direct controller
Browse files Browse the repository at this point in the history
  • Loading branch information
justinsb committed Sep 11, 2024
1 parent 2c15230 commit 8d1440b
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 634 deletions.
242 changes: 201 additions & 41 deletions pkg/controller/direct/dataflow/dataflowflextemplatejob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ import (
pb "cloud.google.com/go/dataflow/apiv1beta3/dataflowpb"
krm "github.com/GoogleCloudPlatform/k8s-config-connector/apis/dataflow/v1beta1"
refs "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/apis/k8s/v1alpha1"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/config"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/directbase"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/monitoring"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/registry"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
"google.golang.org/protobuf/proto"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -330,30 +333,75 @@ func (a *dataflowFlexTemplateJobAdapter) Create(ctx context.Context, createOp *d
}

job := response.GetJob()
jobID := job.GetId()

if err := a.updateStatus(ctx, createOp, job); err != nil {
return err
}

return nil

// ready := false
// for !ready {
// time.Sleep(2 * time.Second)
// latest, err := a.getJob(ctx, jobID)
// if err != nil {
// // TODO: not right!
// return fmt.Errorf("getting state of job")
// }
// switch latest.CurrentState {
// case pb.JobState_JOB_STATE_RUNNING:
// ready = true
// default:
// log.Info("unexpected job state waiting for job running", "state", latest.CurrentState)
// }
// }

// return setStatus(u, status)
}

func (a *dataflowFlexTemplateJobAdapter) updateStatus(ctx context.Context, op directbase.Operation, job *pb.Job) error {
// TODO: Use jobCreateTime for mutation checking?
// jobCreateTime := job.CreateTime()

ready := false
for !ready {
time.Sleep(2 * time.Second)
latest, err := a.getJob(ctx, jobID)
if err != nil {
// TODO: not right!
return fmt.Errorf("getting state of job")
status := &krm.DataflowFlexTemplateJobStatus{
JobID: job.GetId(),
CurrentState: direct.PtrTo(job.CurrentState.String()),
Type: direct.PtrTo(job.Type.String()),
}

var readyCondition *v1alpha1.Condition

switch job.CurrentState {
case pb.JobState_JOB_STATE_RUNNING:
readyCondition = &v1alpha1.Condition{
Type: v1alpha1.ReadyConditionType,
Status: v1.ConditionTrue,
Reason: k8s.UpToDate,
Message: "The resource is up to date",
}
switch latest.CurrentState {
case pb.JobState_JOB_STATE_RUNNING:
ready = true
default:
log.Info("unexpected job state waiting for job running", "state", latest.CurrentState)

case pb.JobState_JOB_STATE_FAILED:
readyCondition = &v1alpha1.Condition{
Type: v1alpha1.ReadyConditionType,
Status: v1.ConditionFalse,
Reason: k8s.UpToDate,
Message: "Dataflow job is in FAILED state",
}

default:
readyCondition = &v1alpha1.Condition{
Type: v1alpha1.ReadyConditionType,
Status: v1.ConditionFalse,
Reason: k8s.Updating,
Message: fmt.Sprintf("Waiting for Dataflow job to be running (state is %v)", job.CurrentState),
}
}

status := &krm.DataflowFlexTemplateJobStatus{
JobID: jobID,
if err := op.UpdateStatus(ctx, status, readyCondition); err != nil {
return fmt.Errorf("updating status: %w", err)
}
return setStatus(u, status)

return nil
}

// Update implements the Adapter interface.
Expand All @@ -363,7 +411,31 @@ func (a *dataflowFlexTemplateJobAdapter) Update(ctx context.Context, updateOp *d
log := klog.FromContext(ctx)
log.V(0).Info("updating object", "u", u)

if true {
observedGeneration, _, err := unstructured.NestedInt64(u.Object, "status", "observedGeneration")
if err != nil {
return fmt.Errorf("reading status.observedGeneration: %w", err)
}
metadataGeneration := u.GetGeneration()

// We rely on the FlexJob being immutable, so drift at the GCP level should not be possible.
// Instead, we reconcile whenever we see a different spec.generation
if observedGeneration == metadataGeneration {
log.V(0).Info("object status.observedGeneration matches metadata.generation, skipping reconcile", "status.observedGeneration", observedGeneration, "metadata.generation", metadataGeneration)

j, _ := u.MarshalJSON()
log.V(0).Info("object is", "json", string(j))

// If we are waiting on the existing job, update the status
if a.actual != nil {
if err := a.updateStatus(ctx, updateOp, a.actual); err != nil {
return err
}

// TODO: If job fails to update, we probably need to "put back" the old job id
// But we also want to avoid repeatedly launching a failing job...
// The current behaviour will only try to launch once, which seems reasonable.
}

return nil
}

Expand All @@ -383,35 +455,123 @@ func (a *dataflowFlexTemplateJobAdapter) Update(ctx context.Context, updateOp *d
}

job := response.GetJob()
jobID := job.GetId()

// TODO: Use jobCreateTime for mutation checking?
// jobCreateTime := job.CreateTime()

status := &krm.DataflowFlexTemplateJobStatus{
JobID: jobID,
if err := a.updateStatus(ctx, updateOp, job); err != nil {
return err
}
return setStatus(u, status)

return nil
}

func (a *dataflowFlexTemplateJobAdapter) fullyQualifiedName() string {
return fmt.Sprintf("projects/%s/locations/%s/clusters/%s", a.projectID, a.location, a.resourceID)
}

func setStatus(u *unstructured.Unstructured, typedStatus any) error {
status, err := runtime.DefaultUnstructuredConverter.ToUnstructured(typedStatus)
if err != nil {
return fmt.Errorf("error converting status to unstructured: %w", err)
}

old, _, _ := unstructured.NestedMap(u.Object, "status")
if old != nil {
status["conditions"] = old["conditions"]
status["observedGeneration"] = old["observedGeneration"]
status["externalRef"] = old["externalRef"]
}

u.Object["status"] = status

return nil
}
// func setStatus(u *unstructured.Unstructured, typedStatus any) error {
// status, err := runtime.DefaultUnstructuredConverter.ToUnstructured(typedStatus)
// if err != nil {
// return fmt.Errorf("error converting status to unstructured: %w", err)
// }

// old, _, _ := unstructured.NestedMap(u.Object, "status")
// if old != nil {
// status["conditions"] = old["conditions"]
// status["observedGeneration"] = old["observedGeneration"]
// status["externalRef"] = old["externalRef"]
// }

// u.Object["status"] = status

// return nil
// }

// // ShouldReconcileBasedOnEtag checks if we should reconcile based on the GCP etag matching the KRM etag.
// // If the etag in KRM status is the same as the GCP etag, we consider the GCP object not to have changed.
// // We also consider the object to have changes if the KRM object generation != observedGeneration (spec changes),
// // and we also reconcile again if the object is not healthy (based on status.conditions).
// //
// // A few problems with the approach:
// // * We miss changes due to labels or annotations.
// // * If there's a change in the GCP object that isn't reflected in etag, we miss that (seems unlikely)
// // * Because we set spec.resourceID, we do an extra reconciliation after first creation (because we bump generation).
// func getObservedGeneration(u *unstructured.Unstructured) bool {
// obj := &objectWithEtag{}
// observedGeneration, _, _ := unstructured.NestedInt64(u.Object, "status", "observedGeneration")
// if observedGeneration != 0 {
// obj.Status.ObservedGeneration = &observedGeneration
// }

// )
// if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &obj); err != nil {
// return nil, fmt.Errorf("")
// }

// if u.GetGeneration() != direct.ValueOf(obj.Status.ObservedGeneration) {
// log.V(2).Info("generation does not match", "generation", u.GetGeneration(), "observedGeneration", direct.ValueOf(obj.Status.ObservedGeneration))
// return true
// }

// // if gcpEtag == "" {
// // log.V(2).Info("etag not set in GCP")
// // return true
// // }

// // objectEtag := direct.ValueOf(obj.Status.ObservedState.Etag)
// // if objectEtag == "" {
// // objectEtag = direct.ValueOf(obj.Status.Etag)
// // }

// // if objectEtag == "" {
// // log.V(2).Info("etag not set in KRM object")
// // return true
// // }

// // if gcpEtag != objectEtag {
// // log.V(2).Info("object status etag does not match gcp updateTime", "objectEtag", objectEtag, "gcpEtag", gcpEtag)
// // return true
// // }

// if obj.Status.Conditions != nil {
// // if there was a previously failing update let's make sure we give
// // the update a chance to heal or keep marking it as failed

// ready := false
// for _, condition := range obj.Status.Conditions {
// if condition.Type == v1alpha1.ReadyConditionType {
// if condition.Status == corev1.ConditionTrue {
// ready = true
// }
// }
// }

// if !ready {
// log.V(2).Info("status.conditions indicates object is not ready yet")
// return true
// }
// }

// log.V(2).Info("object etag matches gcp etag", "objectEtag", objectEtag, "gcpEtag", gcpEtag)
// return false
// }

// // objectWithEtag holds the fields that are relevant to an etag-based change detection.
// type objectWithEtag struct {
// Status objectWithEtagtatus `json:"status"`
// }

// type objectWithEtagtatus struct {
// Conditions []v1alpha1.Condition `json:"conditions,omitempty"`

// // Used if status.observedState.etag is not set
// Etag *string `json:"etag,omitempty"`

// // Compared to the object's generation to detect spec changes
// ObservedGeneration *int64 `json:"observedGeneration,omitempty"`

// ObservedState objectWithEtagObservedState `json:"observedState,omitempty"`
// }

// type objectWithEtagObservedState struct {
// // Checked before status.etag
// Etag *string `json:"etag,omitempty"`
// }
20 changes: 14 additions & 6 deletions pkg/controller/direct/directbase/directbase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (r *reconcileContext) doReconcile(ctx context.Context, u *unstructured.Unst
return true, nil
}
if !k8s.HasAbandonAnnotation(u) {
deleteOp := NewDeleteOperation(u)
deleteOp := NewDeleteOperation(r.Reconciler.Client, u)
if _, err := adapter.Delete(ctx, deleteOp); err != nil {
if !errors.Is(err, k8s.ErrIAMNotFound) && !k8s.IsReferenceNotFoundError(err) {
if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
Expand All @@ -291,29 +291,37 @@ func (r *reconcileContext) doReconcile(ctx context.Context, u *unstructured.Unst
// set the etag to an empty string, since IAMPolicy is the authoritative intent, KCC wants to overwrite the underlying policy regardless
//policy.Spec.Etag = ""

hasSetReadyCondition := false
shouldRequeue := false

if !existsAlready {
createOp := NewCreateOperation(u)
createOp := NewCreateOperation(r.Reconciler.Client, u)
if err := adapter.Create(ctx, createOp); err != nil {
if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(u))
return r.handleUnresolvableDeps(ctx, u, unwrappedErr)
}
return false, r.handleUpdateFailed(ctx, u, fmt.Errorf("error creating: %w", err))
}
hasSetReadyCondition = createOp.HasSetReadyCondition
shouldRequeue = createOp.ShouldRequeue
} else {
updateOp := NewUpdateOperation(u)
updateOp := NewUpdateOperation(r.Reconciler.Client, u)
if err := adapter.Update(ctx, updateOp); err != nil {
if unwrappedErr, ok := lifecyclehandler.CausedByUnresolvableDeps(err); ok {
logger.Info(unwrappedErr.Error(), "resource", k8s.GetNamespacedName(u))
return r.handleUnresolvableDeps(ctx, u, unwrappedErr)
}
return false, r.handleUpdateFailed(ctx, u, fmt.Errorf("error updating: %w", err))
}
hasSetReadyCondition = updateOp.HasSetReadyCondition
shouldRequeue = updateOp.ShouldRequeue
}
if isAPIServerUpdateRequired(u) {
return false, r.handleUpToDate(ctx, u)

if !hasSetReadyCondition && isAPIServerUpdateRequired(u) {
return shouldRequeue, r.handleUpToDate(ctx, u)
}
return false, nil
return shouldRequeue, nil
}

// ensureFinalizers will apply our finalizers to the object if they are not present.
Expand Down
Loading

0 comments on commit 8d1440b

Please sign in to comment.