From 4a5632599802176c429d532002db251fdc3eb4f9 Mon Sep 17 00:00:00 2001 From: Samhita Alla Date: Tue, 4 Jan 2022 10:01:53 +0530 Subject: [PATCH] add `with_overrides` to map task (#794) * add with_overrides Signed-off-by: Samhita Alla * remove Resources Signed-off-by: Samhita Alla Signed-off-by: Eduardo Apolinario --- flytekit/core/map_task.py | 7 ++++--- tests/flytekit/unit/core/test_map_task.py | 12 ++++-------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index 60061cbb0e1..42645a47970 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -207,7 +207,7 @@ def _raw_execute(self, **kwargs) -> Any: def map_task(task_function: PythonFunctionTask, concurrency: int = None, min_success_ratio: float = None, **kwargs): """ - Use a map task for parallelizable tasks that are run across a List of an input type. A map task can be composed of + Use a map task for parallelizable tasks that run across a list of an input type. A map task can be composed of any individual :py:class:`flytekit.PythonFunctionTask`. Invoke a map task with arguments using the :py:class:`list` version of the expected input. @@ -220,8 +220,8 @@ def map_task(task_function: PythonFunctionTask, concurrency: int = None, min_suc :language: python :dedent: 4 - At run time, the underlying map task will be run for every value in the input collection. Task-specific attributes - such as :py:class:`flytekit.TaskMetadata` and :py:class:`flytekit.Resources` are applied to individual instances + At run time, the underlying map task will be run for every value in the input collection. Attributes + such as :py:class:`flytekit.TaskMetadata` and ``with_overrides`` are applied to individual instances of the mapped task. :param task_function: This argument is implicitly passed and represents the repeatable function @@ -230,6 +230,7 @@ def map_task(task_function: PythonFunctionTask, concurrency: int = None, min_suc all inputs are processed. :param min_success_ratio: If specified, this determines the minimum fraction of total jobs which can complete successfully before terminating this task and marking it successful. + """ if not isinstance(task_function, PythonFunctionTask): raise ValueError( diff --git a/tests/flytekit/unit/core/test_map_task.py b/tests/flytekit/unit/core/test_map_task.py index 95df6698290..31cbceffe9f 100644 --- a/tests/flytekit/unit/core/test_map_task.py +++ b/tests/flytekit/unit/core/test_map_task.py @@ -3,7 +3,7 @@ import pytest -from flytekit import LaunchPlan, Resources, map_task +from flytekit import LaunchPlan, map_task from flytekit.common.translator import get_serializable from flytekit.core import context_manager from flytekit.core.context_manager import Image, ImageConfig @@ -33,13 +33,9 @@ def my_mappable_task(a: int) -> str: @workflow def my_wf(x: typing.List[int]) -> typing.List[str]: - return map_task( - my_mappable_task, - metadata=TaskMetadata(retries=1), - requests=Resources(cpu="10M"), - concurrency=10, - min_success_ratio=0.75, - )(a=x) + return map_task(my_mappable_task, metadata=TaskMetadata(retries=1), concurrency=10, min_success_ratio=0.75,)( + a=x + ).with_overrides(cpu="10M") # test_map_task_end