Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CloudBatchSubmitJobOperator incorrectly reports success when defer mode is off #43744

Closed
1 of 2 tasks
adamszustak opened this issue Nov 6, 2024 · 10 comments · Fixed by #44425
Closed
1 of 2 tasks

CloudBatchSubmitJobOperator incorrectly reports success when defer mode is off #43744

adamszustak opened this issue Nov 6, 2024 · 10 comments · Fixed by #44425
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:google Google (including GCP) related issues

Comments

@adamszustak
Copy link

adamszustak commented Nov 6, 2024

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-apache-beam==5.6.2
apache-airflow-providers-celery==3.6.0
apache-airflow-providers-cncf-kubernetes==8.0.1
apache-airflow-providers-common-sql==1.11.1
apache-airflow-providers-dbt-cloud==3.7.0
apache-airflow-providers-ftp==3.7.0
apache-airflow-providers-google==10.16.0
apache-airflow-providers-hashicorp==3.6.4
apache-airflow-providers-http==4.10.0
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-mysql==5.5.4
apache-airflow-providers-postgres==5.10.2
apache-airflow-providers-sendgrid==3.4.0
apache-airflow-providers-sqlite==3.7.1
apache-airflow-providers-ssh==3.10.1

Apache Airflow version

airflow 2.7.3

Operating System

composer-2.6.5-airflow-2.7.3

Deployment

Google Cloud Composer

Deployment details

No response

What happened

When defer mode is off, the CloudBatchSubmitJobOperator operator always reports success regardless of the job's actual execution status.
When defer mode is on, the CloudBatchSubmitJobOperator operator correctly reports success or failure based on the job's exit code.

What you think should happen instead

The operator should always report the correct success or failure status based on the job's execution, regardless of whether defer mode is on or off.

How to reproduce

Create a CloudBatchSubmitJobOperator that always fails (like exit 1) and run it in sync mode.

([
            {
                "taskSpec": {
                    "runnables": [
                        {
                            "script": {
                                "text": "echo This is task ${BATCH_TASK_INDEX}. This job has a total of ${BATCH_TASK_COUNT} tasks.; exit 1"
                            }
                        }
                    ],
                    "computeResource": {"cpuMilli": 2000, "memoryMib": 16},
                    "maxRetryCount": 2,
                    "maxRunDuration": "3600s",
                },
                "taskCount": 1,
                "parallelism": 3,
            }
        ],
        deferrable=False,
)

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@adamszustak adamszustak added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Nov 6, 2024
Copy link

boring-cyborg bot commented Nov 6, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@dosubot dosubot bot added the provider:google Google (including GCP) related issues label Nov 6, 2024
@potiuk
Copy link
Member

potiuk commented Nov 6, 2024

cc: @VladaZakharova

@eladkal eladkal added good first issue and removed needs-triage label for new issues that we didn't triage yet labels Nov 10, 2024
@SuccessMoses
Copy link
Contributor

i would like to work on this

@potiuk
Copy link
Member

potiuk commented Nov 11, 2024

assigned you

@SuccessMoses
Copy link
Contributor

@adamszustak I created a simple DAG and CloudBatchSubmitJobOperator always succeed regardless of whether deferable is True or False. It only fails if timeout occurs while wait_for_job.

Although I used unittest.mock.patch to mock the client not actual google cloud connection.

from unittest.mock import patch
from airflow.providers.google.cloud.operators.cloud_batch import CloudBatchSubmitJobOperator
from airflow import DAG
from google.cloud.batch_v1 import JobStatus, Job

# Define your DAG
default_args = {
    'owner': 'airflow',
}

dag = DAG(
    'cloud_batch_submit_example',
    default_args=default_args,
)

# Job definition
CLOUD_BATCH_HOOK_PATH = "airflow.providers.google.cloud.operators.cloud_batch.CloudBatchHook"
TASK_ID = "test"
PROJECT_ID = "testproject"
REGION = "us-central1"
JOB_NAME = "test"
JOB = Job()
JOB.name = JOB_NAME


submit_job_task = CloudBatchSubmitJobOperator(
    task_id=TASK_ID, project_id=PROJECT_ID, region=REGION, job_name=JOB_NAME, job=JOB,
    dag=dag, deferrable=False, timeout_seconds=0
)

submit_job_task

if __name__ == "__main__":
    with (
        patch('airflow.providers.google.cloud.hooks.cloud_batch.CloudBatchHook.get_conn') as mock,
        patch('google.cloud.batch_v1.Job.to_dict') as mock_to_dict
    ):
        mock.return_value.get_job.return_value.status.state = JobStatus.State.FAILED #make job fail
        dag.test()

