Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add with_overrides to map task #794

Merged
merged 2 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions flytekit/core/map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def _raw_execute(self, **kwargs) -> Any:

def map_task(task_function: PythonFunctionTask, concurrency: int = None, min_success_ratio: float = None, **kwargs):
"""
Use a map task for parallelizable tasks that are run across a List of an input type. A map task can be composed of
Use a map task for parallelizable tasks that run across a list of an input type. A map task can be composed of
any individual :py:class:`flytekit.PythonFunctionTask`.

Invoke a map task with arguments using the :py:class:`list` version of the expected input.
Expand All @@ -220,8 +220,8 @@ def map_task(task_function: PythonFunctionTask, concurrency: int = None, min_suc
:language: python
:dedent: 4

At run time, the underlying map task will be run for every value in the input collection. Task-specific attributes
such as :py:class:`flytekit.TaskMetadata` and :py:class:`flytekit.Resources` are applied to individual instances
At run time, the underlying map task will be run for every value in the input collection. Attributes
such as :py:class:`flytekit.TaskMetadata` and ``with_overrides`` are applied to individual instances
of the mapped task.

:param task_function: This argument is implicitly passed and represents the repeatable function
Expand All @@ -230,6 +230,7 @@ def map_task(task_function: PythonFunctionTask, concurrency: int = None, min_suc
all inputs are processed.
:param min_success_ratio: If specified, this determines the minimum fraction of total jobs which can complete
successfully before terminating this task and marking it successful.

"""
if not isinstance(task_function, PythonFunctionTask):
raise ValueError(
Expand Down
12 changes: 4 additions & 8 deletions tests/flytekit/unit/core/test_map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from flytekit import LaunchPlan, Resources, map_task
from flytekit import LaunchPlan, map_task
from flytekit.common.translator import get_serializable
from flytekit.core import context_manager
from flytekit.core.context_manager import Image, ImageConfig
Expand Down Expand Up @@ -33,13 +33,9 @@ def my_mappable_task(a: int) -> str:

@workflow
def my_wf(x: typing.List[int]) -> typing.List[str]:
return map_task(
my_mappable_task,
metadata=TaskMetadata(retries=1),
requests=Resources(cpu="10M"),
concurrency=10,
min_success_ratio=0.75,
)(a=x)
return map_task(my_mappable_task, metadata=TaskMetadata(retries=1), concurrency=10, min_success_ratio=0.75,)(
a=x
).with_overrides(cpu="10M")

# test_map_task_end

Expand Down