Skip to content

Commit

Permalink
Merge branch 'main' into 1967-adjust-checks-with-fallbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-codecov authored Oct 4, 2024
2 parents f22847c + 47c669a commit 901e799
Show file tree
Hide file tree
Showing 14 changed files with 393 additions and 206 deletions.
2 changes: 2 additions & 0 deletions database/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class Owner(CodecovBaseModel):
stripe_customer_id = Column(types.Text, server_default=FetchedValue())
stripe_subscription_id = Column(types.Text, server_default=FetchedValue())
onboarding_completed = Column(types.Boolean, default=False)
upload_token_required_for_public_repos = Column(types.Boolean, default=True)

bot_id = Column(
"bot",
types.Integer,
Expand Down
91 changes: 91 additions & 0 deletions helpers/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from __future__ import annotations

from enum import Enum

from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO

"""
This encapsulates Parallel Upload Processing logic
Upload Processing can run in essentially 4 modes:
- Completely serial processing
- Serial processing, but running "experiment" code (`EXPERIMENT_SERIAL`):
- In this mode, the final (`is_final`) `UploadProcessor` task saves a copy
of the final report for later verification.
- Parallel processing, but running "experiment" code (`EXPERIMENT_PARALLEL`):
- In this mode, another parallel set of `UploadProcessor` tasks runs *after*
the main set up tasks.
- These tasks are not persisting any of their results in the database,
instead the final `UploadFinisher` task will launch the `ParallelVerification` task.
- Fully parallel processing (`PARALLEL`):
- In this mode, the final `UploadFinisher` task is responsible for merging
the final report and persisting it.
An example Task chain might look like this, in "experiment" mode:
- Upload
- UploadProcessor
- UploadProcessor
- UploadProcessor (`EXPERIMENT_SERIAL` (the final one))
- UploadFinisher
- UploadProcessor (`EXPERIMENT_PARALLEL`)
- UploadProcessor (`EXPERIMENT_PARALLEL`)
- UploadProcessor (`EXPERIMENT_PARALLEL`)
- UploadFinisher (`EXPERIMENT_PARALLEL`)
- ParallelVerification
The `PARALLEL` mode looks like this:
- Upload
- UploadProcessor (`PARALLEL`)
- UploadProcessor (`PARALLEL`)
- UploadProcessor (`PARALLEL`)
- UploadFinisher (`PARALLEL`)
"""


class ParallelFeature(Enum):
SERIAL = "serial"
EXPERIMENT = "experiment"
PARALLEL = "parallel"

@classmethod
def load(cls, repoid: int) -> ParallelFeature:
feature = PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=repoid, default="serial"
)

if feature == "experiment" or feature is True:
return ParallelFeature.EXPERIMENT
if feature == "parallel":
return ParallelFeature.PARALLEL
return ParallelFeature.SERIAL


class ParallelProcessing(Enum):
SERIAL = "serial"
EXPERIMENT_SERIAL = "experiment-serial"
EXPERIMENT_PARALLEL = "experiment-parallel"
PARALLEL = "parallel"

@property
def is_parallel(self) -> bool:
return (
self is ParallelProcessing.EXPERIMENT_PARALLEL
or self is ParallelProcessing.PARALLEL
)

