Skip to content

Commit

Permalink
Move loading and merging intermediate Reports to its own file
Browse files Browse the repository at this point in the history
This should make the high level steps of parallel processing a bit more obvious and easier to grasp.
  • Loading branch information
Swatinem committed Oct 8, 2024
1 parent 5d2afac commit b5a13dd
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 144 deletions.
51 changes: 51 additions & 0 deletions services/processing/loading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import json
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass

import sentry_sdk
from shared.reports.editable import EditableReport

from services.archive import ArchiveService
from services.processing.state import MERGE_BATCH_SIZE


@dataclass
class IntermediateReport:
upload_id: int
"""
The `Upload` id for which this report was loaded.
"""

report: EditableReport
"""
The loaded Report.
"""


@sentry_sdk.trace
def load_intermediate_reports(
archive_service: ArchiveService,
commitsha: str,
upload_ids: list[int],
) -> list[IntermediateReport]:
def load_report(upload_id: int) -> IntermediateReport:
repo_hash = archive_service.storage_hash
# TODO: migrate these files to a better storage location
prefix = f"v4/repos/{repo_hash}/commits/{commitsha}/parallel/incremental"
chunks_path = f"{prefix}/chunk{upload_id}.txt"
json_path = f"{prefix}/files_and_sessions{upload_id}.txt"

chunks = archive_service.read_file(chunks_path).decode(errors="replace")
report_json = json.loads(archive_service.read_file(json_path))

report = EditableReport.from_chunks(
chunks=chunks,
files=report_json["files"],
sessions=report_json["sessions"],
totals=report_json.get("totals"),
)
return IntermediateReport(upload_id, report)

with ThreadPoolExecutor(max_workers=MERGE_BATCH_SIZE) as pool:
loaded_reports = pool.map(load_report, upload_ids)
return list(loaded_reports)
112 changes: 112 additions & 0 deletions services/processing/merging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from dataclasses import dataclass

import sentry_sdk
from shared.reports.editable import EditableReport, EditableReportFile
from shared.reports.enums import UploadState
from shared.reports.resources import Report
from shared.yaml import UserYaml
from sqlalchemy.orm import Session as DbSession

from database.models.reports import Upload
from services.processing.loading import IntermediateReport
from services.report import delete_uploads_by_sessionid
from services.report.raw_upload_processor import clear_carryforward_sessions


@dataclass
class MergeResult:
session_mapping: dict[int, int]
"""
This is a mapping from the input `upload_id` to the output `session_id`
as it exists in the merged "master Report".
"""

deleted_sessions: set[int]
"""
The Set of carryforwarded `session_id`s that have been removed from the "master Report".
"""


@sentry_sdk.trace
def merge_reports(
commit_yaml: UserYaml,
master_report: Report,
intermediate_reports: list[IntermediateReport],
) -> MergeResult:
session_mapping: dict[int, int] = dict()
deleted_sessions: set[int] = set()

for intermediate_report in intermediate_reports:
report = intermediate_report.report

old_sessionid = next(iter(report.sessions))
new_sessionid = master_report.next_session_number()
change_sessionid(report, old_sessionid, new_sessionid)
session_mapping[intermediate_report.upload_id] = new_sessionid

session = report.sessions[new_sessionid]

_session_id, session = master_report.add_session(
session, use_id_from_session=True
)

if flags := session.flags:
session_adjustment = clear_carryforward_sessions(
master_report, report, flags, commit_yaml
)
deleted_sessions.update(session_adjustment.fully_deleted_sessions)

master_report.merge(report)

return MergeResult(session_mapping, deleted_sessions)


@sentry_sdk.trace
def update_uploads(db_session: DbSession, merge_result: MergeResult):
for upload_id, session_id in merge_result.session_mapping.items():
upload = db_session.query(Upload).filter(Upload.id_ == upload_id).first()
upload.state_id = UploadState.PROCESSED.db_id
upload.state = "processed"
upload.order_number = session_id

if upload:
delete_uploads_by_sessionid(upload, list(merge_result.deleted_sessions))
db_session.flush()


def change_sessionid(report: EditableReport, old_id: int, new_id: int):
"""
Modifies the `EditableReport`, changing the session with `old_id` to have `new_id` instead.
This patches up all the references to that session across all files and line records.
In particular, it changes the id in all the `LineSession`s and `CoverageDatapoint`s,
and does the equivalent of `calculate_present_sessions`.
"""
session = report.sessions[new_id] = report.sessions.pop(old_id)
session.id = new_id

report_file: EditableReportFile
for report_file in report._chunks:
if report_file is None:
continue

all_sessions = set()

for idx, _line in enumerate(report_file._lines):
if not _line:
continue

# this turns the line into an actual `ReportLine`
line = report_file._lines[idx] = report_file._line(_line)

for session in line.sessions:
if session.id == old_id:
session.id = new_id
all_sessions.add(session.id)

if line.datapoints:
for point in line.datapoints:
if point.sessionid == old_id:
point.sessionid = new_id

report_file._details["present_sessions"] = all_sessions
160 changes: 16 additions & 144 deletions tasks/upload_finisher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import contextlib
import functools
import json
import logging
import re
from concurrent.futures import ThreadPoolExecutor
from enum import Enum

