diff --git a/src/cc_catalog_airflow/dags/common/storage/audio.py b/src/cc_catalog_airflow/dags/common/storage/audio.py index b505ac9ff..3d5754484 100644 --- a/src/cc_catalog_airflow/dags/common/storage/audio.py +++ b/src/cc_catalog_airflow/dags/common/storage/audio.py @@ -47,6 +47,9 @@ columns.JSONColumn( name='tags', required=False ), + columns.BooleanColumn( + name='watermarked', required=False, + ), columns.StringColumn( name='provider', required=False, size=80, truncate=False ), @@ -126,6 +129,7 @@ def add_item( title: Optional[str] = None, meta_data: Optional[Union[Dict, str]] = None, raw_tags: Optional[Union[list, str]] = None, + watermarked: Optional[bool] = False, duration: Optional[int] = None, bit_rate: Optional[int] = None, sample_rate: Optional[int] = None, @@ -137,7 +141,7 @@ def add_item( set_url: Optional[str] = None, alt_audio_files: Optional[Dict] = None, source: Optional[str] = None, - ingestion_type: Optional[str] = 'commoncrawl', + ingestion_type: Optional[str] = None, ): """ Add information for a single audio to the AudioStore. @@ -208,6 +212,7 @@ def add_item( 'title': title, 'meta_data': meta_data, 'raw_tags': raw_tags, + 'watermarked': watermarked, 'duration': duration, 'bit_rate': bit_rate, 'sample_rate': sample_rate, diff --git a/src/cc_catalog_airflow/dags/common/storage/image.py b/src/cc_catalog_airflow/dags/common/storage/image.py index 066fb5e1d..8f5e388b8 100644 --- a/src/cc_catalog_airflow/dags/common/storage/image.py +++ b/src/cc_catalog_airflow/dags/common/storage/image.py @@ -116,7 +116,7 @@ def add_item( raw_tags=None, watermarked: Optional[str] = "f", source: Optional[str] = None, - ingestion_type: Optional[str] = 'commoncrawl', + ingestion_type: Optional[str] = None, ): """ Add information for a single image to the ImageStore. diff --git a/src/cc_catalog_airflow/dags/common/storage/media.py b/src/cc_catalog_airflow/dags/common/storage/media.py index 020ff5d4c..c9e9855bc 100644 --- a/src/cc_catalog_airflow/dags/common/storage/media.py +++ b/src/cc_catalog_airflow/dags/common/storage/media.py @@ -30,6 +30,9 @@ "pdm", } +COMMON_CRAWL = 'commoncrawl' +PROVIDER_API = 'provider_api' + class MediaStore(metaclass=abc.ABCMeta): """ @@ -137,8 +140,8 @@ def clean_media_metadata(self, **media_data) -> Optional[dict]: and for common metadata we: - remove `license_url` and `raw_license_url`, - validate `license_` and `license_version`, - - enrich `metadata` and `tags`, - - remove `raw_tags` are removed, + - enrich `metadata`, + - replace `raw_tags` with enriched `tags`, - validate `source`, - add `provider`, - add `filesize` (with value of None) @@ -153,6 +156,14 @@ def clean_media_metadata(self, **media_data) -> Optional[dict]: media_data.get('source'), self._PROVIDER ) + # Add ingestion_type column value based on `source`. + # The implementation is based on `ingestion_column` + if media_data.get('ingestion_type') is None: + if media_data['source'] == 'commoncrawl': + media_data['ingestion_type'] = 'commoncrawl' + else: + media_data['ingestion_type'] = 'provider_api' + media_data['tags'] = self._enrich_tags( media_data.pop('raw_tags', None) ) diff --git a/src/cc_catalog_airflow/dags/common/storage/test_audio.py b/src/cc_catalog_airflow/dags/common/storage/test_audio.py index 690111867..9cbfdb965 100644 --- a/src/cc_catalog_airflow/dags/common/storage/test_audio.py +++ b/src/cc_catalog_airflow/dags/common/storage/test_audio.py @@ -33,6 +33,7 @@ 'creator_url': 'https://creatorurl.com', 'title': 'agreatpicture', 'meta_data': {}, + 'watermarked': None, 'raw_tags': {}, 'bit_rate': None, 'sample_rate': None, @@ -221,6 +222,7 @@ def default_audio_args( title='agreatsong', meta_data={"description": "cat song"}, tags={"name": "tag1", "provider": "testing"}, + watermarked=None, duration=100, bit_rate=None, sample_rate=None, @@ -262,6 +264,7 @@ def test_create_tsv_row_creates_alt_audio_files( 'agreatsong', '{"description": "cat song"}', '{"name": "tag1", "provider": "testing"}', + '\\N', 'testing_provider', 'testing_source', 'provider_api', @@ -308,6 +311,7 @@ def test_create_tsv_row_creates_audio_set( 'agreatsong', '{"description": "cat song"}', '{"name": "tag1", "provider": "testing"}', + '\\N', 'testing_provider', 'testing_source', 'provider_api', @@ -437,6 +441,7 @@ def mock_validate_url(url_string): 'title': 'agreatsong', 'meta_data': {'description': 'a song about cat'}, 'tags': [{'name': 'tag1', 'provider': 'testing'}], + 'watermarked': None, 'bit_rate': 16000, 'sample_rate': 44100, 'category': 'music', @@ -471,6 +476,7 @@ def mock_validate_url(url_string): 'agreatsong', '{"description": "a song about cat"}', '[{"name": "tag1", "provider": "testing"}]', + '\\N', 'testing_provider', 'testing_source', 'provider_api', diff --git a/src/cc_catalog_airflow/dags/util/loader/ingestion_column.py b/src/cc_catalog_airflow/dags/util/loader/ingestion_column.py index 58454dac4..aa2908522 100644 --- a/src/cc_catalog_airflow/dags/util/loader/ingestion_column.py +++ b/src/cc_catalog_airflow/dags/util/loader/ingestion_column.py @@ -8,6 +8,7 @@ from common.storage.audio import AUDIO_TSV_COLUMNS from common.storage.image import IMAGE_TSV_COLUMNS +from common.storage import media logger = logging.getLogger(__name__) @@ -51,8 +52,8 @@ def check_and_fix_tsv_file(tsv_file_name): def _add_ingestion_type(tsv_file_name, source): - COMMON_CRAWL = 'commoncrawl' - PROVIDER_API = 'provider_api' + COMMON_CRAWL = media.COMMON_CRAWL + PROVIDER_API = media.PROVIDER_API ingestion_type = source if source == COMMON_CRAWL else PROVIDER_API logger.debug(f'Found source: {source}') logger.info( diff --git a/src/cc_catalog_airflow/dags/util/loader/sql.py b/src/cc_catalog_airflow/dags/util/loader/sql.py index ef5505963..ecce875be 100644 --- a/src/cc_catalog_airflow/dags/util/loader/sql.py +++ b/src/cc_catalog_airflow/dags/util/loader/sql.py @@ -287,6 +287,7 @@ def _merge_jsonb_arrays(column: str) -> str: col.REMOVED: FALSE, col.META_DATA: col.META_DATA, col.TAGS: col.TAGS, + col.WATERMARKED: col.WATERMARKED, } if media_type == 'audio': column_inserts.update({ @@ -302,7 +303,6 @@ def _merge_jsonb_arrays(column: str) -> str: column_inserts.update({ col.WIDTH: col.WIDTH, col.HEIGHT: col.HEIGHT, - col.WATERMARKED: col.WATERMARKED }) if media_type == 'audio': media_specific_upsert_query = ( @@ -318,9 +318,7 @@ def _merge_jsonb_arrays(column: str) -> str: else: media_specific_upsert_query = ( f'''{_newest_non_null(col.WIDTH)}, - {_newest_non_null(col.HEIGHT)}, - {_newest_non_null(col.WATERMARKED)} - ''' + {_newest_non_null(col.HEIGHT)}''' ) upsert_query = dedent( f''' @@ -345,6 +343,7 @@ def _merge_jsonb_arrays(column: str) -> str: {_newest_non_null(col.TITLE)}, {_merge_jsonb_objects(col.META_DATA)}, {_merge_jsonb_arrays(col.TAGS)}, + {_newest_non_null(col.WATERMARKED)}, {media_specific_upsert_query} ''' ) @@ -376,6 +375,7 @@ def overwrite_records_in_db_table( col.TITLE, col.META_DATA, col.TAGS, + col.WATERMARKED, col.DURATION, col.BIT_RATE, col.SAMPLE_RATE,