-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add popularity refresh DAGs #2592
Conversation
if media_type == AUDIO: | ||
table_name = TABLE_NAMES[AUDIO] | ||
standardized_popularity_func = STANDARDIZED_AUDIO_POPULARITY_FUNCTION |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Outside of the scope of this PR, maybe we could have some kind of MEDIA_TYPE_CONFIG
dictionary in the future? Then we could only pass the media type to format_update_standardized_popularity_query
and retrieve all the db columns, table names and so on from the config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, jinx with my own comments 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re this and also this comment -- 100% yes :) In fact I started doing that in this PR but removed it because it created a much larger changeset. I kept to the current convention for this work, but I'll create an issue for refactoring this separately (unless @AetherUnbound do you think this should be updated in this PR?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right, cause it's in sql.py
there could be a lot more changes present. Yes, I think a separate issue is good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THIS IS SO COOL. I had the pleasure of forgetting to enable the batched_update
DAG in the Airflow UI and got to see all of the refresh_popularity
mapped instances be marked as deferred
until I enabled it.
Works as described, and I don't see any suggestions for the code itself, so LGTM! Excited to use this and tune the timing intervals after we observe it in production.
Woah interesting! Will we need to change the deployment steps in production to start a triggerer there as well? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fantastic! I'm so happy to see all of the pieces coming together here 😄 I was able to run the testing instructions locally and everything ran as expected. I have a few questions and notes, and want to mention again about the airflow triggerer
as we'll likely need to make an adjustment for deployments prior to kicking this off.
Additionally, would you be willing to add some tests for the get_providers_update_confs
function?
if media_type == AUDIO: | ||
table_name = TABLE_NAMES[AUDIO] | ||
standardized_popularity_func = STANDARDIZED_AUDIO_POPULARITY_FUNCTION |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, jinx with my own comments 😄
{ | ||
# Uniquely identify the query | ||
"query_id": ( | ||
f"{provider}_popularity_refresh_{last_updated_time.strftime('%Y%m%d')}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think this is fine enough resolution for this DAG? There wouldn't be a case where we'd run it twice in one day potentially?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query_id is used for building the temp table in the triggered batched_update
runs, which is dropped when the update is successful. If it's not successful, we should be managing that by clearing tasks or by manually triggering a new one with the resume_update
param to use the existing temp table.
This DAG has max_active_runs=1
, so I think the only way to get a collision would be to start a popularity refresh, fail it and at least one of the triggered batched_update
s, and then retry the popularity_refresh
DAG on the same day. In that case a failure seems fine, since that's not the intended way to handle errors for batched updates anyway.
max_active_runs=1, | ||
catchup=False, | ||
doc_md=__doc__, | ||
tags=["popularity_refresh"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it would make sense for this to also have the "data_refresh"
tag? Or should we leave it off because it's independent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the project is complete, the data and popularity refreshes should be totally decoupled. There's definitely a case to be made that they're related but I think it might be more confusing to use the data_refresh
tag here, in case it implies a relationship that no longer exists. But that's not a strongly held opinion :)
refresh_popularity_scores = TriggerDagRunOperator.partial( | ||
task_id="refresh_popularity", | ||
trigger_dag_id=BATCHED_UPDATE_DAG_ID, | ||
# Wait for all the dagruns to finish | ||
wait_for_completion=True, | ||
# Release the worker slot while waiting | ||
deferrable=True, | ||
poke_interval=poke_interval, | ||
retries=0, | ||
).expand( | ||
# Build the conf for each provider | ||
conf=get_providers_update_confs(POSTGRES_CONN_ID, popularity_refresh) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So cool that we're using dynamic tasks again!!
@@ -22,7 +23,9 @@ | |||
UPDATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID = "update_media_popularity_constants_view" | |||
|
|||
|
|||
def create_refresh_popularity_metrics_task_group(data_refresh: DataRefresh): | |||
def create_refresh_popularity_metrics_task_group( | |||
refresh_config: DataRefresh | PopularityRefresh, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool!!
Based on the medium urgency of this PR, the following reviewers are being gently reminded to review this PR: @krysal Excluding weekend1 days, this PR was ready for review 6 day(s) ago. PRs labelled with medium urgency are expected to be reviewed within 4 weekday(s)2. @stacimc, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings. Footnotes
|
I'll put this on draft while Staci catches up. |
ecf3489
to
ec6e16d
Compare
I think that's all feedback addressed. I also modified the DAG slightly to pull |
Actually, drafting again for a moment while I look into changes necessary to start the |
I think we'll need to update the catalog's |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! Worked as indicated. It seems that we will need a new service instance for the catalog. Looks in line with the project recommendations.
I will be taking another look at this today! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for looking into the triggerer stuff.
Fixes
Fixes #2089 by @stacimc
Description
Adds a popularity refresh DAG factory that generates
audio_popularity_refresh
andimage_popularity_refresh
DAGs. Each DAG:batched_update
DagRun for each of the providers of this media type that support popularity dataNone
schedule, so must be triggered manually, and has very generous timeouts. These will be revisited in follow-up PRs after testing is done in production.Testing Instructions
The
TriggerDagRunOperator
we're using to trigger thebatched_update
s is put indeferrable
mode, in order to free the worker slot while waiting for the batched updates to complete. This requires a Triggerer to be running.Locally, I did this by runningRunjust catalog/shell
and thenairflow triggerer
to start the Triggerer.just down -v && just up
to start the triggerer.You'll also want to make sure you have
DATA_REFRESH_POKE_INTERVAL=5
in yourcatalog/.env
so that you don't have to wait 30 minutes for the TriggerDagRunOperator to re-run.Then run
just init
to get sample data in your local environment. The sample data all have null popularity scores, which you can verify by runningjust catalog/pgcli
to open a pgcli session and then running:In both cases the result should be
5000
.Now, simply run the
audio_popularity_refresh
andimage_popularity_refresh
DAGs locally. Both should pass with no task failures. You should go tohttp://localhost:9090/dags/batched_update/grid
and verify that you see a separate DagRun for each provider that supports popularity data. You can inspect the logs for thenotify_updated_count
for each, and should see:Where 0 records were updated, this is because there are no records for that provider in our sample data. Your
query_id
s will differ in the date suffix.Because this PR also moves some popularity task factories around, also run both data refreshes to ensure they pass.
Checklist
Update index.md
).main
) or a parent feature branch.Developer Certificate of Origin
Developer Certificate of Origin