Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove popularity constants view #2883

Merged
merged 15 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions catalog/dags/common/popularity/constants.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
IMAGE_VIEW_NAME = "image_view"
AUDIO_VIEW_NAME = "audio_view"
AUDIOSET_VIEW_NAME = "audioset_view"
IMAGE_POPULARITY_CONSTANTS_VIEW = "image_popularity_constants"
AUDIO_POPULARITY_CONSTANTS_VIEW = "audio_popularity_constants"
IMAGE_POPULARITY_PERCENTILE_FUNCTION = "image_popularity_percentile"
AUDIO_POPULARITY_PERCENTILE_FUNCTION = "audio_popularity_percentile"
STANDARDIZED_IMAGE_POPULARITY_FUNCTION = "standardized_image_popularity"
Expand Down
241 changes: 144 additions & 97 deletions catalog/dags/common/popularity/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,33 @@
from datetime import timedelta
from textwrap import dedent

from airflow.decorators import task, task_group
from airflow.models.abstractoperator import AbstractOperator

from common.constants import AUDIO, IMAGE
from common.constants import AUDIO, DAG_DEFAULT_ARGS, IMAGE
from common.loader.sql import TABLE_NAMES
from common.popularity.constants import (
AUDIO_POPULARITY_CONSTANTS_VIEW,
AUDIO_POPULARITY_PERCENTILE_FUNCTION,
AUDIO_VIEW_NAME,
IMAGE_POPULARITY_CONSTANTS_VIEW,
IMAGE_POPULARITY_PERCENTILE_FUNCTION,
IMAGE_VIEW_NAME,
STANDARDIZED_AUDIO_POPULARITY_FUNCTION,
STANDARDIZED_IMAGE_POPULARITY_FUNCTION,
)
from common.sql import PostgresHook
from common.sql import PostgresHook, _single_value
from common.storage import columns as col
from common.storage.db_columns import AUDIO_TABLE_COLUMNS, IMAGE_TABLE_COLUMNS


DEFAULT_PERCENTILE = 0.85


IMAGE_POP_CONSTANTS_IDX = "image_popularity_constants_provider_metric_idx"
AUDIO_POP_CONSTANTS_IDX = "audio_popularity_constants_provider_metric_idx"
IMAGE_VIEW_ID_IDX = "image_view_identifier_idx"
AUDIO_VIEW_ID_IDX = "audio_view_identifier_idx"
IMAGE_VIEW_PROVIDER_FID_IDX = "image_view_provider_fid_idx"
AUDIO_VIEW_PROVIDER_FID_IDX = "audio_view_provider_fid_idx"

# Column name constants
VALUE = "val"
CONSTANT = "constant"
FID = col.FOREIGN_ID.db_name
IDENTIFIER = col.IDENTIFIER.db_name
Expand Down Expand Up @@ -64,8 +61,17 @@
Column(name=PARTITION, definition="character varying(80) PRIMARY KEY"),
Column(name=METRIC, definition="character varying(80)"),
Column(name=PERCENTILE, definition="float"),
Column(name=VALUE, definition="float"),
Column(name=CONSTANT, definition="float"),
]

# Further refactoring of this nature will be done in
# https://github.com/WordPress/openverse/issues/2678.
POPULARITY_METRICS_BY_MEDIA_TYPE = {
AUDIO: AUDIO_POPULARITY_METRICS,
IMAGE: IMAGE_POPULARITY_METRICS,
}


