diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index aebb248..f9d834e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,10 +5,10 @@ repos: - id: black language_version: python3 - repo: https://github.com/pycqa/flake8 - rev: 4.0.1 + rev: 3.9.0 hooks: - id: flake8 - repo: https://github.com/timothycrosley/isort - rev: 5.9.3 + rev: 5.12.0 hooks: - id: isort \ No newline at end of file diff --git a/app/api/api_v1/routers/s3_checks.py b/app/api/api_v1/routers/s3_checks.py index 2992df9..ea86db3 100644 --- a/app/api/api_v1/routers/s3_checks.py +++ b/app/api/api_v1/routers/s3_checks.py @@ -1,20 +1,23 @@ from typing import List, Union from fastapi import APIRouter, Form, HTTPException, status -from fastapi.logger import logger from app.core.config import Settings -from app.models.s3_checks import s3FileCheck, s3FileCheckResponse -from app.utils.s3_checks import get_s3_resource +from app.utils.s3_checks import ( + check_file_metadata, + check_files_metadata, + get_s3_resource, +) s3_router = router = APIRouter() settings = Settings() -@router.post("/files/key/", response_model=s3FileCheck) +@router.post( + "/files/key/", +) async def check_if_file_exist_in_bucket( file_key: str = Form(...), - bucket: str = Form(...), s3_access_key: Union[str, None] = Form( None, description="S3 access key. If None then take the default one from env variables", @@ -64,29 +67,15 @@ async def check_if_file_exist_in_bucket( detail=f"Error connecting to S3: {e}", ) else: - is_exist = False - bucket = s3_resource.Bucket(bucket) - if bucket in s3_resource.buckets.all(): - objs = list(bucket.objects.filter(Prefix=file_key)) - logger.info(f"Checking if file exist in bucket: {file_key}") - if len(objs) == 1 and objs[0].key == file_key: - is_exist = True - return { - "file_key": file_key, - "is_exists": is_exist, - } - else: - logger.info(f"Bucket {bucket} does not exist") - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Bucket does not exist", - ) + file_metadata = await check_file_metadata(s3_resource, file_key) + return file_metadata -@router.post("/files", response_model=s3FileCheckResponse) +@router.post( + "/files", +) async def check_if_files_exist_in_bucket( file_keys: List[str] = Form(...), - bucket: str = Form(...), s3_access_key: Union[str, None] = Form( None, description="S3 access key. If None then take the default one from env variables", @@ -138,36 +127,8 @@ async def check_if_files_exist_in_bucket( file_keys = [key for key in file_keys[0].split(",")] file_keys_set = set(file_keys) - bucket = s3_resource.Bucket(bucket) - - if bucket in s3_resource.buckets.all(): - logger.debug("Bucket exists: {}".format(bucket)) - - folders = set([key.split("/")[0] for key in file_keys_set]) - all_s3_objects = [] - for folder in folders: - objects_in_folder = [ - {"key": obj.key, "size": obj.size} - for obj in bucket.objects.filter(Prefix=folder).all() - ] - all_s3_objects = all_s3_objects + objects_in_folder - - existing_keys = file_keys_set.intersection( - [obj["key"] for obj in all_s3_objects] - ) - non_existing_keys = set(file_keys).difference(existing_keys) - existing_file_details = [ - obj for obj in all_s3_objects if obj["key"] in existing_keys - ] - - return { - "exists": existing_file_details, - "non_exists": list(non_existing_keys), - } + files_metadata = await check_files_metadata( + session=s3_resource, file_keys=file_keys_set + ) - else: - logger.info(f"Bucket {bucket} does not exist") - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Bucket does not exist", - ) + return files_metadata diff --git a/app/utils/s3_checks.py b/app/utils/s3_checks.py index 8074a50..9ddd370 100644 --- a/app/utils/s3_checks.py +++ b/app/utils/s3_checks.py @@ -1,4 +1,10 @@ +import asyncio +from typing import Set +from urllib.parse import urlparse + import boto3 +from fastapi.encoders import jsonable_encoder +from fastapi.logger import logger from app.core.config import Settings @@ -19,3 +25,27 @@ def get_s3_resource( raise ValueError(f"Error connecting to S3: {e}") else: return s3_resource + + +async def check_file_metadata(session, file_key: str): + + # get bucket and file path according to the file key + file_parts = urlparse(file_key) + bucket, obj_key = file_parts.netloc, file_parts.path.lstrip("/") + obj = session.ObjectSummary(bucket, obj_key) + + # get the metadata + try: + metadata = {"key": file_key, "size": obj.size, "exist": True} + except Exception as e: + logger.error(f"Error getting metadata for {file_key} : {e}") + metadata = {"key": file_key, "size": None, "exist": False} + finally: + return jsonable_encoder(metadata) + + +async def check_files_metadata(session, file_keys: Set[str]): + files_metadata = await asyncio.gather( + *[check_file_metadata(session, file_key) for file_key in file_keys] + ) + return jsonable_encoder(files_metadata)