Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Add Function status #399

Merged
merged 3 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- BREAKING: Renamed `function` field of the Function pydantic model to `archive`([#393](https://github.com/Substra/substra/pull/393))
- BREAKING: Renamed ComputeTask status ([#397](https://github.com/Substra/substra/pull/397))
- BREAKING: Renamed ComputeTask status values ([#397](https://github.com/Substra/substra/pull/397))
- `download_logs` uses the new endpoint ([#398](https://github.com/Substra/substra/pull/398))
- BREAKING: Renamed Status to ComputeTaskStatus ([#399](https://github.com/Substra/substra/pull/399))

### Added

- Paths are now resolved on DatasampleSpec objects. Which means that users can pass relative paths ([#392](https://github.com/Substra/substra/pull/392))
- Added FunctionStatus ([#399](https://github.com/Substra/substra/pull/399))

## [0.49.0](https://github.com/Substra/substra/releases/tag/0.49.0) - 2023-10-18

Expand Down
2 changes: 1 addition & 1 deletion references/sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ The ``filters`` argument is a dictionary, with those possible keys:
rank (List[int]): list tasks which are at given ranks.

status (List[str]): list tasks with given status.
The possible values are the values of `substra.models.Status`
The possible values are the values of `substra.models.ComputeTaskStatus`
metadata (dict)
{
"key": str # the key of the metadata to filter on
Expand Down
2 changes: 1 addition & 1 deletion references/sdk_models.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Asset creation specification base class.
- owner: <class 'str'>
- compute_plan_key: <class 'str'>
- metadata: typing.Dict[str, str]
- status: <enum 'Status'>
- status: <enum 'ComputeTaskStatus'>
- worker: <class 'str'>
- rank: typing.Optional[int]
- tag: <class 'str'>
Expand Down
3 changes: 2 additions & 1 deletion substra/sdk/backends/local/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ def _add_function(self, key, spec, spec_options=None):
metadata=spec.metadata if spec.metadata else dict(),
inputs=_schemas_list_to_models_list(spec.inputs, models.FunctionInput),
outputs=_schemas_list_to_models_list(spec.outputs, models.FunctionOutput),
status=models.FunctionStatus.ready,
)
return self._db.add(function)

Expand Down Expand Up @@ -429,7 +430,7 @@ def _add_task(self, key, spec, spec_options=None):
outputs=_output_from_spec(spec.outputs),
tag=spec.tag or "",
# TODO: the waiting status should be more granular now
status=models.Status.waiting_for_executor_slot,
status=models.ComputeTaskStatus.waiting_for_executor_slot,
metadata=spec.metadata if spec.metadata else dict(),
)

Expand Down
4 changes: 2 additions & 2 deletions substra/sdk/backends/local/compute/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def schedule_task(self, task: models.Task):
task: Task to execute
"""
with self._context(task.key) as task_dir:
task.status = models.Status.doing
task.status = models.ComputeTaskStatus.doing
task.start_date = datetime.datetime.now()
function = self._db.get_with_files(schemas.Type.Function, task.function.key)
input_multiplicity = {i.identifier: i.multiple for i in function.inputs}
Expand Down Expand Up @@ -377,7 +377,7 @@ def schedule_task(self, task: models.Task):
)

# Set status
task.status = models.Status.done
task.status = models.ComputeTaskStatus.done
task.end_date = datetime.datetime.now()

self._update_cp(compute_plan=compute_plan, update_live_performances=update_live_performances)
2 changes: 1 addition & 1 deletion substra/sdk/backends/local/dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def get_performances(self, key: str) -> models.Performances:
performances = models.Performances()

for task in list_tasks:
if task.status == models.Status.done:
if task.status == models.ComputeTaskStatus.done:
function = self.get(schemas.Type.Function, task.function.key)
perf_identifiers = [
output.identifier for output in function.outputs if output.kind == schemas.AssetKind.performance
Expand Down
47 changes: 42 additions & 5 deletions substra/sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ def list_task(
worker (List[str]): list tasks which ran on listed workers. Remote mode only.\n
rank (List[int]): list tasks which are at given ranks.\n
status (List[str]): list tasks with given status.
The possible values are the values of `substra.models.Status`
The possible values are the values of `substra.models.ComputeTaskStatus`
metadata (dict)
{
"key": str # the key of the metadata to filter on
Expand Down Expand Up @@ -1062,9 +1062,46 @@ def wait_task(
Not raised when `timeout == None`
"""
asset_getter = self.get_task
status_canceled = models.Status.canceled.value
status_failed = models.Status.failed.value
statuses_stopped = (models.Status.done.value, models.Status.canceled.value)
status_canceled = models.ComputeTaskStatus.canceled.value
status_failed = models.ComputeTaskStatus.failed.value
statuses_stopped = (models.ComputeTaskStatus.done.value, models.ComputeTaskStatus.canceled.value)
return self._wait(
key=key,
asset_getter=asset_getter,
polling_period=polling_period,
raise_on_failure=raise_on_failure,
status_canceled=status_canceled,
status_failed=status_failed,
statuses_stopped=statuses_stopped,
timeout=timeout,
)

@logit
def wait_function(
self, key: str, *, timeout: Optional[float] = None, polling_period: float = 1.0, raise_on_failure: bool = True
) -> models.Task:
"""Wait for the build of the given function to finish.

It is considered finished when the status is ready, failed or cancelled.

Args:
key (str): the key of the task to wait for.
timeout (float, optional): maximum time to wait, in seconds. If set to None, will hang until completion.
polling_period (float): time to wait between two checks, in seconds. Defaults to 2.0.
raise_on_failure (bool): whether to raise an exception if the execution fails. Defaults to True.

Returns:
models.Task: the task after completion

Raises:
exceptions.FutureFailureError: The task failed or have been cancelled.
exceptions.FutureTimeoutError: The task took more than the duration set in the timeout to complete.
Not raised when `timeout == None`
"""
asset_getter = self.get_function
status_canceled = models.FunctionStatus.canceled.value
status_failed = models.FunctionStatus.failed.value
statuses_stopped = (models.FunctionStatus.ready.value, models.FunctionStatus.canceled.value)
return self._wait(
key=key,
asset_getter=asset_getter,
Expand Down Expand Up @@ -1095,7 +1132,7 @@ def _wait(
if asset.status in statuses_stopped:
break

if asset.status == models.Status.failed.value and asset.error_type is not None:
if asset.status == models.ComputeTaskStatus.failed.value and asset.error_type is not None:
# when dealing with a failed task, wait for the error_type field of the task to be set
# i.e. wait for the registration of the failure report
break
Expand Down
16 changes: 14 additions & 2 deletions substra/sdk/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MetadataFilterType(str, enum.Enum):
exists = "exists"


class Status(str, enum.Enum):
class ComputeTaskStatus(str, enum.Enum):
"""Status of the task"""

unknown = "STATUS_UNKNOWN"
Expand All @@ -57,6 +57,17 @@ class ComputePlanStatus(str, enum.Enum):
empty = "PLAN_STATUS_EMPTY"


class FunctionStatus(str, enum.Enum):
"""Status of the function"""

unknown = "FUNCTION_STATUS_UNKNOWN"
waiting = "FUNCTION_STATUS_WAITING"
building = "FUNCTION_STATUS_BUILDING"
ready = "FUNCTION_STATUS_READY"
failed = "FUNCTION_STATUS_FAILED"
canceled = "FUNCTION_STATUS_CANCELED"


class TaskErrorType(str, enum.Enum):
"""Types of errors that can occur in a task"""

Expand Down Expand Up @@ -178,6 +189,7 @@ class Function(_Model):
creation_date: datetime
inputs: List[FunctionInput]
outputs: List[FunctionOutput]
status: FunctionStatus

description: _File
archive: _File
Expand Down Expand Up @@ -268,7 +280,7 @@ class Task(_Model):
owner: str
compute_plan_key: str
metadata: Dict[str, str]
status: Status
status: ComputeTaskStatus
worker: str
rank: Optional[int] = None
tag: str
Expand Down
9 changes: 9 additions & 0 deletions tests/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"opener": {"kind": "ASSET_DATA_MANAGER", "optional": False, "multiple": False},
},
"outputs": {"model": {"kind": "ASSET_MODEL", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
}


Expand Down Expand Up @@ -106,6 +107,7 @@
"shared": {"kind": "ASSET_MODEL", "optional": True, "multiple": False},
},
"outputs": {"predictions": {"kind": "ASSET_MODEL", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
}


Expand Down Expand Up @@ -133,6 +135,7 @@
"predictions": {"kind": "ASSET_MODEL", "optional": False, "multiple": False},
},
"outputs": {"performance": {"kind": "ASSET_PERFORMANCE", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
}


Expand Down Expand Up @@ -162,6 +165,7 @@
"opener": {"kind": "ASSET_DATA_MANAGER", "optional": False, "multiple": False},
},
"outputs": {"model": {"kind": "ASSET_MODEL", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
},
"owner": "MyOrg1MSP",
"creation_date": "2021-08-24T13:36:07.393646367Z",
Expand Down Expand Up @@ -234,6 +238,7 @@
"metadata": {"foo": "bar"},
"inputs": {"model": {"kind": "ASSET_MODEL", "optional": False, "multiple": True}},
"outputs": {"model": {"kind": "ASSET_MODEL", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
},
"owner": "MyOrg1MSP",
"creation_date": "2021-08-24T13:36:07.393646367Z",
Expand Down Expand Up @@ -308,6 +313,7 @@
"local": {"kind": "ASSET_MODEL", "multiple": False},
"shared": {"kind": "ASSET_MODEL", "multiple": False},
},
"status": "FUNCTION_STATUS_READY",
},
"owner": "MyOrg1MSP",
"creation_date": "2021-08-24T13:36:07.393646367Z",
Expand Down Expand Up @@ -420,6 +426,7 @@
"local": {"kind": "ASSET_MODEL", "multiple": False},
"shared": {"kind": "ASSET_MODEL", "multiple": False},
},
"status": "FUNCTION_STATUS_READY",
},
"owner": "MyOrg1MSP",
"creation_date": "2021-08-24T13:36:07.393646367Z",
Expand Down Expand Up @@ -478,6 +485,7 @@
"shared": {"kind": "ASSET_MODEL", "optional": True, "multiple": False},
},
"outputs": {"predictions": {"kind": "ASSET_MODEL", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
},
"owner": "MyOrg1MSP",
"creation_date": "2021-08-24T13:36:07.393646367Z",
Expand Down Expand Up @@ -536,6 +544,7 @@
"predictions": {"kind": "ASSET_MODEL", "optional": False, "multiple": False},
},
"outputs": {"performance": {"kind": "ASSET_PERFORMANCE", "multiple": False}},
"status": "FUNCTION_STATUS_READY",
},
"owner": "MyOrg1MSP",
"creation_date": "2021-08-24T13:36:07.393646367Z",
Expand Down
14 changes: 7 additions & 7 deletions tests/sdk/test_wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from substra.sdk import exceptions
from substra.sdk.models import ComputePlanStatus
from substra.sdk.models import Status
from substra.sdk.models import ComputeTaskStatus
from substra.sdk.models import TaskErrorType

from .. import datastore
Expand All @@ -21,8 +21,8 @@ def _param_name_maker(arg):
@pytest.mark.parametrize(
("asset_dict", "function_name", "status", "expectation"),
[
(datastore.TRAINTASK, "wait_task", Status.done, does_not_raise()),
(datastore.TRAINTASK, "wait_task", Status.canceled, pytest.raises(exceptions.FutureFailureError)),
(datastore.TRAINTASK, "wait_task", ComputeTaskStatus.done, does_not_raise()),
(datastore.TRAINTASK, "wait_task", ComputeTaskStatus.canceled, pytest.raises(exceptions.FutureFailureError)),
(datastore.COMPUTE_PLAN, "wait_compute_plan", ComputePlanStatus.done, does_not_raise()),
(
datastore.COMPUTE_PLAN,
Expand All @@ -49,7 +49,7 @@ def test_wait(client, mocker, asset_dict, function_name, status, expectation):

def test_wait_task_failed(client, mocker):
# We need an error type to stop the iteration
item = {**datastore.TRAINTASK, "status": Status.failed, "error_type": TaskErrorType.internal}
item = {**datastore.TRAINTASK, "status": ComputeTaskStatus.failed, "error_type": TaskErrorType.internal}
mock_requests(mocker, "get", item)
with pytest.raises(exceptions.FutureFailureError):
client.wait_task(key=item["key"])
Expand All @@ -58,9 +58,9 @@ def test_wait_task_failed(client, mocker):
@pytest.mark.parametrize(
("asset_dict", "function_name", "status"),
[
(datastore.TRAINTASK, "wait_task", Status.waiting_for_parent_tasks),
(datastore.TRAINTASK, "wait_task", Status.waiting_for_builder_slot),
(datastore.TRAINTASK, "wait_task", Status.waiting_for_executor_slot),
(datastore.TRAINTASK, "wait_task", ComputeTaskStatus.waiting_for_parent_tasks),
(datastore.TRAINTASK, "wait_task", ComputeTaskStatus.waiting_for_builder_slot),
(datastore.TRAINTASK, "wait_task", ComputeTaskStatus.waiting_for_executor_slot),
(datastore.COMPUTE_PLAN, "wait_compute_plan", ComputePlanStatus.todo),
],
ids=_param_name_maker,
Expand Down
4 changes: 2 additions & 2 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ def test_check_metadata_search_filter(filters, expected, exception):
),
(
schemas.Type.Task,
{"status": [substra.models.Status.done.value]},
{"status": [substra.models.Status.done.value]},
{"status": [substra.models.ComputeTaskStatus.done.value]},
{"status": [substra.models.ComputeTaskStatus.done.value]},
None,
),
(schemas.Type.Task, {"rank": [1]}, {"rank": ["1"]}, None),
Expand Down
Loading