Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add catalog_cleaner DAG #4610

Merged
merged 8 commits into from
Jul 20, 2024
Merged

Add catalog_cleaner DAG #4610

merged 8 commits into from
Jul 20, 2024

Conversation

krysal
Copy link
Member

@krysal krysal commented Jul 12, 2024

Fixes

Fixes #3415 by @krysal

Description

This PR adds a DAG similar to the bactched_update but is dedicated to using the files from AWS S3 generated from the ingestion server. I created a new DAG since the batched_update would need more tweaks than just loading the data from S3 into a table since the update is performed by joining the newly created table with image, and also does the batch updates in parallel using the Dynamic Task Mapping feature of Airflow. Given several batched_update runs have been performed with the popularity calculations, it suggests it's possible to parallelize this task as long as the rows to be modified do not overlap. The ingestion server also similarly applies the updates.

This is a proof of concept to see the feasibility of parallelizing the DB updates, making a separate DAG we don't interfere with those that depend on the batched_update DAG, and if it's successful (which I don't see why it wouldn't be) we can discuss how to generalize it, whether if modiying the batched_update of creating another to use with temporary tables from S3 files.

Testing Instructions

You will need to have some "dirty" rows in the catalog and their corresponding TSV files containing the correct data to fix them.

  1. To create a TSV, spin up the catalog stack, ./ov just catalog/up, and if you have psql locally try this:
PGPASSWORD=deploy psql -h localhost -p 50255 openledger deploy -c "COPY (SELECT identifier, url FROM image ORDER BY created_on DESC LIMIT 15) TO STDOUT WITH DELIMITER E'\t' CSV;" >> url.tsv

You should then have a file of <identifier> <url> without quotes or a header like the following:

3074629a-9934-464b-aca6-b517f4b7cf80	https://pd.w.org/2024/06/8616676f5d15f44f7.79186143-2048x1366.jpg
e3637802-7ff5-4972-b1b4-554e97d921f1	https://pd.w.org/2024/06/85667a9763e32215.10445933-1152x2048.jpg
aec4bdef-6315-4a5d-a997-727891d2f00a	https://pd.w.org/2024/06/121666b021b681b06.28758133-1536x2048.jpg
...
  1. Then modify the URLs for the same rows. You can use a query like the following to remove the protocol:
UPDATE image SET url = ltrim(url, 'https://') WHERE identifier IN (
	SELECT identifier FROM image ORDER BY created_on DESC LIMIT 15
);
  1. Upload your new file to MinIO: http://localhost:5011/. Save the path to the bucket as you'll need it as a setting later.

  2. Go to the Airflow U, unpause the catalog_cleaner DAG, and trigger it with a configuration that fits your test case. E.g.:

{
    "batch_size": 3,
    "column": "url",
    "s3_bucket": "openverse-catalog",
    "s3_path": "shared/data-refresh-cleaned-data/url_test.tsv"
}
  1. Verify the changes were applied in the DB to the rows.
./ov just catalog/pgcli
SELECT identifier, url FROM image ORDER BY created_on DESC LIMIT 15;

Try different configurations for batch_size and/or the CLEANER_MAX_CONCURRENT_DB_UPDATE_TASKS Airflow variable to see how it works. You can add time.sleep(seconds) before returning in the update_batch function to better appreciate the concurrency.

Checklist

  • My pull request has a descriptive title (not a vague title likeUpdate index.md).
  • My pull request targets the default branch of the repository (main) or a parent feature branch.
  • My commit messages follow best practices.
  • My code follows the established code style of the repository.
  • I added or updated tests for the changes I made (if applicable).
  • I added or updated documentation (if applicable).
  • I tried running the project locally and verified that there are no visible errors.
  • I ran the DAG documentation generator (./ov just catalog/generate-docs for catalog PRs) or the media properties generator (./ov just catalog/generate-docs media-props for the catalog or ./ov just api/generate-docs for the API) where applicable.

Developer Certificate of Origin

Developer Certificate of Origin
Developer Certificate of Origin
Version 1.1

Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129

Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.


Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.

@openverse-bot openverse-bot added 🧱 stack: catalog Related to the catalog and Airflow DAGs 🧱 stack: documentation Related to Sphinx documentation 🟧 priority: high Stalls work on the project or its dependents 🌟 goal: addition Addition of new feature 💻 aspect: code Concerns the software code in the repository labels Jul 12, 2024
Copy link

Full-stack documentation: https://docs.openverse.org/_preview/4610

Please note that GitHub pages takes a little time to deploy newly pushed code, if the links above don't work or you see old versions, wait 5 minutes and try again.

You can check the GitHub pages deployment action list to see the current status of the deployments.

Changed files 🔄:

@krysal krysal mentioned this pull request Jul 12, 2024
@krysal krysal force-pushed the catalog_cleaner_dag branch from e8db5c7 to f5c04a3 Compare July 15, 2024 17:51
@krysal krysal force-pushed the catalog_cleaner_dag branch from f5c04a3 to bf5b67d Compare July 15, 2024 17:52
@krysal krysal marked this pull request as ready for review July 15, 2024 19:59
@krysal krysal requested review from a team as code owners July 15, 2024 19:59
@AetherUnbound
Copy link
Collaborator

