Skip to content

Commit

Permalink
[bugfix] Make ScheduleDefinition.with_job pass tags correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Dec 4, 2024
1 parent ba57dc5 commit 86bbb1f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ def with_updated_job(self, new_job: ExecutableDefinition) -> "ScheduleDefinition
"""Returns a copy of this schedule with the job replaced.
Args:
job (ExecutableDefinition): The job that should execute when this
new_job (ExecutableDefinition): The job that should execute when this
schedule runs.
"""
return ScheduleDefinition.dagster_internal_init(
Expand All @@ -557,9 +557,14 @@ def with_updated_job(self, new_job: ExecutableDefinition) -> "ScheduleDefinition
default_status=self.default_status,
environment_vars=self._environment_vars,
required_resource_keys=self._raw_required_resource_keys,
run_config=None, # run_config, tags, should_execute encapsulated in execution_fn
# run_config, run_config_fn, tags_fn, should_execute are not copied because the schedule constructor
# incorporates them into the execution_fn defined in the constructor. Since we are
# copying the execution_fn, we don't need to copy these, and it would actually be an
# error to do so (since you can't pass an execution_fn and any of these values
# simultaneously).
run_config=None,
run_config_fn=None,
tags=None,
tags=self.tags,
tags_fn=None,
metadata=self.metadata,
should_execute=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@
from datetime import datetime

import pytest
from dagster import DagsterInvalidDefinitionError, ScheduleDefinition, build_schedule_context, graph
from dagster._core.definitions.decorators.op_decorator import op
from dagster import (
DagsterInvalidDefinitionError,
ScheduleDefinition,
build_schedule_context,
graph,
job,
op,
)
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._core.definitions.run_config import RunConfig
from dagster._core.definitions.run_request import RunRequest

Expand Down Expand Up @@ -165,3 +172,28 @@ def test_tag_transfer_to_run_request():
assert (
"foo" not in tags_and_exec_fn_schedule.evaluate_tick(context_with_time).run_requests[0].tags
)


def test_with_updated_job():
@op
def my_op():
pass

@job
def my_job_1():
my_op()

@job
def my_job_2():
my_op()

my_schedule_1 = ScheduleDefinition(
job=my_job_1, cron_schedule="@daily", tags={"foo": "bar"}, metadata={"baz": "qux"}
)

my_schedule_2 = my_schedule_1.with_updated_job(my_job_2)

assert my_schedule_2.job.name == "my_job_2"
assert my_schedule_2.cron_schedule == "@daily"
assert my_schedule_2.tags == {"foo": "bar"}
assert my_schedule_2.metadata == {"baz": MetadataValue.text("qux")}

0 comments on commit 86bbb1f

Please sign in to comment.