From ac78da6860c0990cbc2c3251d35713720725445c Mon Sep 17 00:00:00 2001 From: olaughter <51889566+olaughter@users.noreply.github.com> Date: Wed, 27 Nov 2024 15:35:49 +0000 Subject: [PATCH] Feature/pla 331 lift input directory creation into distinct step (#106) Replace execution data with db_state file. We already pass this in to the pipeline, like so: ``` { "db_state_file_key": "input/2024-11-25T09.41.24.302062/db_state.json" } ``` Meaning we can use this to skip the execution data when looking for the updates file. --- .github/workflows/integration-tests.yml | 7 +--- HOW_TO_UPDATE_TESTS.md | 18 ++------- .../pipeline_in/execution_data/123456.json | 3 -- .../2022-11-01T21.53.26.945831/db_state.json | 1 + .../pipeline_out/execution_data/123456.json | 3 -- .../2022-11-01T21.53.26.945831/db_state.json | 1 + .../setup_execution_data_file.py | 38 ------------------- src/navigator_data_ingest/main.py | 33 ++++------------ 8 files changed, 15 insertions(+), 89 deletions(-) delete mode 100644 integration_tests/data/pipeline_in/execution_data/123456.json create mode 100644 integration_tests/data/pipeline_in/input/2022-11-01T21.53.26.945831/db_state.json delete mode 100644 integration_tests/data/pipeline_out/execution_data/123456.json create mode 100644 integration_tests/data/pipeline_out/input/2022-11-01T21.53.26.945831/db_state.json delete mode 100644 integration_tests/setup_execution_data_file.py diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 9404a1e..63e5b74 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -18,9 +18,7 @@ env: DOCUMENT_NAME_KEY: document_id PARSER_INPUT_EXPECTED_DATA_FILE_PATH: integration_tests/data/parser_input_dir_post_run.json ARCHIVE_EXPECTED_DATA_FILE_PATH: integration_tests/data/docs_test_subset_archive_parser_input_expected.json - EXECUTION_DATA_PREFIX: execution_data - EXECUTION_DATA_FILE_NAME: 123456.json - EXECUTION_DATA_ID: 123456 + DB_STATE_FILE_KEY: input/2022-11-01T21.53.26.945831/db_state.json jobs: test: @@ -61,12 +59,11 @@ jobs: - name: Building s3 buckets and uploading test data run: | poetry run python -m integration_tests.setup_test_buckets ${{ env.INGEST_DOCUMENT_BUCKET }} ${{ env.INGEST_PIPELINE_BUCKET }} ${{ secrets.UNIT_TESTS_AWS_REGION }} - poetry run python -m integration_tests.setup_execution_data_file ${{ env.INGEST_PIPELINE_BUCKET }} ${{ env.EXECUTION_DATA_PREFIX }}/${{ env.EXECUTION_DATA_FILE_NAME }} ${{ env.TEST_DATA_UPLOAD_PATH }} aws s3 sync integration_tests/data/pipeline_in s3://${{ env.INGEST_PIPELINE_BUCKET }} - name: Running the Ingest Stage run: | - docker run -e AWS_ACCESS_KEY_ID=${{ secrets.UNIT_TESTS_AWS_ACCESS_KEY_ID }} -e AWS_SECRET_ACCESS_KEY=${{ secrets.UNIT_TESTS_AWS_SECRET_ACCESS_KEY }} -e API_HOST="" -e MACHINE_USER_EMAIL="" -e MACHINE_USER_PASSWORD="" navigator-data-ingest --pipeline-bucket ${{ env.INGEST_PIPELINE_BUCKET }} --document-bucket ${{ env.INGEST_DOCUMENT_BUCKET }} --updates-file-name ${{ env.UPDATES_FILE_NAME }} --output-prefix ${{ env.INGEST_OUTPUT_PREFIX }} --embeddings-input-prefix ${{ env.EMBEDDINGS_INPUT_PREFIX }} --indexer-input-prefix ${{ env.INDEXER_INPUT_PREFIX }} --execution-id ${{ env.EXECUTION_DATA_ID }} --execution-data-prefix ${{ env.EXECUTION_DATA_PREFIX }} + docker run -e AWS_ACCESS_KEY_ID=${{ secrets.UNIT_TESTS_AWS_ACCESS_KEY_ID }} -e AWS_SECRET_ACCESS_KEY=${{ secrets.UNIT_TESTS_AWS_SECRET_ACCESS_KEY }} -e API_HOST="" -e MACHINE_USER_EMAIL="" -e MACHINE_USER_PASSWORD="" navigator-data-ingest --pipeline-bucket ${{ env.INGEST_PIPELINE_BUCKET }} --document-bucket ${{ env.INGEST_DOCUMENT_BUCKET }} --updates-file-name ${{ env.UPDATES_FILE_NAME }} --output-prefix ${{ env.INGEST_OUTPUT_PREFIX }} --embeddings-input-prefix ${{ env.EMBEDDINGS_INPUT_PREFIX }} --indexer-input-prefix ${{ env.INDEXER_INPUT_PREFIX }} --db-state-file-key ${{ env.DB_STATE_FILE_KEY }} - name: Running Integration Tests on the Ingest Stage Output 🚀 run: | diff --git a/HOW_TO_UPDATE_TESTS.md b/HOW_TO_UPDATE_TESTS.md index 1806fd2..a16728b 100644 --- a/HOW_TO_UPDATE_TESTS.md +++ b/HOW_TO_UPDATE_TESTS.md @@ -50,18 +50,6 @@ Example: python -m integration_tests.setup_test_buckets docbucket123123123 pipbucket123123123 eu-west-1 ``` -### Create the execution data file locally from the nev variables - -```shell - python -m integration_tests.setup_execution_data_file ${pipeline_bucket} ${EXECUTION_DATA_PREFIX}/${EXECUTION_DATA_ID}.json ${TEST_DATA_UPLOAD_PATH} -``` - -Example: - -```shell - python -m integration_tests.setup_execution_data_file pipbucket123123123 execution_data/123456.json input/2022-11-01T21.53.26.945831/new_and_updated_documents.json -``` - ### Sync the test data to the s3 bucket ```shell @@ -79,13 +67,13 @@ Example: If you are trying to figure out what the variables are look in the env var section of the following file: .github/workflows/integration-tests.yml. Also note that the prefixes used must match the subdirectory names of the data/pipeline_in directory. ```shell - docker run -e AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} -e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} -e API_HOST="" -e MACHINE_USER_EMAIL="" -e MACHINE_USER_PASSWORD="" navigator-data-ingest-test --pipeline-bucket ${PIPELINE_BUCKET} --document-bucket ${DOCUMENT_BUCKET} --input-file ${TEST_DATA_UPLOAD_PATH} --output-prefix ${OUTPUT_PREFIX} --embeddings-input-prefix ${EMBEDDINGS_INPUT_PREFIX} --indexer-input-prefix ${INDEXER_INPUT_PREFIX} --execution-id ${EXECUTION_DATA_ID} --execution-data-prefix ${EXECUTION_DATA_PREFIX} + docker run -e AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} -e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} -e API_HOST="" -e MACHINE_USER_EMAIL="" -e MACHINE_USER_PASSWORD="" navigator-data-ingest-test --pipeline-bucket ${PIPELINE_BUCKET} --document-bucket ${DOCUMENT_BUCKET} --input-file ${TEST_DATA_UPLOAD_PATH} --output-prefix ${OUTPUT_PREFIX} --embeddings-input-prefix ${EMBEDDINGS_INPUT_PREFIX} --indexer-input-prefix ${INDEXER_INPUT_PREFIX} --db-state-file-key ${DB_STATE_FILE_KEY} ``` Example: ```shell - docker run -e AWS_ACCESS_KEY_ID=XXX -e AWS_SECRET_ACCESS_KEY=XXX -e API_HOST="" -e MACHINE_USER_EMAIL="" -e MACHINE_USER_PASSWORD="" navigator-data-ingest --pipeline-bucket pipbucket123123123 --document-bucket docbucket123123123 --updates-file-name new_and_updated_documents.json --output-prefix ingest_unit_test_parser_input --embeddings-input-prefix ingest_unit_test_embeddings_input --indexer-input-prefix ingest_unit_test_indexer_input --execution-id 123456 --execution-data-prefix execution_data + docker run -e AWS_ACCESS_KEY_ID=XXX -e AWS_SECRET_ACCESS_KEY=XXX -e API_HOST="" -e MACHINE_USER_EMAIL="" -e MACHINE_USER_PASSWORD="" navigator-data-ingest --pipeline-bucket pipbucket123123123 --document-bucket docbucket123123123 --updates-file-name new_and_updated_documents.json --output-prefix ingest_unit_test_parser_input --embeddings-input-prefix ingest_unit_test_embeddings_input --indexer-input-prefix ingest_unit_test_indexer_input --db-state-file-key input/2022-11-01T21.53.26.945831/db_state.json ``` ### Sync Down Output @@ -120,4 +108,4 @@ I would recommend committing any changes, running the reformat program and caref ```shell python -m format_jsons -``` \ No newline at end of file +``` diff --git a/integration_tests/data/pipeline_in/execution_data/123456.json b/integration_tests/data/pipeline_in/execution_data/123456.json deleted file mode 100644 index db2cd46..0000000 --- a/integration_tests/data/pipeline_in/execution_data/123456.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "input_dir_path": "s3://pipbucket123123123/input/2022-11-01T21.53.26.945831" -} \ No newline at end of file diff --git a/integration_tests/data/pipeline_in/input/2022-11-01T21.53.26.945831/db_state.json b/integration_tests/data/pipeline_in/input/2022-11-01T21.53.26.945831/db_state.json new file mode 100644 index 0000000..0967ef4 --- /dev/null +++ b/integration_tests/data/pipeline_in/input/2022-11-01T21.53.26.945831/db_state.json @@ -0,0 +1 @@ +{} diff --git a/integration_tests/data/pipeline_out/execution_data/123456.json b/integration_tests/data/pipeline_out/execution_data/123456.json deleted file mode 100644 index c9c7e23..0000000 --- a/integration_tests/data/pipeline_out/execution_data/123456.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "input_dir_path": "s3://pipbucket123123123/input/2022-11-01T21.53.26.945831" -} \ No newline at end of file diff --git a/integration_tests/data/pipeline_out/input/2022-11-01T21.53.26.945831/db_state.json b/integration_tests/data/pipeline_out/input/2022-11-01T21.53.26.945831/db_state.json new file mode 100644 index 0000000..0967ef4 --- /dev/null +++ b/integration_tests/data/pipeline_out/input/2022-11-01T21.53.26.945831/db_state.json @@ -0,0 +1 @@ +{} diff --git a/integration_tests/setup_execution_data_file.py b/integration_tests/setup_execution_data_file.py deleted file mode 100644 index 6b2a85b..0000000 --- a/integration_tests/setup_execution_data_file.py +++ /dev/null @@ -1,38 +0,0 @@ -import json -import sys -import os -from pathlib import Path - -from cloudpathlib import S3Path - - -def create_execution_data_file( - bucket_name: str, execution_data_key: str, new_and_updated_documents_file_name: str -) -> None: - """Create the test execution data file dynamically from environment variables in the test.""" - data = { - "input_dir_path": str( - S3Path(f"s3://{bucket_name}/{new_and_updated_documents_file_name}").parent - ), - } - - local_output_path = os.path.join( - os.getcwd(), f"integration_tests/data/pipeline_in/{execution_data_key}" - ) - - if Path(local_output_path).exists(): - os.remove(local_output_path) - - if not Path(local_output_path).parent.exists(): - os.mkdir(Path(local_output_path).parent) - - with open(local_output_path, "w") as f: - json.dump(data, f, indent=4) - - -if __name__ == "__main__": - create_execution_data_file( - bucket_name=sys.argv[1], - execution_data_key=sys.argv[2], - new_and_updated_documents_file_name=sys.argv[3], - ) diff --git a/src/navigator_data_ingest/main.py b/src/navigator_data_ingest/main.py index bf843fb..0463c9a 100644 --- a/src/navigator_data_ingest/main.py +++ b/src/navigator_data_ingest/main.py @@ -7,7 +7,6 @@ import click import json_logging from cloudpathlib import S3Path -from cpr_sdk.pipeline_general_models import ExecutionData from navigator_data_ingest.base.api_client import ( write_error_file, @@ -100,14 +99,9 @@ help="Number of workers downloading/uploading cached documents", ) @click.option( - "--execution-id", + "--db-state-file-key", required=True, - help="Unique identifier for the execution", -) -@click.option( - "--execution-data-prefix", - required=True, - help="The s3 prefix for the execution data file", + help="The s3 key for the file containing the db state.", ) def main( pipeline_bucket: str, @@ -118,8 +112,7 @@ def main( indexer_input_prefix: str, archive_prefix: str, worker_count: int, - execution_id: str, - execution_data_prefix: str, + db_state_file_key: str, ): """ Load documents from source JSON array file, updating details via API. @@ -134,23 +127,13 @@ def main( param indexer_input_prefix: S3 prefix containing the indexer input files. param archive_prefix: S3 prefix to which to archive documents. param worker_count: Number of workers downloading/uploading cached documents. - param execution_id: Unique identifier for the execution. - param execution_data_prefix: Prefix to apply to output files that contains the - execution data files. + param db_state_file_key: The s3 path for the file containing the db state """ - # Read the execution data file to get the unique execution timestamp s3 path - # (input/${timestamp}/) - execution_data_path = ( - S3Path(os.path.join("s3://", pipeline_bucket, execution_data_prefix)) - / f"{execution_id}.json" - ) - - input_dir_path = S3Path( - ExecutionData.model_validate_json( - execution_data_path.read_text() - ).input_dir_path - ) + # Get the key of folder containing the db state file + input_dir_path = ( + S3Path(os.path.join("s3://", pipeline_bucket, db_state_file_key)) + ).parent # Get the key of the updates file contain information on the new and updated # documents (input/${timestamp}/updates.json)