Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regression : Retry on Failure causes infinite Retries #216

Closed
r39132 opened this issue Aug 3, 2015 · 13 comments
Closed

Regression : Retry on Failure causes infinite Retries #216

r39132 opened this issue Aug 3, 2015 · 13 comments
Labels
kind:bug This is a clearly a bug

Comments

@r39132
Copy link
Contributor

r39132 commented Aug 3, 2015

I pulled the latest code from git/master and set it up. I have observed a regression now on 3 separate occasions.

Here are the default args that I am using and passing to my code.

default_args = {
    'owner': 'sanand',
    'depends_on_past': True,
    'pool': 'ep_data_pipeline',
    'start_date': YESTERDAY,
    'email': [import_ep_pipeline_success_email_dl],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 0,
    'retry_delay': timedelta(seconds=60)
}
dag = DAG('ep_demo' , default_args=default_args)

I keep getting this exception every few minutes... It doesn't advance the counter to 2 out of 4. This worked fine until a few days ago, after I pulled the latest code.

Try 1 out of 4
Exception:
Bash command failed
Log: Link
Host: etl-00.tad.agari.com
Log file: /home/deploy/airflow/logs/ep_demo/generate_new_parquet_files_spark_job/2015-08-02T00:00:00.log
Mark success: Link

The task failing is

# Operator : Read the parquet data and generate aggregate data
optional_ingest_templated_command = """
    {% if params.ENV == "EP_PROD" %}
        echo "Running ingest in Prod Env"
        ssh -i ~/.ssh/id_rsa [email protected] '. /root/.bash_profile; ~/run_ingest.sh'
    {% else %}
        echo "Running ingest in TAD"
        ssh -i ~/.ssh/id_rsa_spark [email protected] '. /root/.bash_profile; ~/run_ingest.sh'
    {% endif %}
"""
generate_new_parquet_files_spark_job = BashOperator(
    task_id='generate_new_parquet_files_spark_job',
    execution_timeout=timedelta(hours=3),
    bash_command=optional_ingest_templated_command,
    params={'ENV': ENV},
    dag=dag)
@mistercrunch
Copy link
Member

Can you share the content of the log file? I'm confused about where the "out of 4" comes from. Is default_args modified anywhere else?

@r39132
Copy link
Contributor Author

r39132 commented Aug 4, 2015

You'll notice a looping pattern in the log snippet below.

New run starting @2015-08-03T00:00:13.658479
--------------------------------------------------------------------------------
[2015-08-03 00:00:13,676] {models.py:794} INFO - Queuing into pool ep_data_pipeline
[2015-08-03 00:00:24,380] {models.py:89} INFO - Filling up the DagBag from /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 00:00:24,380] {models.py:155} INFO - Importing /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 00:00:24,527] {models.py:203} INFO - Loaded DAG <DAG: ep_demo>
[2015-08-03 00:00:28,140] {models.py:89} INFO - Filling up the DagBag from /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 00:00:28,148] {models.py:155} INFO - Importing /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 00:00:28,276] {models.py:203} INFO - Loaded DAG <DAG: ep_demo>
[2015-08-03 00:00:28,296] {models.py:784} INFO -
--------------------------------------------------------------------------------
New run starting @2015-08-03T00:00:28.296290
--------------------------------------------------------------------------------
[2015-08-03 00:00:28,314] {models.py:816} INFO - Executing <Task(BashOperator): generate_new_parquet_files_spark_job> on 2015-08-02 00:00:00
[2015-08-03 00:00:28,330] {bash_operator.py:39} INFO - tmp dir root location:
/tmp
[2015-08-03 00:00:28,330] {bash_operator.py:47} INFO - Temporary script location :/tmp/airflowtmpRDGvek//tmp/airflowtmpRDGvek/generate_new_parquet_files_spark_job_4GmA2
[2015-08-03 00:00:28,330] {bash_operator.py:48} INFO - Running command:

        echo "Running ingest in TAD"
        ssh -i ~/.ssh/id_rsa_spark [email protected] '. /root/.bash_profile; ~/run_ingest.sh'

