Skip to content

Commit

Permalink
Whole database dump (#208)
Browse files Browse the repository at this point in the history
* Initial commit of whole database dump draft

* Added  session token to boto client instantiation if in dev mode.

* Added botocore to project dependencies via Poetry.

* Added missing f-string to logging string.

* Added method to check whether AWS credentials result in valid connection

* Reorganised imports.

* Wrapped is_vespa_online in try catch.

* Added dev_mode argument to test S3Client instantiation.

* Removed unused imports and functions. Fixed line length violations.

* Added new endpoint for downloading whole database dump, generating it if not exists.

* Removed standalone whole database dump script.

* Added logging for deployment AWS env and update log level to info.

* Delete test_whole_database_dump.py

* Changed HTTP request type from POST to GET as we're retrieving cached file.

* Moved environment variables to config file.

* Used env vars imported from config file instead of fetching in the endpoint.

* Removed trailing newline

* Update poetry.lock
  • Loading branch information
katybaulch authored Jan 16, 2024
1 parent 9de373f commit 6702d26
Show file tree
Hide file tree
Showing 9 changed files with 445 additions and 187 deletions.
72 changes: 69 additions & 3 deletions app/api/api_v1/routers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,33 @@
"""
import json
import logging
import os
from datetime import datetime
from io import BytesIO
from typing import Mapping, Sequence

from cpr_data_access.search_adaptors import VespaSearchAdapter
from cpr_data_access.exceptions import QueryError
from fastapi import APIRouter, Depends, HTTPException, status, Request
from cpr_data_access.search_adaptors import VespaSearchAdapter
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session

from app.api.api_v1.schemas.search import SearchRequestBody, SearchResponse, SortField
from app.core.aws import S3Document, get_s3_client
from app.core.browse import BrowseArgs, browse_rds_families
from app.core.config import VESPA_SECRETS_LOCATION, VESPA_URL
from app.core.config import (
AWS_REGION,
DOC_CACHE_BUCKET,
INGEST_CYCLE_START,
PUBLIC_APP_URL,
VESPA_SECRETS_LOCATION,
VESPA_URL,
)
from app.core.download import (
convert_dump_to_csv,
generate_data_dump_as_csv,
get_whole_database_dump,
)
from app.core.lookups import get_countries_for_region, get_country_by_slug
from app.core.search import (
ENCODER,
Expand Down Expand Up @@ -159,6 +174,57 @@ def download_search_documents(
)


@search_router.get("/searches/download-all-data")
def download_all_search_documents(db=Depends(get_db)) -> StreamingResponse:
"""Download a CSV containing details of all the documents in the corpus."""
_LOGGER.info("Whole data download request")

if INGEST_CYCLE_START is None or PUBLIC_APP_URL is None or DOC_CACHE_BUCKET is None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Missing required environment variables",
)

aws_environment = "production" if "dev" not in PUBLIC_APP_URL else "staging"
data_dump_s3_key = f"navigator/{aws_environment}_data_dump_{INGEST_CYCLE_START}.csv"

s3_client = get_s3_client()
valid_credentials = s3_client.is_connected()

s3_document = S3Document(DOC_CACHE_BUCKET, AWS_REGION, data_dump_s3_key)
if not s3_client.document_exists(s3_document):
_LOGGER.info(f"Generating dump for ingest cycle w/c {INGEST_CYCLE_START}...")
df_as_csv = generate_data_dump_as_csv(PUBLIC_APP_URL, db)

if valid_credentials is False:
_LOGGER.error("Cannot connect to AWS.")
else:
response = s3_client.upload_fileobj(
df_as_csv, DOC_CACHE_BUCKET, data_dump_s3_key
)
if response is False:
_LOGGER.error("Failed to upload object to s3: %s", response)

if s3_client.document_exists(s3_document):
_LOGGER.debug("Finished uploading data dump to s3")

else:
_LOGGER.debug("File already exists in S3. Fetching...")

s3_file = s3_client.download_file(s3_document)

_LOGGER.debug(f"Downloading all documents as of '{INGEST_CYCLE_START}' as CSV")
timestamp = datetime.now()
filename = f"whole_database_dump-{timestamp}.csv"
return StreamingResponse(
content=BytesIO(s3_file.read()),
headers={
"Content-Type": "text/csv",
"Content-Disposition": f"attachment; filename={filename}",
},
)


def _get_browse_args_from_search_request_body(
search_body: SearchRequestBody,
) -> BrowseArgs:
Expand Down
62 changes: 46 additions & 16 deletions app/core/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

import boto3
import botocore.client
from botocore.exceptions import ClientError
from botocore.exceptions import ClientError, UnauthorizedSSOTokenError
from botocore.response import StreamingBody

logger = logging.getLogger(__name__)
from app.core.config import AWS_REGION

AWS_REGION = os.getenv("AWS_REGION", "eu-west-1")
logger = logging.getLogger(__name__)


class S3Document:
Expand Down Expand Up @@ -45,17 +45,46 @@ def from_url(cls, url: str) -> "S3Document":
class S3Client:
"""Helper class to connect to S3 and perform actions on buckets and documents."""

def __init__(self): # noqa: D107
self.client = boto3.client(
"s3",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
config=botocore.client.Config(
signature_version="s3v4",
region_name=AWS_REGION,
connect_timeout=10,
),
)
def __init__(self, dev_mode: bool): # noqa: D107
if dev_mode is True:
logger.info("***************** IN DEVELOPMENT MODE *****************")
self.client = boto3.client(
"s3",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
aws_session_token=os.getenv("AWS_SESSION_TOKEN"),
config=botocore.client.Config(
signature_version="s3v4",
region_name=AWS_REGION,
connect_timeout=10,
),
)
else:
logger.info("***************** IN DEPLOYMENT MODE *****************")
self.client = boto3.client(
"s3",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
config=botocore.client.Config(
signature_version="s3v4",
region_name=AWS_REGION,
connect_timeout=10,
),
)

def is_connected(self) -> bool:
"""
Check whether we are connected to AWS.
:return [bool]: Connection status
"""
sts = boto3.client("sts")

try:
sts.get_caller_identity()
return True
except UnauthorizedSSOTokenError:
return False

def upload_fileobj(
self,
Expand Down Expand Up @@ -213,7 +242,7 @@ def list_files(
next_continuation_token = response.get("NextContinuationToken", None)

except ClientError:
logger.exception("Request to list files in bucket '{bucket}' failed")
logger.exception(f"Request to list files in bucket '{bucket}' failed")
raise

def download_file(self, s3_document: S3Document) -> StreamingBody:
Expand Down Expand Up @@ -275,4 +304,5 @@ def document_exists(self, s3_document: S3Document) -> bool:

def get_s3_client():
"""Get s3 client for API."""
return S3Client()
dev_mode = bool(os.getenv("DEVELOPMENT_MODE", "False"))
return S3Client(dev_mode)
5 changes: 5 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,8 @@

# Shared search config
INDEX_ENCODER_CACHE_FOLDER: str = os.getenv("INDEX_ENCODER_CACHE_FOLDER", "/models")

# Whole database dump
INGEST_CYCLE_START = os.getenv("INGEST_CYCLE_START")
DOC_CACHE_BUCKET = os.getenv("DOCUMENT_CACHE_BUCKET")
AWS_REGION = os.getenv("AWS_REGION", "eu-west-1")
Loading

0 comments on commit 6702d26

Please sign in to comment.