@krysal For the testing instructions on this one, should we run that command against the upstream database or the API one? And what format should the TSV be in (e.g. what columns do we need, does it need to be quoted, etc.). If possible, do you mind supplying a set of commands we can use (similar to the one you provided for altering the records) to generate and upload the TSV so testing across contributors can be consistent? Thank you!

Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some preliminary thoughts before testing this locally!

Comment on lines 43 to 54
@task
def count_dirty_rows(temp_table_name: str, task: AbstractOperator = None):
"""Get the number of rows in the temp table before the updates."""
count = run_sql.function(
dry_run=False,
sql_template=f"SELECT COUNT(*) FROM {temp_table_name}",
query_id=f"{temp_table_name}_count",
handler=single_value,
task=task,
)
logger.info(f"Found {count:,} rows in the `{temp_table_name}` table.")
return count
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: since run_sql is already a @task and this merely adds a logger.info line (with information which could be pieced together from the XComs and arguments), maybe it makes more sense to use run_sql directly with a .override(task_id="count_dirty_rows")?

Copy link
Member Author

@krysal krysal Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation! This reminds me that I left that to be developed later. I can do that change and add the slack notification before the update.

Comment on lines +73 to +76
# Includes the formatted batch range in the context to be used as the index
# template for easier identification of the tasks in the UI.
context = get_current_context()
context["index_template"] = f"{batch_start}__{batch_end}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah, this is neat!! I didn't know this could be evaluated after the task had started 😮

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, TIL!!

@krysal
Copy link
Member Author

krysal commented Jul 17, 2024

@krysal For the testing instructions on this one, should we run that command against the upstream database or the API one? And what format should the TSV be in (e.g. what columns do we need, does it need to be quoted, etc.). If possible, do you mind supplying a set of commands we can use (similar to the one you provided for altering the records) to generate and upload the TSV so testing across contributors can be consistent? Thank you!

This is for cleaning the catalog so all the operations are performed over the upstream DB. I added a handy command that should get you the file if you have psql locally. If not, then it can be applied inside one of the docker containers, it just would be one or two more steps to get it to the host. Let me know if that works for you.

The columns that need to be cleaned in production (that is, for which we have files) are specifically URL, creator_url, and foreign_landing_url. The generated files are quite simple: Paris of identifiers and the fixed URL value, no quotes and no header.

@krysal krysal requested a review from AetherUnbound July 18, 2024 00:39
Copy link
Collaborator

@AetherUnbound AetherUnbound left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much for the testing instructions, I was able to execute them perfectly! And that's a good thing to note about psql, sounds like it's a tool we should add to the ov container as well for this sort of thing 😄 I'll make an issue for it.

I was able to test this from end-to-end and it worked just as expected, including the batch mapped indices which is very neat. Great work on this!

@openverse-bot
Copy link
Collaborator

Based on the high urgency of this PR, the following reviewers are being gently reminded to review this PR:

@obulat
@stacimc
This reminder is being automatically generated due to the urgency configuration.

Excluding weekend1 days, this PR was ready for review 3 day(s) ago. PRs labelled with high urgency are expected to be reviewed within 2 weekday(s)2.

@krysal, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings.

Footnotes

  1. Specifically, Saturday and Sunday.

  2. For the purpose of these reminders we treat Monday - Friday as weekdays. Please note that the operation that generates these reminders runs at midnight UTC on Monday - Friday. This means that depending on your timezone, you may be pinged outside of the expected range.

Copy link
Contributor

@obulat obulat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the detailed testing instructions, @krysal ! It worked well locally (once I figured out the connection details). Hope this works in production the first time we run it 🤞

task=task,
)
notify_slack.function(
text=f"Starting the cleaning process in upstream DB. Expecting {count:,} rows"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still call the DB upstream, or is it catalog DB (vs the API db) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to add the field being cleaned (url, foreign_landing_url etc) to this notification?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still call the DB upstream, or is it catalog DB (vs the API db) ?

That's a good question. I use both terms intermittently, which can be confusing for sure 😅 Which one do you prefer?

Would it be possible to add the field being cleaned (url, foreign_landing_url etc) to this notification?

It's part of the temporary table name but thinking again, it's better to make it explicit. Done! :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which one do you prefer?
I would prefer catalog.
Although you could say that the API is also a "catalog", but I guess I look at it more from the point of view of our stacks. On another hand, since this database (upstream/catalog) is the single source of truth that contains all information which can later be filtered in the API database, it makes sense to consider it "the catalog"

Copy link
Collaborator

@stacimc stacimc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me -- thanks for the very clear description in addition to the testing instructions. The justification seems sound to me, and I'm also excited to test parallelization of the batches; I don't recall any reason for avoiding it. 🚀

Comment on lines +73 to +76
# Includes the formatted batch range in the context to be used as the index
# template for easier identification of the tasks in the UI.
context = get_current_context()
context["index_template"] = f"{batch_start}__{batch_end}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, TIL!!

@krysal
Copy link
Member Author

krysal commented Jul 20, 2024

Thank you all for the review folks! ✨ I'll be starting the DAG on Monday.

@krysal krysal merged commit e8fabd0 into main Jul 20, 2024
42 checks passed
@krysal krysal deleted the catalog_cleaner_dag branch July 20, 2024 00:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository 🌟 goal: addition Addition of new feature 🟧 priority: high Stalls work on the project or its dependents 🧱 stack: catalog Related to the catalog and Airflow DAGs 🧱 stack: documentation Related to Sphinx documentation
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Use the batched_update DAG with stored CSVs to update Catalog URLs
5 participants