From b5a13dd47eed2738a918bbb793ad439eb1d81d53 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Tue, 8 Oct 2024 10:33:04 +0200 Subject: [PATCH] Move loading and merging intermediate Reports to its own file This should make the high level steps of parallel processing a bit more obvious and easier to grasp. --- services/processing/loading.py | 51 +++++++++++ services/processing/merging.py | 112 +++++++++++++++++++++++ tasks/upload_finisher.py | 160 ++++----------------------------- 3 files changed, 179 insertions(+), 144 deletions(-) create mode 100644 services/processing/loading.py create mode 100644 services/processing/merging.py diff --git a/services/processing/loading.py b/services/processing/loading.py new file mode 100644 index 000000000..c9dd6ea77 --- /dev/null +++ b/services/processing/loading.py @@ -0,0 +1,51 @@ +import json +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass + +import sentry_sdk +from shared.reports.editable import EditableReport + +from services.archive import ArchiveService +from services.processing.state import MERGE_BATCH_SIZE + + +@dataclass +class IntermediateReport: + upload_id: int + """ + The `Upload` id for which this report was loaded. + """ + + report: EditableReport + """ + The loaded Report. + """ + + +@sentry_sdk.trace +def load_intermediate_reports( + archive_service: ArchiveService, + commitsha: str, + upload_ids: list[int], +) -> list[IntermediateReport]: + def load_report(upload_id: int) -> IntermediateReport: + repo_hash = archive_service.storage_hash + # TODO: migrate these files to a better storage location + prefix = f"v4/repos/{repo_hash}/commits/{commitsha}/parallel/incremental" + chunks_path = f"{prefix}/chunk{upload_id}.txt" + json_path = f"{prefix}/files_and_sessions{upload_id}.txt" + + chunks = archive_service.read_file(chunks_path).decode(errors="replace") + report_json = json.loads(archive_service.read_file(json_path)) + + report = EditableReport.from_chunks( + chunks=chunks, + files=report_json["files"], + sessions=report_json["sessions"], + totals=report_json.get("totals"), + ) + return IntermediateReport(upload_id, report) + + with ThreadPoolExecutor(max_workers=MERGE_BATCH_SIZE) as pool: + loaded_reports = pool.map(load_report, upload_ids) + return list(loaded_reports) diff --git a/services/processing/merging.py b/services/processing/merging.py new file mode 100644 index 000000000..bba24c26e --- /dev/null +++ b/services/processing/merging.py @@ -0,0 +1,112 @@ +from dataclasses import dataclass + +import sentry_sdk +from shared.reports.editable import EditableReport, EditableReportFile +from shared.reports.enums import UploadState +from shared.reports.resources import Report +from shared.yaml import UserYaml +from sqlalchemy.orm import Session as DbSession + +from database.models.reports import Upload +from services.processing.loading import IntermediateReport +from services.report import delete_uploads_by_sessionid +from services.report.raw_upload_processor import clear_carryforward_sessions + + +@dataclass +class MergeResult: + session_mapping: dict[int, int] + """ + This is a mapping from the input `upload_id` to the output `session_id` + as it exists in the merged "master Report". + """ + + deleted_sessions: set[int] + """ + The Set of carryforwarded `session_id`s that have been removed from the "master Report". + """ + + +@sentry_sdk.trace +def merge_reports( + commit_yaml: UserYaml, + master_report: Report, + intermediate_reports: list[IntermediateReport], +) -> MergeResult: + session_mapping: dict[int, int] = dict() + deleted_sessions: set[int] = set() + + for intermediate_report in intermediate_reports: + report = intermediate_report.report + + old_sessionid = next(iter(report.sessions)) + new_sessionid = master_report.next_session_number() + change_sessionid(report, old_sessionid, new_sessionid) + session_mapping[intermediate_report.upload_id] = new_sessionid + + session = report.sessions[new_sessionid] + + _session_id, session = master_report.add_session( + session, use_id_from_session=True + ) + + if flags := session.flags: + session_adjustment = clear_carryforward_sessions( + master_report, report, flags, commit_yaml + ) + deleted_sessions.update(session_adjustment.fully_deleted_sessions) + + master_report.merge(report) + + return MergeResult(session_mapping, deleted_sessions) + + +@sentry_sdk.trace +def update_uploads(db_session: DbSession, merge_result: MergeResult): + for upload_id, session_id in merge_result.session_mapping.items(): + upload = db_session.query(Upload).filter(Upload.id_ == upload_id).first() + upload.state_id = UploadState.PROCESSED.db_id + upload.state = "processed" + upload.order_number = session_id + + if upload: + delete_uploads_by_sessionid(upload, list(merge_result.deleted_sessions)) + db_session.flush() + + +def change_sessionid(report: EditableReport, old_id: int, new_id: int): + """ + Modifies the `EditableReport`, changing the session with `old_id` to have `new_id` instead. + This patches up all the references to that session across all files and line records. + + In particular, it changes the id in all the `LineSession`s and `CoverageDatapoint`s, + and does the equivalent of `calculate_present_sessions`. + """ + session = report.sessions[new_id] = report.sessions.pop(old_id) + session.id = new_id + + report_file: EditableReportFile + for report_file in report._chunks: + if report_file is None: + continue + + all_sessions = set() + + for idx, _line in enumerate(report_file._lines): + if not _line: + continue + + # this turns the line into an actual `ReportLine` + line = report_file._lines[idx] = report_file._line(_line) + + for session in line.sessions: + if session.id == old_id: + session.id = new_id + all_sessions.add(session.id) + + if line.datapoints: + for point in line.datapoints: + if point.sessionid == old_id: + point.sessionid = new_id + + report_file._details["present_sessions"] = all_sessions diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index b53b2cfb6..e377955bc 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -1,9 +1,7 @@ import contextlib -import functools import json import logging import re -from concurrent.futures import ThreadPoolExecutor from enum import Enum from redis.exceptions import LockError @@ -16,8 +14,6 @@ upload_finisher_task_name, ) from shared.metrics import Histogram -from shared.reports.editable import EditableReport, EditableReportFile -from shared.reports.enums import UploadState from shared.reports.resources import Report from shared.storage.exceptions import FileNotInStorageError from shared.yaml import UserYaml @@ -26,7 +22,6 @@ from celery_config import notify_error_task_name from database.models import Commit, Pull from database.models.core import Repository -from database.models.reports import Upload from helpers.checkpoint_logger import _kwargs_key from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs from helpers.checkpoint_logger.flows import UploadFlow @@ -34,13 +29,11 @@ from helpers.parallel import ParallelProcessing from services.archive import ArchiveService, MinioEndpoints from services.comparison import get_or_create_comparison +from services.processing.loading import load_intermediate_reports +from services.processing.merging import merge_reports, update_uploads from services.processing.state import ProcessingState, should_trigger_postprocessing from services.redis import get_redis_connection -from services.report import ReportService, delete_uploads_by_sessionid -from services.report.raw_upload_processor import ( - SessionAdjustmentResult, - clear_carryforward_sessions, -) +from services.report import ReportService from services.yaml import read_yaml_field from tasks.base import BaseCodecovTask from tasks.parallel_verification import parallel_verification_task @@ -570,104 +563,22 @@ def merge_incremental_reports( ) report = Report() - log.info( - "Downloading %s incremental reports that were processed in parallel", - len(processing_results["processings_so_far"]), - extra=dict( - repoid=repoid, - commit=commitid, - processing_results=processing_results["processings_so_far"], - parent_task=self.request.parent_id, - ), + upload_ids = [ + int(upload["upload_pk"]) + for upload in processing_results["parallel_incremental_result"] + ] + intermediate_reports = load_intermediate_reports( + archive_service, commitid, upload_ids + ) + merge_result = merge_reports( + UserYaml(commit_yaml), report, intermediate_reports ) - def download_and_build_incremental_report(partial_report): - chunks = archive_service.read_file(partial_report["chunks_path"]).decode( - errors="replace" - ) - files_and_sessions = json.loads( - archive_service.read_file(partial_report["files_and_sessions_path"]) - ) - report = report_service.build_report( - chunks, - files_and_sessions["files"], - files_and_sessions["sessions"], - None, - report_class=EditableReport, - ) - return { - "upload_pk": partial_report["upload_pk"], - "report": report, - } - - def merge_report(cumulative_report: Report, obj): - incremental_report: Report = obj["report"] - - if len(incremental_report.sessions) != 1: - log.warning( - "Incremental report does not have expected session", - extra=dict( - repoid=repoid, - commit=commitid, - upload_pk=obj["upload_pk"], - ), - ) - - old_sessionid = next(iter(incremental_report.sessions)) - new_sessionid = cumulative_report.next_session_number() - change_sessionid(incremental_report, old_sessionid, new_sessionid) - - session = incremental_report.sessions[new_sessionid] - - _session_id, session = cumulative_report.add_session( - session, use_id_from_session=True - ) - - session_adjustment = SessionAdjustmentResult([], []) - if flags := session.flags: - session_adjustment = clear_carryforward_sessions( - cumulative_report, incremental_report, flags, UserYaml(commit_yaml) - ) - - cumulative_report.merge(incremental_report) - - if parallel_processing is ParallelProcessing.PARALLEL: - # When we are fully parallel, we need to update the `Upload` in the database - # with the final session_id (aka `order_number`) and other statuses - db_session = commit.get_db_session() - upload = ( - db_session.query(Upload) - .filter(Upload.id_ == obj["upload_pk"]) - .first() - ) - upload.state_id = UploadState.PROCESSED.db_id - upload.state = "processed" - upload.order_number = new_sessionid - delete_uploads_by_sessionid( - upload, session_adjustment.fully_deleted_sessions - ) - db_session.flush() - - return cumulative_report - - with ThreadPoolExecutor(max_workers=10) as pool: # max chosen arbitrarily - unmerged_reports = pool.map( - download_and_build_incremental_report, - processing_results["parallel_incremental_result"], - ) + if parallel_processing is ParallelProcessing.PARALLEL: + # When we are fully parallel, we need to update the `Upload` in the database + # with the final session_id (aka `order_number`) and other statuses + update_uploads(commit.get_db_session(), merge_result) - log.info( - "Merging %s incremental reports together", - len(processing_results["processings_so_far"]), - extra=dict( - repoid=repoid, - commit=commitid, - processing_results=processing_results["processings_so_far"], - parent_task=self.request.parent_id, - ), - ) - report = functools.reduce(merge_report, unmerged_reports, report) - commit.get_db_session().flush() return report @@ -683,42 +594,3 @@ def acquire_report_lock(repoid: int, commitid: str, hard_time_limit: int) -> Loc timeout=max(60 * 5, hard_time_limit), blocking_timeout=5, ) - - -# TODO: maybe move this to `shared` if it turns out to be a better place for this -def change_sessionid(report: Report, old_id: int, new_id: int): - """ - Modifies the `Report`, changing the session with `old_id` to have `new_id` instead. - This patches up all the references to that session across all files and line records. - - In particular, it changes the id in all the `LineSession`s and `CoverageDatapoint`s, - and does the equivalent of `calculate_present_sessions`. - """ - session = report.sessions[new_id] = report.sessions.pop(old_id) - session.id = new_id - - report_file: EditableReportFile - for report_file in report._chunks: - if report_file is None: - continue - - all_sessions = set() - - for idx, _line in enumerate(report_file._lines): - if not _line: - continue - - # this turns the line into an actual `ReportLine` - line = report_file._lines[idx] = report_file._line(_line) - - for session in line.sessions: - if session.id == old_id: - session.id = new_id - all_sessions.add(session.id) - - if line.datapoints: - for point in line.datapoints: - if point.sessionid == old_id: - point.sessionid = new_id - - report_file._details["present_sessions"] = all_sessions