def drop_media_matview(
postgres_conn_id: str,
Expand All @@ -86,25 +92,19 @@ def drop_media_popularity_relations(
postgres_conn_id,
media_type=IMAGE,
db_view=IMAGE_VIEW_NAME,
constants=IMAGE_POPULARITY_CONSTANTS_VIEW,
metrics=IMAGE_POPULARITY_METRICS_TABLE_NAME,
pg_timeout: float = timedelta(minutes=10).total_seconds(),
):
if media_type == AUDIO:
db_view = AUDIO_VIEW_NAME
constants = AUDIO_POPULARITY_CONSTANTS_VIEW
metrics = AUDIO_POPULARITY_METRICS_TABLE_NAME

postgres = PostgresHook(
postgres_conn_id=postgres_conn_id, default_statement_timeout=pg_timeout
)
drop_media_view = f"DROP MATERIALIZED VIEW IF EXISTS public.{db_view} CASCADE;"
drop_popularity_constants = (
f"DROP MATERIALIZED VIEW IF EXISTS public.{constants} CASCADE;"
)
drop_popularity_metrics = f"DROP TABLE IF EXISTS public.{metrics} CASCADE;"
postgres.run(drop_media_view)
postgres.run(drop_popularity_constants)
postgres.run(drop_popularity_metrics)


Expand Down Expand Up @@ -153,27 +153,34 @@ def create_media_popularity_metrics(
postgres.run(query)


@task
def update_media_popularity_metrics(
postgres_conn_id,
media_type=IMAGE,
popularity_metrics=None,
popularity_metrics_table=IMAGE_POPULARITY_METRICS_TABLE_NAME,
popularity_percentile=IMAGE_POPULARITY_PERCENTILE_FUNCTION,
task: AbstractOperator = None,
):
if popularity_metrics is None:
if media_type == AUDIO:
popularity_metrics = AUDIO_POPULARITY_METRICS
else:
popularity_metrics = IMAGE_POPULARITY_METRICS
popularity_metrics = POPULARITY_METRICS_BY_MEDIA_TYPE[media_type]
if media_type == AUDIO:
popularity_metrics_table = AUDIO_POPULARITY_METRICS_TABLE_NAME
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=PostgresHook.get_execution_timeout(task),
)

column_names = [c.name for c in POPULARITY_METRICS_TABLE_COLUMNS]

# Note that we do not update the val and constant. That is only done during the
# calculation tasks. In other words, we never want to clear out the current value of
# the popularity constant unless we're already done calculating the new one, since
# that can be a time consuming process.
updates_string = ",\n ".join(
f"{c}=EXCLUDED.{c}" for c in column_names if c != PARTITION
f"{c}=EXCLUDED.{c}"
for c in column_names
if c not in [PARTITION, CONSTANT, VALUE]
)
popularity_metric_inserts = _get_popularity_metric_insert_values_string(
popularity_metrics
Expand All @@ -191,7 +198,117 @@ def update_media_popularity_metrics(
;
"""
)
postgres.run(query)
return postgres.run(query)


@task
def calculate_media_popularity_percentile_value(
postgres_conn_id,
provider,
media_type=IMAGE,
popularity_metrics_table=IMAGE_POPULARITY_METRICS_TABLE_NAME,
popularity_percentile=IMAGE_POPULARITY_PERCENTILE_FUNCTION,
task: AbstractOperator = None,
):
if media_type == AUDIO:
popularity_metrics_table = AUDIO_POPULARITY_METRICS_TABLE_NAME
popularity_percentile = AUDIO_POPULARITY_PERCENTILE_FUNCTION
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of following the existing conventions in this file, that will be cleaned up in #2678. It's not easy to refactor it in this PR without also touching a lot of other methods, and I felt that would make this harder to review.


postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=PostgresHook.get_execution_timeout(task),
)

# Calculate the percentile value. E.g. if `percentile` = 0.80, then we'll
# calculate the _value_ of the 80th percentile for this provider's
# popularity metric.
calculate_new_percentile_value_query = dedent(
f"""
SELECT {popularity_percentile}({PARTITION}, {METRIC}, {PERCENTILE})
FROM {popularity_metrics_table}
WHERE {col.PROVIDER.db_name}='{provider}';
"""
)

return postgres.run(calculate_new_percentile_value_query, handler=_single_value)


@task
def update_percentile_and_constants_values_for_provider(
postgres_conn_id,
provider,
raw_percentile_value,
media_type=IMAGE,
popularity_metrics=None,
popularity_metrics_table=IMAGE_POPULARITY_METRICS_TABLE_NAME,
task: AbstractOperator = None,
):
if popularity_metrics is None:
popularity_metrics = POPULARITY_METRICS_BY_MEDIA_TYPE.get(media_type, {})
if media_type == AUDIO:
popularity_metrics_table = AUDIO_POPULARITY_METRICS_TABLE_NAME

if raw_percentile_value is None:
# Occurs when a provider has a metric configured, but there are no records
# with any data for that metric.
return

postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=PostgresHook.get_execution_timeout(task),
)

provider_info = popularity_metrics.get(provider)
percentile = provider_info.get("percentile", DEFAULT_PERCENTILE)

# Calculate the popularity constant using the percentile value
percentile_value = raw_percentile_value or 1
new_constant = ((1 - percentile) / (percentile)) * percentile_value

# Update the percentile value and constant in the metrics table
update_constant_query = dedent(
f"""
UPDATE public.{popularity_metrics_table}
SET {VALUE} = {percentile_value}, {CONSTANT} = {new_constant}
WHERE {col.PROVIDER.db_name} = '{provider}';
"""
)
return postgres.run(update_constant_query)


@task_group
def update_percentile_and_constants_for_provider(
postgres_conn_id, provider, media_type=IMAGE, execution_timeout=None
):
calculate_percentile_val = calculate_media_popularity_percentile_value.override(
task_id="calculate_percentile_value",
execution_timeout=execution_timeout
or DAG_DEFAULT_ARGS.get("execution_timeout"),
)(
postgres_conn_id=postgres_conn_id,
provider=provider,
media_type=media_type,
)
calculate_percentile_val.doc = (
"Calculate the percentile popularity value for this provider. For"
" example, if this provider has `percentile`=0.80 and `metric`='views',"
" calculate the 80th percentile value of views for all records for this"
" provider."
)

update_metrics_table = update_percentile_and_constants_values_for_provider.override(
task_id="update_percentile_values_and_constant",
)(
postgres_conn_id=postgres_conn_id,
provider=provider,
raw_percentile_value=calculate_percentile_val,
media_type=media_type,
)
update_metrics_table.doc = (
"Given the newly calculated percentile value, calculate the"
" popularity constant and update the metrics table with the newly"
" calculated values."
)


def _get_popularity_metric_insert_values_string(
Expand All @@ -213,7 +330,8 @@ def _format_popularity_metric_insert_tuple_string(
metric,
percentile,
):
return f"('{provider}', '{metric}', {percentile})"
# Default null val and constant
return f"('{provider}', '{metric}', {percentile}, null, null)"


def create_media_popularity_percentile_function(
Expand Down Expand Up @@ -246,85 +364,14 @@ def create_media_popularity_percentile_function(
postgres.run(query)


def create_media_popularity_constants_view(
postgres_conn_id,
media_type=IMAGE,
popularity_constants=IMAGE_POPULARITY_CONSTANTS_VIEW,
popularity_constants_idx=IMAGE_POP_CONSTANTS_IDX,
popularity_metrics=IMAGE_POPULARITY_METRICS_TABLE_NAME,
popularity_percentile=IMAGE_POPULARITY_PERCENTILE_FUNCTION,
task: AbstractOperator = None,
):
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=PostgresHook.get_execution_timeout(task),
)
if media_type == AUDIO:
popularity_constants = AUDIO_POPULARITY_CONSTANTS_VIEW
popularity_constants_idx = AUDIO_POP_CONSTANTS_IDX
popularity_metrics = AUDIO_POPULARITY_METRICS_TABLE_NAME
popularity_percentile = AUDIO_POPULARITY_PERCENTILE_FUNCTION

create_view_query = dedent(
f"""
CREATE MATERIALIZED VIEW public.{popularity_constants} AS
WITH
popularity_metric_raw_values AS (
SELECT
*,
{popularity_percentile}({PARTITION}, {METRIC}, {PERCENTILE})
AS raw_value
FROM {popularity_metrics}
),
popularity_metric_values AS(
SELECT
*,
CASE
WHEN raw_value=0 THEN
1
ELSE
raw_value
END AS value
FROM popularity_metric_raw_values
)
SELECT *, ((1 - {PERCENTILE}) / {PERCENTILE}) * value AS {CONSTANT}
FROM popularity_metric_values;
"""
)
add_idx_query = dedent(
f"""
CREATE UNIQUE INDEX {popularity_constants_idx}
ON public.{popularity_constants}
USING btree({PARTITION}, {METRIC});
"""
)
postgres.run(create_view_query)
postgres.run(add_idx_query)


def update_media_popularity_constants(
postgres_conn_id,
media_type=IMAGE,
popularity_constants_view=IMAGE_POPULARITY_CONSTANTS_VIEW,
task: AbstractOperator = None,
):
if media_type == AUDIO:
popularity_constants_view = AUDIO_POPULARITY_CONSTANTS_VIEW
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id,
default_statement_timeout=PostgresHook.get_execution_timeout(task),
)
postgres.run(f"REFRESH MATERIALIZED VIEW CONCURRENTLY {popularity_constants_view};")


def create_standardized_media_popularity_function(
postgres_conn_id,
media_type=IMAGE,
function_name=STANDARDIZED_IMAGE_POPULARITY_FUNCTION,
popularity_constants=IMAGE_POPULARITY_CONSTANTS_VIEW,
popularity_metrics=IMAGE_POPULARITY_METRICS_TABLE_NAME,
):
if media_type == AUDIO:
popularity_constants = AUDIO_POPULARITY_CONSTANTS_VIEW
popularity_metrics = AUDIO_POPULARITY_METRICS_TABLE_NAME
function_name = STANDARDIZED_AUDIO_POPULARITY_FUNCTION
postgres = PostgresHook(
postgres_conn_id=postgres_conn_id, default_statement_timeout=10.0
Expand All @@ -335,7 +382,7 @@ def create_standardized_media_popularity_function(
provider text, meta_data jsonb
) RETURNS FLOAT AS $$
SELECT ($2->>{METRIC})::float / (($2->>{METRIC})::float + {CONSTANT})
FROM {popularity_constants} WHERE provider=$1;
FROM {popularity_metrics} WHERE provider=$1;
$$
LANGUAGE SQL
STABLE
Expand Down Expand Up @@ -403,21 +450,21 @@ def create_media_view(
def get_providers_with_popularity_data_for_media_type(
postgres_conn_id: str,
media_type: str = IMAGE,
constants_view: str = IMAGE_POPULARITY_CONSTANTS_VIEW,
popularity_metrics: str = IMAGE_POPULARITY_METRICS_TABLE_NAME,
pg_timeout: float = timedelta(minutes=10).total_seconds(),
):
"""
Return a list of distinct `provider`s that support popularity data,
for the given media type.
"""
if media_type == AUDIO:
constants_view = AUDIO_POPULARITY_CONSTANTS_VIEW
popularity_metrics = AUDIO_POPULARITY_METRICS_TABLE_NAME

postgres = PostgresHook(
postgres_conn_id=postgres_conn_id, default_statement_timeout=pg_timeout
)
providers = postgres.get_records(
f"SELECT DISTINCT provider FROM public.{constants_view};"
f"SELECT DISTINCT provider FROM public.{popularity_metrics};"
)

return [x[0] for x in providers]
Expand Down
8 changes: 8 additions & 0 deletions catalog/dags/common/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
# https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/_api/airflow/providers/postgres/hooks/postgres/index.html#airflow.providers.postgres.hooks.postgres.PostgresHook.copy_expert # noqa


def _single_value(cursor):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this is in a shared location and used in other modules, we can remove the underscore prefix, right?

Suggested change
def _single_value(cursor):
def single_value(cursor):

(Requires updates in import locations too, of course)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! I'm going to make a note to update this in the followup in #2678, because even as simple as this is I'd still want to fully retest.

try:
row = cursor.fetchone()
return row[0]
except Exception as e:
raise ValueError("Unable to extract expected row data from cursor") from e


class PostgresHook(UpstreamPostgresHook):
"""
PostgresHook that sets the database timeout on any query to match the airflow task
Expand Down
Loading