Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move loading and merging intermediate Reports to its own file #767

Merged
merged 2 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions services/processing/loading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import json
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass

import sentry_sdk
from shared.reports.editable import EditableReport

from services.archive import ArchiveService, MinioEndpoints


@dataclass
class IntermediateReport:
upload_id: int
"""
The `Upload` id for which this report was loaded.
"""

report: EditableReport
"""
The loaded Report.
"""


@sentry_sdk.trace
def load_intermediate_reports(
archive_service: ArchiveService,
commitsha: str,
upload_ids: list[int],
) -> list[IntermediateReport]:
@sentry_sdk.trace
def load_report(upload_id: int) -> IntermediateReport:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[very nit] I wonder if it would make sense for this to be a method of IntermediateReport class. Maybe a classmethod like load.

I only say this to avoid having 1 of 2 functions inside another function. But that is a personal preference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a good idea in general, though that would require to pass the archive_service and commitsha to that method, as the current fn captures that from the parent scope.

repo_hash = archive_service.storage_hash
json_path, chunks_path = intermediate_report_paths(
repo_hash, commitsha, upload_id
)

chunks = archive_service.read_file(chunks_path).decode(errors="replace")
report_json = json.loads(archive_service.read_file(json_path))

report = EditableReport.from_chunks(
chunks=chunks,
files=report_json["files"],
sessions=report_json["sessions"],
totals=report_json.get("totals"),
)
return IntermediateReport(upload_id, report)

def instrumented_load_report(upload_id: int) -> IntermediateReport:
with sentry_sdk.isolation_scope() as _scope:
return load_report(upload_id)

with ThreadPoolExecutor() as pool:
Swatinem marked this conversation as resolved.
Show resolved Hide resolved
loaded_reports = pool.map(instrumented_load_report, upload_ids)
return list(loaded_reports)


@sentry_sdk.trace
def cleanup_intermediate_reports(
archive_service: ArchiveService,
commitsha: str,
upload_ids: list[int],
):
"""
Cleans up the files in storage that contain the "intermediate Report"s
from parallel processing, as well as the copy of the "master Report" used
for the "experiment" mode.
"""
repo_hash = archive_service.storage_hash

# there are only relevant for the "experiment" mode:
files_to_delete = list(experiment_report_paths(repo_hash, commitsha))

for upload_id in upload_ids:
files_to_delete.extend(
intermediate_report_paths(repo_hash, commitsha, upload_id)
)

archive_service.delete_files(files_to_delete)


def intermediate_report_paths(
repo_hash: str, commitsha: str, upload_id: int
) -> tuple[str, str]:
# TODO: migrate these files to a better storage location
prefix = f"v4/repos/{repo_hash}/commits/{commitsha}/parallel/incremental"
chunks_path = f"{prefix}/chunk{upload_id}.txt"
json_path = f"{prefix}/files_and_sessions{upload_id}.txt"
return json_path, chunks_path


def experiment_report_paths(repo_hash: str, commitsha: str) -> tuple[str, str]:
return MinioEndpoints.parallel_upload_experiment.get_path(
version="v4",
repo_hash=repo_hash,
commitid=commitsha,
file_name="files_and_sessions",
), MinioEndpoints.parallel_upload_experiment.get_path(
version="v4",
repo_hash=repo_hash,
commitid=commitsha,
file_name="chunks",
)
112 changes: 112 additions & 0 deletions services/processing/merging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from dataclasses import dataclass

import sentry_sdk
from shared.reports.editable import EditableReport, EditableReportFile
from shared.reports.enums import UploadState
from shared.reports.resources import Report
from shared.yaml import UserYaml
from sqlalchemy.orm import Session as DbSession

from database.models.reports import Upload
from services.processing.loading import IntermediateReport
from services.report import delete_uploads_by_sessionid
from services.report.raw_upload_processor import clear_carryforward_sessions


@dataclass
class MergeResult:
session_mapping: dict[int, int]
"""
This is a mapping from the input `upload_id` to the output `session_id`
as it exists in the merged "master Report".
"""

deleted_sessions: set[int]
"""
The Set of carryforwarded `session_id`s that have been removed from the "master Report".
"""


@sentry_sdk.trace
def merge_reports(
commit_yaml: UserYaml,
master_report: Report,
intermediate_reports: list[IntermediateReport],
) -> MergeResult:
session_mapping: dict[int, int] = dict()
deleted_sessions: set[int] = set()

