Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Staging database restore DAG: base restore #2099

Merged
merged 24 commits into from
May 23, 2023

Conversation

AetherUnbound
Copy link
Collaborator

Fixes

This is a step towards #1989 but does not address it completely (steps 12+ from the implementation plan will be added in follow-up PRs).

Description

This PR adds the initial restore-from-snapshot steps of the staging database restore DAG. See the linked IP above for the full definition.

I recommend reviewing in the following order:

  • constants.py
  • utils.py
  • staging_database_restore.py
  • staging_database_restore_dag.py

Screenshot_2023-05-12_15-47-27

Since this doesn't have the last required steps for the DAG, we won't enable it once this is merged. I felt that splitting this up into several PRs was a better route to take though, otherwise the full DAG would have been huge!

Testing Instructions

Tests should pass locally. You can also set the SKIP_STAGING_DATABASE_RESTORE Airflow Variable to true and kick off the DAG to see that it should skip everything.

Warning
If you want to test the actual restore operations, you can do so by setting the following environment variables:
AWS_RDS_CONN_ID=aws_rds
AIRFLOW_CONN_AWS_RDS=aws://<access-key>:<access-secret>@?region_name=us-east-1

This will run the restore & delete operations, so be advised!

Checklist

  • My pull request has a descriptive title (not a vague title likeUpdate index.md).
  • My pull request targets the default branch of the repository (main) or a parent feature branch.
  • My commit messages follow best practices.
  • My code follows the established code style of the repository.
  • I added or updated tests for the changes I made (if applicable).
  • I added or updated documentation (if applicable).
  • I tried running the project locally and verified that there are no visible errors.
  • I ran the DAG documentation generator (if applicable).

Developer Certificate of Origin

Developer Certificate of Origin
Developer Certificate of Origin
Version 1.1

Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129

Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.


Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.

@AetherUnbound AetherUnbound requested a review from a team as a code owner May 13, 2023 00:37
@AetherUnbound AetherUnbound requested review from obulat and stacimc May 13, 2023 00:37
@github-actions github-actions bot added the 🧱 stack: catalog Related to the catalog and Airflow DAGs label May 13, 2023
@AetherUnbound AetherUnbound added 🟨 priority: medium Not blocking but should be addressed soon 🌟 goal: addition Addition of new feature 💻 aspect: code Concerns the software code in the repository labels May 15, 2023
Copy link
Collaborator

@stacimc stacimc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳 This looks great! No blocking feedback. Code lgtm and I did test locally with SKIP_STAGING_DATABASE_RESTORE. Splitting this into another PR with the next steps makes total sense to me as well.

I learned so much about TaskFlow reviewing this 😄 Very, very cool.

should_skip >> staging_details

restore_snapshot = restore_staging_from_snapshot(latest_snapshot, staging_details)
ensure_snapshot_ready >> restore_snapshot
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all works as is, but I'm curious why we didn't need to explicitly set either:

latest_snapshot >> ensure_snapshot_ready

or

staging_details >> restore_snapshot

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little sneaky, behind the scenes stuff from the TaskFlow API 😅 the restore_snapshot = restore_staging_from_snapshot(latest_snapshot, staging_details) line creates an inherent dependency relationship from latest_snapshot and staging_details to restore_snapshot, without us having to explicitly define that relationship with the >> operator. Same for ensure_snapshot_ready - it has latest_snapshot as one of its params, so Airflow knows implicitly it has to run that step first!

raise ValueError(f"No staging DB found for {constants.STAGING_IDENTIFIER}")
staging_db = instances[0]
# While it might be tempting to log this information, it contains sensitive
# values. Instead, we'll select only the information we need, then log that.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is filtering out sensitive values only important for logging? Ie, could we only restrict logging to REQUIRED_DB_INFO, but use everything to create the new db? Would that provide any extra resilience to configuration drift, or can we expect that REQUIRED_DB_INFO will never need to change?

Copy link
Collaborator Author

@AetherUnbound AetherUnbound May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are a few reasons why the current approach might be best:

  • Easy prevention of logging secrets accidentally
  • REQUIRED_DB_INFO isn't likely to change on the features that we care about at least
  • There are some values that are returned from the details API that have to have their shape changed before they can be fed into the create from snapshot API (see the lines below on subnet group name and VPC security group IDs). There are dozens of values returned from the former API, so trying to manage the appropriate transformations on all of them so they can be fed into the latter API seems unnecessary at this point IMO.



@task.short_circuit
def skip_restore(should_skip: bool = False) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this take should_skip as an argument? Will a value ever be passed this way?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log = logging.getLogger(__name__)


@task.short_circuit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool, TIL!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stumbled across this one by accident while looking for something else, couldn't help but use it!

)
)
if not should_continue:
notify_slack.function(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat!


@functools.wraps(func)
def wrapped(*args, **kwargs):
rds_hook = kwargs.pop("rds_hook", None) or RdsHook(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

if db_identifier not in constants.SAFE_TO_MUTATE:
raise ValueError(
f"The target function must be called with the staging database "
f"identifier ({constants.STAGING_IDENTIFIER}), not {db_identifier}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a little confusing, since the TEMP identifiers also work, right? Maybe The target function must be called with a non-production database identifier?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh shoot, that's a great point. This function went through a few iterations (initially it was only staging, then SAFE_TO_MUTATE came about). I'll update this!

staging_database_restore.slack, "send_message"
) as mock_send_message:
actual = staging_database_restore.skip_restore.function(should_skip)
assert actual == (not should_skip)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add tests that mock the SKIP_STAGING_DATABASE_RESTORE variable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think since that's a Variable.get with a simple or comparison between that and should_skip, that testing the conditions there is sufficient. This seemed easier than mocking the Variable.get step, but I can change it to that kind of test and remove the should_skip param if you think that's best!

rule = TriggerRule.NONE_FAILED
with DAG(dag_id="test_make_rename_task_group", start_date=datetime(1970, 1, 1)):
group = staging_database_restore.make_rename_task_group("dibble", "crim", rule)
assert group.group_id == "rename_dibble_to_crim"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄

Copy link
Collaborator

@stacimc stacimc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳 This looks great! No blocking feedback. Code lgtm and I did test locally with SKIP_STAGING_DATABASE_RESTORE. Splitting this into another PR with the next steps makes total sense to me as well.

I learned so much about TaskFlow reviewing this 😄 Very, very cool.

Copy link
Contributor

@obulat obulat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested this locally, and it skipped all of the steps :)
Oh, and got scared at first of all the errors when I set the variable value to True (capitalized as in Python). So everything worked as expected.

AetherUnbound and others added 2 commits May 23, 2023 15:34
@AetherUnbound AetherUnbound merged commit 988381d into main May 23, 2023
@AetherUnbound AetherUnbound deleted the feature/staging-database-recreation-dag branch May 23, 2023 23:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository 🌟 goal: addition Addition of new feature 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

3 participants