[2015-08-03 00:00:28,411] {bash_operator.py:56} INFO - Output:
[2015-08-03 00:00:28,422] {bash_operator.py:58} INFO - Running ingest in TAD
[2015-08-03 00:00:28,544] {bash_operator.py:58} INFO - starting:
[2015-08-03 00:00:28,544] {bash_operator.py:58} INFO - submitting:
[2015-08-03 00:00:31,871] {bash_operator.py:58} INFO - ARGS: Namespace(aggreplace=False, avro=True, bucket='agari-collector-consumer', current=None, dataname='dump', db=False, domains=False, end_ts=None, filecycle=True, input='uploads/', loadmodels='', loads3db='', noagg=True, old=False, parquet=True, repgen=False, s3=False, s3db=False, s3rda=False, start_ts=None, update=False)
[2015-08-03 00:00:36,114] {bash_operator.py:58} INFO - PARTITIONS: 32
[2015-08-03 03:00:28,330] {utils.py:456} ERROR - Process timed out
[2015-08-03 03:00:28,375] {models.py:867} ERROR - Timeout
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.2.1-py2.7.egg/airflow/models.py", line 839, in run
    task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.2.1-py2.7.egg/airflow/operators/bash_operator.py", line 57, in execute
    for line in iter(sp.stdout.readline, ''):
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.2.1-py2.7.egg/airflow/utils.py", line 457, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
AirflowTaskTimeout: Timeout
[2015-08-03 03:00:28,455] {utils.py:386} INFO - Sent an alert email to [u'[email protected]']
[2015-08-03 03:00:29,943] {models.py:903} ERROR - Timeout
[2015-08-03 03:01:37,601] {models.py:89} INFO - Filling up the DagBag from /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 03:01:37,601] {models.py:155} INFO - Importing /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 03:01:37,729] {models.py:203} INFO - Loaded DAG <DAG: ep_demo>
[2015-08-03 03:01:39,687] {models.py:89} INFO - Filling up the DagBag from /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 03:01:39,687] {models.py:155} INFO - Importing /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 03:01:39,771] {models.py:203} INFO - Loaded DAG <DAG: ep_demo>
[2015-08-03 03:01:39,778] {models.py:784} INFO -
--------------------------------------------------------------------------------
Retry run 1 out of 3 starting @2015-08-03T03:01:39.778413
--------------------------------------------------------------------------------
[2015-08-03 03:01:39,785] {models.py:794} INFO - Queuing into pool ep_data_pipeline
[2015-08-03 03:01:54,405] {models.py:89} INFO - Filling up the DagBag from /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 03:01:54,405] {models.py:155} INFO - Importing /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 03:01:54,501] {models.py:203} INFO - Loaded DAG <DAG: ep_demo>
[2015-08-03 03:01:56,607] {models.py:89} INFO - Filling up the DagBag from /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 03:01:56,607] {models.py:155} INFO - Importing /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 03:01:56,701] {models.py:203} INFO - Loaded DAG <DAG: ep_demo>
[2015-08-03 03:01:56,716] {models.py:784} INFO -
--------------------------------------------------------------------------------
New run starting @2015-08-03T03:01:56.716244
--------------------------------------------------------------------------------
[2015-08-03 03:01:56,740] {models.py:816} INFO - Executing <Task(BashOperator): generate_new_parquet_files_spark_job> on 2015-08-02 00:00:00
[2015-08-03 03:01:56,749] {bash_operator.py:39} INFO - tmp dir root location:
/tmp
[2015-08-03 03:01:56,749] {bash_operator.py:47} INFO - Temporary script location :/tmp/airflowtmpyItGhA//tmp/airflowtmpyItGhA/generate_new_parquet_files_spark_jobtSTKDf
[2015-08-03 03:01:56,749] {bash_operator.py:48} INFO - Running command:

        echo "Running ingest in TAD"
        ssh -i ~/.ssh/id_rsa_spark [email protected] '. /root/.bash_profile; ~/run_ingest.sh'

