-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Save cleaned data of Ingestion Server to AWS S3 #4163
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran the data refresh locally, and saw the saved data in the minio bucket. I'll look through the code again later, but wanted to add a comment here now.
When running locally, I got the bucket does not exist error. Do you think it's better to reuse an existing bucket, or to add the new one to the minio docker template:
BUCKETS_TO_CREATE=openverse-storage,openverse-airflow-logs
I checked in S3 and we've used
Having checked S3, it looks like the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm excited about this! There are a few things that will need to be adjusted, in addition to my comments above about file size before upload.
Additionally, the testing instructions appear to no longer be accurate, since that initialization based on the environment appears to be a part of the code now.
Lastly, I adjusted the default bucket name and removed some HTTP schemes from the sample data as I noted below. I was able to get the upload to work, but the CSV which was uploaded had multiple copies of each identifier (the screenshot below has the rows sorted just to show the duplications, they were not in that order in the originally produced file). It appears that some of the logic is causing duplicate file writing.
@@ -364,7 +413,7 @@ def clean_image_data(table): | |||
log.info(f"Starting {len(jobs)} cleaning jobs") | |||
|
|||
for result in pool.starmap(_clean_data_worker, jobs): | |||
batch_cleaned_counts = save_cleaned_data(result) | |||
batch_cleaned_counts = data_uploader.save(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noting this because I had to talk myself through it - I was worried that the data uploader step was happening in each process as part of the multiprocessing, but it looks like each result that comes out of pool.starmap
is processed serially, so we don't need to worry about multiple processes stepping on each other.
d24d5c4
to
314c96f
Compare
7e6c8e1
to
a9d9894
Compare
@obulat @AetherUnbound Thanks for your valuable suggestions. I was too quick to mark this as ready before but I'm still glad I did! @AetherUnbound Regarding the file size, I set it to 850 MB, so only one file is created for each field but tags. 1 GB is on the big side for downloading without a high-speed connection. About the duplicated rows in the file, I'm not sure what could have happened there; I don't see those results in my tests 🤔 Could you try again? I applied the rest of changes suggested. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works great now, @krysal !
I left several comments for improvements (non-blocking).
92b4eec
to
98d3334
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unfortunately still producing duplicate rows for me locally 😕 here's everything I did to get this working:
- Add
OPENVERSE_BUCKET=openverse-storage
because my existing minio environment file didn't have the new bucket, and the new bucket defaults toopenverse-catalog
- Remove
https://
from theforeign_landing_url
for the first 10 records ofsample_image.csv
- Run
just down -v
- Run
just c
to start the catalog (and get S3 running) - Run
just api/init
(this performs the image data refresh for us) - Download the produced TSV from localhost:5011
Although the logs say 10 records were written for foreign_landing_url
, the CSV I download has 40 records (4 full copies of each 10 rows).
96bb15d
to
9090a1d
Compare
b57890d
to
ed6baed
Compare
@sarayourfriend That is very helpful, thank you!
I'm using the existing bucket so there shouldn't be surprises with it :) Good to know that either way! @AetherUnbound I resorted to simplify the file management given the previous approach was producing strange results in integration tests. This is ready for a re-review now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good, I was able to test it with the ingestion altering method I had used in the past. The output had no duplicate rows this time! I have one non-blocking suggestion.
Co-authored-by: Madison Swain-Bowden <[email protected]>
5c5d72e
to
9e1fff0
Compare
0c3eaa5
to
7c6fefe
Compare
It turns out the quotes are needed for |
58285fb
to
7c6fefe
Compare
8f89c2b
to
98024a7
Compare
Okay, so I just moved the instructions regarding whether the file upload is skipped or not, so there is not much change, which is interesting in relation to tests... Anyway, I'm merging this now! |
Fixes
Part of #3912 by @krysal
Description
This PR changes the Ingestion Server's cleanup step to save the cleaned data to an S3 bucket once they reach a certain number of rows. I chose 10M given that from the previously saved files, the one with most lines has +9M, so we know around that quantity is manageable in memory and produce fewer file parts.
S3 does not support appending to files; you can only replace them, so the file for each column to modify is split into chunks and uploaded as they are generated.
This is the first part to upload the files, additionally, the server instance will need to have permission over the bucket, as @sarayourfriend pointed out.
Testing Instructions
Using Airflow UI
To test this locally with MinIO, create the
openverse-catalog
bucket if it's not automatically created at start. Go to http://localhost:5011/ (username:test_user
& password:test_secret
).Then make some rows of the
image
table in the catalog dirty by removing the protocol in one or several of the URLs (url
,creator_url
, orforeign_landing_url
) or modify the tags for low accuracy. Run the data refresh from the Airflow UI, wait for it to finish and check the bucket in MinIO.http://localhost:5011/browser/openverse-catalog/
The data refresh should continue even if the upload to S3 fails for whatever reason. Try shooting down the S3 container and clearing the
ingest_upstream
step in the DAG to confirms it continues despite the failure in the upload.Set the
CLEANUP_BUFFER_SIZE
environment variable to a low number, like 5 (having more rows to clean), to test uploading several files for the same field.Using init script
Alternatively, follow instructions provided by @AetherUnbound in this comment.
Checklist
Update index.md
).main
) or a parent feature branch.Developer Certificate of Origin
Developer Certificate of Origin