Skip to content

Commit

Permalink
Merge pull request #26 from airbnb/backfill_start_date
Browse files Browse the repository at this point in the history
Backfill start_date to override the tasks's start_date
  • Loading branch information
mistercrunch committed Jun 13, 2015
2 parents 62ad66e + e894cb7 commit 2dc75fa
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 1 deletion.
8 changes: 8 additions & 0 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ def run(args):
dag = dag_pickle.pickle
task = dag.get_task(task_id=args.task_id)

task_start_date = None
if args.task_start_date:
task_start_date = dateutil.parser.parse(args.task_start_date)
task.start_date = task_start_date
ti = TaskInstance(task, args.execution_date)

if args.local:
Expand All @@ -109,6 +113,7 @@ def run(args):
mark_success=args.mark_success,
force=args.force,
pickle_id=args.pickle,
task_start_date=task_start_date,
ignore_dependencies=args.ignore_dependencies)
run_job.run()
elif args.raw:
Expand Down Expand Up @@ -394,6 +399,9 @@ def get_parser():
parser_run.add_argument(
"-sd", "--subdir", help=subdir_help,
default=DAGS_FOLDER)
parser_run.add_argument(
"-s", "--task_start_date",
help="Override the tasks's start_date (used internally)",)
parser_run.add_argument(
"-m", "--mark_success", help=mark_success_help, action="store_true")
parser_run.add_argument(
Expand Down
3 changes: 2 additions & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ def queue_command(self, key, command, priority=1, queue=None):

def queue_task_instance(
self, task_instance, mark_success=False, pickle_id=None,
force=False, ignore_dependencies=False):
force=False, ignore_dependencies=False, task_start_date=None):
command = task_instance.command(
local=True,
mark_success=mark_success,
force=force,
ignore_dependencies=ignore_dependencies,
task_start_date=task_start_date,
pickle_id=pickle_id)
self.queue_command(
task_instance.key,
Expand Down
4 changes: 4 additions & 0 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ def _execute(self):
executor.queue_task_instance(
ti,
mark_success=self.mark_success,
task_start_date=self.bf_start_date,
pickle_id=pickle_id)
ti.state = State.RUNNING
if key not in started:
Expand Down Expand Up @@ -555,12 +556,14 @@ def __init__(
force=False,
mark_success=False,
pickle_id=None,
task_start_date=None,
*args, **kwargs):
self.task_instance = task_instance
self.ignore_dependencies = ignore_dependencies
self.force = force
self.pickle_id = pickle_id
self.mark_success = mark_success
self.task_start_date = task_start_date
super(LocalTaskJob, self).__init__(*args, **kwargs)

def _execute(self):
Expand All @@ -570,6 +573,7 @@ def _execute(self):
force=self.force,
pickle_id=self.pickle_id,
mark_success=self.mark_success,
task_start_date=self.task_start_date,
job_id=self.id,
)
self.process = subprocess.Popen(['bash', '-c', command])
Expand Down
4 changes: 4 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ def command(
local=False,
pickle_id=None,
raw=False,
task_start_date=None,
job_id=None):
"""
Returns a command that can be executed anywhere where airflow is
Expand All @@ -424,6 +425,8 @@ def command(
ignore_dependencies = "-i" if ignore_dependencies else ""
force = "--force" if force else ""
local = "--local" if local else ""
task_start_date = \
"-s " + task_start_date.isoformat() if task_start_date else ""
raw = "--raw" if raw else ""
subdir = ""
if not pickle and self.task.dag and self.task.dag.full_filepath:
Expand All @@ -439,6 +442,7 @@ def command(
"{job_id} "
"{raw} "
"{subdir} "
"{task_start_date} "
).format(**locals())

@property
Expand Down

0 comments on commit 2dc75fa

Please sign in to comment.