Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Commit

Permalink
Add AudioStore entity
Browse files Browse the repository at this point in the history
Signed-off-by: Olga Bulat <[email protected]>
  • Loading branch information
obulat committed Jun 17, 2021
1 parent c6726a5 commit f3785d0
Show file tree
Hide file tree
Showing 8 changed files with 401 additions and 368 deletions.
352 changes: 67 additions & 285 deletions src/cc_catalog_airflow/dags/common/storage/image.py

Large diffs are not rendered by default.

234 changes: 234 additions & 0 deletions src/cc_catalog_airflow/dags/common/storage/media.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import abc
import collections
from datetime import datetime
import logging
import os
from typing import Optional, Union

from common.storage import util
from common.licenses import licenses

logger = logging.getLogger(__name__)

# Filter out tags that exactly match these terms. All terms should be
# lowercase.
TAG_BLACKLIST = {"no person", "squareformat"}

# Filter out tags that contain the following terms. All entrees should be
# lowercase.
TAG_CONTAINS_BLACKLIST = {
"flickriosapp",
"uploaded",
":",
"=",
"cc0",
"by",
"by-nc",
"by-nd",
"by-sa",
"by-nc-nd",
"by-nc-sa",
"pdm",
}


class MediaStore(metaclass=abc.ABCMeta):
"""
An abstract base class that stores media information from a given provider.
Optional init arguments:
provider: String marking the provider in the `media`(`image`, `audio` etc) table of the DB.
output_file: String giving a temporary .tsv filename (*not* the
full path) where the media info should be stored.
output_dir: String giving a path where `output_file` should be placed.
buffer_length: Integer giving the maximum number of media information rows
to store in memory before writing them to disk.
"""

def __init__(
self,
provider: Optional[str] = None,
output_file: Optional[str] = None,
output_dir: Optional[str] = None,
buffer_length: int = 100,
media_type: Optional[str] = "generic",
):
logger.info(f"Initialized {media_type} media store with provider {provider}")
self.media_type = media_type
self._media_buffer = []
self._total_items = 0
self._PROVIDER = provider
self._BUFFER_LENGTH = buffer_length
self._NOW = datetime.now()
self._OUTPUT_PATH = self._initialize_output_path(
output_dir,
output_file,
provider,
)
self.columns = None

def save_item(self, media: collections.namedtuple) -> None:
"""
Appends item data to the buffer as a tsv row if data is valid.
Doesn't do anything if data isn't valid.
"""
tsv_row = self._create_tsv_row(media)
if tsv_row:
self._media_buffer.append(tsv_row)
self._total_items += 1
if len(self._media_buffer) >= self._BUFFER_LENGTH:
self._flush_buffer()

@abc.abstractmethod
def add_item(self, **kwargs):
"""
Abstract method to clean the item data and add it to the store
"""
pass

@staticmethod
def get_valid_license_info(
license_url,
license_,
license_version,
):
valid_license_info = licenses.get_license_info(
license_url=license_url, license_=license_, license_version=license_version
)
if valid_license_info.url != license_url:
raw_license_url = license_url
else:
raw_license_url = None
return valid_license_info, raw_license_url

def parse_item_metadata(
self,
license_url,
raw_license_url,
source,
meta_data,
raw_tags,
):
source = util.get_source(source, self._PROVIDER)
meta_data = self._enrich_meta_data(
meta_data, license_url=license_url, raw_license_url=raw_license_url
)
tags = self._enrich_tags(raw_tags)
return source, meta_data, tags

def commit(self):
"""Writes all remaining media items in the buffer to disk."""
self._flush_buffer()
return self.total_items

def _initialize_output_path(self, output_dir, output_file, provider) -> str:
"""
Creates the path for the tsv file. If output_dir and output_file are
not given, the following filename is used:
`/tmp/{provider_name}_{media_type}_{timestamp}.tsv`
"""
if output_dir is None:
logger.info("No given output directory. Using OUTPUT_DIR from environment.")
output_dir = os.getenv("OUTPUT_DIR")
if output_dir is None:
logger.warning(
"OUTPUT_DIR is not set in the environment. Output will go to /tmp."
)
output_dir = "/tmp"

if output_file is not None:
output_file = str(output_file)
else:
output_file = (
f'{provider}_{self.media_type}_{datetime.strftime(self._NOW, "%Y%m%d%H%M%S")}'
f".tsv"
)

output_path = os.path.join(output_dir, output_file)
logger.info(f"Output path: {output_path}")
return output_path

@property
def total_items(self):
"""Get total items for directly using in scripts."""
return self._total_items

def _create_tsv_row(self, item):
row_length = len(self.columns)
prepared_strings = [
self.columns[i].prepare_string(item[i]) for i in range(row_length)
]
logger.debug(f"Prepared strings list:\n{prepared_strings}")
for i in range(row_length):
if self.columns[i].REQUIRED and prepared_strings[i] is None:
logger.warning(f"Row missing required {self.columns[i].NAME}")
return None
else:
return (
"\t".join([s if s is not None else "\\N" for s in prepared_strings])
+ "\n"
)

def _flush_buffer(self) -> int:
buffer_length = len(self._media_buffer)
if buffer_length > 0:
logger.info(f"Writing {buffer_length} lines from buffer to disk.")
with open(self._OUTPUT_PATH, "a") as f:
f.writelines(self._media_buffer)
self._media_buffer = []
logger.debug(
f"Total Media Items Processed so far: {self._total_items}"
)
else:
logger.debug("Empty buffer! Nothing to write.")
return buffer_length

@staticmethod
def _tag_blacklisted(tag: Union[str, dict]) -> bool:
"""
Tag is banned or contains a banned substring.
:param tag: the tag to be verified against the blacklist
:return: true if tag is blacklisted, else returns false
"""
if type(tag) == dict: # check if the tag is already enriched
tag = tag.get("name")
if tag in TAG_BLACKLIST:
return True
for blacklisted_substring in TAG_CONTAINS_BLACKLIST:
if blacklisted_substring in tag:
return True
return False

@staticmethod
def _enrich_meta_data(meta_data, license_url, raw_license_url) -> dict:
if type(meta_data) != dict:
logger.debug(f"`meta_data` is not a dictionary: {meta_data}")
enriched_meta_data = {
"license_url": license_url,
"raw_license_url": raw_license_url,
}
else:
enriched_meta_data = meta_data
enriched_meta_data.update(
license_url=license_url, raw_license_url=raw_license_url
)
return enriched_meta_data

def _enrich_tags(self, raw_tags) -> Optional[list]:
if type(raw_tags) != list:
logger.debug("`tags` is not a list.")
return None
else:
return [
self._format_raw_tag(tag)
for tag in raw_tags
if not self._tag_blacklisted(tag)
]

def _format_raw_tag(self, tag):
if type(tag) == dict and tag.get("name") and tag.get("provider"):
logger.debug(f"Tag already enriched: {tag}")
return tag
else:
logger.debug(f"Enriching tag: {tag}")
return {"name": tag, "provider": self._PROVIDER}
Loading

0 comments on commit f3785d0

Please sign in to comment.