Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Modify audio columns #130

Merged
merged 8 commits into from
Jul 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions src/cc_catalog_airflow/dags/common/storage/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,18 @@
columns.StringColumn(
name='category', required=False, size=80, truncate=False,
),
columns.JSONColumn(
name='genre', required=False,
columns.ArrayColumn(
dhruvkb marked this conversation as resolved.
Show resolved Hide resolved
name='genres', required=False, base_column=columns.StringColumn(
name='genre', required=False, size=80, truncate=False
)
),
columns.JSONColumn(
# set name, set thumbnail, position of audio in set, set url
name='audio_set', required=False,
),
columns.JSONColumn(
# Alternative files: url, filesize, bit_rate, sample_rate
name='alt_audio_files', required=False
name='alt_files', required=False
),
]

Expand Down Expand Up @@ -133,12 +135,12 @@ def add_item(
bit_rate: Optional[int] = None,
sample_rate: Optional[int] = None,
category: Optional[str] = None,
genre: Optional[str] = None,
genres: Optional[Union[list, str]] = None,
audio_set: Optional[str] = None,
set_position: Optional[int] = None,
set_thumbnail: Optional[str] = None,
set_url: Optional[str] = None,
alt_audio_files: Optional[Dict] = None,
alt_files: Optional[Dict] = None,
source: Optional[str] = None,
ingestion_type: Optional[str] = None,
):
Expand Down Expand Up @@ -187,13 +189,13 @@ def add_item(
bit_rate: Audio bit rate as int.
sample_rate: Audio sample rate as int.
category: 'music', 'sound' or 'podcast'.
genre: List of genres
genres: List of genres
audio_set: The name of the set (album, pack) the audio
is part of
set_position: Position of the audio in the audio_set
set_thumbnail: URL of the audio_set thumbnail
set_url: URL of the audio_set
alt_audio_files: A dictionary with information about alternative
alt_files: A dictionary with information about alternative
files for the audio (different formats/ quality).
Dict with the following keys: url, filesize,
bit_rate, sample_rate
Expand Down Expand Up @@ -230,9 +232,9 @@ def add_item(
'bit_rate': bit_rate,
'sample_rate': sample_rate,
'category': category,
'genre': genre,
'genres': genres,
'audio_set': audio_set_data,
'alt_audio_files': alt_audio_files,
'alt_files': alt_files,
'source': source,
'ingestion_type': ingestion_type,
}
Expand Down
41 changes: 41 additions & 0 deletions src/cc_catalog_airflow/dags/common/storage/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,44 @@ def prepare_string(self, value):
self.SIZE,
False
)


class ArrayColumn(Column):
"""
Represents a PostgreSQL column of type Array, which should hold elements
of the given BASE_COLUMN type.

name: name of the corresponding column in the DB
required: whether the column should be considered required by the
instantiating script. (Not necessarily mapping to
`not null` columns in the PostgreSQL table)
base_column: type of the elements in the array, another column

"""
def __init__(self, name: str, required: bool, base_column: Column):
self.BASE_COLUMN = base_column
super().__init__(name, required)

def prepare_string(self, value):
"""
Returns a string representation of an array in the PostgreSQL format:
`{<item 1>, <item 2>...}`.

Apply changes and validations of the corresponding base column type.
"""
input_type = type(value)

if value is None:
return value
elif input_type != list:
arr_str = self.BASE_COLUMN.prepare_string(value)
return "{" + arr_str + "}" if arr_str else None

values = []
for val in value:
if val is None:
values.append(None)
else:
values.append(self.BASE_COLUMN.prepare_string(val))
arr_str = json.dumps(values, ensure_ascii=False)
return "{" + arr_str[1:-1] + "}" if arr_str else None
26 changes: 12 additions & 14 deletions src/cc_catalog_airflow/dags/common/storage/test_audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,16 @@
'creator_url': 'https://creatorurl.com',
'title': 'agreatsong',
'meta_data': {},
'watermarked': None,
'raw_tags': {},
'watermarked': None,
'bit_rate': None,
'sample_rate': None,
'category': None,
'genre': [],
'genres': [],
'audio_set': {},
'alt_audio_files': [],
'alt_files': [],
'source': 'testing_source',
'ingestion_type': 'provider_api',

}


Expand Down Expand Up @@ -232,28 +230,28 @@ def default_audio_args(
bit_rate=None,
sample_rate=None,
category='music',
genre=['rock', 'pop'],
alt_audio_files=None,
genres=['rock', 'pop'],
alt_files=None,
provider='testing_provider',
source='testing_source',
ingestion_type='provider_api',
)