[2015-08-03 03:01:56,810] {bash_operator.py:56} INFO - Output:
[2015-08-03 03:01:56,818] {bash_operator.py:58} INFO - Running ingest in TAD
[2015-08-03 03:01:57,024] {bash_operator.py:58} INFO - starting:
[2015-08-03 03:01:57,024] {bash_operator.py:58} INFO - submitting:
[2015-08-03 03:02:00,289] {bash_operator.py:58} INFO - ARGS: Namespace(aggreplace=False, avro=True, bucket='agari-collector-consumer', current=None, dataname='dump', db=False, domains=False, end_ts=None, filecycle=True, input='uploads/', loadmodels='', loads3db='', noagg=True, old=False, parquet=True, repgen=False, s3=False, s3db=False, s3rda=False, start_ts=None, update=False)
[2015-08-03 03:02:04,484] {bash_operator.py:58} INFO - PARTITIONS: 32
[2015-08-03 06:01:56,748] {utils.py:456} ERROR - Process timed out
[2015-08-03 06:01:56,749] {models.py:867} ERROR - Timeout
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.2.1-py2.7.egg/airflow/models.py", line 839, in run
    task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.2.1-py2.7.egg/airflow/operators/bash_operator.py", line 57, in execute
    for line in iter(sp.stdout.readline, ''):
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.2.1-py2.7.egg/airflow/utils.py", line 457, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
AirflowTaskTimeout: Timeout
[2015-08-03 06:01:56,785] {utils.py:386} INFO - Sent an alert email to [u'[email protected]']
[2015-08-03 06:01:56,819] {models.py:903} ERROR - Timeout
[2015-08-03 06:03:03,146] {models.py:89} INFO - Filling up the DagBag from /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 06:03:03,146] {models.py:155} INFO - Importing /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 06:03:03,258] {models.py:203} INFO - Loaded DAG <DAG: ep_demo>
[2015-08-03 06:03:05,317] {models.py:89} INFO - Filling up the DagBag from /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 06:03:05,317] {models.py:155} INFO - Importing /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 06:03:05,421] {models.py:203} INFO - Loaded DAG <DAG: ep_demo>
[2015-08-03 06:03:05,437] {models.py:784} INFO -
--------------------------------------------------------------------------------
Retry run 1 out of 3 starting @2015-08-03T06:03:05.437404
--------------------------------------------------------------------------------
[2015-08-03 06:03:05,444] {models.py:794} INFO - Queuing into pool ep_data_pipeline
[2015-08-03 06:03:17,801] {models.py:89} INFO - Filling up the DagBag from /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 06:03:17,801] {models.py:155} INFO - Importing /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 06:03:17,873] {models.py:203} INFO - Loaded DAG <DAG: ep_demo>
[2015-08-03 06:03:19,929] {models.py:89} INFO - Filling up the DagBag from /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 06:03:19,929] {models.py:155} INFO - Importing /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 06:03:20,029] {models.py:203} INFO - Loaded DAG <DAG: ep_demo>
[2015-08-03 06:03:20,038] {models.py:784} INFO -
--------------------------------------------------------------------------------
New run starting @2015-08-03T06:03:20.038304
--------------------------------------------------------------------------------
[2015-08-03 06:03:20,053] {models.py:816} INFO - Executing <Task(BashOperator): generate_new_parquet_files_spark_job> on 2015-08-02 00:00:00
[2015-08-03 06:03:20,073] {bash_operator.py:39} INFO - tmp dir root location:
/tmp
[2015-08-03 06:03:20,073] {bash_operator.py:47} INFO - Temporary script location :/tmp/airflowtmpS0uQyn//tmp/airflowtmpS0uQyn/generate_new_parquet_files_spark_jobZ2gdMc
[2015-08-03 06:03:20,073] {bash_operator.py:48} INFO - Running command:

        echo "Running ingest in TAD"
        ssh -i ~/.ssh/id_rsa_spark [email protected] '. /root/.bash_profile; ~/run_ingest.sh'

