-
Notifications
You must be signed in to change notification settings - Fork 214
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a script to generate the media_properties.md
Signed-off-by: Olga Bulat <[email protected]>
- Loading branch information
Showing
11 changed files
with
1,403 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
import ast | ||
import copy | ||
from pathlib import Path | ||
|
||
|
||
COLUMNS_PATH = Path(__file__).parents[2] / "dags" / "common" / "storage" / "columns.py" | ||
|
||
COLUMNS_URL = "https://github.com/WordPress/openverse/blob/main/catalog/dags/common/storage/columns.py" # noqa: E501 | ||
|
||
COLUMN = { | ||
"python_type": None, | ||
"name": None, | ||
"db_name": None, | ||
"nullable": None, | ||
"required": False, | ||
"upsert_strategy": "newest_non_null", | ||
"base_column": None, | ||
} | ||
|
||
COLUMN_PROPS = COLUMN.keys() | ||
|
||
|
||
def file_to_ast(file_name) -> ast.Module: | ||
with open(file_name) as f: | ||
file_contents = f.read() | ||
return ast.parse(file_contents) | ||
|
||
|
||
CODE = file_to_ast(COLUMNS_PATH) | ||
|
||
|
||
def format_python_column( | ||
python_column: dict[str, any], | ||
python_column_lines: dict[str, tuple[int, int]], | ||
) -> str: | ||
""" | ||
Format the Python column properties dictionary to a string that can be | ||
used in the markdown file. | ||
""" | ||
col_type = python_column.pop("python_type") | ||
start, end = python_column_lines[col_type] | ||
python_column_string = f"[{col_type}]({COLUMNS_URL}#L{start}-L{end}) (`" | ||
|
||
col_name = python_column.pop("name") | ||
column_db_name = python_column.pop("db_name") | ||
if column_db_name and col_name != column_db_name: | ||
python_column_string += f'name="{col_name}", ' | ||
|
||
python_column_string += ", ".join( | ||
[f"{k}={v}" for k, v in python_column.items() if v is not None] | ||
) | ||
|
||
return f"{python_column_string}`)" | ||
|
||
|
||
def get_python_column_types() -> dict[str, tuple[int, int]]: | ||
""" | ||
Extract all types of columns with their line numbers for hyperlinks. | ||
Sample output: `StringColumn: (3, 5)`` | ||
""" | ||
return { | ||
item.name: (item.lineno, item.end_lineno) | ||
for item in ast.iter_child_nodes(CODE) | ||
if isinstance(item, ast.ClassDef) and item.name.endswith("Column") | ||
} | ||
|
||
|
||
def parse_col_argument_value(item): | ||
""" | ||
Return `attr` for Attribute value (upsert strategies), `func.id` for Call value (base_column), | ||
and `value` for Constant values such as `true`. | ||
We don't save the List type used for sql_args. | ||
""" | ||
# Upsert strategy | ||
if isinstance(item, ast.Attribute) and isinstance(item.value, ast.Name): | ||
return item.attr | ||
# Base column | ||
elif isinstance(item, ast.Call) and isinstance(item.func, ast.Name): | ||
return item.func.id | ||
elif isinstance(item, ast.Constant): | ||
return item.value | ||
return item.value | ||
|
||
|
||
def parse_python_columns() -> dict[str, any]: | ||
""" | ||
Parse columns.py to a dictionary with the column's `db_name` as a key, | ||
and the string describing Python column as a value. | ||
Example output: | ||
"height": "[IntegerColumn](/link/to/column/type/definition/)" + | ||
"(`name="height", nullable=True, required=False, upsert_strategy=newest_non_null`)" | ||
""" | ||
python_column_lines = get_python_column_types() | ||
|
||
# Extracts all the assignments of the form `column_name = <Type>Column(...)` | ||
cols: list[ast.Call] = [ | ||
item.value | ||
for item in ast.iter_child_nodes(CODE) | ||
if isinstance(item, ast.Assign) | ||
and isinstance(item.value, ast.Call) | ||
and isinstance(item.value.func, ast.Name) | ||
and item.value.func.id.endswith("Column") | ||
] | ||
|
||
columns = {} | ||
for col in cols: | ||
parsed_column = copy.copy(COLUMN) | { | ||
col.arg: parse_col_argument_value(col.value) | ||
for col in col.keywords | ||
if col.arg in COLUMN_PROPS | ||
} | ||
parsed_column["python_type"] = col.func.id | ||
|
||
# If required is true, then the media item is discarded if the column is null. | ||
# This mean that the column cannot have `None` as a value. | ||
if parsed_column["nullable"] is None: | ||
parsed_column["nullable"] = ( | ||
True | ||
if parsed_column["required"] is None | ||
else not parsed_column["required"] | ||
) | ||
db_name = parsed_column.get("db_name") or parsed_column["name"] | ||
columns[db_name] = format_python_column(parsed_column, python_column_lines) | ||
|
||
return columns |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
import re | ||
from dataclasses import dataclass | ||
from pathlib import Path | ||
from typing import Any, Literal | ||
|
||
|
||
LOCAL_POSTGRES_FOLDER = Path(__file__).parents[3] / "docker" / "upstream_db" | ||
SQL_PATH = { | ||
"image": LOCAL_POSTGRES_FOLDER / "0003_openledger_image_schema.sql", | ||
"audio": LOCAL_POSTGRES_FOLDER / "0006_openledger_audio_schema.sql", | ||
} | ||
sql_types = [ | ||
"integer", | ||
"boolean", | ||
"uuid", | ||
"double precision", | ||
"jsonb", | ||
"timestamp with time zone", | ||
"character varying", | ||
] | ||
sql_type_regex = re.compile(f"({'|'.join(sql_types)})") | ||
MediaType = Literal["audio", "image"] | ||
|
||
|
||
@dataclass | ||
class FieldSqlInfo: | ||
nullable: bool | ||
datatype: str | ||
constraint: str | ||
|
||
|
||
def create_db_props_dict( | ||
media_type: MediaType, | ||
) -> dict[Any, Any] | dict[Any, dict[str, FieldSqlInfo]]: | ||
""" | ||
Parse the DDL for a media type and returns a list of field | ||
sql definitions. | ||
""" | ||
|
||
create_table_regex = re.compile(r"CREATE\s+TABLE\s+\w+\.(\w+)\s+\(([\s\S]*?)\);") | ||
sql_path = SQL_PATH[media_type] | ||
|
||
with open(sql_path) as f: | ||
contents = f.read() | ||
table_description_matches = create_table_regex.search(contents) | ||
if not table_description_matches: | ||
print(f"Could not find table description for {media_type} in {sql_path}") | ||
return {} | ||
table_name = table_description_matches.group(1) | ||
if table_name != media_type: | ||
print(f"Table name {table_name} does not match media type {media_type}") | ||
return {} | ||
field_descriptions = [ | ||
field.strip() | ||
for field in table_description_matches.group(2).split("\n") | ||
if field.strip() | ||
] | ||
fields = {} | ||
for field in field_descriptions: | ||
field_name = field.split(" ")[0] | ||
# False if "not null" in field.lower() else True | ||
field_constraint = "" | ||
try: | ||
field_type = sql_type_regex.search(field).group(1) | ||
if field_type == "character varying": | ||
char_limit = field.split("(")[1].split(")")[0] | ||
field_constraint = f"({char_limit})" | ||
|
||
if "[]" in field: | ||
field_type = f"array of {field_type}" | ||
except AttributeError: | ||
raise ValueError(f"Could not find type for field {field_name} in {field}") | ||
|
||
fields[field_name] = { | ||
"sql": FieldSqlInfo( | ||
nullable="NOT NULL" not in field, | ||
datatype=field_type, | ||
constraint=field_constraint, | ||
) | ||
} | ||
return fields |
137 changes: 137 additions & 0 deletions
137
catalog/utilities/media_props_gen/generate_media_properties.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
"""Automatic media properties generation.""" | ||
|
||
import logging | ||
from dataclasses import dataclass | ||
from pathlib import Path | ||
|
||
from column_parser import parse_python_columns | ||
from db import MediaType, create_db_props_dict | ||
from md import Md | ||
|
||
|
||
log = logging.getLogger(__name__) | ||
# Silence noisy modules | ||
logging.getLogger("common.storage.media").setLevel(logging.WARNING) | ||
|
||
# Constants | ||
PARENT = Path(__file__).parent | ||
DOC_MD_PATH = PARENT / "media_properties.md" | ||
SOURCE_MD_PATH = PARENT / "media_props.md" | ||
|
||
PREAMBLE = open(Path(__file__).parent / "preamble.md").read() | ||
|
||
MEDIA_TYPES: list[MediaType] = ["audio", "image"] | ||
|
||
|
||
@dataclass | ||
class FieldInfo: | ||
name: str | ||
nullable: bool | ||
datatype: str | ||
constraint: str | ||
python_column: str = "" | ||
|
||
|
||
def generate_media_properties() -> dict: | ||
""" | ||
Generate a dictionary documenting each property of the media items. | ||
For each property, return the database field and the Python object shape. | ||
""" | ||
media_props = {} | ||
python_columns = parse_python_columns() | ||
|
||
for media_type in MEDIA_TYPES: | ||
media_props[media_type] = create_db_props_dict(media_type) | ||
|
||
# Add the python column properties to the media properties dictionary | ||
for prop in media_props[media_type].keys(): | ||
media_props[media_type][prop]["python_column"] = python_columns.get( | ||
prop, "" | ||
) | ||
|
||
return media_props | ||
|
||
|
||
def generate_db_props_string(field_name: str, field: dict) -> tuple[str, str]: | ||
field_sql = field["sql"] | ||
|
||
constraint = f"{' '+field_sql.constraint if field_sql.constraint else ''}" | ||
nullable = f"{'nullable' if field_sql.nullable else 'non-nullable'}" | ||
props_string = f"{field_sql.datatype}{constraint}, {nullable}" | ||
|
||
return f"[`{field_name}`](#{field_name})", props_string | ||
|
||
|
||
def generate_media_props_table(media_properties) -> str: | ||
"""Generate the markdown table with media properties.""" | ||
|
||
# Convert the list of FieldInfo objects to a md table | ||
table = "| Name | DB Field | Python Column |\n" | ||
table += "| --- | --- | --- |\n" | ||
for field_name, field in media_properties.items(): | ||
name, db_properties = generate_db_props_string(field_name, field) | ||
|
||
table += ( | ||
f"| {name} | {db_properties} | " f"{field.get('python_column', '')} |\n" | ||
) | ||
return table | ||
|
||
|
||
def generate_long_form_doc(markdown_descriptions: dict, media_properties: dict) -> str: | ||
""" | ||
Generate the long-form markdown documentation for each media property. | ||
Uses the markdown descriptions from the `media_props.md` source file. | ||
Also uses `media_properties` dictionary to set which media types have | ||
the specific properties. | ||
""" | ||
media_docs = "" | ||
for prop, description in markdown_descriptions.items(): | ||
prop_heading = f"{Md.heading(3, prop)}" | ||
|
||
media_types = [ | ||
f"`{media_type}`" | ||
for media_type, value in media_properties.items() | ||
if prop in value.keys() | ||
] | ||
prop_heading += f"_Media Types_: {', '.join(media_types)}\n\n" | ||
|
||
prop_doc = "".join( | ||
[f"{Md.heading(4, k)}{Md.line(v)}" for k, v in description.items()] | ||
) | ||
media_docs += prop_heading + prop_doc | ||
|
||
return media_docs | ||
|
||
|
||
def generate_markdown_doc() -> str: | ||
""" | ||
Parse the media property descriptions from the source code and `media_props.md` | ||
Generate the tables with media properties database column and | ||
Python objects characteristics, and a long-form documentation for each property. | ||
""" | ||
media_properties = generate_media_properties() | ||
markdown_descriptions = Md.parse(SOURCE_MD_PATH) | ||
|
||
image_table = generate_media_props_table(media_properties["image"]) | ||
audio_table = generate_media_props_table(media_properties["audio"]) | ||
|
||
long_form_doc = generate_long_form_doc(markdown_descriptions, media_properties) | ||
|
||
media_props_doc = f""" | ||
{PREAMBLE} | ||
{Md.heading(2, "Image Properties")}{image_table} | ||
{Md.heading(2, "Audio Properties")}{audio_table} | ||
{Md.heading(2, "Media Property Descriptions")}{long_form_doc} | ||
""".strip() | ||
return media_props_doc | ||
|
||
|
||
def write_media_props_doc(path: Path = DOC_MD_PATH) -> None: | ||
"""Generate the DAG documentation and write it to a file.""" | ||
doc_text = generate_markdown_doc() | ||
log.info(f"Writing DAG doc to {path}") | ||
path.write_text(doc_text) | ||
|
||
|
||
if __name__ == "__main__": | ||
write_media_props_doc() |
Oops, something went wrong.