Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor popularity SQL #2964

Merged
merged 20 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions catalog/DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -743,21 +743,27 @@ https://www.rawpixel.com/api/v1/search?tags=$publicdomain&page=1&pagesize=100
## `recreate_audio_popularity_calculation`

This file generates Apache Airflow DAGs that, for the given media type,
completely wipe out the PostgreSQL relations and functions involved in
calculating our standardized popularity metric. It then recreates relations and
functions to make the calculation, and performs an initial calculation. The
results are available in the materialized view for that media type.
completely wipes out and recreates the PostgreSQL functions involved in
calculating our standardized popularity metric.

Note that they do not drop any tables or views related to popularity, and they
do not perform any popularity calculations. Once this DAG has been run, the
associated popularity refresh DAG must be run in order to actually recalculate
popularity constants and standardized popularity scores using the new functions.

These DAGs are not on a schedule, and should only be run manually when new SQL
code is deployed for the calculation.

## `recreate_image_popularity_calculation`

This file generates Apache Airflow DAGs that, for the given media type,
completely wipe out the PostgreSQL relations and functions involved in
calculating our standardized popularity metric. It then recreates relations and
functions to make the calculation, and performs an initial calculation. The
results are available in the materialized view for that media type.
completely wipes out and recreates the PostgreSQL functions involved in
calculating our standardized popularity metric.

Note that they do not drop any tables or views related to popularity, and they
do not perform any popularity calculations. Once this DAG has been run, the
associated popularity refresh DAG must be run in order to actually recalculate
popularity constants and standardized popularity scores using the new functions.

These DAGs are not on a schedule, and should only be run manually when new SQL
code is deployed for the calculation.
Expand Down
39 changes: 39 additions & 0 deletions catalog/dags/common/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Literal

Expand Down Expand Up @@ -36,3 +37,41 @@
AWS_RDS_CONN_ID = os.environ.get("AWS_RDS_CONN_ID", AWS_CONN_ID)
ES_PROD_HTTP_CONN_ID = "elasticsearch_http_production"
REFRESH_POKE_INTERVAL = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 30))


@dataclass
class SQLInfo:
"""
Configuration object for a media type's popularity SQL info.

Required Constructor Arguments:

media_table: name of the main media table
metrics_table: name of the popularity metrics table
standardized_popularity_fn: name of the standardized_popularity sql
function
popularity_percentile_fn: name of the popularity percentile sql
function

"""

media_table: str
metrics_table: str
standardized_popularity_fn: str
popularity_percentile_fn: str


SQL_INFO_BY_MEDIA_TYPE = {
AUDIO: SQLInfo(
media_table=AUDIO,
metrics_table="audio_popularity_metrics",
standardized_popularity_fn="standardized_audio_popularity",
popularity_percentile_fn="audio_popularity_percentile",
),
IMAGE: SQLInfo(
media_table=IMAGE,
metrics_table="image_popularity_metrics",
standardized_popularity_fn="standardized_image_popularity",
popularity_percentile_fn="image_popularity_percentile",
),
}
Comment on lines +42 to +77
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. There's a similar dataclass in the API's testconf: https://github.com/WordPress/openverse/blob/HEAD/api/test/unit/conftest.py#L59-L92

59 changes: 22 additions & 37 deletions catalog/dags/common/loader/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,24 @@
from airflow.models.abstractoperator import AbstractOperator
from psycopg2.errors import InvalidTextRepresentation

from common.constants import AUDIO, IMAGE, MediaType
from common.constants import IMAGE, MediaType, SQLInfo
from common.loader import provider_details as prov
from common.loader.paths import _extract_media_type
from common.popularity.constants import (
STANDARDIZED_AUDIO_POPULARITY_FUNCTION,
STANDARDIZED_IMAGE_POPULARITY_FUNCTION,
)
from common.sql import PostgresHook
from common.storage import columns as col
from common.storage.columns import NULL, Column, UpsertStrategy
from common.storage.db_columns import AUDIO_TABLE_COLUMNS, IMAGE_TABLE_COLUMNS
from common.storage.db_columns import setup_db_columns_for_media_type
from common.storage.tsv_columns import (
COLUMNS,
CURRENT_AUDIO_TSV_COLUMNS,
CURRENT_IMAGE_TSV_COLUMNS,
required_columns,
REQUIRED_COLUMNS,
setup_tsv_columns_for_media_type,
)
from common.utils import setup_sql_info_for_media_type


logger = logging.getLogger(__name__)

LOAD_TABLE_NAME_STUB = "load_"
TABLE_NAMES = {AUDIO: AUDIO, IMAGE: IMAGE}
DB_USER_NAME = "deploy"
NOW = "NOW()"
FALSE = "'f'"
Expand All @@ -44,14 +39,6 @@
prov.SMK_DEFAULT_PROVIDER: "1 month 3 days",
}

