diff --git a/database/models/core.py b/database/models/core.py index 778761850..5cdb76538 100644 --- a/database/models/core.py +++ b/database/models/core.py @@ -82,6 +82,8 @@ class Owner(CodecovBaseModel): stripe_customer_id = Column(types.Text, server_default=FetchedValue()) stripe_subscription_id = Column(types.Text, server_default=FetchedValue()) onboarding_completed = Column(types.Boolean, default=False) + upload_token_required_for_public_repos = Column(types.Boolean, default=True) + bot_id = Column( "bot", types.Integer, diff --git a/helpers/parallel.py b/helpers/parallel.py new file mode 100644 index 000000000..da5b8df59 --- /dev/null +++ b/helpers/parallel.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +from enum import Enum + +from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO + +""" +This encapsulates Parallel Upload Processing logic + +Upload Processing can run in essentially 4 modes: +- Completely serial processing +- Serial processing, but running "experiment" code (`EXPERIMENT_SERIAL`): + - In this mode, the final (`is_final`) `UploadProcessor` task saves a copy + of the final report for later verification. +- Parallel processing, but running "experiment" code (`EXPERIMENT_PARALLEL`): + - In this mode, another parallel set of `UploadProcessor` tasks runs *after* + the main set up tasks. + - These tasks are not persisting any of their results in the database, + instead the final `UploadFinisher` task will launch the `ParallelVerification` task. +- Fully parallel processing (`PARALLEL`): + - In this mode, the final `UploadFinisher` task is responsible for merging + the final report and persisting it. + +An example Task chain might look like this, in "experiment" mode: +- Upload + - UploadProcessor + - UploadProcessor + - UploadProcessor (`EXPERIMENT_SERIAL` (the final one)) + - UploadFinisher + - UploadProcessor (`EXPERIMENT_PARALLEL`) + - UploadProcessor (`EXPERIMENT_PARALLEL`) + - UploadProcessor (`EXPERIMENT_PARALLEL`) + - UploadFinisher (`EXPERIMENT_PARALLEL`) + - ParallelVerification + + +The `PARALLEL` mode looks like this: +- Upload + - UploadProcessor (`PARALLEL`) + - UploadProcessor (`PARALLEL`) + - UploadProcessor (`PARALLEL`) + - UploadFinisher (`PARALLEL`) +""" + + +class ParallelFeature(Enum): + SERIAL = "serial" + EXPERIMENT = "experiment" + PARALLEL = "parallel" + + @classmethod + def load(cls, repoid: int) -> ParallelFeature: + feature = PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( + identifier=repoid, default="serial" + ) + + if feature == "experiment" or feature is True: + return ParallelFeature.EXPERIMENT + if feature == "parallel": + return ParallelFeature.PARALLEL + return ParallelFeature.SERIAL + + +class ParallelProcessing(Enum): + SERIAL = "serial" + EXPERIMENT_SERIAL = "experiment-serial" + EXPERIMENT_PARALLEL = "experiment-parallel" + PARALLEL = "parallel" + + @property + def is_parallel(self) -> bool: + return ( + self is ParallelProcessing.EXPERIMENT_PARALLEL + or self is ParallelProcessing.PARALLEL + ) + + @classmethod + def from_task_args( + cls, + in_parallel: bool = False, + is_final: bool = False, + run_fully_parallel: bool = False, + **kwargs, + ) -> ParallelProcessing: + if run_fully_parallel: + return ParallelProcessing.PARALLEL + if in_parallel: + return ParallelProcessing.EXPERIMENT_PARALLEL + if is_final: + return ParallelProcessing.EXPERIMENT_SERIAL + return ParallelProcessing.SERIAL diff --git a/requirements.in b/requirements.in index c2a3c99b5..d649d765c 100644 --- a/requirements.in +++ b/requirements.in @@ -1,5 +1,5 @@ https://github.com/codecov/opentelem-python/archive/refs/tags/v0.0.4a1.tar.gz#egg=codecovopentelem -https://github.com/codecov/shared/archive/36791fe3c18a0dbdf7296ffbdffbf2137fa9fc06.tar.gz#egg=shared +https://github.com/codecov/shared/archive/88117b96a4b420d88549b8df2649c3eb9c61c2a5.tar.gz#egg=shared https://github.com/codecov/test-results-parser/archive/1507de2241601d678e514c08b38426e48bb6d47d.tar.gz#egg=test-results-parser https://github.com/codecov/timestring/archive/d37ceacc5954dff3b5bd2f887936a98a668dda42.tar.gz#egg=timestring asgiref>=3.7.2 diff --git a/requirements.txt b/requirements.txt index 5b1417982..623541206 100644 --- a/requirements.txt +++ b/requirements.txt @@ -357,7 +357,7 @@ sentry-sdk[celery]==2.13.0 # via # -r requirements.in # shared -shared @ https://github.com/codecov/shared/archive/36791fe3c18a0dbdf7296ffbdffbf2137fa9fc06.tar.gz +shared @ https://github.com/codecov/shared/archive/88117b96a4b420d88549b8df2649c3eb9c61c2a5.tar.gz # via -r requirements.in six==1.16.0 # via diff --git a/services/bundle_analysis/report.py b/services/bundle_analysis/report.py index faea2e2e9..bac4a4052 100644 --- a/services/bundle_analysis/report.py +++ b/services/bundle_analysis/report.py @@ -1,3 +1,4 @@ +import logging import os import tempfile from dataclasses import dataclass @@ -26,6 +27,8 @@ from services.storage import get_storage_client from services.timeseries import repository_datasets_query +log = logging.getLogger(__name__) + @dataclass class ProcessingError: @@ -291,6 +294,11 @@ def process_upload( "plugin_name": plugin_name, }, ) + log.error( + "Unable to parse upload for bundle analysis", + exc_info=True, + extra=dict(repoid=commit.repoid, commit=commit.commitid), + ) return ProcessingResult( upload=upload, commit=commit, diff --git a/services/report/__init__.py b/services/report/__init__.py index 03d7213b1..4d9d469bc 100644 --- a/services/report/__init__.py +++ b/services/report/__init__.py @@ -39,11 +39,9 @@ ReportExpiredException, RepositoryWithoutValidBotError, ) +from helpers.parallel import ParallelFeature from helpers.telemetry import MetricContext -from rollouts import ( - CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER, - PARALLEL_UPLOAD_PROCESSING_BY_REPO, -) +from rollouts import CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER from services.archive import ArchiveService from services.report.parser import get_proper_parser from services.report.parser.types import ParsedRawReport @@ -112,26 +110,6 @@ def initialize_and_save_report( ) -> CommitReport: raise NotImplementedError() - def fetch_report_upload( - self, commit_report: CommitReport, upload_id: int - ) -> Upload: - """ - Fetch Upload by the given upload_id. - :raises: Exception if Upload is not found. - """ - db_session = commit_report.get_db_session() - upload = db_session.query(Upload).filter_by(id_=int(upload_id)).first() - if not upload: - raise Exception( - f"Failed to find existing upload by ID ({upload_id})", - dict( - commit=commit_report.commit_id, - repo=commit_report.commit.repoid, - upload_id=upload_id, - ), - ) - return upload - def create_report_upload( self, normalized_arguments: Mapping[str, str], commit_report: CommitReport ) -> Upload: @@ -275,11 +253,10 @@ def initialize_and_save_report( # This means there is a report to carryforward self.save_full_report(commit, report, report_code) + parallel_processing = ParallelFeature.load(commit.repository.repoid) # Behind parallel processing flag, save the CFF report to GCS so the parallel variant of # finisher can build off of it later. - if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( - identifier=commit.repository.repoid - ): + if parallel_processing is ParallelFeature.EXPERIMENT: self.save_parallel_report_to_archive(commit, report, report_code) return current_report_row @@ -810,13 +787,6 @@ def update_upload_with_processing_result( session = processing_result.session if processing_result.error is None: - # this should be enabled for the actual rollout of parallel upload processing. - # if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( - # "this should be the repo id" - # ): - # upload_obj.state_id = UploadState.PARALLEL_PROCESSED.db_id - # upload_obj.state = "parallel_processed" - # else: upload.state_id = UploadState.PROCESSED.db_id upload.state = "processed" upload.order_number = session.id @@ -1052,6 +1022,9 @@ def delete_uploads_by_sessionid(upload: Upload, session_ids: list[int]): """ This deletes all the `Upload` records corresponding to the given `session_ids`. """ + if not session_ids: + return + db_session = upload.get_db_session() uploads = ( db_session.query(Upload.id_) diff --git a/services/report/tests/unit/test_init.py b/services/report/tests/unit/test_init.py deleted file mode 100644 index 567d8212b..000000000 --- a/services/report/tests/unit/test_init.py +++ /dev/null @@ -1,23 +0,0 @@ -import pytest - -from database.models.reports import CommitReport -from database.tests.factories import CommitFactory -from services.report import ReportService - - -def test_fetch_fail_raises(dbsession): - service = ReportService({}) - commit = CommitFactory.create( - message="", - parent_commit_id=None, - repository__owner__unencrypted_oauth_token="test7lk5ndmtqzxlx06rip65nac9c7epqopclnoy", - repository__owner__username="ThiagoCodecov", - repository__yaml={"codecov": {"max_report_age": "764y ago"}}, - ) - report = CommitReport(commit_id=commit.id_) - dbsession.add(commit) - dbsession.add(report) - dbsession.flush() - with pytest.raises(Exception) as exp: - service.fetch_report_upload(report, 100000) - assert "Failed to find existing upload by ID (100000)" in str(exp) diff --git a/tasks/sync_pull.py b/tasks/sync_pull.py index a8ffeeb10..27ce896f1 100644 --- a/tasks/sync_pull.py +++ b/tasks/sync_pull.py @@ -18,7 +18,7 @@ from app import celery_app from database.models import Commit, Pull, Repository, Test -from helpers.exceptions import RepositoryWithoutValidBotError +from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task from helpers.metrics import metrics from rollouts import SYNC_PULL_USE_MERGE_COMMIT_SHA @@ -109,6 +109,7 @@ def run_impl_within_lock( commit_updates_done = {"merged_count": 0, "soft_deleted_count": 0} repository = db_session.query(Repository).filter_by(repoid=repoid).first() assert repository + extra_info = dict(pullid=pullid, repoid=repoid) try: installation_name_to_use = get_installation_name_for_owner_for_task( self.name, repository.owner @@ -119,7 +120,7 @@ def run_impl_within_lock( except RepositoryWithoutValidBotError: log.warning( "Could not sync pull because there is no valid bot found for that repo", - extra=dict(pullid=pullid, repoid=repoid), + extra=extra_info, exc_info=True, ) return { @@ -128,6 +129,24 @@ def run_impl_within_lock( "pull_updated": False, "reason": "no_bot", } + except NoConfiguredAppsAvailable as err: + log.error( + "Could not sync pull because there are no configured apps available", + extra={ + **extra_info, + "suspended_app_count": err.suspended_count, + "rate_limited_count": err.rate_limited_count, + }, + ) + if err.rate_limited_count > 0: + log.info("Apps are rate limited. Retrying in 60s", extra=extra_info) + self.retry(max_retries=1, countdown=60) + return { + "notifier_called": False, + "commit_updates_done": {"merged_count": 0, "soft_deleted_count": 0}, + "pull_updated": False, + "reason": "no_configured_apps_available", + } context = OwnerContext( owner_onboarding_date=repository.owner.createstamp, owner_plan=repository.owner.plan, @@ -146,7 +165,7 @@ def run_impl_within_lock( if pull is None: log.info( "Not syncing pull since we can't find it in the database nor in the provider", - extra=dict(pullid=pullid, repoid=repoid), + extra=extra_info, ) return { "notifier_called": False, @@ -157,7 +176,7 @@ def run_impl_within_lock( if enriched_pull.provider_pull is None: log.info( "Not syncing pull since we can't find it in the provider. There is nothing to sync", - extra=dict(pullid=pullid, repoid=repoid), + extra=extra_info, ) return { "notifier_called": False, diff --git a/tasks/tests/integration/test_upload_e2e.py b/tasks/tests/integration/test_upload_e2e.py index 386e43483..5b3d4f19f 100644 --- a/tasks/tests/integration/test_upload_e2e.py +++ b/tasks/tests/integration/test_upload_e2e.py @@ -161,10 +161,10 @@ def setup_mocks( @pytest.mark.integration @pytest.mark.django_db() -@pytest.mark.parametrize("do_parallel_processing", [False, True]) +@pytest.mark.parametrize("parallel_processing", ["serial", "experiment", "parallel"]) def test_full_upload( dbsession: DbSession, - do_parallel_processing: bool, + parallel_processing: str, mocker, mock_repo_provider, mock_storage, @@ -176,7 +176,7 @@ def test_full_upload( mocker.patch.object( PARALLEL_UPLOAD_PROCESSING_BY_REPO, "check_value", - return_value=do_parallel_processing, + return_value=parallel_processing, ) repository = RepositoryFactory.create() @@ -360,10 +360,10 @@ def test_full_upload( @pytest.mark.integration @pytest.mark.django_db() -@pytest.mark.parametrize("do_parallel_processing", [False, True]) +@pytest.mark.parametrize("parallel_processing", ["serial", "experiment", "parallel"]) def test_full_carryforward( dbsession: DbSession, - do_parallel_processing: bool, + parallel_processing: bool, mocker, mock_repo_provider, mock_storage, @@ -378,7 +378,7 @@ def test_full_carryforward( mocker.patch.object( PARALLEL_UPLOAD_PROCESSING_BY_REPO, "check_value", - return_value=do_parallel_processing, + return_value=parallel_processing, ) repository = RepositoryFactory.create() diff --git a/tasks/tests/unit/test_sync_pull.py b/tasks/tests/unit/test_sync_pull.py index 614cd9065..49b84f153 100644 --- a/tasks/tests/unit/test_sync_pull.py +++ b/tasks/tests/unit/test_sync_pull.py @@ -3,6 +3,7 @@ from pathlib import Path import pytest +from celery.exceptions import Retry from mock.mock import MagicMock from redis.exceptions import LockError from shared.reports.types import Change @@ -10,7 +11,7 @@ from database.tests.factories import CommitFactory, PullFactory, RepositoryFactory from database.tests.factories.reports import TestFactory -from helpers.exceptions import RepositoryWithoutValidBotError +from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError from services.repository import EnrichedPull from services.yaml import UserYaml from tasks.sync_pull import PullSyncTask @@ -306,6 +307,43 @@ def test_call_pullsync_no_bot(self, dbsession, mock_redis, mocker): "reason": "no_bot", } + def test_call_pullsync_no_apps_available_rate_limit( + self, dbsession, mock_redis, mocker + ): + task = PullSyncTask() + pull = PullFactory.create(state="open") + dbsession.add(pull) + dbsession.flush() + mocker.patch( + "tasks.sync_pull.get_repo_provider_service", + side_effect=NoConfiguredAppsAvailable( + apps_count=1, rate_limited_count=1, suspended_count=0 + ), + ) + with pytest.raises(Retry): + task.run_impl(dbsession, repoid=pull.repoid, pullid=pull.pullid) + + def test_call_pullsync_no_apps_available_suspended( + self, dbsession, mock_redis, mocker + ): + task = PullSyncTask() + pull = PullFactory.create(state="open") + dbsession.add(pull) + dbsession.flush() + mocker.patch( + "tasks.sync_pull.get_repo_provider_service", + side_effect=NoConfiguredAppsAvailable( + apps_count=1, rate_limited_count=0, suspended_count=1 + ), + ) + res = task.run_impl(dbsession, repoid=pull.repoid, pullid=pull.pullid) + assert res == { + "commit_updates_done": {"merged_count": 0, "soft_deleted_count": 0}, + "notifier_called": False, + "pull_updated": False, + "reason": "no_configured_apps_available", + } + def test_call_pullsync_no_permissions_get_compare( self, dbsession, mock_redis, mocker, mock_repo_provider, mock_storage ): diff --git a/tasks/tests/unit/test_upload_processing_task.py b/tasks/tests/unit/test_upload_processing_task.py index 281d23b84..fb0c9aabf 100644 --- a/tasks/tests/unit/test_upload_processing_task.py +++ b/tasks/tests/unit/test_upload_processing_task.py @@ -18,6 +18,7 @@ ReportExpiredException, RepositoryWithoutValidBotError, ) +from helpers.parallel import ParallelProcessing from rollouts import USE_LABEL_INDEX_IN_REPORT_PROCESSING_BY_REPO_ID from services.archive import ArchiveService from services.report import ProcessingError, RawReportInfo, ReportService @@ -322,7 +323,7 @@ def test_upload_processor_call_with_upload_obj( redis_queue = [{"url": url, "upload_pk": upload.id_}] mocked_3 = mocker.patch.object(UploadProcessorTask, "app") mocked_3.send_task.return_value = True - result = UploadProcessorTask().process_impl_within_lock( + result = UploadProcessorTask().process_upload( db_session=dbsession, previous_results={}, repoid=commit.repoid, @@ -330,6 +331,7 @@ def test_upload_processor_call_with_upload_obj( commit_yaml={"codecov": {"max_report_age": False}}, arguments_list=redis_queue, report_code=None, + parallel_processing=ParallelProcessing.SERIAL, ) assert result == { @@ -726,6 +728,7 @@ def test_upload_task_process_individual_report_with_notfound_report( report=false_report, raw_report_info=RawReportInfo(), upload=upload, + parallel_processing=ParallelProcessing.SERIAL, ) assert result.error.as_dict() == { "code": "file_not_in_storage", @@ -751,6 +754,7 @@ def test_upload_task_process_individual_report_with_notfound_report_no_retries_y Report(), UploadFactory.create(), RawReportInfo(), + parallel_processing=ParallelProcessing.SERIAL, ) @pytest.mark.django_db(databases={"default"}) diff --git a/tasks/upload.py b/tasks/upload.py index 8ddd596d1..9c1a8fdd9 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -32,9 +32,9 @@ from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task +from helpers.parallel import ParallelFeature from helpers.reports import delete_archive_setting from helpers.save_commit_error import save_commit_error -from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO from services.archive import ArchiveService from services.bundle_analysis.report import BundleAnalysisReportService from services.redis import download_archive_from_redis, get_redis_connection @@ -606,9 +606,24 @@ def _schedule_coverage_processing_task( ): checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE) - do_parallel_processing = PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( - identifier=commit.repository.repoid - ) and not delete_archive_setting(commit_yaml) + parallel_feature = ParallelFeature.load(upload_context.repoid) + if parallel_feature is ParallelFeature.EXPERIMENT and delete_archive_setting( + commit_yaml + ): + parallel_feature = ParallelFeature.SERIAL + + if parallel_feature is not ParallelFeature.SERIAL: + parallel_tasks = self.create_parallel_tasks( + commit, + commit_yaml, + argument_list, + commit_report, + checkpoints, + parallel_feature is ParallelFeature.PARALLEL, + ) + + if parallel_feature is ParallelFeature.PARALLEL: + return parallel_tasks.apply_async() processing_tasks = [ upload_processor_task.s( @@ -623,7 +638,7 @@ def _schedule_coverage_processing_task( for chunk in itertools.batched(argument_list, CHUNK_SIZE) ] processing_tasks[0].args = ({},) # this is the first `previous_results` - if do_parallel_processing: + if parallel_feature is ParallelFeature.EXPERIMENT: processing_tasks[-1].kwargs.update(is_final=True) processing_tasks.append( @@ -638,12 +653,24 @@ def _schedule_coverage_processing_task( }, ) ) - serial_tasks = chain(processing_tasks) - if not do_parallel_processing: - return serial_tasks.apply_async() + if parallel_feature is ParallelFeature.EXPERIMENT: + parallel_shadow_experiment = serial_tasks | parallel_tasks + return parallel_shadow_experiment.apply_async() + return serial_tasks.apply_async() + + @sentry_sdk.trace + def create_parallel_tasks( + self, + commit: Commit, + commit_yaml: dict, + argument_list: list[dict], + commit_report: CommitReport, + checkpoints: CheckpointLogger, + run_fully_parallel: bool, + ): parallel_processing_tasks = [ upload_processor_task.s( repoid=commit.repoid, @@ -652,11 +679,17 @@ def _schedule_coverage_processing_task( arguments_list=[arguments], report_code=commit_report.code, parallel_idx=arguments["upload_pk"], + run_fully_parallel=run_fully_parallel, in_parallel=True, is_final=False, ) for arguments in argument_list ] + if run_fully_parallel: + for task in parallel_processing_tasks: + # this is the `previous_results`, which celery provides when running + # in a chain as part of the experiment, otherwise we have to provide this. + task.args = ({},) finish_parallel_sig = upload_finisher_task.signature( kwargs={ @@ -664,14 +697,14 @@ def _schedule_coverage_processing_task( "commitid": commit.commitid, "commit_yaml": commit_yaml, "report_code": commit_report.code, + "run_fully_parallel": run_fully_parallel, "in_parallel": True, _kwargs_key(UploadFlow): checkpoints.data, }, ) parallel_tasks = chord(parallel_processing_tasks, finish_parallel_sig) - parallel_shadow_experiment = serial_tasks | parallel_tasks - return parallel_shadow_experiment.apply_async() + return parallel_tasks def _schedule_bundle_analysis_processing_task( self, diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index 67ebcc4ab..9f642737d 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -15,6 +15,7 @@ ) from shared.metrics import Histogram from shared.reports.editable import EditableReport, EditableReportFile +from shared.reports.enums import UploadState from shared.reports.resources import Report from shared.storage.exceptions import FileNotInStorageError from shared.yaml import UserYaml @@ -23,20 +24,25 @@ from celery_config import notify_error_task_name from database.models import Commit, Pull from database.models.core import Repository +from database.models.reports import Upload from helpers.checkpoint_logger import _kwargs_key from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs from helpers.checkpoint_logger.flows import UploadFlow from helpers.metrics import KiB, MiB -from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO +from helpers.parallel import ParallelProcessing from services.archive import ArchiveService, MinioEndpoints from services.comparison import get_or_create_comparison from services.redis import get_redis_connection -from services.report import ReportService -from services.report.raw_upload_processor import clear_carryforward_sessions +from services.report import ReportService, delete_uploads_by_sessionid +from services.report.raw_upload_processor import ( + SessionAdjustmentResult, + clear_carryforward_sessions, +) from services.yaml import read_yaml_field from tasks.base import BaseCodecovTask from tasks.parallel_verification import parallel_verification_task from tasks.upload_clean_labels_index import task_name as clean_labels_index_task_name +from tasks.upload_processor import UploadProcessorTask log = logging.getLogger(__name__) @@ -101,7 +107,6 @@ def run_impl( repoid, commitid, commit_yaml, - in_parallel=False, report_code=None, **kwargs, ): @@ -129,10 +134,11 @@ def run_impl( assert commit, "Commit not found in database." repository = commit.repository - if ( - PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(identifier=repository.repoid) - and in_parallel - ): + parallel_processing = ParallelProcessing.from_task_args(**kwargs) + + if parallel_processing.is_parallel: + # need to transform processing_results produced by chord to get it into the + # same format as the processing_results produced from chain processing_results = { "processings_so_far": [ task["processings_so_far"][0] for task in processing_results @@ -149,6 +155,7 @@ def run_impl( commit, report_service, processing_results, + parallel_processing, ) log.info( @@ -161,28 +168,42 @@ def run_impl( ), ) - parallel_paths = report_service.save_parallel_report_to_archive( - commit, report, report_code - ) - # now that we've built the report and stored it to GCS, we have what we need to - # compare the results with the current upload pipeline. We end execution of the - # finisher task here so that we don't cause any additional side-effects - - # The verification task that will compare the results of the serial flow and - # the parallel flow, and log the result to determine if parallel flow is - # working properly. - parallel_verification_task.apply_async( - kwargs=dict( - repoid=repoid, - commitid=commitid, - commit_yaml=commit_yaml, - report_code=report_code, - parallel_paths=parallel_paths, - processing_results=processing_results, - ), - ) + if parallel_processing is ParallelProcessing.PARALLEL: + pr = processing_results["processings_so_far"][0]["arguments"].get("pr") + processor_task = UploadProcessorTask() + processor_task.save_report_results( + db_session, + report_service, + repository, + commit, + report, + pr, + report_code, + ) + + else: + parallel_paths = report_service.save_parallel_report_to_archive( + commit, report, report_code + ) + # now that we've built the report and stored it to GCS, we have what we need to + # compare the results with the current upload pipeline. We end execution of the + # finisher task here so that we don't cause any additional side-effects + + # The verification task that will compare the results of the serial flow and + # the parallel flow, and log the result to determine if parallel flow is + # working properly. + parallel_verification_task.apply_async( + kwargs=dict( + repoid=repoid, + commitid=commitid, + commit_yaml=commit_yaml, + report_code=report_code, + parallel_paths=parallel_paths, + processing_results=processing_results, + ), + ) - return + return lock_name = f"upload_finisher_lock_{repoid}_{commitid}" redis_connection = get_redis_connection() @@ -253,7 +274,7 @@ def finish_reports_processing( db_session, commit: Commit, commit_yaml: UserYaml, - processing_results, + processing_results: dict, report_code, checkpoints, ): @@ -370,7 +391,9 @@ def finish_reports_processing( return {"notifications_called": notifications_called} - def should_clean_labels_index(self, commit_yaml: UserYaml, processing_results): + def should_clean_labels_index( + self, commit_yaml: UserYaml, processing_results: dict + ): """Returns True if any of the successful processings was uploaded using a flag that implies labels were uploaded with the report. """ @@ -389,7 +412,11 @@ def should_clean_for_processing_result(results): return any(map(should_clean_for_processing_result, actual_processing_results)) def should_call_notifications( - self, commit: Commit, commit_yaml: UserYaml, processing_results, report_code + self, + commit: Commit, + commit_yaml: UserYaml, + processing_results: dict, + report_code, ) -> ShouldCallNotifyResult: extra_dict = { "repoid": commit.repoid, @@ -473,41 +500,52 @@ def merge_incremental_reports( commit: Commit, report_service: ReportService, processing_results: dict, + parallel_processing: ParallelProcessing, ): archive_service = report_service.get_archive_service(repository) repoid = repository.repoid commitid = commit.id - fas_path = MinioEndpoints.parallel_upload_experiment.get_path( - version="v4", - repo_hash=archive_service.get_archive_hash(repository), - commitid=commit.commitid, - file_name="files_and_sessions", - ) - chunks_path = MinioEndpoints.parallel_upload_experiment.get_path( - version="v4", - repo_hash=archive_service.get_archive_hash(repository), - commitid=commit.commitid, - file_name="chunks", - ) + if parallel_processing is ParallelProcessing.PARALLEL: + report = report_service.get_existing_report_for_commit(commit) + if report is None: + log.info( + "No base report found for parallel upload processing, using an empty report", + extra=dict(commit=commitid, repoid=repoid), + ) + report = Report() - try: - files_and_sessions = json.loads(archive_service.read_file(fas_path)) - chunks = archive_service.read_file(chunks_path).decode(errors="replace") - report = report_service.build_report( - chunks, - files_and_sessions["files"], - files_and_sessions["sessions"], - None, + else: + fas_path = MinioEndpoints.parallel_upload_experiment.get_path( + version="v4", + repo_hash=archive_service.get_archive_hash(repository), + commitid=commit.commitid, + file_name="files_and_sessions", ) - except ( - FileNotInStorageError - ): # there were no CFFs, so no report was stored in GCS - log.info( - "No base report found for parallel upload processing, using an empty report", - extra=dict(commit=commitid, repoid=repoid), + chunks_path = MinioEndpoints.parallel_upload_experiment.get_path( + version="v4", + repo_hash=archive_service.get_archive_hash(repository), + commitid=commit.commitid, + file_name="chunks", ) - report = Report() + + try: + files_and_sessions = json.loads(archive_service.read_file(fas_path)) + chunks = archive_service.read_file(chunks_path).decode(errors="replace") + report = report_service.build_report( + chunks, + files_and_sessions["files"], + files_and_sessions["sessions"], + None, + ) + except ( + FileNotInStorageError + ): # there were no CFFs, so no report was stored in GCS + log.info( + "No base report found for parallel upload processing, using an empty report", + extra=dict(commit=commitid, repoid=repoid), + ) + report = Report() log.info( "Downloading %s incremental reports that were processed in parallel", @@ -564,18 +602,30 @@ def merge_report(cumulative_report: Report, obj): session, use_id_from_session=True ) + session_adjustment = SessionAdjustmentResult([], []) if flags := session.flags: - clear_carryforward_sessions( + session_adjustment = clear_carryforward_sessions( cumulative_report, incremental_report, flags, UserYaml(commit_yaml) ) - # ReportService.update_upload_with_processing_result should use this result - # to update the state of Upload. Once the experiment is finished, Upload.state should - # be set to: parallel_processed (instead of processed) cumulative_report.merge(incremental_report) - # once the experiment is finished, we should be modifying the Upload here - # moving it's state from: parallel_processed -> processed + if parallel_processing is ParallelProcessing.PARALLEL: + # When we are fully parallel, we need to update the `Upload` in the database + # with the final session_id (aka `order_number`) and other statuses + db_session = commit.get_db_session() + upload = ( + db_session.query(Upload) + .filter(Upload.id_ == obj["upload_pk"]) + .first() + ) + upload.state_id = UploadState.PROCESSED.db_id + upload.state = "processed" + upload.order_number = new_sessionid + delete_uploads_by_sessionid( + upload, session_adjustment.fully_deleted_sessions + ) + db_session.flush() return cumulative_report @@ -596,6 +646,7 @@ def merge_report(cumulative_report: Report, obj): ), ) report = functools.reduce(merge_report, unmerged_reports, report) + commit.get_db_session().flush() return report diff --git a/tasks/upload_processor.py b/tasks/upload_processor.py index e27cd023f..a7ba23662 100644 --- a/tasks/upload_processor.py +++ b/tasks/upload_processor.py @@ -1,5 +1,6 @@ import logging import random +from typing import Any import sentry_sdk from asgiref.sync import async_to_sync @@ -17,14 +18,13 @@ from database.models.core import Pull, Repository from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task -from helpers.metrics import metrics +from helpers.parallel import ParallelProcessing from helpers.parallel_upload_processing import ( save_final_serial_report_results, save_incremental_report_results, ) from helpers.reports import delete_archive_setting from helpers.save_commit_error import save_commit_error -from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO from services.redis import get_redis_connection from services.report import ProcessingResult, RawReportInfo, Report, ReportService from services.report.parser.types import VersionOneParsedRawReport @@ -79,22 +79,19 @@ def run_impl( commit_yaml, arguments_list, report_code=None, - parallel_idx=None, - in_parallel=False, - is_final=False, **kwargs, ): repoid = int(repoid) log.info( "Received upload processor task", - extra=dict(repoid=repoid, commit=commitid, in_parallel=in_parallel), + extra=dict( + repoid=repoid, commit=commitid, in_parallel=kwargs.get("in_parallel") + ), ) - in_parallel = in_parallel and PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( - identifier=repoid - ) + parallel_processing = ParallelProcessing.from_task_args(**kwargs) - if in_parallel: + if parallel_processing.is_parallel: log.info( "Using parallel upload processing, skip acquiring upload processing lock", extra=dict( @@ -105,18 +102,15 @@ def run_impl( ), ) - # This function is named `within_lock` but we gate any concurrency- - # unsafe operations with `PARALLEL_UPLOAD_PROCESSING_BY_REPO`. - return self.process_impl_within_lock( + return self.process_upload( db_session=db_session, previous_results={}, repoid=repoid, commitid=commitid, commit_yaml=commit_yaml, arguments_list=arguments_list, - parallel_idx=parallel_idx, report_code=report_code, - in_parallel=in_parallel, + parallel_processing=parallel_processing, ) lock_name = UPLOAD_PROCESSING_LOCK_NAME(repoid, commitid) @@ -147,7 +141,7 @@ def run_impl( ), ) - return self.process_impl_within_lock( + return self.process_upload( db_session=db_session, previous_results=previous_results, repoid=repoid, @@ -155,9 +149,7 @@ def run_impl( commit_yaml=commit_yaml, arguments_list=arguments_list, report_code=report_code, - parallel_idx=parallel_idx, - in_parallel=in_parallel, - is_final=is_final, + parallel_processing=parallel_processing, ) except LockError: max_retry = 200 * 3**self.request.retries @@ -175,18 +167,16 @@ def run_impl( self.retry(max_retries=MAX_RETRIES, countdown=retry_in) @sentry_sdk.trace - def process_impl_within_lock( + def process_upload( self, db_session, - previous_results, + previous_results: dict, repoid: int, - commitid, + commitid: str, commit_yaml: dict, - arguments_list, + arguments_list: list[dict], report_code, - parallel_idx=None, - in_parallel=False, - is_final=False, + parallel_processing: ParallelProcessing, ): processings_so_far: list[dict] = previous_results.get("processings_so_far", []) n_processed = 0 @@ -202,6 +192,8 @@ def process_impl_within_lock( pr = None report_service = ReportService(UserYaml(commit_yaml)) + in_parallel = parallel_processing.is_parallel + if in_parallel: log.info( "Creating empty report to store incremental result", @@ -209,16 +201,15 @@ def process_impl_within_lock( ) report = Report() else: - with metrics.timer(f"{self.metrics_prefix}.build_original_report"): - report = report_service.get_existing_report_for_commit( - commit, report_code=report_code + report = report_service.get_existing_report_for_commit( + commit, report_code=report_code + ) + if report is None: + log.info( + "No existing report for commit", + extra=dict(commit=commit.commitid), ) - if report is None: - log.info( - "No existing report for commit", - extra=dict(commit=commit.commitid), - ) - report = Report() + report = Report() raw_reports: list[RawReportInfo] = [] try: @@ -242,24 +233,21 @@ def process_impl_within_lock( in_parallel=in_parallel, ), ) - individual_info = {"arguments": arguments} + individual_info: dict[str, Any] = {"arguments": arguments} try: - with metrics.timer( - f"{self.metrics_prefix}.process_individual_report" - ): - raw_report_info = RawReportInfo() - processing_result = self.process_individual_report( - report_service, - commit, - report, - upload_obj, - raw_report_info, - in_parallel=in_parallel, - ) - # NOTE: this is only used because test mocking messes with the return value here. - # in normal flow, the function mutates the argument instead. - if processing_result.report: - report = processing_result.report + raw_report_info = RawReportInfo() + processing_result = self.process_individual_report( + report_service, + commit, + report, + upload_obj, + raw_report_info, + parallel_processing, + ) + # NOTE: this is only used because test mocking messes with the return value here. + # in normal flow, the function mutates the argument instead. + if processing_result.report: + report = processing_result.report except (CeleryError, SoftTimeLimitExceeded, SQLAlchemyError): raise @@ -290,12 +278,15 @@ def process_impl_within_lock( parallel_incremental_result = None results_dict = {} if in_parallel: + upload_id = arguments_list[0].get("upload_pk") parallel_incremental_result = save_incremental_report_results( - report_service, commit, report, parallel_idx, report_code - ) - parallel_incremental_result["upload_pk"] = arguments_list[0].get( - "upload_pk" + report_service, + commit, + report, + upload_id, + report_code, ) + parallel_incremental_result["upload_pk"] = upload_id log.info( "Saved incremental report results to storage", @@ -320,7 +311,7 @@ def process_impl_within_lock( # ParallelVerification task to compare with later, for the parallel # experiment. The report being saved is not necessarily the final # report for the commit, as more uploads can still be made. - if is_final: + if parallel_processing is ParallelProcessing.EXPERIMENT_SERIAL: final_serial_report_url = save_final_serial_report_results( report_service, commit, report, report_code, arguments_list ) @@ -376,10 +367,10 @@ def process_individual_report( report: Report, upload: Upload, raw_report_info: RawReportInfo, - in_parallel=False, + parallel_processing: ParallelProcessing, ) -> ProcessingResult: processing_result = report_service.build_report_from_raw_content( - report, raw_report_info, upload=upload + report, raw_report_info, upload ) if ( processing_result.error is not None @@ -401,7 +392,7 @@ def process_individual_report( # for the parallel experiment, we don't want to modify anything in the # database, so we disable it here - if not in_parallel: + if parallel_processing is not ParallelProcessing.EXPERIMENT_PARALLEL: report_service.update_upload_with_processing_result( upload, processing_result )