Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add telemetry pipeline run ends #2377

Merged
merged 13 commits into from
Feb 1, 2024
10 changes: 10 additions & 0 deletions docs/book/user-guide/starter-guide/fetching-pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ Calling a pipeline executes it and then returns the response of the freshly exec
last_run = training_pipeline()
```

{% hint style="warning" %}
The run that you get back is the model stored in the ZenML database at the point of the method call. This means the pipeline run is still initializing and no steps have been run. To get the latest state can get a refreshed version from the client:

```python
from zenml.client import Client

Client().get_pipeline_run(last_run.id) to get a refreshed version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should also not call it last_run. This isn't actually the last run as someone might have created another one in the meantime, but this is exactly the run that was created by calling training_pipeline() in the script above.

```
{% endhint %}

### Get a run via the client

If you already know the exact run that you want to fetch (e.g., from looking at the dashboard), you can use the [`Client.get_pipeline_run()`](https://sdkdocs.zenml.io/latest/core\_code\_docs/core-client/#zenml.client.Client.get\_pipeline\_run) method to fetch the run directly without having to query the pipeline first:
Expand Down
10 changes: 5 additions & 5 deletions examples/e2e/steps/training/model_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ def model_trainer(
# keep track of mlflow version for future use
model_registry = Client().active_stack.model_registry
if model_registry:
versions = model_registry.list_model_versions(name=name)
if versions:
version = model_registry.get_latest_model_version(
name=name, stage=None
)
if version:
model_ = get_step_context().model
model_.log_metadata(
{"model_registry_version": versions[-1].version}
)
model_.log_metadata({"model_registry_version": version.version})
### YOUR CODE ENDS HERE ###

return model
1 change: 1 addition & 0 deletions examples/e2e/utils/promote_in_model_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def promote_in_model_registry(
target_env: stage for promotion
"""
model_registry = Client().active_stack.model_registry
model_registry.configure_mlflow()
if latest_version != current_version:
model_registry.update_model_version(
name=model_name,
Expand Down
1 change: 1 addition & 0 deletions src/zenml/analytics/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class AnalyticsEvent(str, Enum):

# Pipelines
RUN_PIPELINE = "Pipeline run"
RUN_PIPELINE_ENDED = "Pipeline run ended"
CREATE_PIPELINE = "Pipeline created"
BUILD_PIPELINE = "Pipeline built"

Expand Down
11 changes: 8 additions & 3 deletions src/zenml/new/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,14 +734,12 @@ def _run(
deployment=deployment_request
)

analytics_handler.metadata = self._get_pipeline_analytics_metadata(
deployment=deployment_model, stack=stack
)
stack.prepare_pipeline_deployment(deployment=deployment_model)

self.log_pipeline_deployment_metadata(deployment_model)

run = None
run_id = None
htahir1 marked this conversation as resolved.
Show resolved Hide resolved
if not schedule:
run_request = PipelineRunRequest(
name=get_run_name(
Expand All @@ -764,7 +762,11 @@ def _run(
status=ExecutionStatus.INITIALIZING,
)
run = Client().zen_store.create_run(run_request)
run_id = run.id

analytics_handler.metadata = self._get_pipeline_analytics_metadata(
deployment=deployment_model, stack=stack, run_id=run_id
)
# Prevent execution of nested pipelines which might lead to
# unexpected behavior
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
Expand Down Expand Up @@ -1081,12 +1083,14 @@ def _get_pipeline_analytics_metadata(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
run_id: Optional[UUID] = None,
) -> Dict[str, Any]:
"""Returns the pipeline deployment metadata.

Args:
deployment: The pipeline deployment to track.
stack: The stack on which the pipeline will be deployed.
run_id: The ID of the pipeline run.

Returns:
the metadata about the pipeline deployment
Expand All @@ -1113,6 +1117,7 @@ def _get_pipeline_analytics_metadata(
"schedule": bool(deployment.schedule),
"custom_materializer": custom_materializer,
"own_stack": own_stack,
"pipeline_run_id": str(run_id) if run_id else None,
}

def _compile(
Expand Down
21 changes: 19 additions & 2 deletions src/zenml/zen_stores/sql_zen_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@
from sqlmodel.sql.expression import Select, SelectOfScalar

from zenml.analytics.enums import AnalyticsEvent
from zenml.analytics.utils import analytics_disabler, track_decorator
from zenml.analytics.utils import (
analytics_disabler,
track_decorator,
track_handler,
)
from zenml.config.global_config import GlobalConfiguration
from zenml.config.secrets_store_config import SecretsStoreConfiguration
from zenml.config.server_config import ServerConfiguration
Expand Down Expand Up @@ -6688,7 +6692,20 @@ def _update_pipeline_run_status(
ExecutionStatus.FAILED,
}:
run_update.end_time = datetime.utcnow()

with track_handler(
AnalyticsEvent.RUN_PIPELINE_ENDED
) as analytics_handler:
analytics_handler.metadata = {
"pipeline_run_id": pipeline_run_id,
"status": new_status,
"num_steps": num_steps,
"start_time": pipeline_run.start_time.strftime(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just FYI, we don't send the start_time of the pipeline run in the other event, so there won't be any way to match that I assume. Not sure if we either want to add that in the other analytics event, or remove it here or maybe just send the duration here if that is interesting?

"%Y-%m-%dT%H:%M:%S.%fZ"
),
"end_time": run_update.end_time.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
}
pipeline_run.update(run_update)
session.add(pipeline_run)

Expand Down