diff --git a/services/processing/loading.py b/services/processing/loading.py new file mode 100644 index 000000000..776db7e95 --- /dev/null +++ b/services/processing/loading.py @@ -0,0 +1,102 @@ +import json +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass + +import sentry_sdk +from shared.reports.editable import EditableReport + +from services.archive import ArchiveService, MinioEndpoints + + +@dataclass +class IntermediateReport: + upload_id: int + """ + The `Upload` id for which this report was loaded. + """ + + report: EditableReport + """ + The loaded Report. + """ + + +@sentry_sdk.trace +def load_intermediate_reports( + archive_service: ArchiveService, + commitsha: str, + upload_ids: list[int], +) -> list[IntermediateReport]: + @sentry_sdk.trace + def load_report(upload_id: int) -> IntermediateReport: + repo_hash = archive_service.storage_hash + json_path, chunks_path = intermediate_report_paths( + repo_hash, commitsha, upload_id + ) + + chunks = archive_service.read_file(chunks_path).decode(errors="replace") + report_json = json.loads(archive_service.read_file(json_path)) + + report = EditableReport.from_chunks( + chunks=chunks, + files=report_json["files"], + sessions=report_json["sessions"], + totals=report_json.get("totals"), + ) + return IntermediateReport(upload_id, report) + + def instrumented_load_report(upload_id: int) -> IntermediateReport: + with sentry_sdk.isolation_scope() as _scope: + return load_report(upload_id) + + with ThreadPoolExecutor() as pool: + loaded_reports = pool.map(instrumented_load_report, upload_ids) + return list(loaded_reports) + + +@sentry_sdk.trace +def cleanup_intermediate_reports( + archive_service: ArchiveService, + commitsha: str, + upload_ids: list[int], +): + """ + Cleans up the files in storage that contain the "intermediate Report"s + from parallel processing, as well as the copy of the "master Report" used + for the "experiment" mode. + """ + repo_hash = archive_service.storage_hash + + # there are only relevant for the "experiment" mode: + files_to_delete = list(experiment_report_paths(repo_hash, commitsha)) + + for upload_id in upload_ids: + files_to_delete.extend( + intermediate_report_paths(repo_hash, commitsha, upload_id) + ) + + archive_service.delete_files(files_to_delete) + + +def intermediate_report_paths( + repo_hash: str, commitsha: str, upload_id: int +) -> tuple[str, str]: + # TODO: migrate these files to a better storage location + prefix = f"v4/repos/{repo_hash}/commits/{commitsha}/parallel/incremental" + chunks_path = f"{prefix}/chunk{upload_id}.txt" + json_path = f"{prefix}/files_and_sessions{upload_id}.txt" + return json_path, chunks_path + + +def experiment_report_paths(repo_hash: str, commitsha: str) -> tuple[str, str]: + return MinioEndpoints.parallel_upload_experiment.get_path( + version="v4", + repo_hash=repo_hash, + commitid=commitsha, + file_name="files_and_sessions", + ), MinioEndpoints.parallel_upload_experiment.get_path( + version="v4", + repo_hash=repo_hash, + commitid=commitsha, + file_name="chunks", + ) diff --git a/services/processing/merging.py b/services/processing/merging.py new file mode 100644 index 000000000..bba24c26e --- /dev/null +++ b/services/processing/merging.py @@ -0,0 +1,112 @@ +from dataclasses import dataclass + +import sentry_sdk +from shared.reports.editable import EditableReport, EditableReportFile +from shared.reports.enums import UploadState +from shared.reports.resources import Report +from shared.yaml import UserYaml +from sqlalchemy.orm import Session as DbSession + +from database.models.reports import Upload +from services.processing.loading import IntermediateReport +from services.report import delete_uploads_by_sessionid +from services.report.raw_upload_processor import clear_carryforward_sessions + + +@dataclass +class MergeResult: + session_mapping: dict[int, int] + """ + This is a mapping from the input `upload_id` to the output `session_id` + as it exists in the merged "master Report". + """ + + deleted_sessions: set[int] + """ + The Set of carryforwarded `session_id`s that have been removed from the "master Report". + """ + + +@sentry_sdk.trace +def merge_reports( + commit_yaml: UserYaml, + master_report: Report, + intermediate_reports: list[IntermediateReport], +) -> MergeResult: + session_mapping: dict[int, int] = dict() + deleted_sessions: set[int] = set() + + for intermediate_report in intermediate_reports: + report = intermediate_report.report + + old_sessionid = next(iter(report.sessions)) + new_sessionid = master_report.next_session_number() + change_sessionid(report, old_sessionid, new_sessionid) + session_mapping[intermediate_report.upload_id] = new_sessionid + + session = report.sessions[new_sessionid] + + _session_id, session = master_report.add_session( + session, use_id_from_session=True + ) + + if flags := session.flags: + session_adjustment = clear_carryforward_sessions( + master_report, report, flags, commit_yaml + ) + deleted_sessions.update(session_adjustment.fully_deleted_sessions) + + master_report.merge(report) + + return MergeResult(session_mapping, deleted_sessions) + + +@sentry_sdk.trace +def update_uploads(db_session: DbSession, merge_result: MergeResult): + for upload_id, session_id in merge_result.session_mapping.items(): + upload = db_session.query(Upload).filter(Upload.id_ == upload_id).first() + upload.state_id = UploadState.PROCESSED.db_id + upload.state = "processed" + upload.order_number = session_id + + if upload: + delete_uploads_by_sessionid(upload, list(merge_result.deleted_sessions)) + db_session.flush() + + +def change_sessionid(report: EditableReport, old_id: int, new_id: int): + """ + Modifies the `EditableReport`, changing the session with `old_id` to have `new_id` instead. + This patches up all the references to that session across all files and line records. + + In particular, it changes the id in all the `LineSession`s and `CoverageDatapoint`s, + and does the equivalent of `calculate_present_sessions`. + """ + session = report.sessions[new_id] = report.sessions.pop(old_id) + session.id = new_id + + report_file: EditableReportFile + for report_file in report._chunks: + if report_file is None: + continue + + all_sessions = set() + + for idx, _line in enumerate(report_file._lines): + if not _line: + continue + + # this turns the line into an actual `ReportLine` + line = report_file._lines[idx] = report_file._line(_line) + + for session in line.sessions: + if session.id == old_id: + session.id = new_id + all_sessions.add(session.id) + + if line.datapoints: + for point in line.datapoints: + if point.sessionid == old_id: + point.sessionid = new_id + + report_file._details["present_sessions"] = all_sessions diff --git a/services/processing/state.py b/services/processing/state.py index 1dfa78a05..3d276a192 100644 --- a/services/processing/state.py +++ b/services/processing/state.py @@ -27,7 +27,7 @@ from services.redis import get_redis_connection -MERGE_BATCH_SIZE = 5 +MERGE_BATCH_SIZE = 10 CLEARED_UPLOADS = Counter( "worker_processing_cleared_uploads", diff --git a/services/tests/test_processing_state.py b/services/tests/test_processing_state.py index 6acdc9d6d..69abe5bb3 100644 --- a/services/tests/test_processing_state.py +++ b/services/tests/test_processing_state.py @@ -46,26 +46,26 @@ def test_concurrent_uploads(): def test_batch_merging_many_uploads(): state = ProcessingState(1234, uuid4().hex) - state.mark_uploads_as_processing([1, 2, 3, 4, 5, 6, 7, 8, 9]) + state.mark_uploads_as_processing([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]) - for id in range(1, 9): + for id in range(1, 12): state.mark_upload_as_processed(id) # we have only processed 8 out of 9. we want to do a batched merge assert should_perform_merge(state.get_upload_numbers()) merging = state.get_uploads_for_merging() - assert len(merging) == 5 # = MERGE_BATCH_SIZE + assert len(merging) == 10 # = MERGE_BATCH_SIZE state.mark_uploads_as_merged(merging) # but no notifications yet assert not should_trigger_postprocessing(state.get_upload_numbers()) - state.mark_upload_as_processed(9) + state.mark_upload_as_processed(12) # with the last upload being processed, we do another merge, and then trigger notifications assert should_perform_merge(state.get_upload_numbers()) merging = state.get_uploads_for_merging() - assert len(merging) == 4 + assert len(merging) == 2 state.mark_uploads_as_merged(merging) assert should_trigger_postprocessing(state.get_upload_numbers()) diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index 19b186ef3..ac6e5c13c 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -1,11 +1,10 @@ import contextlib -import functools import json import logging import re -from concurrent.futures import ThreadPoolExecutor from enum import Enum +import sentry_sdk from redis.exceptions import LockError from redis.lock import Lock from shared.celery_config import ( @@ -16,8 +15,6 @@ upload_finisher_task_name, ) from shared.metrics import Histogram -from shared.reports.editable import EditableReport, EditableReportFile -from shared.reports.enums import UploadState from shared.reports.resources import Report from shared.storage.exceptions import FileNotInStorageError from shared.yaml import UserYaml @@ -25,22 +22,22 @@ from app import celery_app from celery_config import notify_error_task_name from database.models import Commit, Pull -from database.models.core import Repository -from database.models.reports import Upload from helpers.checkpoint_logger import _kwargs_key from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs from helpers.checkpoint_logger.flows import UploadFlow from helpers.metrics import KiB, MiB from helpers.parallel import ParallelProcessing -from services.archive import ArchiveService, MinioEndpoints +from services.archive import ArchiveService from services.comparison import get_or_create_comparison +from services.processing.loading import ( + cleanup_intermediate_reports, + experiment_report_paths, + load_intermediate_reports, +) +from services.processing.merging import merge_reports, update_uploads from services.processing.state import ProcessingState, should_trigger_postprocessing from services.redis import get_redis_connection -from services.report import ReportService, delete_uploads_by_sessionid -from services.report.raw_upload_processor import ( - SessionAdjustmentResult, - clear_carryforward_sessions, -) +from services.report import ReportService from services.yaml import read_yaml_field from tasks.base import BaseCodecovTask from tasks.parallel_verification import parallel_verification_task @@ -152,6 +149,10 @@ def run_impl( task["parallel_incremental_result"] for task in processing_results ], } + upload_ids = [ + int(upload["upload_pk"]) + for upload in processing_results["parallel_incremental_result"] + ] report_lock = ( acquire_report_lock(repoid, commitid, self.hard_time_limit_task) @@ -160,12 +161,13 @@ def run_impl( ) with report_lock: report_service = ReportService(commit_yaml) - report = self.merge_incremental_reports( + archive_service = report_service.get_archive_service(repository) + report = perform_report_merging( + report_service, + archive_service, commit_yaml, - repository, commit, - report_service, - processing_results, + upload_ids, parallel_processing, ) @@ -193,14 +195,7 @@ def run_impl( pr, report_code, ) - state.mark_uploads_as_merged( - [ - int(upload["upload_pk"]) - for upload in processing_results[ - "parallel_incremental_result" - ] - ] - ) + state.mark_uploads_as_merged(upload_ids) else: parallel_paths = report_service.save_parallel_report_to_archive( @@ -224,9 +219,12 @@ def run_impl( ), ) - return + cleanup_intermediate_reports(archive_service, commit.commitid, upload_ids) - if not should_trigger_postprocessing(state.get_upload_numbers()): + if ( + parallel_processing is not ParallelProcessing.PARALLEL + or not should_trigger_postprocessing(state.get_upload_numbers()) + ): return lock_name = f"upload_finisher_lock_{repoid}_{commitid}" @@ -517,165 +515,6 @@ def invalidate_caches(self, redis_connection, commit: Commit): if commit.branch == repository.branch: redis_connection.hdel("badge", ("%s:" % key).lower()) - def merge_incremental_reports( - self, - commit_yaml: dict, - repository: Repository, - commit: Commit, - report_service: ReportService, - processing_results: dict, - parallel_processing: ParallelProcessing, - ): - archive_service = report_service.get_archive_service(repository) - repoid = repository.repoid - commitid = commit.id - - if parallel_processing is ParallelProcessing.PARALLEL: - report = report_service.get_existing_report_for_commit(commit) - if report is None: - log.info( - "No base report found for parallel upload processing, using an empty report", - extra=dict(commit=commitid, repoid=repoid), - ) - report = Report() - - else: - fas_path = MinioEndpoints.parallel_upload_experiment.get_path( - version="v4", - repo_hash=archive_service.get_archive_hash(repository), - commitid=commit.commitid, - file_name="files_and_sessions", - ) - chunks_path = MinioEndpoints.parallel_upload_experiment.get_path( - version="v4", - repo_hash=archive_service.get_archive_hash(repository), - commitid=commit.commitid, - file_name="chunks", - ) - - try: - files_and_sessions = json.loads(archive_service.read_file(fas_path)) - chunks = archive_service.read_file(chunks_path).decode(errors="replace") - report = report_service.build_report( - chunks, - files_and_sessions["files"], - files_and_sessions["sessions"], - None, - ) - except ( - FileNotInStorageError - ): # there were no CFFs, so no report was stored in GCS - log.info( - "No base report found for parallel upload processing, using an empty report", - extra=dict(commit=commitid, repoid=repoid), - ) - report = Report() - - log.info( - "Downloading %s incremental reports that were processed in parallel", - len(processing_results["processings_so_far"]), - extra=dict( - repoid=repoid, - commit=commitid, - processing_results=processing_results["processings_so_far"], - parent_task=self.request.parent_id, - ), - ) - - def download_and_build_incremental_report(partial_report): - chunks = archive_service.read_file(partial_report["chunks_path"]).decode( - errors="replace" - ) - files_and_sessions = json.loads( - archive_service.read_file(partial_report["files_and_sessions_path"]) - ) - report = report_service.build_report( - chunks, - files_and_sessions["files"], - files_and_sessions["sessions"], - None, - report_class=EditableReport, - ) - return { - "upload_pk": partial_report["upload_pk"], - "report": report, - } - - def merge_report(cumulative_report: Report, obj): - incremental_report: Report = obj["report"] - - if len(incremental_report.sessions) != 1: - log.warning( - "Incremental report does not have expected session", - extra=dict( - repoid=repoid, - commit=commitid, - upload_pk=obj["upload_pk"], - ), - ) - - old_sessionid = next(iter(incremental_report.sessions)) - new_sessionid = cumulative_report.next_session_number() - change_sessionid(incremental_report, old_sessionid, new_sessionid) - - session = incremental_report.sessions[new_sessionid] - - _session_id, session = cumulative_report.add_session( - session, use_id_from_session=True - ) - - session_adjustment = SessionAdjustmentResult([], []) - if flags := session.flags: - session_adjustment = clear_carryforward_sessions( - cumulative_report, incremental_report, flags, UserYaml(commit_yaml) - ) - - cumulative_report.merge(incremental_report) - - if parallel_processing is ParallelProcessing.PARALLEL: - # When we are fully parallel, we need to update the `Upload` in the database - # with the final session_id (aka `order_number`) and other statuses - db_session = commit.get_db_session() - upload = ( - db_session.query(Upload) - .filter(Upload.id_ == obj["upload_pk"]) - .first() - ) - upload.state_id = UploadState.PROCESSED.db_id - upload.state = "processed" - upload.order_number = new_sessionid - delete_uploads_by_sessionid( - upload, session_adjustment.fully_deleted_sessions - ) - db_session.flush() - - return cumulative_report - - with ThreadPoolExecutor(max_workers=10) as pool: # max chosen arbitrarily - unmerged_reports = pool.map( - download_and_build_incremental_report, - processing_results["parallel_incremental_result"], - ) - - log.info( - "Merging %s incremental reports together", - len(processing_results["processings_so_far"]), - extra=dict( - repoid=repoid, - commit=commitid, - processing_results=processing_results["processings_so_far"], - parent_task=self.request.parent_id, - ), - ) - report = functools.reduce(merge_report, unmerged_reports, report) - commit.get_db_session().flush() - - cleanup_partial_reports( - repository, commit, processing_results["parallel_incremental_result"] - ) - - return report - RegisteredUploadTask = celery_app.register_task(UploadFinisherTask()) upload_finisher_task = celery_app.tasks[RegisteredUploadTask.name] @@ -691,74 +530,62 @@ def acquire_report_lock(repoid: int, commitid: str, hard_time_limit: int) -> Loc ) -def cleanup_partial_reports( - repository: Repository, commit: Commit, partial_reports: list[dict] -): - """ - Cleans up the files in storage that contain the "partial Report"s - from parallel processing, as well as the copy of the "master Report" used - for the "experiment" mode. - """ - archive_service = ArchiveService(repository) - repo_hash = archive_service.get_archive_hash(repository) - - # there are only relevant for the "experiment" mode: - files_to_delete = [ - MinioEndpoints.parallel_upload_experiment.get_path( - version="v4", - repo_hash=repo_hash, - commitid=commit.commitid, - file_name="files_and_sessions", - ), - MinioEndpoints.parallel_upload_experiment.get_path( - version="v4", - repo_hash=repo_hash, - commitid=commit.commitid, - file_name="chunks", - ), - ] - - for partial_report in partial_reports: - files_to_delete.append(partial_report["chunks_path"]) - files_to_delete.append(partial_report["files_and_sessions_path"]) - - archive_service.delete_files(files_to_delete) - - -# TODO: maybe move this to `shared` if it turns out to be a better place for this -def change_sessionid(report: Report, old_id: int, new_id: int): - """ - Modifies the `Report`, changing the session with `old_id` to have `new_id` instead. - This patches up all the references to that session across all files and line records. +@sentry_sdk.trace +def perform_report_merging( + report_service: ReportService, + archive_service: ArchiveService, + commit_yaml: dict, + commit: Commit, + upload_ids: list[int], + parallel_processing: ParallelProcessing, +) -> Report: + master_report = load_master_report( + report_service, archive_service, commit, parallel_processing + ) + intermediate_reports = load_intermediate_reports( + archive_service, commit.commitid, upload_ids + ) + merge_result = merge_reports( + UserYaml(commit_yaml), master_report, intermediate_reports + ) - In particular, it changes the id in all the `LineSession`s and `CoverageDatapoint`s, - and does the equivalent of `calculate_present_sessions`. - """ - session = report.sessions[new_id] = report.sessions.pop(old_id) - session.id = new_id + if parallel_processing is ParallelProcessing.PARALLEL: + # When we are fully parallel, we need to update the `Upload` in the database + # with the final session_id (aka `order_number`) and other statuses + update_uploads(commit.get_db_session(), merge_result) - report_file: EditableReportFile - for report_file in report._chunks: - if report_file is None: - continue + return master_report - all_sessions = set() - for idx, _line in enumerate(report_file._lines): - if not _line: - continue +@sentry_sdk.trace +def load_master_report( + report_service: ReportService, + archive_service: ArchiveService, + commit: Commit, + parallel_processing: ParallelProcessing, +) -> Report: + report: Report | None = None - # this turns the line into an actual `ReportLine` - line = report_file._lines[idx] = report_file._line(_line) + if parallel_processing is ParallelProcessing.PARALLEL: + report = report_service.get_existing_report_for_commit(commit) + else: + repo_hash = archive_service.storage_hash + json_path, chunks_path = experiment_report_paths(repo_hash, commit.commitid) - for session in line.sessions: - if session.id == old_id: - session.id = new_id - all_sessions.add(session.id) + try: + chunks = archive_service.read_file(chunks_path).decode(errors="replace") + report_json = json.loads(archive_service.read_file(json_path)) + + report = report_service.build_report( + chunks, + report_json["files"], + report_json["sessions"], + report_json.get("totals"), + ) + except FileNotInStorageError: + pass - if line.datapoints: - for point in line.datapoints: - if point.sessionid == old_id: - point.sessionid = new_id + if report is None: + report = Report() - report_file._details["present_sessions"] = all_sessions + return report