Skip to content

Commit

Permalink
Merge pull request #1753 from humanprotocol/zm/m2-develop-merge
Browse files Browse the repository at this point in the history
[CVAT-M2] Prepare for merge into develop
  • Loading branch information
zhiltsov-max authored Mar 28, 2024
2 parents ef1b003 + 3fb0af3 commit 5e49f0d
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 82 deletions.
2 changes: 1 addition & 1 deletion packages/examples/cvat/exchange-oracle/src/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class FeaturesConfig:


class CoreConfig:
default_assignment_time = int(os.environ.get("DEFAULT_ASSIGNMENT_TIME", 300))
default_assignment_time = int(os.environ.get("DEFAULT_ASSIGNMENT_TIME", 1800))


class HumanAppConfig:
Expand Down
32 changes: 19 additions & 13 deletions packages/examples/cvat/exchange-oracle/src/core/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from pydantic import AnyUrl, BaseModel, Field, root_validator

from src.core.config import Config
from src.core.types import TaskTypes
from src.utils.enums import BetterEnumMeta

Expand Down Expand Up @@ -133,9 +132,27 @@ class AnnotationInfo(BaseModel):
job_size: int = 10
"Frames per job, validation frames are not included"

max_time: int = Field(default_factory=lambda: Config.core_config.default_assignment_time)
max_time: Optional[int] = None # deprecated, TODO: mark deprecated with pydantic 2.7+
"Maximum time per job (assignment) for an annotator, in seconds"

@root_validator(pre=True)
@classmethod
def _validate_label_type(cls, values: dict[str, Any]) -> dict[str, Any]:
default_label_type = LabelTypes.plain
if values["type"] == TaskTypes.image_skeletons_from_boxes:
default_label_type = LabelTypes.skeleton

# Add default value for labels, if none provided.
# pydantic can't do this for tagged unions
try:
labels = values["labels"]
for label_info in labels:
label_info["type"] = label_info.get("type", default_label_type)
except KeyError:
pass

return values


class ValidationInfo(BaseModel):
min_quality: float = Field(ge=0)
Expand All @@ -158,15 +175,4 @@ class TaskManifest(BaseModel):


def parse_manifest(manifest: Any) -> TaskManifest:
# Add default value for labels, if none provided.
# pydantic can't do this for tagged unions

if isinstance(manifest, dict):
try:
labels = manifest["annotation"]["labels"]
for label_info in labels:
label_info["type"] = label_info.get("type", LabelTypes.plain)
except KeyError:
pass

return TaskManifest.parse_obj(manifest)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from attrs import frozen
from datumaro.util import dump_json, parse_json

DEFAULT_ASSIGNMENT_SIZE_MULTIPLIER = 2 * 3 # tile grid size

SkeletonBboxMapping = Dict[int, int]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,6 @@ def process_outgoing_job_launcher_webhooks():
timestamp=None, # TODO: launcher doesn't support it yet
)

# TODO: remove when field naming is updated in launcher
body["escrowAddress"] = body.pop("escrow_address")
body["chainId"] = body.pop("chain_id")
body["eventType"] = body.pop("event_type")
body["eventData"] = body.pop("event_data")
# ^^^

