Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add create_repository_using_definitions_args #11141

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 93 additions & 35 deletions python_modules/dagster/dagster/_core/definitions/definitions_class.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Optional, Type, Union

import dagster._check as check
from dagster._annotations import public
from dagster._annotations import experimental, public
from dagster._core.definitions.events import CoercibleToAssetKey
from dagster._core.definitions.executor_definition import ExecutorDefinition
from dagster._core.definitions.logger_definition import LoggerDefinition
Expand Down Expand Up @@ -29,6 +29,90 @@
from dagster._core.storage.asset_value_loader import AssetValueLoader


@public
@experimental
def create_repository_using_definitions_args(
name: str,
assets: Optional[
Iterable[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]
] = None,
schedules: Optional[Iterable[ScheduleDefinition]] = None,
sensors: Optional[Iterable[SensorDefinition]] = None,
jobs: Optional[Iterable[Union[JobDefinition, UnresolvedAssetJobDefinition]]] = None,
resources: Optional[Mapping[str, Any]] = None,
executor: Optional[ExecutorDefinition] = None,
loggers: Optional[Mapping[str, LoggerDefinition]] = None,
) -> Union[RepositoryDefinition, PendingRepositoryDefinition]:
"""
For users who, for the time being, want to continue to use multiple named repositories in
a single code location, you can use this function. The behavior (e.g. applying resources to
all assets) are identical to :py:class:`Definitions` but this returns a named repository."""

return _create_repository_using_definitions_args(
name=name,
assets=assets,
schedules=schedules,
sensors=sensors,
jobs=jobs,
resources=resources,
executor=executor,
loggers=loggers,
)


def _create_repository_using_definitions_args(
name: str,
assets: Optional[
Iterable[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]
] = None,
schedules: Optional[Iterable[ScheduleDefinition]] = None,
sensors: Optional[Iterable[SensorDefinition]] = None,
jobs: Optional[Iterable[Union[JobDefinition, UnresolvedAssetJobDefinition]]] = None,
resources: Optional[Mapping[str, Any]] = None,
executor: Optional[ExecutorDefinition] = None,
loggers: Optional[Mapping[str, LoggerDefinition]] = None,
):
if assets:
check.iterable_param(
assets, "assets", (AssetsDefinition, SourceAsset, CacheableAssetsDefinition)
)

if schedules:
check.iterable_param(schedules, "schedules", ScheduleDefinition)

if sensors:
check.iterable_param(sensors, "sensors", SensorDefinition)

if jobs:
check.iterable_param(jobs, "jobs", (JobDefinition, UnresolvedAssetJobDefinition))

if resources:
check.mapping_param(resources, "resources", key_type=str)

if executor:
check.inst_param(executor, "executor", ExecutorDefinition)

if loggers:
check.mapping_param(loggers, "loggers", key_type=str, value_type=LoggerDefinition)

resource_defs = coerce_resources_to_defs(resources or {})

@repository(
name=name,
default_executor_def=executor,
default_logger_defs=loggers,
)
def created_repo():
return [
*with_resources(assets or [], resource_defs),
*(schedules or []),
*(sensors or []),
*(jobs or []),
]

return created_repo