from redis.exceptions import LockError
Expand All @@ -16,8 +14,6 @@
upload_finisher_task_name,
)
from shared.metrics import Histogram
from shared.reports.editable import EditableReport, EditableReportFile
from shared.reports.enums import UploadState
from shared.reports.resources import Report
from shared.storage.exceptions import FileNotInStorageError
from shared.yaml import UserYaml
Expand All @@ -26,21 +22,18 @@
from celery_config import notify_error_task_name
from database.models import Commit, Pull
from database.models.core import Repository
from database.models.reports import Upload
from helpers.checkpoint_logger import _kwargs_key
from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs
from helpers.checkpoint_logger.flows import UploadFlow
from helpers.metrics import KiB, MiB
from helpers.parallel import ParallelProcessing
from services.archive import ArchiveService, MinioEndpoints
from services.comparison import get_or_create_comparison
from services.processing.loading import load_intermediate_reports
from services.processing.merging import merge_reports, update_uploads
from services.processing.state import ProcessingState, should_trigger_postprocessing
from services.redis import get_redis_connection
from services.report import ReportService, delete_uploads_by_sessionid
from services.report.raw_upload_processor import (
SessionAdjustmentResult,
clear_carryforward_sessions,
)
from services.report import ReportService
from services.yaml import read_yaml_field
from tasks.base import BaseCodecovTask
from tasks.parallel_verification import parallel_verification_task
Expand Down Expand Up @@ -570,104 +563,22 @@ def merge_incremental_reports(
)
report = Report()

log.info(
"Downloading %s incremental reports that were processed in parallel",
len(processing_results["processings_so_far"]),
extra=dict(
repoid=repoid,
commit=commitid,
processing_results=processing_results["processings_so_far"],
parent_task=self.request.parent_id,
),
upload_ids = [
int(upload["upload_pk"])
for upload in processing_results["parallel_incremental_result"]
]
intermediate_reports = load_intermediate_reports(
archive_service, commitid, upload_ids
)
merge_result = merge_reports(
UserYaml(commit_yaml), report, intermediate_reports
)

def download_and_build_incremental_report(partial_report):
chunks = archive_service.read_file(partial_report["chunks_path"]).decode(
errors="replace"
)
files_and_sessions = json.loads(
archive_service.read_file(partial_report["files_and_sessions_path"])
)
report = report_service.build_report(
chunks,
files_and_sessions["files"],
files_and_sessions["sessions"],
None,
report_class=EditableReport,
)
return {
"upload_pk": partial_report["upload_pk"],
"report": report,
}

def merge_report(cumulative_report: Report, obj):
incremental_report: Report = obj["report"]

if len(incremental_report.sessions) != 1:
log.warning(
"Incremental report does not have expected session",
extra=dict(
repoid=repoid,
commit=commitid,
upload_pk=obj["upload_pk"],
),
)

old_sessionid = next(iter(incremental_report.sessions))
new_sessionid = cumulative_report.next_session_number()
change_sessionid(incremental_report, old_sessionid, new_sessionid)

session = incremental_report.sessions[new_sessionid]

_session_id, session = cumulative_report.add_session(
session, use_id_from_session=True
)

session_adjustment = SessionAdjustmentResult([], [])
if flags := session.flags:
session_adjustment = clear_carryforward_sessions(
cumulative_report, incremental_report, flags, UserYaml(commit_yaml)
)

cumulative_report.merge(incremental_report)

if parallel_processing is ParallelProcessing.PARALLEL:
# When we are fully parallel, we need to update the `Upload` in the database
# with the final session_id (aka `order_number`) and other statuses
db_session = commit.get_db_session()
upload = (
db_session.query(Upload)
.filter(Upload.id_ == obj["upload_pk"])
.first()
)
upload.state_id = UploadState.PROCESSED.db_id
upload.state = "processed"
upload.order_number = new_sessionid
delete_uploads_by_sessionid(
upload, session_adjustment.fully_deleted_sessions
)
db_session.flush()

return cumulative_report

with ThreadPoolExecutor(max_workers=10) as pool: # max chosen arbitrarily
unmerged_reports = pool.map(
download_and_build_incremental_report,
processing_results["parallel_incremental_result"],
)
if parallel_processing is ParallelProcessing.PARALLEL:
# When we are fully parallel, we need to update the `Upload` in the database
# with the final session_id (aka `order_number`) and other statuses
update_uploads(commit.get_db_session(), merge_result)

log.info(
"Merging %s incremental reports together",
len(processing_results["processings_so_far"]),
extra=dict(
repoid=repoid,
commit=commitid,
processing_results=processing_results["processings_so_far"],
parent_task=self.request.parent_id,
),
)
report = functools.reduce(merge_report, unmerged_reports, report)
commit.get_db_session().flush()
return report


Expand All @@ -683,42 +594,3 @@ def acquire_report_lock(repoid: int, commitid: str, hard_time_limit: int) -> Loc
timeout=max(60 * 5, hard_time_limit),
blocking_timeout=5,
)


# TODO: maybe move this to `shared` if it turns out to be a better place for this
def change_sessionid(report: Report, old_id: int, new_id: int):
"""
Modifies the `Report`, changing the session with `old_id` to have `new_id` instead.
This patches up all the references to that session across all files and line records.
In particular, it changes the id in all the `LineSession`s and `CoverageDatapoint`s,
and does the equivalent of `calculate_present_sessions`.
"""
session = report.sessions[new_id] = report.sessions.pop(old_id)
session.id = new_id

report_file: EditableReportFile
for report_file in report._chunks:
if report_file is None:
continue

all_sessions = set()

for idx, _line in enumerate(report_file._lines):
if not _line:
continue

# this turns the line into an actual `ReportLine`
line = report_file._lines[idx] = report_file._line(_line)

for session in line.sessions:
if session.id == old_id:
session.id = new_id
all_sessions.add(session.id)

if line.datapoints:
for point in line.datapoints:
if point.sessionid == old_id:
point.sessionid = new_id

report_file._details["present_sessions"] = all_sessions

0 comments on commit b5a13dd

Please sign in to comment.