Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new data refresh factory #4259

Merged
merged 9 commits into from
May 16, 2024
Merged

Add new data refresh factory #4259

merged 9 commits into from
May 16, 2024

Conversation

stacimc
Copy link
Collaborator

@stacimc stacimc commented May 3, 2024

Fixes

Fixes #4146 by @stacimc

Description

This PR adds the new data refresh factory, which generates DAGs containing (for now) the initial steps and the copy data steps.

These new DAGs will create the temp tables in the API and copy the data into them from the catalog. They do not apply constraints, do anything involving indexing, or promote tables. They should not be turned on in production (but it would be harmless if they were accidentally enabled).

Screenshot 2024-05-02 at 5 03 43 PM

In the code, TODOs are intentionally left to indicate where the additional pieces will be added.

Testing Instructions

Locally, run an image and an audio DAG for a few minutes to ingest some new records (I ran Cleveland and Jamendo).

Now try the staging_audio_data_refresh and staging_image_data_refresh. These should run successfully. You can trigger them at the same time to verify the concurrency handling works (whichever one starts second should wait on the other).

Since these don't yet build the new ES indices or promote the tables, we can't rely on the logs from the report_record_counts tasks to see if they worked. Instead run just api/pgcli to access the local API database, and check to see that the temp tables were created and that they contain the expected number of records. Remember to check that a temp table was created for the audioset table as well as audio!

select count(*) from temp_import_audioset;
select count(*) from temp_import_audio;
select count(*) from temp_import_image;

If you want to run the DAGs multiple times you'll need to manually drop the temp tables here as well.

This code refactors everything from the ingestion server's refresh_api_table. You can review that code for comparison. I also recommend running a "normal" or legacy data refresh and checking the logs for the SQL that is executed during a data refresh, to compare with what is run here.

Checklist

  • My pull request has a descriptive title (not a vague title likeUpdate index.md).
  • My pull request targets the default branch of the repository (main) or a parent feature branch.
  • My commit messages follow best practices.
  • My code follows the established code style of the repository.
  • I added or updated tests for the changes I made (if applicable).
  • I added or updated documentation (if applicable).
  • I tried running the project locally and verified that there are no visible errors.
  • I ran the DAG documentation generator (just catalog/generate-docs for catalog
    PRs) or the media properties generator (just catalog/generate-docs media-props
    for the catalog or just api/generate-docs for the API) where applicable.

Developer Certificate of Origin

Developer Certificate of Origin
Developer Certificate of Origin
Version 1.1

Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129

Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.


Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.

@stacimc stacimc self-assigned this May 3, 2024
@stacimc stacimc added 🟨 priority: medium Not blocking but should be addressed soon 🌟 goal: addition Addition of new feature 💻 aspect: code Concerns the software code in the repository 🧱 stack: ingestion server Related to the ingestion/data refresh server 🧱 stack: catalog Related to the catalog and Airflow DAGs labels May 3, 2024
@stacimc
Copy link
Collaborator Author

stacimc commented May 3, 2024

This needs to be rebased on #4260 because the diff is currently very confusing, since this PR both renames the old data refresh module to legacy_data_refresh and adds a new data_refresh module.

@stacimc stacimc force-pushed the add/new-data-refresh-factory branch from 9d2c216 to c26656a Compare May 7, 2024 00:51
Copy link

github-actions bot commented May 7, 2024

Full-stack documentation: https://docs.openverse.org/_preview/4259

Please note that GitHub pages takes a little time to deploy newly pushed code, if the links above don't work or you see old versions, wait 5 minutes and try again.

You can check the GitHub pages deployment action list to see the current status of the deployments.

Changed files 🔄:

@stacimc stacimc marked this pull request as ready for review May 7, 2024 18:07
@stacimc stacimc requested review from a team as code owners May 7, 2024 18:07
@stacimc stacimc requested review from krysal and obulat May 7, 2024 18:07
@sarayourfriend
Copy link
Collaborator

sarayourfriend commented May 9, 2024

Update: It passed after retrying. I don't know what happened here, but it looks like some kind of race condition?