@classmethod
def from_task_args(
cls,
in_parallel: bool = False,
is_final: bool = False,
run_fully_parallel: bool = False,
**kwargs,
) -> ParallelProcessing:
if run_fully_parallel:
return ParallelProcessing.PARALLEL
if in_parallel:
return ParallelProcessing.EXPERIMENT_PARALLEL
if is_final:
return ParallelProcessing.EXPERIMENT_SERIAL
return ParallelProcessing.SERIAL
2 changes: 1 addition & 1 deletion requirements.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
https://github.com/codecov/opentelem-python/archive/refs/tags/v0.0.4a1.tar.gz#egg=codecovopentelem
https://github.com/codecov/shared/archive/36791fe3c18a0dbdf7296ffbdffbf2137fa9fc06.tar.gz#egg=shared
https://github.com/codecov/shared/archive/88117b96a4b420d88549b8df2649c3eb9c61c2a5.tar.gz#egg=shared
https://github.com/codecov/test-results-parser/archive/1507de2241601d678e514c08b38426e48bb6d47d.tar.gz#egg=test-results-parser
https://github.com/codecov/timestring/archive/d37ceacc5954dff3b5bd2f887936a98a668dda42.tar.gz#egg=timestring
asgiref>=3.7.2
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ sentry-sdk[celery]==2.13.0
# via
# -r requirements.in
# shared
shared @ https://github.com/codecov/shared/archive/36791fe3c18a0dbdf7296ffbdffbf2137fa9fc06.tar.gz
shared @ https://github.com/codecov/shared/archive/88117b96a4b420d88549b8df2649c3eb9c61c2a5.tar.gz
# via -r requirements.in
six==1.16.0
# via
Expand Down
8 changes: 8 additions & 0 deletions services/bundle_analysis/report.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import tempfile
from dataclasses import dataclass
Expand Down Expand Up @@ -26,6 +27,8 @@
from services.storage import get_storage_client
from services.timeseries import repository_datasets_query

log = logging.getLogger(__name__)


@dataclass
class ProcessingError:
Expand Down Expand Up @@ -291,6 +294,11 @@ def process_upload(
"plugin_name": plugin_name,
},
)
log.error(
"Unable to parse upload for bundle analysis",
exc_info=True,
extra=dict(repoid=commit.repoid, commit=commit.commitid),
)
return ProcessingResult(
upload=upload,
commit=commit,
Expand Down
41 changes: 7 additions & 34 deletions services/report/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,9 @@
ReportExpiredException,
RepositoryWithoutValidBotError,
)
from helpers.parallel import ParallelFeature
from helpers.telemetry import MetricContext
from rollouts import (
CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER,
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
)
from rollouts import CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER
from services.archive import ArchiveService
from services.report.parser import get_proper_parser
from services.report.parser.types import ParsedRawReport
Expand Down Expand Up @@ -112,26 +110,6 @@ def initialize_and_save_report(
) -> CommitReport:
raise NotImplementedError()

def fetch_report_upload(
self, commit_report: CommitReport, upload_id: int
) -> Upload:
"""
Fetch Upload by the given upload_id.
:raises: Exception if Upload is not found.
"""
db_session = commit_report.get_db_session()
upload = db_session.query(Upload).filter_by(id_=int(upload_id)).first()
if not upload:
raise Exception(
f"Failed to find existing upload by ID ({upload_id})",
dict(
commit=commit_report.commit_id,
repo=commit_report.commit.repoid,
upload_id=upload_id,
),
)
return upload

def create_report_upload(
self, normalized_arguments: Mapping[str, str], commit_report: CommitReport
) -> Upload:
Expand Down Expand Up @@ -275,11 +253,10 @@ def initialize_and_save_report(
# This means there is a report to carryforward
self.save_full_report(commit, report, report_code)

parallel_processing = ParallelFeature.load(commit.repository.repoid)
# Behind parallel processing flag, save the CFF report to GCS so the parallel variant of
# finisher can build off of it later.
if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=commit.repository.repoid
):
if parallel_processing is ParallelFeature.EXPERIMENT:
self.save_parallel_report_to_archive(commit, report, report_code)

