-
Notifications
You must be signed in to change notification settings - Fork 236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: better status updates for DataflowFlexTemplateJob #2580
feat: better status updates for DataflowFlexTemplateJob #2580
Conversation
3c2f997
to
72c05ed
Compare
5687d40
to
de5d42e
Compare
b5a5899
to
5ee890e
Compare
@@ -3,7 +3,7 @@ kind: DataflowFlexTemplateJob | |||
metadata: | |||
annotations: | |||
cnrm.cloud.google.com/management-conflict-prevention-policy: none | |||
cnrm.cloud.google.com/mutable-but-unreadable-fields: '{"spec":{"containerSpecGcsPath":"gs://dataflow-templates/2020-08-31-00_RC00/flex/PubSub_Avro_to_BigQuery","parameters":{"createDisposition":"CREATE_NEVER","inputSubscription":"projects/${projectId}/subscriptions/pubsubsubscription-${uniqueId}","outputTableSpec":"${projectId}:bigquerydataset${uniqueId}.bigquerytable${uniqueId}","outputTopic":"projects/${projectId}/topics/pubsubtopic1-${uniqueId}","schemaPath":"gs://config-connector-samples/dataflowflextemplate/numbers.avsc"}}}' | |||
cnrm.cloud.google.com/mutable-but-unreadable-fields: '{"spec":{"containerSpecGcsPath":"gs://dataflow-templates/2020-08-31-00_RC00/flex/PubSub_Avro_to_BigQuery","parameters":{"createDisposition":"CREATE_NEVER","inputSubscription":"projects/${projectId}/subscriptions/pubsubsubscription-${uniqueId}","outputTableSpec":"${projectId}:bigquerydataset${uniqueId}.bigquerytable${uniqueId}","outputTopic":"projects/${projectId}/topics/pubsubtopic0-${uniqueId}","schemaPath":"gs://config-connector-samples/dataflowflextemplate/numbers.avsc"}}}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I ask why this is a mutable but unreadable fields?
I'm thinking that since we are migrating to the direct resource, we should drop this annotation and switch to some other mechanism (I don't think the mutable-but-unreadable-fields in general will break users).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sure - this test is actually not using the direct controller. This is actually me noticing another problem in our tests and fixing it. I will just split out this PR, it's not really relevant here!
We do use the direct controller in batchdataflowflextemplatejob-direct
, but that is actually using the annotation to opt-in to the direct controller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, gotcha.
5ee890e
to
ff76f0c
Compare
Type: v1alpha1.ReadyConditionType, | ||
Status: v1.ConditionFalse, | ||
Reason: k8s.UpToDate, | ||
Message: "Dataflow job is in FAILED state", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-blocker: Suggest putting the condition.message in a normalizer, so we can build more tools around it (triage guide, re-iterate durations, etc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but let's do that when we have something variable in the message.
metadataGeneration := u.GetGeneration() | ||
|
||
// We rely on the FlexJob being immutable, so drift at the GCP level should not be possible. | ||
// Instead, we reconcile whenever we see a different spec.generation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(ah, I was confused at the beginning)
typo in comment: metadata.generation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
readyCondition = &v1alpha1.Condition{ | ||
Type: v1alpha1.ReadyConditionType, | ||
Status: v1.ConditionFalse, | ||
Reason: k8s.UpToDate, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I ask why this is UpToDate when the job is failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment. It's because (I think) we want to stop reconciliation at this point: we have reconciled the desired state and the outcome is a failed job.
} | ||
return setStatus(u, status) | ||
|
||
return nil | ||
} | ||
|
||
// Update implements the Adapter interface. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we also check the Job status to decide whether to update or not?
i.e. pb.JobState_JOB_STATE_CANCELLED --> no update and stop reconciling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added handling for all the terminal states, not just JobState_JOB_STATE_FAILED
The logic specifically here is that we did update the status. Because we updated the status with Updating, we will be requeued.
Added a comment to the Operation UpdateStatus method:
// Operation defines some functionality supported by all operation types.
type Operation interface {
+ // Writes the status and ready condition to the object's status subresource.
+ // If the readyCondition is provided and the Status of that condition is not v1.ConditionTrue,
+ // then the operation will be requeued.
UpdateStatus(ctx context.Context, typedStatus any, readyCondition *v1alpha1.Condition) error
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, changed my mind and made it explicit - we now call op.RequestRequeue when we want to requeue.
var readyCondition *v1alpha1.Condition | ||
|
||
switch job.CurrentState { | ||
case pb.JobState_JOB_STATE_RUNNING: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think pb.JobState_JOB_STATE_UPDATED also applies to this case, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, added a bunch more terminal but not ready states.
Also made the requeuing explicit - it was just too magic. This means we can set the ready condition however we best see fit. I think the current ready logic is right - on a terminal condition we stop reconciling but ready=false, but IDK!
return err | ||
} | ||
|
||
// TODO: If job fails to update, we probably need to "put back" the old job id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I ask what does "put back the old job ID" means? Does the JobID changes in pending status until it is Running? If so, it seems that we also need to handle the spec.resourceID
in the case that a job is pending, and users uses a unstable JobID trying to acquire/manage that Job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a separate discussion about what the primary key for this job really should be.
Today jobs are technically immutable. We can replace a job, but the new job gets a new job ID. If that new job fails, the old job continues running/resumes. In this code we now keep the new job ID, so that we don't keep launching new (failing) jobs forever. However, it does mean that the job is still running, just with the old job ID.
HasSetReadyCondition bool | ||
|
||
// ShouldRequeue tracks whether we need a re-reconciliation | ||
ShouldRequeue bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I like this. Though I don't see it is used in the dataflowflextemplateJob Update
, did I miss anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was used, I just made it much more explicit - the controller must now call op.RequestRequeue()
if status["externalRef"] == nil { | ||
status["externalRef"] = old["externalRef"] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mind adding the condition that if not equal, return errors as well? I'm interested about the externalRef especially for the DataflowFlexTemplateJob
since it looks like the JobID has some specialties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This externalRef will change, because it includes the job id :-(
That's one reason why I think we might currently have the wrong primary key for this resource ... trying to track that down internally.
/lgtm Thank you for the enhancement for DFTJ! |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: yuwenma The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
ff76f0c
to
260aa96
Compare
/lgtm |
d651072
into
GoogleCloudPlatform:master
We update the state of the object before reconciliation has completed.