-
Notifications
You must be signed in to change notification settings - Fork 212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DAG for creating staging indices #3232
Conversation
Full-stack documentation: https://docs.openverse.org/_preview/3232 Please note that GitHub pages takes a little time to deploy newly pushed code, if the links above don't work or you see old versions, wait 5 minutes and try again. You can check the GitHub pages deployment action list to see the current status of the deployments. Changed files 🔄: |
There is a milestone with issues related to filtered index creation. The project thread for search sandbox links to it, but I don't know if these issues are actually geared towards this. In particular, I don't know if they explicitly seek to handle filtered index re-creation in this case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stacimc The issue you've linked in the PR description isn't the right one. I see a note to try and find the right one, so I tried looking and also couldn't find it. Maybe we forgot to create the issues after #2358. This single PR addresses the entire implementation plan (I think?), so we could create the issue now, at least.
A couple of questions for clarification, I might just be forgetting things from the planning discussions, but they might be good to document in the code too:
- What's the significance of the "full" suffix? In what sense are those indexes "full" and others not? I'm struggling to connect the dots.
- Do we also need to deploy the staging ingestion server before the last step (maybe simultaneously with step 3) of the deployment plan?
I haven't tested this locally yet (will do so after lunch), but I love the approach you've taken. I think it makes heaps of sense to use the staging ingestion server, and gets around some nasty issues with needing to avoid collision with the actual data refresh!
catalog/dags/elasticsearch/recreate_staging_index/recreate_full_staging_index_dag_factory.py
Outdated
Show resolved
Hide resolved
catalog/dags/elasticsearch/recreate_staging_index/recreate_full_staging_index_dag_factory.py
Outdated
Show resolved
Hide resolved
return SimpleHttpOperator( | ||
task_id="get_current_index", | ||
http_conn_id="data_refresh", | ||
http_conn_id=http_conn_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool, this whole set of changes!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Tests well locally.
My one concern is over the parameter naming, as I found it confusing when I was actually testing, mostly because the alias references are non-specific, so I kept mixing up whether it was the -full
or non-full alias that I was meant to expect to move. In particular, the fact that the audio-full alias always moves, was initially difficult to wrap my head around, and I could totally see forgetting the intricacies of this behaviour whenever I first use this, probably a little while after reviewing this PR. Clearer explanations of the alias behaviour on the DAG trigger screen would help with this, essentially clarifying that the options are for the "base" media index alias and that the -full
alias always moves, no matter what you do.
On that last note: what is the purpose of the -full
alias? IIRC the intention from the implementation plan was to make it easier to iterate quickly without having to modify the ES query... but given the abstract and vague nature of "full", I wonder if it doesn't make more sense to have a free-text input that lets you assign a custom alias to add or be managed by the DAG. So that if I'm testing some specific feature, I could pass sara-cool-feature
and it would create the alias audio-sara-cool-feature
and move that around.
I suppose the question is how to easily identify the index to start with, which I'm realising is now probably the main use case for -full
index? If that's the case, would -latest-manual
be a better way to convey the purpose?
Anyway, these are just minor points that can be solved in a follow-up issue and I do not want to further delay this PR that's taken so long to get reviews for. If you think there are easy ways to clarify some of these things or like any of my concrete suggestions, I'll leave it up to you whether it's worth implementing them now or whether to create fast follow issues to avoid prolonging this PR. Both are reasonable to me.
@sarayourfriend Ah thanks -- the issue this addressed is in the infrastructure repo which is why the link did not work (fixed now). It looks like that issue was closed because the index was just added in staging manually while this PR has been stuck waiting for review. The priority can probably be lowered now.
This comes from the IP and I believe is to distinguish it from smaller proportional indices, which the IP proposes adding in a separate DAG.
The index names and alias behavior is all taken from the IP without change, but I do think the suggestion for I found a lot of additional commentary in the comments on the IP. I think the intention of having a simple
The |
Drafting this momentarily since the issue was closed. |
Gotcha! You're right that it makes more sense in context 👍 |
569702e
to
e49dca1
Compare
f318d62
to
a64bcfb
Compare
|
||
|
||
@task(retries=0) | ||
def prevent_concurrency_with_staging_database_restore(**context): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to make some sensor utils for doing these steps (here and in the staging_database_restore
), since this could already be useful in several places. I did not want to increase the complexity of this PR so I'll be doing that in a follow up.
After coming back to this PR, I've made some major changes (so re-requesting review from @sarayourfriend):
All of the same use cases are possible (default behavior of the DAG if you don't update any options is to just create the new |
@stacimc I'll review this tomorrow. |
Based on the high urgency of this PR, the following reviewers are being gently reminded to review this PR: @krysal Excluding weekend1 days, this PR was ready for review 4 day(s) ago. PRs labelled with high urgency are expected to be reviewed within 2 weekday(s)2. @stacimc, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worked perfectly! Tried first the case without any base audio index, then creating another, and then one more run deleting of the old index. Very nice! 💯
Pinging @sarayourfriend and @AetherUnbound -- I think the review reminder bot isn't working because Sara approved the original version of this PR. I cleared that review because the implementation changed considerably (summary in this comment). |
Apologies, I started reviewing this yesterday and ran out of time. It LGTM code wise, I just haven't run it locally yet.. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code all looks great, I love how easy some of these compositions are becoming and the new slack utility task!
I'm running into an issue though where subsequent reruns fail at the trigger_reindex
step 😕
Logs:
[2023-12-05, 05:09:17 UTC] {http.py:143} INFO - Calling HTTP method
[2023-12-05, 05:09:17 UTC] {base.py:73} INFO - Using connection ID 'staging_data_refresh' for task execution.
[2023-12-05, 05:09:17 UTC] {http.py:178} ERROR - HTTP error: Internal Server Error
[2023-12-05, 05:09:17 UTC] {http.py:179} ERROR - {"message": "Failed to schedule task due to an internal server error. Check scheduler logs."}
[2023-12-05, 05:09:17 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/http/hooks/http.py", line 176, in check_response
response.raise_for_status()
File "/home/airflow/.local/lib/python3.10/site-packages/requests/models.py", line 1021, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 500 Server Error: Internal Server Error for url: http://ingestion_server:8001/task
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/http/operators/http.py", line 145, in execute
response = http.run(self.endpoint, self.data, self.headers, self.extra_options)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/http/hooks/http.py", line 166, in run
return self.run_and_check(session, prepped_request, extra_options)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/http/hooks/http.py", line 217, in run_and_check
self.check_response(response)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/http/hooks/http.py", line 180, in check_response
raise AirflowException(str(response.status_code) + ":" + response.reason)
airflow.exceptions.AirflowException: 500:Internal Server Error
The ingestion server logs don't give any indication that something's wrong:
ingestion_server-1 | [2023-12-05 05:06:39,796 - elastic_transport.transport - 335][INFO] POST http://es:9200/audio-full-20231205t050631/_refresh [status:200 duration:0.136s]
ingestion_server-1 | [2023-12-05 05:06:39,819 - elastic_transport.transport - 335][INFO] PUT http://es:9200/audio-full-20231205t050631/_settings [status:200 duration:0.022s]
ingestion_server-1 | [2023-12-05 05:09:15,345 - gunicorn.error - 281][DEBUG] GET /stat/audio-full
ingestion_server-1 | [2023-12-05 05:09:15,349 - elastic_transport.transport - 335][INFO] GET http://es:9200/ [status:200 duration:0.002s]
ingestion_server-1 | [2023-12-05 05:09:15,350 - elastic_transport.transport - 335][INFO] GET http://es:9200/audio-full [status:200 duration:0.001s]
ingestion_server-1 | [2023-12-05 05:09:15,351 - gunicorn.access - 363][INFO] 172.20.0.12 - - [05/Dec/2023:05:09:15 +0000] "GET /stat/audio-full HTTP/1.1" 200 77 "-" "python-requests/2.31.0"
ingestion_server-1 | [2023-12-05 05:09:17,075 - gunicorn.error - 281][DEBUG] POST /task
ingestion_server-1 | [2023-12-05 05:09:17,088 - elastic_transport.transport - 335][INFO] GET http://es:9200/ [status:200 duration:0.003s]
ingestion_server-1 | [2023-12-05 05:09:17,088 - root - 298][INFO] Creating index audio-full-20231205t050907 for model audio from table audio.
ingestion_server-1 | [2023-12-05 05:09:17,151 - elastic_transport.transport - 335][INFO] PUT http://es:9200/audio-full-20231205t050907 [status:200 duration:0.062s]
ingestion_server-1 | [2023-12-05 05:09:17,151 - root - 307][INFO] Running distributed index using indexer workers.
ingestion_server-1 | [2023-12-05 05:09:17,157 - root - 119][INFO] Checking http://172.20.0.6:8002/healthcheck. . .
ingestion_server-1 | [2023-12-05 05:09:17,162 - root - 132][INFO] http://172.20.0.6:8002/healthcheck passed healthcheck
ingestion_server-1 | [2023-12-05 05:09:17,163 - root - 65][INFO] Assigning job {'model_name': 'audio', 'table_name': 'audio', 'start_id': 0, 'end_id': 5000, 'target_index': 'audio-full-20231205t050907'} to http://172.20.0.6:8002
ingestion_server-1 | [2023-12-05 05:09:17,170 - root - 274][INFO] Task 6e58fe46b3e247b3acaee0dfc93f1891 completed.
ingestion_server-1 | [2023-12-05 05:09:17,184 - gunicorn.access - 363][INFO] 172.20.0.12 - - [05/Dec/2023:05:09:17 +0000] "POST /task HTTP/1.1" 500 93 "-" "python-requests/2.31.0"
ingestion_server-1 | [2023-12-05 05:09:17,731 - gunicorn.error - 281][DEBUG] POST /worker_finished
ingestion_server-1 | [2023-12-05 05:09:17,732 - root - 115][INFO] Received worker_finished signal from 172.20.0.6
ingestion_server-1 | [2023-12-05 05:09:17,733 - root - 284][INFO] All indexer workers succeeded! New index: audio-full-20231205t050907
ingestion_server-1 | [2023-12-05 05:09:17,735 - elastic_transport.transport - 335][INFO] GET http://es:9200/ [status:200 duration:0.002s]
ingestion_server-1 | [2023-12-05 05:09:17,739 - gunicorn.access - 363][INFO] 172.20.0.6 - - [05/Dec/2023:05:09:17 +0000] "POST /worker_finished HTTP/1.1" 200 0 "-" "python-requests/2.31.0"
ingestion_server-1 | [2023-12-05 05:09:17,894 - elastic_transport.transport - 335][INFO] POST http://es:9200/audio-full-20231205t050907/_refresh [status:200 duration:0.154s]
ingestion_server-1 | [2023-12-05 05:09:17,914 - elastic_transport.transport - 335][INFO] PUT http://es:9200/audio-full-20231205t050907/_settings [status:200 duration:0.020s]
In fact, the new indices are made and aliased even:
Do you know why that might be failing?
config = DATA_REFRESH_CONFIGS.get(media_type) | ||
data_refresh_timeout = config.data_refresh_timeout if config else timedelta(days=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to make this code resilient? Would it make sense to simplify this with the assumption that media_type
is going to be image
or audio
only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Tested locally and it all works great 👍 I made some assumptions about the instructions, places where it seemed like they hadn't been updated from the previous instructions, but according to my current understanding of this feature, it works the way I would exect it to. I did a bunch of different configurations when running the new DAG and it always behaved precisely the way I thought it would.
@AetherUnbound Can you give more details about what you mean by subsequent reruns here? If you entirely cleared a previous, successful dagrun (one that had successfully created a new index), it will attempt to create an index with the same index name and fail (although I do see the |
Sure! I tried to convey it with my screenshot, but I triggered one run with default settings, let that finish, and then tried to trigger another run with default settings (as in, I didn't change anything when triggering the run). Then it failed at that step, was I supposed to trigger with something else? |
Gotcha! That's how I initially interpreted it but it worked for me so I thought I might've misunderstood 🤔 I've tried again to be certain and am able to trigger multiple runs one after the other with default settings; I also tried identical non-default settings and that also worked for me. I have no idea what could cause that, especially since you say it appears to work, and you only see the behavior in the UI? How bizarre! |
FWIW, in my testing I was also able to run the DAG multiple times with the default settings. |
Okay, I just rebuilt the images and tried again - same thing! I have no idea what's happening 🤷🏼♀️ Probably a mismatched config on my end. If other folks aren't seeing this then LGTM! |
Not important, but @AetherUnbound if you get a minute I'd be curious if you have similar problems when re-running any other DAGs that deal with indices, like the data refreshes or the |
I just tried, and I'm able to run the data refresh DAGs locally just fine!! So strange |
Fixes
Fixes #3246
Description
Note: I recommend reviewing this PR by commit (although note that the first commit was a reverted initial attempt and can be ignored).
This PR adds a
recreate_full_staging_index
DAG, which can be used to create a new elasticsearch index in the staging cluster.TODO: I could not find an issue for creating the DAG for the search relevancy project. Double check this and link it.
The DAGs work by connecting to the staging ingestion server. This is much simpler to implement than updating the production ingestion server to optionally connect to the staging elasticsearch cluster, and also avoids many questions about valid connection configurations. The IP proposes using the ingestion server’s REINDEX task to create the index and the
ElasticsearchPythonHook
to do the remaining steps, but since those steps are all possible to do very easily using the ingestion server, and we’re already using the ingestion server to begin with, I decided to use the ingestion server for everything for ease/speed of implementation. This can of course be iterated on if there’s call to do so.I changed the behavior and params of this DAG from my original implementation, after re-reviewing the project proposal and IPs. **Expand this section** to see the original description if you're interested.
This DAG has the following conf options:
media_type
: media type to be used.target_alias_override
: By default, the DAG will apply the<media_type>-full
alias to the newly created index, but optionally you can supply a different value here. For example on a DagRun withmedia_type
set to 'image', you could settarget_alias_override
to 'image' as well in order to point the main media alias to your new index.delete_old_index
: By default, False. When enabled, the DAG will delete the old index that was previously pointed to by the target alias (if applicable -- if no such index exists, it skips this step).If the
staging_database_restore
DAG is running when the DAG starts, it will fail immediately. Conversely, if this DAG is running whenstaging_database_restore
starts, that DAG will wait on this one to complete.This PR also updates the ingestion server to respect the
DATA_REFRESH_LIMIT
in thereindex
task as well as the fullingest_upstream
. (TL;DR: this is an env variable that currently can be set to limit the number of records that are copied into the API table during a data refresh. In this PR, I've updated it so the same limit is respected when building an index.)It was also necessary to update the ingestion server to consider a task 'active' if it has active workers. This is necessary to fix a bug where an ingestion server task is considered to be in the "errored" state by the TaskStatus, when it schedules some indexer workers and then completes (because in this state, the task is no longer alive but progress has not yet reached 100%). By checking whether there are active workers associated with the task id, we can correctly determine whether the task is actually in an errored state.
Testing Instructions
Run
just up
on this branch. For testing, you’ll need to access your local ingestion server via Elasticvue. I used audio for tests but you could test some or all of these with image as well.Test that it connects to the staging ingestion server
Locally we only have one ingestion server, but you can test that it is using the correct connection by first setting the
AIRFLOW_CONN_STAGING_DATA_REFRESH
env variable in yourcatalog/.env
to something nonsense and verifying that therecreate_full_audio_staging_index
fails locally. Then correctly set the variable tohttp://ingestion_server:8001
for subsequent tests.Test normal flow
Trigger the DAG locally and do not enable any of the options. It should run successfully, and default to
audio
. In Elasticvue, you should see a new index with a name likeaudio-full-20231019t220746
(the timestamp will differ) with theaudio-full
alias. I will refer to this as index A.Trigger the DAG with default options a second time. Now in Elasticvue you should see another new index, with a more recent timestamp, which I will call index B. Index B should have the
audio-full
alias; index A should no longer have the alias, but should not have been deleted.Test pointing the media alias
First, look at Elasticvue and note the name of the index that currently has the ‘audio’ alias. I will call this Index C. Then trigger the DAG again, but set
target_alias_override
to 'audio'. Now in Elasticvue you should see:audio-full
aliasaudio
aliasaudio
aliasTest pointing the media alias and deleting the old one
Trigger the DAG again with params:
In Elasticvue you should see:
audio-full
aliasaudio
aliasTest
delete_old_index
is skipped if no index previously had the target_aliasTrigger the DAG again, with this conf:
This should create a new index with a target_alias that does not currently exist, but we tell it to
delete_old_index
anyway. You should see that thetrigger_delete_index
step is skipped, and the indices look like:audio
alias and not deletedaudio-full
aliasTest creating an index with a limit
In your
ingestion_server/.env
, setDATA_REFRESH_LIMIT=1000
. Now run the DAG again and inspect Elasticvue. You should see that the new index has only 1000 documents (while the others will have 5000, assuming you're working off clean sample data).I personally also tested that the limit still works with the data refresh by running an audio data refresh and verifying that there were now only 1000 audio records in my local API.
Test concurrency prevention with
staging_database_restore
To test this, trigger both DAGs in the shell:
Since
staging_database_restore
started first, you should seerecreate_full_staging_index
fail immediately. (Note that the staging_database_restore DAG will pass the first couple of tasks, but most of the others will fail in the local env. That's fine for this test.)Now try the other direction:
This time, you should see the
staging_database_restore
'swait_for_recreate_full_staging_index
task go up for reschedule and then pass once therecreate_full_staging_index
DAG finishes. (SetDATA_REFRESH_POKE_INTERVAL=5
in your catalog/.env if it's taking a long time.)Other DAGs
I also ran the data refresh and popularity refresh for audio to ensure they were unaffected.
Deployment plan
In addition to merging this PR, we also need to add the staging ingestion server connection to production Airflow using an Airflow variable. Before using the DAG we'll probably want to add an official DNS name to the staging data refresh server.
To actually add the audio index in staging, we'll:
DATA_REFRESH_LIMIT
in staging to limit the number of records to be reindexed (There are currently2,533,832
records in theaudio
table on staging.)recreate_full_audio_staging_index
DAG in production Airflow withpoint_media_alias
enabledChecklist
Update index.md
).main
) or a parent feature branch.Developer Certificate of Origin
Developer Certificate of Origin