_, signature = prepare_signed_message(
webhook.escrow_address,
webhook.chain_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,10 @@ def track_task_creation() -> None:
failed: List[cvat_models.DataUpload] = []
for upload in uploads:
status, reason = cvat_api.get_task_upload_status(upload.task_id)
project = upload.task.project
if not status or status == cvat_api.UploadStatus.FAILED:
failed.append(upload)

project = upload.task.project

oracle_db_service.outbox.create_webhook(
session,
escrow_address=project.escrow_address,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@ def __init__(self, manifest: TaskManifest, escrow_address: str, chain_id: int):
self._excluded_boxes_info: _MaybeUnset[_ExcludedAnnotationsInfo] = _unset

# Configuration / constants
self.job_size_mult = 6
self.job_size_mult = skeletons_from_boxes_task.DEFAULT_ASSIGNMENT_SIZE_MULTIPLIER
"Job size multiplier"

# TODO: consider WebP if produced files are too big
Expand Down
15 changes: 12 additions & 3 deletions packages/examples/cvat/exchange-oracle/src/services/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from src.core.types import AssignmentStatuses, JobStatuses, PlatformTypes, ProjectStatuses
from src.db import SessionLocal
from src.schemas import exchange as service_api
from src.utils.assignments import compose_assignment_url, parse_manifest
from src.utils.assignments import (
compose_assignment_url,
get_default_assignment_timeout,
parse_manifest,
)
from src.utils.requests import get_or_404
from src.utils.time import utcnow

Expand Down Expand Up @@ -43,7 +47,8 @@ def serialize_task(
title=f"Task {project.escrow_address[:10]}",
description=manifest.annotation.description,
job_bounty=manifest.job_bounty,
job_time_limit=manifest.annotation.max_time,
job_time_limit=manifest.annotation.max_time
or get_default_assignment_timeout(manifest.annotation.type),
job_size=manifest.annotation.job_size + manifest.validation.val_size,
job_type=project.job_type,
platform=PlatformTypes.CVAT,
Expand Down Expand Up @@ -159,7 +164,11 @@ def create_assignment(project_id: int, wallet_address: str) -> Optional[str]:
session,
wallet_address=user.wallet_address,
cvat_job_id=unassigned_job.cvat_id,
expires_at=now + timedelta(seconds=manifest.annotation.max_time),
expires_at=now
+ timedelta(
seconds=manifest.annotation.max_time
or get_default_assignment_timeout(manifest.annotation.type)
),
)

cvat_api.clear_job_annotations(unassigned_job.cvat_id)
Expand Down
10 changes: 10 additions & 0 deletions packages/examples/cvat/exchange-oracle/src/utils/assignments.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from src.core.config import Config
from src.core.manifest import TaskManifest, TaskTypes
from src.core.manifest import parse_manifest as _parse_manifest
from src.core.tasks import skeletons_from_boxes
from src.models.cvat import Project


Expand All @@ -19,3 +20,12 @@ def compose_assignment_url(task_id: int, job_id: int, *, project: Project) -> st
query_params = "?defaultWorkspace=single_shape"

return urljoin(Config.cvat_config.cvat_url, f"/tasks/{task_id}/jobs/{job_id}{query_params}")


def get_default_assignment_timeout(task_type: TaskTypes) -> int:
timeout_seconds = Config.core_config.default_assignment_time

if task_type == TaskTypes.image_skeletons_from_boxes:
timeout_seconds *= skeletons_from_boxes.DEFAULT_ASSIGNMENT_SIZE_MULTIPLIER

return timeout_seconds
3 changes: 3 additions & 0 deletions packages/examples/cvat/exchange-oracle/src/utils/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ def prepare_outgoing_webhook_body(

event = parse_event(OracleWebhookTypes.exchange_oracle, event_type, event_data)
body["event_type"] = event_type

body["event_data"] = event.dict()
if not body["event_data"]:
body.pop("event_data")

return body

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,34 +81,31 @@ def test_track_track_completed_task_creation(self):
data_upload = self.session.query(DataUpload).filter_by(id=upload_id).first()
self.assertIsNone(data_upload)

# TODO:
# Fix "local variable 'project' referenced before assignment" error in src/crons/state_trackers.py and uncomment this test case

# def test_track_track_completed_task_creation_error(self):
# escrow_address = "0x86e83d346041E8806e352681f3F14549C0d2BC67"
# (_, cvat_task, cvat_job) = create_project_task_and_job(self.session, escrow_address, 1)
# upload = DataUpload(
# id=str(uuid.uuid4()),
# task_id=cvat_task.cvat_id,
# )
# self.session.add(upload)
# self.session.commit()

# with (
# patch(
# "src.crons.state_trackers.cvat_api.get_task_upload_status"
# ) as mock_get_task_upload_status,
# patch(
# "src.crons.state_trackers.cvat_api.fetch_task_jobs",
# side_effect=cvat_api.exceptions.ApiException("Error"),
# ),
# ):
# mock_get_task_upload_status.return_value = (cvat_api.UploadStatus.FINISHED, None)

# track_task_creation()

# self.session.commit()

# webhook = self.session.query(Webhook).filter_by(escrow_address=escrow_address).first()
# self.assertIsNotNone(webhook)
# self.assertEqual(webhook.event_type, ExchangeOracleEventType.task_creation_failed)
def test_track_track_completed_task_creation_error(self):
escrow_address = "0x86e83d346041E8806e352681f3F14549C0d2BC67"
(_, cvat_task, cvat_job) = create_project_task_and_job(self.session, escrow_address, 1)
upload = DataUpload(
id=str(uuid.uuid4()),
task_id=cvat_task.cvat_id,
)
self.session.add(upload)
self.session.commit()

with (
patch(
"src.crons.state_trackers.cvat_api.get_task_upload_status"
) as mock_get_task_upload_status,
patch(
"src.crons.state_trackers.cvat_api.fetch_task_jobs",
side_effect=cvat_api.exceptions.ApiException("Error"),
),
):
mock_get_task_upload_status.return_value = (cvat_api.UploadStatus.FINISHED, None)

track_task_creation()

self.session.commit()

webhook = self.session.query(Webhook).filter_by(escrow_address=escrow_address).first()
self.assertIsNotNone(webhook)
self.assertEqual(webhook.event_type, ExchangeOracleEventTypes.task_creation_failed)
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def test_process_incoming_job_launcher_webhooks_escrow_created_type_exceed_max_r
)

self.assertEqual(new_webhook.status, OracleWebhookStatuses.pending.value)
self.assertEqual(new_webhook.event_type, ExchangeOracleEventType.task_creation_failed)
self.assertEqual(new_webhook.event_type, ExchangeOracleEventTypes.task_creation_failed)
self.assertEqual(new_webhook.attempts, 0)

def test_process_incoming_job_launcher_webhooks_escrow_created_type_remove_when_error(
Expand Down
32 changes: 19 additions & 13 deletions packages/examples/cvat/recording-oracle/src/core/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from pydantic import AnyUrl, BaseModel, Field, root_validator

from src.core.config import Config
from src.core.types import TaskTypes
from src.utils.enums import BetterEnumMeta

Expand Down Expand Up @@ -133,9 +132,27 @@ class AnnotationInfo(BaseModel):
job_size: int = 10
"Frames per job, validation frames are not included"

max_time: Optional[int] = None
max_time: Optional[int] = None # deprecated, TODO: mark deprecated with pydantic 2.7+
"Maximum time per job (assignment) for an annotator, in seconds"

@root_validator(pre=True)
@classmethod
def _validate_label_type(cls, values: dict[str, Any]) -> dict[str, Any]:
default_label_type = LabelTypes.plain
if values["type"] == TaskTypes.image_skeletons_from_boxes:
default_label_type = LabelTypes.skeleton

# Add default value for labels, if none provided.
# pydantic can't do this for tagged unions
try:
labels = values["labels"]
for label_info in labels:
label_info["type"] = label_info.get("type", default_label_type)
except KeyError:
pass

return values


class ValidationInfo(BaseModel):
min_quality: float = Field(ge=0)
Expand All @@ -158,15 +175,4 @@ class TaskManifest(BaseModel):


def parse_manifest(manifest: Any) -> TaskManifest:
# Add default value for labels, if none provided.
# pydantic can't do this for tagged unions

if isinstance(manifest, dict):
try:
labels = manifest["annotation"]["labels"]
for label_info in labels:
label_info["type"] = label_info.get("type", LabelTypes.plain)
except KeyError:
pass

return TaskManifest.parse_obj(manifest)
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,6 @@ def process_outgoing_reputation_oracle_webhooks():
timestamp=None, # TODO: reputation oracle doesn't support
)

# TODO: remove compatibility code
# FIXME: For a sake of compatibility with the current
# version of Reputation Oracle keep this
# vvv
body["escrowAddress"] = body.pop("escrow_address")
body["chainId"] = body.pop("chain_id")
body["eventType"] = body.pop("event_type")
body.pop("event_data")
# ^^^

_, signature = prepare_signed_message(
webhook.escrow_address,
webhook.chain_id,
Expand Down
3 changes: 3 additions & 0 deletions packages/examples/cvat/recording-oracle/src/utils/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ def prepare_outgoing_webhook_body(

event = parse_event(OracleWebhookTypes.recording_oracle, event_type, event_data)
body["event_type"] = event_type

body["event_data"] = event.dict()
if not body["event_data"]:
body.pop("event_data")

return body

Expand Down

0 comments on commit 5e49f0d

Please sign in to comment.