Skip to content

Commit

Permalink
add with_overrides to map task (#794)
Browse files Browse the repository at this point in the history
* add with_overrides

Signed-off-by: Samhita Alla <[email protected]>

* remove Resources

Signed-off-by: Samhita Alla <[email protected]>
  • Loading branch information
samhita-alla authored Jan 4, 2022
1 parent 9e156bb commit 0e752bc
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 11 deletions.
7 changes: 4 additions & 3 deletions flytekit/core/map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def _raw_execute(self, **kwargs) -> Any:

def map_task(task_function: PythonFunctionTask, concurrency: int = None, min_success_ratio: float = None, **kwargs):
"""
Use a map task for parallelizable tasks that are run across a List of an input type. A map task can be composed of
Use a map task for parallelizable tasks that run across a list of an input type. A map task can be composed of
any individual :py:class:`flytekit.PythonFunctionTask`.
Invoke a map task with arguments using the :py:class:`list` version of the expected input.
Expand All @@ -220,8 +220,8 @@ def map_task(task_function: PythonFunctionTask, concurrency: int = None, min_suc
:language: python
:dedent: 4
At run time, the underlying map task will be run for every value in the input collection. Task-specific attributes
such as :py:class:`flytekit.TaskMetadata` and :py:class:`flytekit.Resources` are applied to individual instances
At run time, the underlying map task will be run for every value in the input collection. Attributes
such as :py:class:`flytekit.TaskMetadata` and ``with_overrides`` are applied to individual instances
of the mapped task.
:param task_function: This argument is implicitly passed and represents the repeatable function
Expand All @@ -230,6 +230,7 @@ def map_task(task_function: PythonFunctionTask, concurrency: int = None, min_suc
all inputs are processed.
:param min_success_ratio: If specified, this determines the minimum fraction of total jobs which can complete
successfully before terminating this task and marking it successful.
"""
if not isinstance(task_function, PythonFunctionTask):
raise ValueError(
Expand Down
12 changes: 4 additions & 8 deletions tests/flytekit/unit/core/test_map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from flytekit import LaunchPlan, Resources, map_task
from flytekit import LaunchPlan, map_task
from flytekit.common.translator import get_serializable
from flytekit.core import context_manager
from flytekit.core.context_manager import Image, ImageConfig
Expand Down Expand Up @@ -33,13 +33,9 @@ def my_mappable_task(a: int) -> str:

@workflow
def my_wf(x: typing.List[int]) -> typing.List[str]:
return map_task(
my_mappable_task,
metadata=TaskMetadata(retries=1),
requests=Resources(cpu="10M"),
concurrency=10,
min_success_ratio=0.75,
)(a=x)
return map_task(my_mappable_task, metadata=TaskMetadata(retries=1), concurrency=10, min_success_ratio=0.75,)(
a=x
).with_overrides(cpu="10M")

# test_map_task_end

Expand Down

0 comments on commit 0e752bc

Please sign in to comment.