DB_COLUMNS = {
IMAGE: IMAGE_TABLE_COLUMNS,
AUDIO: AUDIO_TABLE_COLUMNS,
}
TSV_COLUMNS = {
AUDIO: CURRENT_AUDIO_TSV_COLUMNS,
IMAGE: CURRENT_IMAGE_TSV_COLUMNS,
}
CURRENT_TSV_VERSION = "001"
RETURN_ROW_COUNT = lambda c: c.rowcount # noqa: E731

Expand All @@ -67,19 +54,21 @@ def create_column_definitions(table_columns: list[Column], is_loading=True):
return ",\n ".join(definitions)


@setup_tsv_columns_for_media_type
def create_loading_table(
postgres_conn_id: str,
identifier: str,
media_type: str = IMAGE,
*,
media_type: str,
tsv_columns: list[Column],
):
"""Create intermediary table and indices if they do not exist."""
load_table = _get_load_table_name(identifier, media_type=media_type)
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=10.0,
)
loading_table_columns = TSV_COLUMNS[media_type]
columns_definition = f"{create_column_definitions(loading_table_columns)}"
columns_definition = f"{create_column_definitions(tsv_columns)}"
table_creation_query = dedent(
f"""
CREATE UNLOGGED TABLE public.{load_table}(
Expand Down Expand Up @@ -216,7 +205,7 @@ def clean_intermediate_table_data(
)

missing_columns = 0
for column in required_columns:
for column in REQUIRED_COLUMNS:
missing_columns += postgres.run(
f"DELETE FROM {load_table} WHERE {column.db_name} IS NULL;",
handler=RETURN_ROW_COUNT,
Expand Down Expand Up @@ -268,13 +257,16 @@ def _is_tsv_column_from_different_version(
)


@setup_sql_info_for_media_type
@setup_db_columns_for_media_type
def upsert_records_to_db_table(
postgres_conn_id: str,
identifier: str,
db_table: str = None,
media_type: str = IMAGE,
*,
media_type: str,
db_columns: list[Column],
sql_info: SQLInfo,
tsv_version: str = CURRENT_TSV_VERSION,
popularity_function: str = STANDARDIZED_IMAGE_POPULARITY_FUNCTION,
task: AbstractOperator = None,
):
"""
Expand All @@ -285,35 +277,28 @@ def upsert_records_to_db_table(

:param postgres_conn_id
:param identifier
:param db_table
:param media_type
:param tsv_version: The version of TSV being processed. This
determines which columns are used in the upsert query.
:param task To be automagically passed by airflow.
:return:
"""
if db_table is None:
db_table = TABLE_NAMES.get(media_type, TABLE_NAMES[IMAGE])

if media_type is AUDIO:
popularity_function = STANDARDIZED_AUDIO_POPULARITY_FUNCTION

load_table = _get_load_table_name(identifier, media_type=media_type)
logger.info(f"Upserting new records into {db_table}.")
logger.info(f"Upserting new records into {sql_info.media_table}.")
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=PostgresHook.get_execution_timeout(task),
)

# Remove identifier column
db_columns: list[Column] = DB_COLUMNS[media_type][1:]
db_columns = db_columns[1:]
column_inserts = {}
column_conflict_values = {}
for column in db_columns:
args = []
if column.db_name == col.STANDARDIZED_POPULARITY.db_name:
args = [
popularity_function,
sql_info.standardized_popularity_fn,
]

if column.upsert_strategy == UpsertStrategy.no_change:
Expand All @@ -331,13 +316,13 @@ def upsert_records_to_db_table(
upsert_conflict_string = ",\n ".join(column_conflict_values.values())
upsert_query = dedent(
f"""
INSERT INTO {db_table} AS old
INSERT INTO {sql_info.media_table} AS old
({col.DIRECT_URL.name}, {', '.join(column_inserts.keys())})
SELECT DISTINCT ON ({col.DIRECT_URL.name}) {col.DIRECT_URL.name},
{', '.join(column_inserts.values())}
FROM {load_table} as new
WHERE NOT EXISTS (
SELECT {col.DIRECT_URL.name} from {db_table}
SELECT {col.DIRECT_URL.name} from {sql_info.media_table}
WHERE {col.DIRECT_URL.name} = new.{col.DIRECT_URL.name} AND
MD5({col.FOREIGN_ID.name}) <> MD5(new.{col.FOREIGN_ID.name})
)
Expand Down
29 changes: 0 additions & 29 deletions catalog/dags/common/popularity/README.md

This file was deleted.

7 changes: 0 additions & 7 deletions catalog/dags/common/popularity/constants.py

This file was deleted.

Loading