class Definitions:
"""Example usage:

Expand Down Expand Up @@ -75,48 +159,22 @@ def __init__(
executor: Optional[ExecutorDefinition] = None,
loggers: Optional[Mapping[str, LoggerDefinition]] = None,
):

if assets:
check.iterable_param(
assets, "assets", (AssetsDefinition, SourceAsset, CacheableAssetsDefinition)
)

if schedules:
check.iterable_param(schedules, "schedules", ScheduleDefinition)

if sensors:
check.iterable_param(sensors, "sensors", SensorDefinition)

if jobs:
check.iterable_param(jobs, "jobs", (JobDefinition, UnresolvedAssetJobDefinition))

if resources:
check.mapping_param(resources, "resources", key_type=str)

if executor:
check.inst_param(executor, "executor", ExecutorDefinition)
experimental_arg_warning("executor", "Definitions.__init__")

if loggers:
check.mapping_param(loggers, "loggers", key_type=str, value_type=LoggerDefinition)
experimental_arg_warning("loggers", "Definitions.__init__")

resource_defs = coerce_resources_to_defs(resources or {})

@repository(
self._created_pending_or_normal_repo = _create_repository_using_definitions_args(
name=SINGLETON_REPOSITORY_NAME,
default_executor_def=executor,
default_logger_defs=loggers,
assets=assets,
schedules=schedules,
sensors=sensors,
jobs=jobs,
resources=resources,
executor=executor,
loggers=loggers,
)
def created_repo():
return [
*with_resources(assets or [], resource_defs),
*(schedules or []),
*(sensors or []),
*(jobs or []),
]

self._created_pending_or_normal_repo = created_repo

@public
def get_job_def(self, name: str) -> JobDefinition:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
CacheableAssetsDefinition,
)
from dagster._core.definitions.decorators.job_decorator import job
from dagster._core.definitions.definitions_class import create_repository_using_definitions_args
from dagster._core.definitions.executor_definition import executor
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.logger_definition import logger
from dagster._core.definitions.repository_definition import (
PendingRepositoryDefinition,
RepositoryDefinition,
)
from dagster._core.definitions.sensor_definition import SensorDefinition
from dagster._core.storage.io_manager import IOManagerDefinition
from dagster._core.storage.mem_io_manager import InMemoryIOManager
from dagster._core.test_utils import instance_for_test
Expand Down Expand Up @@ -293,3 +295,108 @@ def test_bad_logger_value():
with pytest.raises(CheckError):
# ignore type to catch runtime error
Definitions(loggers={"not_a_logger": "not_a_logger"}) # type: ignore


def test_kitchen_sink_on_create_helper_and_definitions():
@asset(required_resource_keys={"a_resource_key"})
def an_asset():
pass

@asset
def another_asset():
pass

another_asset_job = define_asset_job(name="another_asset_job", selection="another_asset")

@op
def an_op():
pass

@job
def a_job():
an_op()

@job
def sensor_target():
an_op()

@job
def schedule_target():
an_op()

a_schedule = ScheduleDefinition(name="a_schedule", job=schedule_target, cron_schedule="@daily")

@sensor(job=sensor_target)
def a_sensor(_):
raise Exception("not called")

@executor
def an_executor(_):
raise Exception("not executed")

@logger
def a_logger(_):
raise Exception("not executed")

repo = create_repository_using_definitions_args(
name="foobar",
assets=[an_asset, another_asset],
jobs=[a_job, another_asset_job],
schedules=[a_schedule],
sensors=[a_sensor],
resources={"a_resource_key": "the resource"},
executor=an_executor,
loggers={"logger_key": a_logger},
)

assert isinstance(repo, RepositoryDefinition)

assert repo.name == "foobar"
assert isinstance(repo.get_job("a_job"), JobDefinition)
assert repo.get_job("a_job").executor_def is an_executor
assert repo.get_job("a_job").loggers == {"logger_key": a_logger}
assert isinstance(repo.get_job("__ASSET_JOB"), JobDefinition)
assert repo.get_job("__ASSET_JOB").executor_def is an_executor
assert repo.get_job("__ASSET_JOB").loggers == {"logger_key": a_logger}
assert isinstance(repo.get_job("another_asset_job"), JobDefinition)
assert repo.get_job("another_asset_job").executor_def is an_executor
assert repo.get_job("another_asset_job").loggers == {"logger_key": a_logger}
assert isinstance(repo.get_job("sensor_target"), JobDefinition)
assert repo.get_job("sensor_target").executor_def is an_executor
assert repo.get_job("sensor_target").loggers == {"logger_key": a_logger}
assert isinstance(repo.get_job("schedule_target"), JobDefinition)
assert repo.get_job("schedule_target").executor_def is an_executor
assert repo.get_job("schedule_target").loggers == {"logger_key": a_logger}

assert isinstance(repo.get_schedule_def("a_schedule"), ScheduleDefinition)
assert isinstance(repo.get_sensor_def("a_sensor"), SensorDefinition)

# test the kitchen sink since we have created it
defs = Definitions(
assets=[an_asset, another_asset],
jobs=[a_job, another_asset_job],
schedules=[a_schedule],
sensors=[a_sensor],
resources={"a_resource_key": "the resource"},
executor=an_executor,
loggers={"logger_key": a_logger},
)

assert isinstance(defs.get_job_def("a_job"), JobDefinition)
assert defs.get_job_def("a_job").executor_def is an_executor
assert defs.get_job_def("a_job").loggers == {"logger_key": a_logger}
assert isinstance(defs.get_job_def("__ASSET_JOB"), JobDefinition)
assert defs.get_job_def("__ASSET_JOB").executor_def is an_executor
assert defs.get_job_def("__ASSET_JOB").loggers == {"logger_key": a_logger}
assert isinstance(defs.get_job_def("another_asset_job"), JobDefinition)
assert defs.get_job_def("another_asset_job").executor_def is an_executor
assert defs.get_job_def("another_asset_job").loggers == {"logger_key": a_logger}
assert isinstance(defs.get_job_def("sensor_target"), JobDefinition)
assert defs.get_job_def("sensor_target").executor_def is an_executor
assert defs.get_job_def("sensor_target").loggers == {"logger_key": a_logger}
assert isinstance(defs.get_job_def("schedule_target"), JobDefinition)
assert defs.get_job_def("schedule_target").executor_def is an_executor
assert defs.get_job_def("schedule_target").loggers == {"logger_key": a_logger}

assert isinstance(defs.get_schedule_def("a_schedule"), ScheduleDefinition)
assert isinstance(defs.get_sensor_def("a_sensor"), SensorDefinition)