From f0b32f35c35ba8843a90c0d16ec6c8fb34703de0 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Wed, 9 Oct 2024 10:07:49 +0200 Subject: [PATCH] Emit metrics for tasks run in parallel --- helpers/parallel.py | 13 +++++++++++++ tasks/upload_finisher.py | 1 + tasks/upload_processor.py | 1 + 3 files changed, 15 insertions(+) diff --git a/helpers/parallel.py b/helpers/parallel.py index da5b8df59..28c2ec277 100644 --- a/helpers/parallel.py +++ b/helpers/parallel.py @@ -2,8 +2,17 @@ from enum import Enum +import sentry_sdk +from shared.metrics import Counter + from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO +PARALLEL_TASK_RUNS = Counter( + "worker_parallel_task_runs", + "Number of tasks run using different parallel processing configurations.", + ["task", "parallel_mode"], +) + """ This encapsulates Parallel Upload Processing logic @@ -89,3 +98,7 @@ def from_task_args( if is_final: return ParallelProcessing.EXPERIMENT_SERIAL return ParallelProcessing.SERIAL + + def emit_metrics(self, task: str): + sentry_sdk.set_tag("parallel_mode", self.value) + PARALLEL_TASK_RUNS.labels(task=task, parallel_mode=self.value).inc() diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index a2f37c2d8..6e24f0761 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -137,6 +137,7 @@ def run_impl( repository = commit.repository parallel_processing = ParallelProcessing.from_task_args(**kwargs) + parallel_processing.emit_metrics("upload_finisher") if parallel_processing.is_parallel: # need to transform processing_results produced by chord to get it into the diff --git a/tasks/upload_processor.py b/tasks/upload_processor.py index 4b40e01a6..f9af59f7e 100644 --- a/tasks/upload_processor.py +++ b/tasks/upload_processor.py @@ -193,6 +193,7 @@ def process_upload( report_service = ReportService(UserYaml(commit_yaml)) in_parallel = parallel_processing.is_parallel + parallel_processing.emit_metrics("upload_processor") if in_parallel: log.info(