log:

[2024-11-24T17:40:38.338+0000] {dag.py:2474} INFO - dagrun id: cloud_batch_submit_example
[2024-11-24T17:40:38.357+0000] {dag.py:2493} INFO - created dagrun <DagRun cloud_batch_submit_example @ 2024-11-24 17:40:37.987067+00:00: manual__2024-11-24T17:40:37.987067+00:00, state:running, queued_at: None. externally triggered: False>
[2024-11-24T17:40:38.385+0000] {dag.py:2433} INFO - [DAG TEST] starting task_id=test map_index=-1
[2024-11-24T17:40:38.385+0000] {dag.py:2436} INFO - [DAG TEST] running task <TaskInstance: cloud_batch_submit_example.test manual__2024-11-24T17:40:37.987067+00:00 [scheduled]>
[2024-11-24 17:40:39,926] {taskinstance.py:2923} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='cloud_batch_submit_example' AIRFLOW_CTX_TASK_ID='test' AIRFLOW_CTX_LOGICAL_DATE='2024-11-24T17:40:37.987067+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-11-24T17:40:37.987067+00:00'
[2024-11-24T17:40:39.926+0000] {taskinstance.py:2923} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='cloud_batch_submit_example' AIRFLOW_CTX_TASK_ID='test' AIRFLOW_CTX_LOGICAL_DATE='2024-11-24T17:40:37.987067+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-11-24T17:40:37.987067+00:00'
Task instance is in running state
 Previous state of the Task instance: queued
Current task name:test state:scheduled start_date:None
Dag name:cloud_batch_submit_example and current dag run status:running
[2024-11-24T17:40:39.928+0000] {taskinstance.py:723} INFO - ::endgroup::
[2024-11-24T17:40:39.949+0000] {connection.py:250} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2024-11-24T17:40:39.949+0000] {base.py:66} INFO - Retrieving connection 'google_cloud_default'
[2024-11-24T17:40:39.966+0000] {taskinstance.py:346} INFO - ::group::Post task execution logs
[2024-11-24T17:40:39.966+0000] {taskinstance.py:358} INFO - Marking task as SUCCESS. dag_id=cloud_batch_submit_example, task_id=test, run_id=manual__2024-11-24T17:40:37.987067+00:00, logical_date=20241124T174037, start_date=, end_date=20241124T174039
Task instance in success state
 Previous state of the Task instance: running
Dag name:cloud_batch_submit_example queued_at:None
Task hostname:51c25e8d352b operator:CloudBatchSubmitJobOperator
[2024-11-24T17:40:39.976+0000] {dag.py:2447} INFO - [DAG TEST] end task task_id=test map_index=-1
[2024-11-24T17:40:39.981+0000] {dagrun.py:935} INFO - Marking run <DagRun cloud_batch_submit_example @ 2024-11-24 17:40:37.987067+00:00: manual__2024-11-24T17:40:37.987067+00:00, state:running, queued_at: None. externally triggered: False> successful
Dag run in success state
Dag run start:2024-11-24 17:40:37.987067+00:00 end:2024-11-24 17:40:39.981712+00:00
[2024-11-24T17:40:39.983+0000] {dagrun.py:987} INFO - DagRun Finished: dag_id=cloud_batch_submit_example, logical_date=2024-11-24 17:40:37.987067+00:00, run_id=manual__2024-11-24T17:40:37.987067+00:00, run_start_date=2024-11-24 17:40:37.987067+00:00, run_end_date=2024-11-24 17:40:39.981712+00:00, run_duration=1.994645, state=success, external_trigger=False, run_type=manual, data_interval_start=2024-11-24 17:40:37.987067+00:00, data_interval_end=2024-11-24 17:40:37.987067+00:00, dag_version_name=None

@SuccessMoses
Copy link
Contributor

@eladkal

@eladkal
Copy link
Contributor

eladkal commented Nov 25, 2024

cc @VladaZakharova

@adamszustak
Copy link
Author

@SuccessMoses What's the point of this test? Have you tried to create failing job?

@VladaZakharova
Copy link
Contributor

I will take a look, if @SuccessMoses hasn't started the work yet :)

@SuccessMoses
Copy link
Contributor

SuccessMoses commented Nov 26, 2024

@adamszustak i made modifications to CloudBatchHook and made the task fail when the cloud batch job fails. I will submit a PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants