Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Commit

Permalink
Make watermarked common media column
Browse files Browse the repository at this point in the history
  • Loading branch information
obulat committed Jun 30, 2021
1 parent e46dd39 commit d13fff9
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 10 deletions.
7 changes: 6 additions & 1 deletion src/cc_catalog_airflow/dags/common/storage/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
columns.JSONColumn(
name='tags', required=False
),
columns.BooleanColumn(
name='watermarked', required=False,
),
columns.StringColumn(
name='provider', required=False, size=80, truncate=False
),
Expand Down Expand Up @@ -126,6 +129,7 @@ def add_item(
title: Optional[str] = None,
meta_data: Optional[Union[Dict, str]] = None,
raw_tags: Optional[Union[list, str]] = None,
watermarked: Optional[bool] = False,
duration: Optional[int] = None,
bit_rate: Optional[int] = None,
sample_rate: Optional[int] = None,
Expand All @@ -137,7 +141,7 @@ def add_item(
set_url: Optional[str] = None,
alt_audio_files: Optional[Dict] = None,
source: Optional[str] = None,
ingestion_type: Optional[str] = 'commoncrawl',
ingestion_type: Optional[str] = None,
):
"""
Add information for a single audio to the AudioStore.
Expand Down Expand Up @@ -208,6 +212,7 @@ def add_item(
'title': title,
'meta_data': meta_data,
'raw_tags': raw_tags,
'watermarked': watermarked,
'duration': duration,
'bit_rate': bit_rate,
'sample_rate': sample_rate,
Expand Down
2 changes: 1 addition & 1 deletion src/cc_catalog_airflow/dags/common/storage/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def add_item(
raw_tags=None,
watermarked: Optional[str] = "f",
source: Optional[str] = None,
ingestion_type: Optional[str] = 'commoncrawl',
ingestion_type: Optional[str] = None,
):
"""
Add information for a single image to the ImageStore.
Expand Down
15 changes: 13 additions & 2 deletions src/cc_catalog_airflow/dags/common/storage/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
"pdm",
}

COMMON_CRAWL = 'commoncrawl'
PROVIDER_API = 'provider_api'


class MediaStore(metaclass=abc.ABCMeta):
"""
Expand Down Expand Up @@ -137,8 +140,8 @@ def clean_media_metadata(self, **media_data) -> Optional[dict]:
and for common metadata we:
- remove `license_url` and `raw_license_url`,
- validate `license_` and `license_version`,
- enrich `metadata` and `tags`,
- remove `raw_tags` are removed,
- enrich `metadata`,
- replace `raw_tags` with enriched `tags`,
- validate `source`,
- add `provider`,
- add `filesize` (with value of None)
Expand All @@ -153,6 +156,14 @@ def clean_media_metadata(self, **media_data) -> Optional[dict]:
media_data.get('source'),
self._PROVIDER
)
# Add ingestion_type column value based on `source`.
# The implementation is based on `ingestion_column`
if media_data.get('ingestion_type') is None:
if media_data['source'] == 'commoncrawl':
media_data['ingestion_type'] = 'commoncrawl'
else:
media_data['ingestion_type'] = 'provider_api'

media_data['tags'] = self._enrich_tags(
media_data.pop('raw_tags', None)
)
Expand Down
6 changes: 6 additions & 0 deletions src/cc_catalog_airflow/dags/common/storage/test_audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
'creator_url': 'https://creatorurl.com',
'title': 'agreatpicture',
'meta_data': {},
'watermarked': None,
'raw_tags': {},
'bit_rate': None,
'sample_rate': None,
Expand Down Expand Up @@ -221,6 +222,7 @@ def default_audio_args(
title='agreatsong',
meta_data={"description": "cat song"},
tags={"name": "tag1", "provider": "testing"},
watermarked=None,
duration=100,
bit_rate=None,
sample_rate=None,
Expand Down Expand Up @@ -262,6 +264,7 @@ def test_create_tsv_row_creates_alt_audio_files(
'agreatsong',
'{"description": "cat song"}',
'{"name": "tag1", "provider": "testing"}',
'\\N',
'testing_provider',
'testing_source',
'provider_api',
Expand Down Expand Up @@ -308,6 +311,7 @@ def test_create_tsv_row_creates_audio_set(
'agreatsong',
'{"description": "cat song"}',
'{"name": "tag1", "provider": "testing"}',
'\\N',
'testing_provider',
'testing_source',
'provider_api',
Expand Down Expand Up @@ -437,6 +441,7 @@ def mock_validate_url(url_string):
'title': 'agreatsong',
'meta_data': {'description': 'a song about cat'},
'tags': [{'name': 'tag1', 'provider': 'testing'}],
'watermarked': None,
'bit_rate': 16000,
'sample_rate': 44100,
'category': 'music',
Expand Down Expand Up @@ -471,6 +476,7 @@ def mock_validate_url(url_string):
'agreatsong',
'{"description": "a song about cat"}',
'[{"name": "tag1", "provider": "testing"}]',
'\\N',
'testing_provider',
'testing_source',
'provider_api',
Expand Down
5 changes: 3 additions & 2 deletions src/cc_catalog_airflow/dags/util/loader/ingestion_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from common.storage.audio import AUDIO_TSV_COLUMNS
from common.storage.image import IMAGE_TSV_COLUMNS
from common.storage import media

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -51,8 +52,8 @@ def check_and_fix_tsv_file(tsv_file_name):


def _add_ingestion_type(tsv_file_name, source):
COMMON_CRAWL = 'commoncrawl'
PROVIDER_API = 'provider_api'
COMMON_CRAWL = media.COMMON_CRAWL
PROVIDER_API = media.PROVIDER_API
ingestion_type = source if source == COMMON_CRAWL else PROVIDER_API
logger.debug(f'Found source: {source}')
logger.info(
Expand Down
8 changes: 4 additions & 4 deletions src/cc_catalog_airflow/dags/util/loader/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def _merge_jsonb_arrays(column: str) -> str:
col.REMOVED: FALSE,
col.META_DATA: col.META_DATA,
col.TAGS: col.TAGS,
col.WATERMARKED: col.WATERMARKED,
}
if media_type == 'audio':
column_inserts.update({
Expand All @@ -302,7 +303,6 @@ def _merge_jsonb_arrays(column: str) -> str:
column_inserts.update({
col.WIDTH: col.WIDTH,
col.HEIGHT: col.HEIGHT,
col.WATERMARKED: col.WATERMARKED
})
if media_type == 'audio':
media_specific_upsert_query = (
Expand All @@ -318,9 +318,7 @@ def _merge_jsonb_arrays(column: str) -> str:
else:
media_specific_upsert_query = (
f'''{_newest_non_null(col.WIDTH)},
{_newest_non_null(col.HEIGHT)},
{_newest_non_null(col.WATERMARKED)}
'''
{_newest_non_null(col.HEIGHT)}'''
)
upsert_query = dedent(
f'''
Expand All @@ -345,6 +343,7 @@ def _merge_jsonb_arrays(column: str) -> str:
{_newest_non_null(col.TITLE)},
{_merge_jsonb_objects(col.META_DATA)},
{_merge_jsonb_arrays(col.TAGS)},
{_newest_non_null(col.WATERMARKED)},
{media_specific_upsert_query}
'''
)
Expand Down Expand Up @@ -376,6 +375,7 @@ def overwrite_records_in_db_table(
col.TITLE,
col.META_DATA,
col.TAGS,
col.WATERMARKED,
col.DURATION,
col.BIT_RATE,
col.SAMPLE_RATE,
Expand Down

0 comments on commit d13fff9

Please sign in to comment.