return current_report_row
Expand Down Expand Up @@ -810,13 +787,6 @@ def update_upload_with_processing_result(
session = processing_result.session

if processing_result.error is None:
# this should be enabled for the actual rollout of parallel upload processing.
# if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
# "this should be the repo id"
# ):
# upload_obj.state_id = UploadState.PARALLEL_PROCESSED.db_id
# upload_obj.state = "parallel_processed"
# else:
upload.state_id = UploadState.PROCESSED.db_id
upload.state = "processed"
upload.order_number = session.id
Expand Down Expand Up @@ -1052,6 +1022,9 @@ def delete_uploads_by_sessionid(upload: Upload, session_ids: list[int]):
"""
This deletes all the `Upload` records corresponding to the given `session_ids`.
"""
if not session_ids:
return

db_session = upload.get_db_session()
uploads = (
db_session.query(Upload.id_)
Expand Down
23 changes: 0 additions & 23 deletions services/report/tests/unit/test_init.py

This file was deleted.

27 changes: 23 additions & 4 deletions tasks/sync_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from app import celery_app
from database.models import Commit, Pull, Repository, Test
from helpers.exceptions import RepositoryWithoutValidBotError
from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError
from helpers.github_installation import get_installation_name_for_owner_for_task
from helpers.metrics import metrics
from rollouts import SYNC_PULL_USE_MERGE_COMMIT_SHA
Expand Down Expand Up @@ -109,6 +109,7 @@ def run_impl_within_lock(
commit_updates_done = {"merged_count": 0, "soft_deleted_count": 0}
repository = db_session.query(Repository).filter_by(repoid=repoid).first()
assert repository
extra_info = dict(pullid=pullid, repoid=repoid)
try:
installation_name_to_use = get_installation_name_for_owner_for_task(
self.name, repository.owner
Expand All @@ -119,7 +120,7 @@ def run_impl_within_lock(
except RepositoryWithoutValidBotError:
log.warning(
"Could not sync pull because there is no valid bot found for that repo",
extra=dict(pullid=pullid, repoid=repoid),
extra=extra_info,
exc_info=True,
)
return {
Expand All @@ -128,6 +129,24 @@ def run_impl_within_lock(
"pull_updated": False,
"reason": "no_bot",
}
except NoConfiguredAppsAvailable as err:
log.error(
"Could not sync pull because there are no configured apps available",
extra={
**extra_info,
"suspended_app_count": err.suspended_count,
"rate_limited_count": err.rate_limited_count,
},
)
if err.rate_limited_count > 0:
log.info("Apps are rate limited. Retrying in 60s", extra=extra_info)
self.retry(max_retries=1, countdown=60)
return {
"notifier_called": False,
"commit_updates_done": {"merged_count": 0, "soft_deleted_count": 0},
"pull_updated": False,
"reason": "no_configured_apps_available",
}
context = OwnerContext(
owner_onboarding_date=repository.owner.createstamp,
owner_plan=repository.owner.plan,
Expand All @@ -146,7 +165,7 @@ def run_impl_within_lock(
if pull is None:
log.info(
"Not syncing pull since we can't find it in the database nor in the provider",
extra=dict(pullid=pullid, repoid=repoid),
extra=extra_info,
)
return {
"notifier_called": False,
Expand All @@ -157,7 +176,7 @@ def run_impl_within_lock(
if enriched_pull.provider_pull is None:
log.info(
"Not syncing pull since we can't find it in the provider. There is nothing to sync",
extra=dict(pullid=pullid, repoid=repoid),
extra=extra_info,
)
return {
"notifier_called": False,
Expand Down
12 changes: 6 additions & 6 deletions tasks/tests/integration/test_upload_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ def setup_mocks(

@pytest.mark.integration
@pytest.mark.django_db()
@pytest.mark.parametrize("do_parallel_processing", [False, True])
@pytest.mark.parametrize("parallel_processing", ["serial", "experiment", "parallel"])
def test_full_upload(
dbsession: DbSession,
do_parallel_processing: bool,
parallel_processing: str,
mocker,
mock_repo_provider,
mock_storage,
Expand All @@ -176,7 +176,7 @@ def test_full_upload(
mocker.patch.object(
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
"check_value",
return_value=do_parallel_processing,
return_value=parallel_processing,
)

repository = RepositoryFactory.create()
Expand Down Expand Up @@ -360,10 +360,10 @@ def test_full_upload(

@pytest.mark.integration
@pytest.mark.django_db()
@pytest.mark.parametrize("do_parallel_processing", [False, True])
@pytest.mark.parametrize("parallel_processing", ["serial", "experiment", "parallel"])
def test_full_carryforward(
dbsession: DbSession,
do_parallel_processing: bool,
parallel_processing: bool,
mocker,
mock_repo_provider,
mock_storage,
Expand All @@ -378,7 +378,7 @@ def test_full_carryforward(
mocker.patch.object(
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
"check_value",
return_value=do_parallel_processing,
return_value=parallel_processing,
)

repository = RepositoryFactory.create()
Expand Down
Loading

0 comments on commit 901e799

Please sign in to comment.