[2015-08-03 06:03:20,131] {bash_operator.py:56} INFO - Output:
[2015-08-03 06:03:20,140] {bash_operator.py:58} INFO - Running ingest in TAD
[2015-08-03 06:03:20,249] {bash_operator.py:58} INFO - starting:
[2015-08-03 06:03:20,249] {bash_operator.py:58} INFO - submitting:
[2015-08-03 06:03:23,469] {bash_operator.py:58} INFO - ARGS: Namespace(aggreplace=False, avro=True, bucket='agari-collector-consumer', current=None, dataname='dump', db=False, domains=False, end_ts=None, filecycle=True, input='uploads/', loadmodels='', loads3db='', noagg=True, old=False, parquet=True, repgen=False, s3=False, s3db=False, s3rda=False, start_ts=None, update=False)
[2015-08-03 06:03:27,800] {bash_operator.py:58} INFO - PARTITIONS: 32
[2015-08-03 08:45:17,095] {bash_operator.py:58} INFO - GLOM!
[2015-08-03 08:45:17,095] {bash_operator.py:58} INFO -
[2015-08-03 08:45:17,095] {bash_operator.py:58} INFO - 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
[2015-08-03 08:45:17,095] {bash_operator.py:58} INFO -
[2015-08-03 08:45:17,096] {bash_operator.py:58} INFO -
[2015-08-03 08:45:17,096] {bash_operator.py:58} INFO - Building avro loader
[2015-08-03 08:45:23,968] {bash_operator.py:58} INFO - Traceback (most recent call last):
[2015-08-03 08:45:23,968] {bash_operator.py:58} INFO - File "/root/spark/bin/ep_spark.py", line 300, in <module>
[2015-08-03 08:45:23,969] {bash_operator.py:58} INFO - main(args)
[2015-08-03 08:45:23,969] {bash_operator.py:58} INFO - File "/root/spark/bin/ep_spark.py", line 144, in main
[2015-08-03 08:45:23,969] {bash_operator.py:58} INFO - min_day = min(days)
[2015-08-03 08:45:23,969] {bash_operator.py:58} INFO - ValueError: min() arg is an empty sequence
[2015-08-03 08:45:24,604] {bash_operator.py:61} INFO - Command exited with return code 1
[2015-08-03 08:45:24,604] {models.py:867} ERROR - Bash command failed
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.2.1-py2.7.egg/airflow/models.py", line 839, in run
    task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/dist-packages/airflow-1.2.1-py2.7.egg/airflow/operators/bash_operator.py", line 64, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2015-08-03 08:45:24,666] {utils.py:386} INFO - Sent an alert email to [u'[email protected]']
[2015-08-03 08:45:24,707] {models.py:903} ERROR - Bash command failed
[2015-08-03 08:46:30,530] {models.py:89} INFO - Filling up the DagBag from /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 08:46:30,536] {models.py:155} INFO - Importing /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 08:46:30,628] {models.py:203} INFO - Loaded DAG <DAG: ep_demo>
[2015-08-03 08:46:32,615] {models.py:89} INFO - Filling up the DagBag from /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 08:46:32,615] {models.py:155} INFO - Importing /home/deploy/airflow/dags/ep_demo_1.py
[2015-08-03 08:46:32,734] {models.py:203} INFO - Loaded DAG <DAG: ep_demo>
[2015-08-03 08:46:32,749] {models.py:784} INFO -
--------------------------------------------------------------------------------
Retry run 1 out of 3 starting @2015-08-03T08:46:32.749423
--------------------------------------------------------------------------------    

@mistercrunch mistercrunch added the kind:bug This is a clearly a bug label Aug 12, 2015
@mistercrunch
Copy link
Member

Still an issue?

@r39132
Copy link
Contributor Author

r39132 commented Aug 28, 2015

Hii! We disabled retries until we received a fix for this. Was a fix checked in?

@mistercrunch
Copy link
Member

Should be fixed. We use retries everywhere at Airbnb. Reopen if it's still an issue

@r39132
Copy link
Contributor Author

r39132 commented Nov 30, 2015

Yes, I just turned retries back on and it is working in 1.6.*

@r39132
Copy link
Contributor Author

r39132 commented Jan 14, 2016