@stacimc I've tried running the audio data refresh locally, but it's failing at setup_id_columns for the audioset:

[2024-05-09, 05:39:19 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: staging_audio_data_refresh.copy_upstream_tables.copy_upstream_table.setup_id_columns manual__2024-05-09T05:39:12.265657+00:00 map_index=1 [queued]>
[2024-05-09, 05:39:19 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: staging_audio_data_refresh.copy_upstream_tables.copy_upstream_table.setup_id_columns manual__2024-05-09T05:39:12.265657+00:00 map_index=1 [queued]>
[2024-05-09, 05:39:19 UTC] {taskinstance.py:2193} INFO - Starting attempt 1 of 3
[2024-05-09, 05:39:19 UTC] {taskinstance.py:2217} INFO - Executing <Task(_PythonDecoratedOperator): copy_upstream_tables.copy_upstream_table.setup_id_columns> on 2024-05-09 05:39:12.265657+00:00
[2024-05-09, 05:39:19 UTC] {standard_task_runner.py:60} INFO - Started process 2061 to run task
[2024-05-09, 05:39:19 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'staging_audio_data_refresh', 'copy_upstream_tables.copy_upstream_table.setup_id_columns', 'manual__2024-05-09T05:39:12.265657+00:00', '--job-id', '45', '--raw', '--subdir', 'DAGS_FOLDER/data_refresh/dag_factory.py', '--cfg-path', '/tmp/tmpgp1dpqho', '--map-index', '1']
[2024-05-09, 05:39:19 UTC] {standard_task_runner.py:88} INFO - Job 45: Subtask copy_upstream_tables.copy_upstream_table.setup_id_columns
[2024-05-09, 05:39:19 UTC] {task_command.py:423} INFO - Running <TaskInstance: staging_audio_data_refresh.copy_upstream_tables.copy_upstream_table.setup_id_columns manual__2024-05-09T05:39:12.265657+00:00 map_index=1 [running]> on host a1c5dbc77be3
[2024-05-09, 05:39:19 UTC] {taskinstance.py:2513} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='data-eng-admin' AIRFLOW_CTX_DAG_ID='staging_audio_data_refresh' AIRFLOW_CTX_TASK_ID='copy_upstream_tables.copy_upstream_table.setup_id_columns' AIRFLOW_CTX_EXECUTION_DATE='2024-05-09T05:39:12.265657+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-05-09T05:39:12.265657+00:00'
[2024-05-09, 05:39:19 UTC] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
[2024-05-09, 05:39:19 UTC] {base.py:83} INFO - Using connection ID 'postgres_openledger_api_staging' for task execution.
[2024-05-09, 05:39:19 UTC] {sql.py:457} INFO - Running statement: SET statement_timeout TO '3600.0s'; 
ALTER TABLE temp_import_audioset ADD COLUMN IF NOT EXISTS
    id serial;
CREATE SEQUENCE IF NOT EXISTS id_temp_seq;
ALTER TABLE temp_import_audioset ALTER COLUMN
    id SET DEFAULT nextval('id_temp_seq'::regclass);, parameters: None
[2024-05-09, 05:39:19 UTC] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 439, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/decorators/base.py", line 241, in execute
    return_value = super().execute(context)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 200, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 217, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/catalog/dags/data_refresh/copy_data.py", line 57, in _run_sql
    return postgres.run(query, handler=handler)
  File "/opt/airflow/catalog/dags/common/sql.py", line 90, in run
    return super().run(sql, autocommit, parameters, handler)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/common/sql/hooks/sql.py", line 404, in run
    self._run_command(cur, sql_statement, parameters)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/common/sql/hooks/sql.py", line 462, in _run_command
    cur.execute(sql_statement)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "pg_type_typname_nsp_index"
DETAIL:  Key (typname, typnamespace)=(id_temp_seq, 2200) already exists.
[2024-05-09, 05:39:19 UTC] {taskinstance.py:1149} INFO - Marking task as UP_FOR_RETRY. dag_id=staging_audio_data_refresh, task_id=copy_upstream_tables.copy_upstream_table.setup_id_columns, map_index=1, execution_date=20240509T053912, start_date=20240509T053919, end_date=20240509T053919
[2024-05-09, 05:39:19 UTC] {standard_task_runner.py:107} ERROR - Failed to execute job 45 for task copy_upstream_tables.copy_upstream_table.setup_id_columns (duplicate key value violates unique constraint "pg_type_typname_nsp_index"
DETAIL:  Key (typname, typnamespace)=(id_temp_seq, 2200) already exists.
; 2061)
[2024-05-09, 05:39:20 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-05-09, 05:39:20 UTC] {taskinstance.py:3312} INFO - 0 downstream tasks scheduled from follow-on schedule check

Copy link
Collaborator

@sarayourfriend sarayourfriend left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exciting! It passes for me locally, except for the odd issue I had with the audio data refresh, shared in a separate comment on the PR. It did eventually work fine when it retried, without me doing anything, so I'm assuming it's a small edge-case bug, rather than signifying a major issue with the approach overall.

I've requested a non-specific change to make it easier to understand the mapped tasks, but I don't even know if what I am asking is possible, or if I'm just looking at the page wrong and what I'm asking for is already there. In any case, it isn't a blocker, but if it can be done, either here or in a follow-up, I think it'd be nice to do (as an issue outside the project scope, not blocking its completion).

catalog/dags/data_refresh/data_refresh_types.py Outdated Show resolved Hide resolved
Comment on lines +54 to +61
METRIC_COLUMN_SETUP_QUERY = dedent(
"""
ALTER TABLE {temp_table_name} ADD COLUMN IF NOT EXISTS
standardized_popularity double precision;
ALTER TABLE {temp_table_name} ALTER COLUMN
view_count SET DEFAULT 0;
"""
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was originally going to ask if these dedents were necessary, and left the example below as a suggestion for how to avoid them, but I think the extra whitespace makes it all easier to read... even if needing to pass it through dedent is a tedious pain.

Suggested change
METRIC_COLUMN_SETUP_QUERY = dedent(
"""
ALTER TABLE {temp_table_name} ADD COLUMN IF NOT EXISTS
standardized_popularity double precision;
ALTER TABLE {temp_table_name} ALTER COLUMN
view_count SET DEFAULT 0;
"""
)
METRIC_COLUMN_SETUP_QUERY = """
ALTER TABLE {temp_table_name} ADD COLUMN IF NOT EXISTS
standardized_popularity double precision;
ALTER TABLE {temp_table_name} ALTER COLUMN
view_count SET DEFAULT 0;
"""

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to ask the same, but yea, I agree that the current setup is actually a little more helpful!

catalog/dags/data_refresh/reporting.py Outdated Show resolved Hide resolved
catalog/dags/data_refresh/copy_data.py Show resolved Hide resolved
@stacimc stacimc mentioned this pull request May 9, 2024
13 tasks
Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wonderful! I was able to test the process locally and everything functioned as expected 🎉

My only other big note is: I know that much of the logic here is DAG definitions, but it does make me wonder if anything here can/should have tests associated with it as well. Most of the action being done with the addition is in SQL, but it might still be useful to have some tests for the copy_data module functions and/or the data refresh types.

Comment on lines +54 to +61
METRIC_COLUMN_SETUP_QUERY = dedent(
"""
ALTER TABLE {temp_table_name} ADD COLUMN IF NOT EXISTS
standardized_popularity double precision;
ALTER TABLE {temp_table_name} ALTER COLUMN
view_count SET DEFAULT 0;
"""
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to ask the same, but yea, I agree that the current setup is actually a little more helpful!

catalog/dags/data_refresh/copy_data.py Outdated Show resolved Hide resolved
Comment on lines 118 to 123
if (
configured_limit := Variable.get(
"DATA_REFRESH_LIMIT", default_var=None, deserialize_json=True
)
is not None
):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just be a standard falsy check, and not necessarily is not None? This would seem to mean that if DATA_REFRESH_LIMIT were set to 0, that would be returned as the configured limit - maybe that case should also return the default?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TL;DR is that this version just kept the exact behavior of the limits as we have in the existing ingestion server -- we used to interpret 0 as "no limit", but no longer do so as that's a little misleading. The result is that it's not possible to explicitly configure the limit to be None, because it gets interpreted as if the variable weren't defined.

But we can and absolutely might as well fix that while we're here! 😅 So I've updated it to explicitly check whether the variable is defined. We still treat None -> "no limit".

Comment on lines 169 to 170
# Ensure that only one table is being copied at a time.
max_active_tis_per_dagrun=1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome!!

@stacimc
Copy link
Collaborator Author

stacimc commented May 13, 2024

@sarayourfriend I pushed changes but forgot to respond -- the issue you were seeing was related to the same id sequence being used for audio and audioset. I've updated to make sure each table gets it own sequence. Good catch!

Most of the action being done with the addition is in SQL, but it might still be useful to have some tests for the copy_data module functions and/or the data refresh types.

@AetherUnbound how do you feel about adding a separate issue to the milestone? I agree with you here but think it might be worth a separate PR because of the work needed to setup those tests :/

@AetherUnbound
Copy link
Collaborator

@AetherUnbound how do you feel about adding a separate issue to the milestone? I agree with you here but think it might be worth a separate PR because of the work needed to setup those tests :/

That sounds great! I wasn't thinking it'd be that much extra, but having it as a separate issue in the milestone works for not blocking this PR while making sure we don't forget 😄

@openverse-bot
Copy link
Collaborator

Based on the medium urgency of this PR, the following reviewers are being gently reminded to review this PR:

@krysal
@obulat
This reminder is being automatically generated due to the urgency configuration.

Excluding weekend1 days, this PR was ready for review 5 day(s) ago. PRs labelled with medium urgency are expected to be reviewed within 4 weekday(s)2.

@stacimc, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings.

Footnotes

  1. Specifically, Saturday and Sunday.

  2. For the purpose of these reminders we treat Monday - Friday as weekdays. Please note that the operation that generates these reminders runs at midnight UTC on Monday - Friday. This means that depending on your timezone, you may be pinged outside of the expected range.

Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! One step closer! 😄

@sarayourfriend
Copy link
Collaborator

sarayourfriend commented May 15, 2024

I'm getting a DAG import error at the moment:

Broken DAG: [/opt/airflow/catalog/dags/data_refresh/dag_factory.py]
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 437, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 793, in __init__
    raise AirflowException(
airflow.exceptions.AirflowException: Invalid arguments were passed to _PythonDecoratedOperator (task_id: get_shared_columns). Invalid arguments were:
**kwargs: {'map_index_template': "{{ task.op_kwargs['upstream_table_name'] }}"}

I'll rebuild the catalog containers and see if it resolved the problem. Maybe my images are on an older Airflow version?

Update: A rebuild fixed this. It was indeed that my image was on an older Airflow version. Curious that it didn't automatically rebuild 🤔 Then again, I don't know how it would know to.

@sarayourfriend sarayourfriend self-requested a review May 15, 2024 23:54
Copy link
Collaborator

@sarayourfriend sarayourfriend left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works perfect! The new mapped tasks view is awesome, exactly what I was hoping for!

By the way, at some point with this work we should add the catalog profile to the ES container in the compose stack. Right now the testing instructions miss specific instructions for running, so I just used just catalog/up, but you need at least just api/up to get the ES containers running as well.

@stacimc stacimc force-pushed the add/new-data-refresh-factory branch from 4bb5bdf to 6a259e0 Compare May 16, 2024 23:04
@stacimc stacimc merged commit bc457c0 into main May 16, 2024
41 checks passed
@stacimc stacimc deleted the add/new-data-refresh-factory branch May 16, 2024 23:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository 🌟 goal: addition Addition of new feature 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs 🧱 stack: ingestion server Related to the ingestion/data refresh server
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Create the new data refresh DAG factory and move initial steps into Airflow
4 participants