for intermediate_report in intermediate_reports:
report = intermediate_report.report

old_sessionid = next(iter(report.sessions))
new_sessionid = master_report.next_session_number()
change_sessionid(report, old_sessionid, new_sessionid)
session_mapping[intermediate_report.upload_id] = new_sessionid

session = report.sessions[new_sessionid]

_session_id, session = master_report.add_session(
session, use_id_from_session=True
)

if flags := session.flags:
session_adjustment = clear_carryforward_sessions(
master_report, report, flags, commit_yaml
)
deleted_sessions.update(session_adjustment.fully_deleted_sessions)

master_report.merge(report)

return MergeResult(session_mapping, deleted_sessions)


@sentry_sdk.trace
def update_uploads(db_session: DbSession, merge_result: MergeResult):
for upload_id, session_id in merge_result.session_mapping.items():
upload = db_session.query(Upload).filter(Upload.id_ == upload_id).first()
upload.state_id = UploadState.PROCESSED.db_id
upload.state = "processed"
upload.order_number = session_id

if upload:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this supposed to be inside the for? It seems to only affect the last upload.
From it being a loop it looks like there might be more than 1 upload

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is indeed confusing, I admit. The Upload is really only used to get the DB session, as well as joining the Upload.report. It is pretty much querying all the sibling uploads for the same report. so any Upload (for the same report, which is true here) will do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. Thank for the clarification

delete_uploads_by_sessionid(upload, list(merge_result.deleted_sessions))
db_session.flush()


def change_sessionid(report: EditableReport, old_id: int, new_id: int):
"""
Modifies the `EditableReport`, changing the session with `old_id` to have `new_id` instead.
This patches up all the references to that session across all files and line records.

In particular, it changes the id in all the `LineSession`s and `CoverageDatapoint`s,
and does the equivalent of `calculate_present_sessions`.
"""
session = report.sessions[new_id] = report.sessions.pop(old_id)
session.id = new_id

report_file: EditableReportFile
for report_file in report._chunks:
if report_file is None:
continue

Check warning on line 91 in services/processing/merging.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/processing/merging.py#L91

Added line #L91 was not covered by tests

all_sessions = set()

for idx, _line in enumerate(report_file._lines):
if not _line:
continue

# this turns the line into an actual `ReportLine`
line = report_file._lines[idx] = report_file._line(_line)

for session in line.sessions:
if session.id == old_id:
session.id = new_id
all_sessions.add(session.id)

if line.datapoints:
for point in line.datapoints:
if point.sessionid == old_id:
point.sessionid = new_id

Check warning on line 110 in services/processing/merging.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/processing/merging.py#L108-L110

Added lines #L108 - L110 were not covered by tests

report_file._details["present_sessions"] = all_sessions
2 changes: 1 addition & 1 deletion services/processing/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from services.redis import get_redis_connection

MERGE_BATCH_SIZE = 5
MERGE_BATCH_SIZE = 10

CLEARED_UPLOADS = Counter(
"worker_processing_cleared_uploads",
Expand Down
10 changes: 5 additions & 5 deletions services/tests/test_processing_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,26 @@ def test_concurrent_uploads():
def test_batch_merging_many_uploads():
state = ProcessingState(1234, uuid4().hex)

state.mark_uploads_as_processing([1, 2, 3, 4, 5, 6, 7, 8, 9])
state.mark_uploads_as_processing([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12])

for id in range(1, 9):
for id in range(1, 12):
state.mark_upload_as_processed(id)

# we have only processed 8 out of 9. we want to do a batched merge
assert should_perform_merge(state.get_upload_numbers())
merging = state.get_uploads_for_merging()
assert len(merging) == 5 # = MERGE_BATCH_SIZE
assert len(merging) == 10 # = MERGE_BATCH_SIZE
state.mark_uploads_as_merged(merging)

# but no notifications yet
assert not should_trigger_postprocessing(state.get_upload_numbers())

state.mark_upload_as_processed(9)
state.mark_upload_as_processed(12)

# with the last upload being processed, we do another merge, and then trigger notifications
assert should_perform_merge(state.get_upload_numbers())
merging = state.get_uploads_for_merging()
assert len(merging) == 4
assert len(merging) == 2
state.mark_uploads_as_merged(merging)

assert should_trigger_postprocessing(state.get_upload_numbers())
Loading
Loading