def test_create_tsv_row_creates_alt_audio_files(
def test_create_tsv_row_creates_alt_files(
default_audio_args,
get_good,
setup_env,
):
audio_store = audio.AudioStore()
audio_args = default_audio_args.copy()
alt_audio_files = [{
alt_files = [{
'url': 'http://alternative.com/audio.mp3',
'filesize': 123,
'bit_rate': 41000,
'sample_rate': '16000'
}]
audio_args['alt_audio_files'] = alt_audio_files
audio_args['alt_files'] = alt_files
test_audio = audio.Audio(**audio_args)
actual_row = audio_store._create_tsv_row(test_audio)
expected_row = '\t'.join([
Expand All @@ -277,7 +275,7 @@ def test_create_tsv_row_creates_alt_audio_files(
'\\N',
'\\N',
'music',
'["rock", "pop"]',
'{"rock", "pop"}',
'\\N',
'[{"url": '
'"http://alternative.com/audio.mp3", "filesize": "123", "bit_rate": "41000", '
Expand Down Expand Up @@ -324,7 +322,7 @@ def test_create_tsv_row_creates_audio_set(
'\\N',
'\\N',
'music',
'["rock", "pop"]',
'{"rock", "pop"}',
'{"audio_set": "test_audio_set", "set_url": "test.com", '
'"set_position": "1", "set_thumbnail": "thumbnail.jpg"}',
'\\N',
Expand Down Expand Up @@ -450,14 +448,14 @@ def mock_validate_url(url_string):
'bit_rate': 16000,
'sample_rate': 44100,
'category': 'music',
'genre': ['pop', 'rock'],
'genres': ['pop', 'rock'],
'audio_set': {
'audio_set': 'album',
'set_position': 1,
'set_url': 'https://album.com/',
'set_thumbnail': 'https://album.com/thumbnail.jpg'
},
'alt_audio_files': None,
'alt_files': None,
'provider': 'testing_provider',
'source': 'testing_source',
'ingestion_type': 'provider_api',
Expand Down Expand Up @@ -489,7 +487,7 @@ def mock_validate_url(url_string):
'16000',
'44100',
'music',
'["pop", "rock"]',
'{"pop", "rock"}',
'{"audio_set": "album", "set_position": "1", "set_url": "https://album.com/", '
'"set_thumbnail": "https://album.com/thumbnail.jpg"}',
'\\N',
Expand Down
41 changes: 37 additions & 4 deletions src/cc_catalog_airflow/dags/common/storage/test_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,31 +179,31 @@ def test_BooleanColumn_prepare_string_casts_falselike():
assert all([bc.prepare_string(v) == 'f' for v in falselike_values])


def test_JSONColumn_prepare_string_nones_empty_list(monkeypatch):
def test_JSONColumn_prepare_string_nones_empty_list():
jc = columns.JSONColumn('test', False)
L = []
actual_json = jc.prepare_string(L)
expect_json = None
assert actual_json == expect_json


def test_JSONColumn_prepare_string_nones_empty_dict(monkeypatch):
def test_JSONColumn_prepare_string_nones_empty_dict():
jc = columns.JSONColumn('test', False)
D = {}
actual_json = jc.prepare_string(D)
expect_json = None
assert actual_json == expect_json


def test_JSONColumn_prepare_string_returns_json_string(monkeypatch):
def test_JSONColumn_prepare_string_returns_json_string():
jc = columns.JSONColumn('test', False)
D = {'test': 'dict'}
actual_json = jc.prepare_string(D)
expect_json = '{"test": "dict"}'
assert actual_json == expect_json


def test_JSONColumn_prepare_string_returns_unicode_json_string(monkeypatch):
def test_JSONColumn_prepare_string_returns_unicode_json_string():
jc = columns.JSONColumn('test', False)
D = {'test': u'A unicode \u018e string \xf1'}
actual_json = jc.prepare_string(D)
Expand Down Expand Up @@ -333,3 +333,36 @@ def mock_sanitize_string(some_string):
actual_str = uc.prepare_string('test string')
expect_str = None
assert actual_str == expect_str


def test_ArrayColumn_of_StringColumn_prepare_string_returns_pg_array():
ac = columns.ArrayColumn(
'test', False, columns.StringColumn(
name='test', size=80, required=False, truncate=False
)
)
given_list = ['item1', 'item2']
actual_str = ac.prepare_string(given_list)
expected_str = '{"item1", "item2"}'
assert actual_str == expected_str


def test_ArrayColumn_prepare_string_returns_pg_array_from_single_string():
ac = columns.ArrayColumn(
'test', False, columns.StringColumn(
name='test', size=80, required=False, truncate=False
)
)
actual_str = ac.prepare_string('abcdef')
expected_str = '{abcdef}'
assert actual_str == expected_str


def test_ArrayColumn_of_IntegerColumn_prepare_string_returns_pg_array():
ac = columns.ArrayColumn(
'test', False, columns.IntegerColumn(name='test', required=False)
)
given_list = [1.1, 22, 3, 456]
actual_str = ac.prepare_string(given_list)
expected_str = '{"1", "22", "3", "456"}'
assert actual_str == expected_str
4 changes: 2 additions & 2 deletions src/cc_catalog_airflow/dags/util/loader/column_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@
BIT_RATE = 'bit_rate'
SAMPLE_RATE = 'sample_rate'
CATEGORY = 'category'
GENRE = 'genre'
GENRES = 'genres'
AUDIO_SET = 'audio_set'
ALT_AUDIO_FILES = 'alt_audio_files'
ALT_FILES = 'alt_files'
27 changes: 19 additions & 8 deletions src/cc_catalog_airflow/dags/util/loader/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ def create_loading_table(
{col.BIT_RATE} integer,
{col.SAMPLE_RATE} integer,
{col.CATEGORY} character varying(100),
{col.GENRE} jsonb,
{col.GENRES} character varying(80)[],
{col.AUDIO_SET} jsonb,
{col.ALT_AUDIO_FILES} jsonb
{col.ALT_FILES} jsonb
);
'''
)
Expand Down Expand Up @@ -261,6 +261,17 @@ def _merge_jsonb_arrays(column: str) -> str:
EXCLUDED.{column},
old.{column}
)'''

def _merge_array(column: str) -> str:
return f'''{column} = COALESCE(
(
SELECT array_agg(DISTINCT x)
FROM unnest(old.{column} || EXCLUDED.{column}) t(x)
),
EXCLUDED.{column},
old.{column}
)'''

if db_table is None:
db_table = AUDIO_TABLE_NAME \
if media_type == 'audio' else IMAGE_TABLE_NAME
Expand Down Expand Up @@ -296,9 +307,9 @@ def _merge_jsonb_arrays(column: str) -> str:
col.BIT_RATE: col.BIT_RATE,
col.SAMPLE_RATE: col.SAMPLE_RATE,
col.CATEGORY: col.CATEGORY,
col.GENRE: col.GENRE,
col.GENRES: col.GENRES,
col.AUDIO_SET: col.AUDIO_SET,
col.ALT_AUDIO_FILES: col.ALT_AUDIO_FILES,
col.ALT_FILES: col.ALT_FILES,
})
else:
column_inserts.update({
Expand All @@ -311,9 +322,9 @@ def _merge_jsonb_arrays(column: str) -> str:
{_newest_non_null(col.BIT_RATE)},
{_newest_non_null(col.SAMPLE_RATE)},
{_newest_non_null(col.CATEGORY)},
{_merge_jsonb_arrays(col.GENRE)},
{_merge_array(col.GENRES)},
{_merge_jsonb_objects(col.AUDIO_SET)},
{_merge_jsonb_objects(col.ALT_AUDIO_FILES)}
{_merge_jsonb_objects(col.ALT_FILES)}
'''
)
else:
Expand Down Expand Up @@ -381,9 +392,9 @@ def overwrite_records_in_db_table(
col.BIT_RATE,
col.SAMPLE_RATE,
col.CATEGORY,
col.GENRE,
col.GENRES,
col.AUDIO_SET,
col.ALT_AUDIO_FILES,
col.ALT_FILES,
]
else:
columns_to_update = [
Expand Down
3 changes: 1 addition & 2 deletions src/cc_catalog_airflow/dags/util/popularity/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ def create_media_view(
db_view_provider_fid_idx = AUDIO_VIEW_PROVIDER_FID_IDX
standardized_popularity_func = STANDARDIZED_AUDIO_POPULARITY_FUNCTION
postgres = PostgresHook(postgres_conn_id=postgres_conn_id)
STANDARDIZED_POPULARITY = f"standardized_{media_type}_popularity"
create_view_query = dedent(
f"""
CREATE MATERIALIZED VIEW public.{db_view_name} AS
Expand All @@ -324,7 +323,7 @@ def create_media_view(
{standardized_popularity_func}(
{table_name}.{PARTITION},
{table_name}.{METADATA_COLUMN}
) AS {STANDARDIZED_POPULARITY}
) AS standardized_popularity
FROM {table_name};
"""
)
Expand Down
2 changes: 1 addition & 1 deletion src/cc_catalog_airflow/dags/util/popularity/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def test_image_view_calculates_std_pop(postgres_with_image_table):
_set_up_image_view(postgres_with_image_table, data_query, metrics)
check_query = dedent(
f"""
SELECT foreign_identifier, standardized_image_popularity
SELECT foreign_identifier, standardized_popularity
FROM {image_view};
"""
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ CREATE MATERIALIZED VIEW image_view AS
removed_from_source,
standardized_image_popularity(
image.provider, image.meta_data
) AS standardized_image_popularity
) AS standardized_popularity
FROM image;
Loading