Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix] Make ScheduleDefinition.with_job pass tags correctly #26042

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ def with_updated_job(self, new_job: ExecutableDefinition) -> "ScheduleDefinition
"""Returns a copy of this schedule with the job replaced.
Args:
job (ExecutableDefinition): The job that should execute when this
new_job (ExecutableDefinition): The job that should execute when this
schedule runs.
"""
return ScheduleDefinition.dagster_internal_init(
Expand All @@ -557,9 +557,14 @@ def with_updated_job(self, new_job: ExecutableDefinition) -> "ScheduleDefinition
default_status=self.default_status,
environment_vars=self._environment_vars,
required_resource_keys=self._raw_required_resource_keys,
run_config=None, # run_config, tags, should_execute encapsulated in execution_fn
# run_config, run_config_fn, tags_fn, should_execute are not copied because the schedule constructor
# incorporates them into the execution_fn defined in the constructor. Since we are
# copying the execution_fn, we don't need to copy these, and it would actually be an
# error to do so (since you can't pass an execution_fn and any of these values
# simultaneously).
run_config=None,
run_config_fn=None,
tags=None,
tags=self.tags,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR, but while you're here, i noticed that the docstring at line 546 uses the wrong param name

tags_fn=None,
metadata=self.metadata,
should_execute=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@
from datetime import datetime

import pytest
from dagster import DagsterInvalidDefinitionError, ScheduleDefinition, build_schedule_context, graph
from dagster._core.definitions.decorators.op_decorator import op
from dagster import (
DagsterInvalidDefinitionError,
ScheduleDefinition,
build_schedule_context,
graph,
job,
op,
)
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._core.definitions.run_config import RunConfig
from dagster._core.definitions.run_request import RunRequest

Expand Down Expand Up @@ -165,3 +172,28 @@ def test_tag_transfer_to_run_request():
assert (
"foo" not in tags_and_exec_fn_schedule.evaluate_tick(context_with_time).run_requests[0].tags
)


def test_with_updated_job():
@op
def my_op():
pass

@job
def my_job_1():
my_op()

@job
def my_job_2():
my_op()

my_schedule_1 = ScheduleDefinition(
job=my_job_1, cron_schedule="@daily", tags={"foo": "bar"}, metadata={"baz": "qux"}
)

my_schedule_2 = my_schedule_1.with_updated_job(my_job_2)

assert my_schedule_2.job.name == "my_job_2"
assert my_schedule_2.cron_schedule == "@daily"
assert my_schedule_2.tags == {"foo": "bar"}
assert my_schedule_2.metadata == {"baz": MetadataValue.text("qux")}