Skip to content

Commit

Permalink
Feature/pla 331 lift input directory creation into distinct step (#106)
Browse files Browse the repository at this point in the history
Replace execution data with db_state file. We already pass this in to
the pipeline, like so:
```
{
  "db_state_file_key": "input/2024-11-25T09.41.24.302062/db_state.json"
}
```
Meaning we can use this to skip the execution data when looking for the
updates file.
  • Loading branch information
olaughter authored Nov 27, 2024
1 parent 65b0e6a commit ac78da6
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 89 deletions.
7 changes: 2 additions & 5 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ env:
DOCUMENT_NAME_KEY: document_id
PARSER_INPUT_EXPECTED_DATA_FILE_PATH: integration_tests/data/parser_input_dir_post_run.json
ARCHIVE_EXPECTED_DATA_FILE_PATH: integration_tests/data/docs_test_subset_archive_parser_input_expected.json
EXECUTION_DATA_PREFIX: execution_data
EXECUTION_DATA_FILE_NAME: 123456.json
EXECUTION_DATA_ID: 123456
DB_STATE_FILE_KEY: input/2022-11-01T21.53.26.945831/db_state.json

jobs:
test:
Expand Down Expand Up @@ -61,12 +59,11 @@ jobs:
- name: Building s3 buckets and uploading test data
run: |
poetry run python -m integration_tests.setup_test_buckets ${{ env.INGEST_DOCUMENT_BUCKET }} ${{ env.INGEST_PIPELINE_BUCKET }} ${{ secrets.UNIT_TESTS_AWS_REGION }}
poetry run python -m integration_tests.setup_execution_data_file ${{ env.INGEST_PIPELINE_BUCKET }} ${{ env.EXECUTION_DATA_PREFIX }}/${{ env.EXECUTION_DATA_FILE_NAME }} ${{ env.TEST_DATA_UPLOAD_PATH }}
aws s3 sync integration_tests/data/pipeline_in s3://${{ env.INGEST_PIPELINE_BUCKET }}
- name: Running the Ingest Stage
run: |
docker run -e AWS_ACCESS_KEY_ID=${{ secrets.UNIT_TESTS_AWS_ACCESS_KEY_ID }} -e AWS_SECRET_ACCESS_KEY=${{ secrets.UNIT_TESTS_AWS_SECRET_ACCESS_KEY }} -e API_HOST="" -e MACHINE_USER_EMAIL="" -e MACHINE_USER_PASSWORD="" navigator-data-ingest --pipeline-bucket ${{ env.INGEST_PIPELINE_BUCKET }} --document-bucket ${{ env.INGEST_DOCUMENT_BUCKET }} --updates-file-name ${{ env.UPDATES_FILE_NAME }} --output-prefix ${{ env.INGEST_OUTPUT_PREFIX }} --embeddings-input-prefix ${{ env.EMBEDDINGS_INPUT_PREFIX }} --indexer-input-prefix ${{ env.INDEXER_INPUT_PREFIX }} --execution-id ${{ env.EXECUTION_DATA_ID }} --execution-data-prefix ${{ env.EXECUTION_DATA_PREFIX }}
docker run -e AWS_ACCESS_KEY_ID=${{ secrets.UNIT_TESTS_AWS_ACCESS_KEY_ID }} -e AWS_SECRET_ACCESS_KEY=${{ secrets.UNIT_TESTS_AWS_SECRET_ACCESS_KEY }} -e API_HOST="" -e MACHINE_USER_EMAIL="" -e MACHINE_USER_PASSWORD="" navigator-data-ingest --pipeline-bucket ${{ env.INGEST_PIPELINE_BUCKET }} --document-bucket ${{ env.INGEST_DOCUMENT_BUCKET }} --updates-file-name ${{ env.UPDATES_FILE_NAME }} --output-prefix ${{ env.INGEST_OUTPUT_PREFIX }} --embeddings-input-prefix ${{ env.EMBEDDINGS_INPUT_PREFIX }} --indexer-input-prefix ${{ env.INDEXER_INPUT_PREFIX }} --db-state-file-key ${{ env.DB_STATE_FILE_KEY }}
- name: Running Integration Tests on the Ingest Stage Output 🚀
run: |
Expand Down
18 changes: 3 additions & 15 deletions HOW_TO_UPDATE_TESTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,6 @@ Example:
python -m integration_tests.setup_test_buckets docbucket123123123 pipbucket123123123 eu-west-1
```

### Create the execution data file locally from the nev variables

```shell
python -m integration_tests.setup_execution_data_file ${pipeline_bucket} ${EXECUTION_DATA_PREFIX}/${EXECUTION_DATA_ID}.json ${TEST_DATA_UPLOAD_PATH}
```

Example:

```shell
python -m integration_tests.setup_execution_data_file pipbucket123123123 execution_data/123456.json input/2022-11-01T21.53.26.945831/new_and_updated_documents.json
```

### Sync the test data to the s3 bucket

```shell
Expand All @@ -79,13 +67,13 @@ Example:
If you are trying to figure out what the variables are look in the env var section of the following file: .github/workflows/integration-tests.yml. Also note that the prefixes used must match the subdirectory names of the data/pipeline_in directory.

```shell
docker run -e AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} -e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} -e API_HOST="" -e MACHINE_USER_EMAIL="" -e MACHINE_USER_PASSWORD="" navigator-data-ingest-test --pipeline-bucket ${PIPELINE_BUCKET} --document-bucket ${DOCUMENT_BUCKET} --input-file ${TEST_DATA_UPLOAD_PATH} --output-prefix ${OUTPUT_PREFIX} --embeddings-input-prefix ${EMBEDDINGS_INPUT_PREFIX} --indexer-input-prefix ${INDEXER_INPUT_PREFIX} --execution-id ${EXECUTION_DATA_ID} --execution-data-prefix ${EXECUTION_DATA_PREFIX}
docker run -e AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} -e AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} -e API_HOST="" -e MACHINE_USER_EMAIL="" -e MACHINE_USER_PASSWORD="" navigator-data-ingest-test --pipeline-bucket ${PIPELINE_BUCKET} --document-bucket ${DOCUMENT_BUCKET} --input-file ${TEST_DATA_UPLOAD_PATH} --output-prefix ${OUTPUT_PREFIX} --embeddings-input-prefix ${EMBEDDINGS_INPUT_PREFIX} --indexer-input-prefix ${INDEXER_INPUT_PREFIX} --db-state-file-key ${DB_STATE_FILE_KEY}
```

Example:

```shell
docker run -e AWS_ACCESS_KEY_ID=XXX -e AWS_SECRET_ACCESS_KEY=XXX -e API_HOST="" -e MACHINE_USER_EMAIL="" -e MACHINE_USER_PASSWORD="" navigator-data-ingest --pipeline-bucket pipbucket123123123 --document-bucket docbucket123123123 --updates-file-name new_and_updated_documents.json --output-prefix ingest_unit_test_parser_input --embeddings-input-prefix ingest_unit_test_embeddings_input --indexer-input-prefix ingest_unit_test_indexer_input --execution-id 123456 --execution-data-prefix execution_data
docker run -e AWS_ACCESS_KEY_ID=XXX -e AWS_SECRET_ACCESS_KEY=XXX -e API_HOST="" -e MACHINE_USER_EMAIL="" -e MACHINE_USER_PASSWORD="" navigator-data-ingest --pipeline-bucket pipbucket123123123 --document-bucket docbucket123123123 --updates-file-name new_and_updated_documents.json --output-prefix ingest_unit_test_parser_input --embeddings-input-prefix ingest_unit_test_embeddings_input --indexer-input-prefix ingest_unit_test_indexer_input --db-state-file-key input/2022-11-01T21.53.26.945831/db_state.json
```

### Sync Down Output
Expand Down Expand Up @@ -120,4 +108,4 @@ I would recommend committing any changes, running the reformat program and caref

```shell
python -m format_jsons
```
```
3 changes: 0 additions & 3 deletions integration_tests/data/pipeline_in/execution_data/123456.json

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
38 changes: 0 additions & 38 deletions integration_tests/setup_execution_data_file.py

This file was deleted.

33 changes: 8 additions & 25 deletions src/navigator_data_ingest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import click
import json_logging
from cloudpathlib import S3Path
from cpr_sdk.pipeline_general_models import ExecutionData

from navigator_data_ingest.base.api_client import (
write_error_file,
Expand Down Expand Up @@ -100,14 +99,9 @@
help="Number of workers downloading/uploading cached documents",
)
@click.option(
"--execution-id",
"--db-state-file-key",
required=True,
help="Unique identifier for the execution",
)
@click.option(
"--execution-data-prefix",
required=True,
help="The s3 prefix for the execution data file",
help="The s3 key for the file containing the db state.",
)
def main(
pipeline_bucket: str,
Expand All @@ -118,8 +112,7 @@ def main(
indexer_input_prefix: str,
archive_prefix: str,
worker_count: int,
execution_id: str,
execution_data_prefix: str,
db_state_file_key: str,
):
"""
Load documents from source JSON array file, updating details via API.
Expand All @@ -134,23 +127,13 @@ def main(
param indexer_input_prefix: S3 prefix containing the indexer input files.
param archive_prefix: S3 prefix to which to archive documents.
param worker_count: Number of workers downloading/uploading cached documents.
param execution_id: Unique identifier for the execution.
param execution_data_prefix: Prefix to apply to output files that contains the
execution data files.
param db_state_file_key: The s3 path for the file containing the db state
"""

# Read the execution data file to get the unique execution timestamp s3 path
# (input/${timestamp}/)
execution_data_path = (
S3Path(os.path.join("s3://", pipeline_bucket, execution_data_prefix))
/ f"{execution_id}.json"
)

input_dir_path = S3Path(
ExecutionData.model_validate_json(
execution_data_path.read_text()
).input_dir_path
)
# Get the key of folder containing the db state file
input_dir_path = (
S3Path(os.path.join("s3://", pipeline_bucket, db_state_file_key))
).parent

# Get the key of the updates file contain information on the new and updated
# documents (input/${timestamp}/updates.json)
Expand Down

0 comments on commit ac78da6

Please sign in to comment.