I am seeing the infinite retries once again! This needs to be reopened! I'm on 1.6.1 - I specify retry limit of 1, but they keep retrying and never increasing the attempts to meet the max. FYI, I have retry set to '1'.

@mistercrunch
Copy link
Member

https://github.com/airbnb/airflow/pull/883/files

@mistercrunch mistercrunch reopened this Jan 17, 2016
@mistercrunch
Copy link
Member

We've seen this bug, the logical path to it is when a retry would get queued, it wouldn't get incremented

@wil5for
Copy link

wil5for commented Jan 20, 2016

I was looking at this today and confirmed the issue was only appearing when we had a pool defined. Thanks for the fix!

@pradhanpk
Copy link
Contributor

I am seeing infinite retries with pools using the current bleeding-edge repo(0f28090). You can replicate this issue with the following dag, which assumes the existence of a pool called "TrivialTasks":

#!/bin/env python                                                                                                                                                                                           
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta


default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2016, 4, 2),
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=1)
    }

dag = DAG('fail', default_args=default_args)


def fail_func():
    raise Exception("Failure!")


t1 = PythonOperator(
    python_callable=fail_func,
    task_id='fail',
    dag=dag,
    pool="TrivialTasks")

This is be related to #1225, except that there's infinite retry even without using SubDagOperator.

pradhanpk added a commit to pradhanpk/airflow that referenced this issue Apr 4, 2016
`SchedulerJob` contains a `set` of `TaskInstance`s called `queued_tis`. `SchedulerJob.process_events` loops through `queued_tis` and tries to remove completed tasks. However, without customizing `__eq__` and `__hash__`, the following two lines have no effect, never removing elements from `queued_tis` leading to infinite retries on failure. This is related to my comment on apache#216. The following code was introduced in the fix to apache#1225.

```
elif ti in self.queued_tis:
    self.queued_tis.remove(ti)
````
@r39132 r39132 reopened this Apr 5, 2016
@pradhanpk
Copy link
Contributor

@r39132 FYI @jlowin told me that he has a fix in the pipeline. More details are in the thread for the PR #1299.

@jlowin
Copy link
Member

jlowin commented Apr 5, 2016

Yes #1299 should be fixed in #1290, if not we can reopen

@jlowin jlowin closed this as completed Apr 5, 2016
potiuk referenced this issue in potiuk/airflow Oct 9, 2020
* wip spark configuration

* fixup depends on init action

* fixup use staging bucket

* fixup! docs, volumes and init action bug

* spark tasks use ccache cluster policy rule

* use 1.10.x operator path

* Attempt to fix the image

* Update terraform/modules/airflow_tenant/modules/airflow_app/main.tf

Co-authored-by: Kamil Breguła <[email protected]>

* Update sfdc-airflow-aas/sfdc_airflow/cluster_policy/rules.py

Co-authored-by: Kamil Breguła <[email protected]>

* improve hadoop config organization on GCS

* set core / yarn configmaps

* escape commas to make helm happy

* improve spark logging, add docs

* revert log4j

* fix env var name

* fix leading newline in hadoop configs

* fix yarn site in configmap

* remove duplicate conf in exported gcs path

* Update subrepos/airflow/chart/templates/workers/worker-deployment.yaml

* Update subrepos/airflow/chart/templates/workers/worker-deployment.yaml

* add back log4j

* working demo

* refactor WI to manage annotations in helm

* Add spark provider package

* wip

* fix numbers add dive

* add deploying iac docs

* allow arbitrary annotations

* Improve helm chart annotations

* Nest service accounts under worker, webserver, scheduler
* Update values.schema.json

* fix verify.sh

* fix gcs connector verification

* Switch to CloudSQL with mutual SSL added in PGBouncer

* tf docs

* improve gpc infra network deps

* remove errant comma in values.schema.json

* fix helm linting

Co-authored-by: Kamil Breguła <[email protected]>
Co-authored-by: Kamil Breguła <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
mobuchowski pushed a commit to mobuchowski/airflow that referenced this issue Jan 4, 2022
…cause we have it in gradle build (apache#216)

Signed-off-by: olek <[email protected]>

Co-authored-by: Michael Collado <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

5 participants