From 3d95ec5ecaefa17f616af8a68a394b2c858f6851 Mon Sep 17 00:00:00 2001 From: Dan LaManna Date: Thu, 12 Jan 2023 14:04:22 -0500 Subject: [PATCH] Compute asset summary for draft versions asynchronously --- dandiapi/api/models/version.py | 81 ++++------------------ dandiapi/api/services/metadata/__init__.py | 32 ++++++--- dandiapi/api/services/publish/__init__.py | 35 +++++++++- dandiapi/api/tasks/scheduled.py | 10 ++- dandiapi/api/tests/test_dandiset.py | 4 ++ dandiapi/api/tests/test_version.py | 4 +- 6 files changed, 86 insertions(+), 80 deletions(-) diff --git a/dandiapi/api/models/version.py b/dandiapi/api/models/version.py index bfe3a4528..17585c06f 100644 --- a/dandiapi/api/models/version.py +++ b/dandiapi/api/models/version.py @@ -2,9 +2,7 @@ import datetime import logging -from typing import TYPE_CHECKING -from dandischema.metadata import aggregate_assets_summary from django.conf import settings from django.contrib.postgres.indexes import HashIndex from django.core.validators import RegexValidator @@ -16,9 +14,6 @@ from .dandiset import Dandiset -if TYPE_CHECKING: - from .asset import Asset - logger = logging.getLogger(__name__) @@ -145,32 +140,6 @@ def next_published_version(cls, dandiset: Dandiset) -> str: return version - @property - def publish_version(self): - """ - Generate a published version + metadata without saving it. - - This is useful to validate version metadata without saving it. - """ - # Create the published model - published_version = Version( - dandiset=self.dandiset, - name=self.name, - metadata=self.metadata, - status=Version.Status.VALID, - version=Version.next_published_version(self.dandiset), - ) - - now = datetime.datetime.now(datetime.timezone.utc) - # Recompute the metadata and inject the publishedBy and datePublished fields - published_version.metadata = { - **published_version._populate_metadata(version_with_assets=self), - 'publishedBy': self.published_by(now), - 'datePublished': now.isoformat(), - } - - return published_version - @classmethod def citation(cls, metadata): year = datetime.datetime.now().year @@ -215,41 +184,15 @@ def strip_metadata(cls, metadata): ] return {key: metadata[key] for key in metadata if key not in computed_fields} - def _populate_metadata(self, version_with_assets: Version | None = None): - - # When validating a draft version, we create a published version without saving it, - # calculate it's metadata, and validate that metadata. However, assetsSummary is computed - # based on the assets that belong to the dummy published version, which has not had assets - # copied to it yet. To get around this, version_with_assets is the draft version that - # should be used to look up the assets for the assetsSummary. - if version_with_assets is None: - version_with_assets = self - - # When running _populate_metadata on an unsaved Version, self.assets is not available. - # Only compute the asset-based properties if this Version has an id, which means it's saved. - summary = { - 'numberOfBytes': 0, - 'numberOfFiles': 0, - } - if version_with_assets.id: - try: - assets: models.QuerySet[Asset] = version_with_assets.assets - summary = aggregate_assets_summary( - # There is no limit to how many assets a dandiset can have, so use - # `values_list` and `iterator` here to keep the memory footprint - # of this list low. - assets.values_list('metadata', flat=True).iterator() - ) - except Exception: - # The assets summary aggregation may fail if any asset metadata is invalid. - # If so, just use the placeholder summary. - logger.info('Error calculating assetsSummary', exc_info=True) - - # Import here to avoid dependency cycle + def _populate_metadata(self): from dandiapi.api.manifests import manifest_location metadata = { **self.metadata, + '@context': ( + 'https://raw.githubusercontent.com/dandi/schema/master/releases/' + f'{self.metadata["schemaVersion"]}/context.json' + ), 'manifestLocation': manifest_location(self), 'name': self.name, 'identifier': f'DANDI:{self.dandiset.identifier}', @@ -257,16 +200,20 @@ def _populate_metadata(self, version_with_assets: Version | None = None): 'id': f'DANDI:{self.dandiset.identifier}/{self.version}', 'repository': settings.DANDI_WEB_APP_URL, 'url': f'{settings.DANDI_WEB_APP_URL}/dandiset/{self.dandiset.identifier}/{self.version}', # noqa - 'assetsSummary': summary, 'dateCreated': self.dandiset.created.isoformat(), } + + if 'assetsSummary' not in metadata: + metadata['assetsSummary'] = { + 'schemaKey': 'AssetsSummary', + 'numberOfBytes': 0, + 'numberOfFiles': 0, + } + if self.doi: metadata['doi'] = self.doi metadata['citation'] = self.citation(metadata) - metadata['@context'] = ( - 'https://raw.githubusercontent.com/dandi/schema/master/releases/' - f'{metadata["schemaVersion"]}/context.json' - ) + return metadata def save(self, *args, **kwargs): diff --git a/dandiapi/api/services/metadata/__init__.py b/dandiapi/api/services/metadata/__init__.py index 9db7c17c5..dccea925d 100644 --- a/dandiapi/api/services/metadata/__init__.py +++ b/dandiapi/api/services/metadata/__init__.py @@ -1,6 +1,6 @@ from celery.utils.log import get_task_logger import dandischema.exceptions -from dandischema.metadata import validate +from dandischema.metadata import aggregate_assets_summary, validate from django.db import transaction from django.utils import timezone import jsonschema.exceptions @@ -67,6 +67,21 @@ def validate_asset_metadata(*, asset: Asset) -> None: asset.versions.filter(version='draft').update(modified=timezone.now()) +def version_aggregate_assets_summary(version: Version): + if version.version != 'draft': + raise VersionHasBeenPublished() + + version.metadata['assetsSummary'] = aggregate_assets_summary( + version.assets.values_list('metadata', flat=True).iterator() + ) + + Version.objects.filter(id=version.id, version='draft').update( + modified=timezone.now(), metadata=version.metadata + ) + version.refresh_from_db() + return version + + def validate_version_metadata(*, version: Version) -> None: logger.info('Validating dandiset metadata for version %s', version.id) @@ -75,17 +90,18 @@ def validate_version_metadata(*, version: Version) -> None: raise VersionHasBeenPublished() with transaction.atomic(): + # validating version metadata needs to lock the version to avoid racing with + # other modifications e.g. aggregate_assets_summary. + version = ( + Version.objects.filter(id=version.id, status=Version.Status.PENDING) + .select_for_update() + .first() + ) version.status = Version.Status.VALIDATING version.save() try: - publish_version = version.publish_version - metadata = publish_version.metadata - - # Inject a dummy DOI so the metadata is valid - metadata['doi'] = '10.80507/dandi.123456/0.123456.1234' - - validate(metadata, schema_key='PublishedDandiset', json_validation=True) + validate(version.metadata, schema_key='Dandiset', json_validation=True) except dandischema.exceptions.ValidationError as e: logger.info('Error while validating version %s', version.id) version.status = Version.Status.INVALID diff --git a/dandiapi/api/services/publish/__init__.py b/dandiapi/api/services/publish/__init__.py index 121e91476..76ae0a93f 100644 --- a/dandiapi/api/services/publish/__init__.py +++ b/dandiapi/api/services/publish/__init__.py @@ -1,3 +1,6 @@ +import datetime + +from dandischema.metadata import aggregate_assets_summary, validate from django.contrib.auth.models import User from django.db import transaction from django.db.models import QuerySet @@ -76,6 +79,27 @@ def _lock_dandiset_for_publishing(*, user: User, dandiset: Dandiset) -> None: draft_version.save() +def _build_publishable_version_from_draft(draft_version: Version) -> Version: + publishable_version = Version( + dandiset=draft_version.dandiset, + name=draft_version.name, + metadata=draft_version.metadata, + status=Version.Status.VALID, + version=Version.next_published_version(draft_version.dandiset), + ) + + now = datetime.datetime.now(datetime.timezone.utc) + # inject the publishedBy and datePublished fields + publishable_version.metadata.update( + { + 'publishedBy': draft_version.published_by(now), + 'datePublished': now.isoformat(), + } + ) + + return publishable_version + + def _publish_dandiset(dandiset_id: int) -> None: """ Publish a dandiset. @@ -93,7 +117,7 @@ def _publish_dandiset(dandiset_id: int) -> None: 'before this function.' ) - new_version: Version = old_version.publish_version + new_version: Version = _build_publishable_version_from_draft(old_version) new_version.save() # Bulk create the join table rows to optimize linking assets to new_version @@ -130,7 +154,9 @@ def _publish_dandiset(dandiset_id: int) -> None: for draft_asset in draft_assets.iterator(): publish_asset(asset=draft_asset) - # Save again to recompute metadata, specifically assetsSummary + new_version.metadata['assetsSummary'] = aggregate_assets_summary( + new_version.assets.values_list('metadata', flat=True).iterator() + ) new_version.save() # Add asset paths with new version @@ -141,6 +167,11 @@ def _publish_dandiset(dandiset_id: int) -> None: old_version.status = Version.Status.PUBLISHED old_version.save() + # Inject a dummy DOI so the metadata is valid + new_version.metadata['doi'] = '10.80507/dandi.123456/0.123456.1234' + + validate(new_version.metadata, schema_key='PublishedDandiset', json_validation=True) + # Write updated manifest files and create DOI after # published version has been committed to DB. transaction.on_commit(lambda: write_manifest_files.delay(new_version.id)) diff --git a/dandiapi/api/tasks/scheduled.py b/dandiapi/api/tasks/scheduled.py index 2510dbd66..92acbbbc4 100644 --- a/dandiapi/api/tasks/scheduled.py +++ b/dandiapi/api/tasks/scheduled.py @@ -11,17 +11,22 @@ from celery.utils.log import get_task_logger from django.conf import settings from django.contrib.auth.models import User -from django.db.transaction import atomic from dandiapi.api.mail import send_pending_users_message from dandiapi.api.models import UserMetadata, Version +from dandiapi.api.services.metadata import version_aggregate_assets_summary from dandiapi.api.tasks import validate_version_metadata_task, write_manifest_files logger = get_task_logger(__name__) +@shared_task(soft_time_limit=10) +def aggregate_assets_summary_task(version_id: int): + version = Version.objects.get(id=version_id) + version_aggregate_assets_summary(version) + + @shared_task(soft_time_limit=20) -@atomic def validate_draft_version_metadata(): # Select only the id of draft versions that have status PENDING pending_draft_versions = ( @@ -34,6 +39,7 @@ def validate_draft_version_metadata(): logger.info('Found %s versions to validate', pending_draft_versions_count) for draft_version_id in pending_draft_versions.iterator(): validate_version_metadata_task.delay(draft_version_id) + aggregate_assets_summary_task.delay(draft_version_id) # Revalidation should be triggered every time a version is modified, # so now is a good time to write out the manifests as well. diff --git a/dandiapi/api/tests/test_dandiset.py b/dandiapi/api/tests/test_dandiset.py index e13156177..bb72f0eb9 100644 --- a/dandiapi/api/tests/test_dandiset.py +++ b/dandiapi/api/tests/test_dandiset.py @@ -388,6 +388,7 @@ def test_dandiset_rest_create(api_client, user): } ], 'assetsSummary': { + 'schemaKey': 'AssetsSummary', 'numberOfBytes': 0, 'numberOfFiles': 0, }, @@ -480,6 +481,7 @@ def test_dandiset_rest_create_with_identifier(api_client, admin_user): } ], 'assetsSummary': { + 'schemaKey': 'AssetsSummary', 'numberOfBytes': 0, 'numberOfFiles': 0, }, @@ -585,6 +587,7 @@ def test_dandiset_rest_create_with_contributor(api_client, admin_user): } ], 'assetsSummary': { + 'schemaKey': 'AssetsSummary', 'numberOfBytes': 0, 'numberOfFiles': 0, }, @@ -675,6 +678,7 @@ def test_dandiset_rest_create_embargoed(api_client, user): } ], 'assetsSummary': { + 'schemaKey': 'AssetsSummary', 'numberOfBytes': 0, 'numberOfFiles': 0, }, diff --git a/dandiapi/api/tests/test_version.py b/dandiapi/api/tests/test_version.py index 7ff104ccc..372c9f632 100644 --- a/dandiapi/api/tests/test_version.py +++ b/dandiapi/api/tests/test_version.py @@ -14,6 +14,7 @@ from dandiapi.api import tasks from dandiapi.api.models import Asset, Version +from dandiapi.api.services.publish import _build_publishable_version_from_draft from dandiapi.zarr.tasks import ingest_zarr_archive from .fuzzy import TIMESTAMP_RE, URN_RE, UTC_ISO_TIMESTAMP_RE, VERSION_ID_RE @@ -279,7 +280,7 @@ def test_version_publish_version(draft_version, asset): draft_version.assets.add(asset) draft_version.save() - publish_version = draft_version.publish_version + publish_version = _build_publishable_version_from_draft(draft_version) publish_version.doi = fake_doi publish_version.save() @@ -320,6 +321,7 @@ def test_version_publish_version(draft_version, asset): # The published_version cannot have a properly defined assetsSummary yet, since that would # require having created rows the Asset-to-Version join table, which is a side affect. 'assetsSummary': { + 'schemaKey': 'AssetsSummary', 'numberOfBytes': 0, 'numberOfFiles': 0, },