Skip to content

Commit

Permalink
bugfix/host-videos-under-more-scenarios (#138)
Browse files Browse the repository at this point in the history
* Add option to clean DB to not clean index

* Host the provided video under more scenarios

* Use specific exception for resource exists SSL

* Fixes for GS URIs / hosting videos ourselves

* Remove param from process special event bin as it's not default checked

* Normalize fs targets for images

* Convert and configure host now always returns the host URL

* Fix bugs and add tests

* Remove non-needed line updated session video uri
  • Loading branch information
Jackson Maxfield Brown authored Dec 1, 2021
1 parent 5836ee3 commit d7328a9
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 67 deletions.
20 changes: 18 additions & 2 deletions cdp_backend/bin/clean_cdp_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import fireo

from cdp_backend.database import DATABASE_MODELS
from cdp_backend.database import models as db_models

###############################################################################

Expand All @@ -36,16 +37,28 @@ def __parse(self) -> None:
type=Path,
help="Path to Google service account JSON key.",
)
p.add_argument(
"--clean-index",
action="store_true",
dest="clean_index",
)
p.parse_args(namespace=self)


###############################################################################


def _clean_cdp_database(google_creds_path: Path) -> None:
def _clean_cdp_database(google_creds_path: Path, clean_index: bool = False) -> None:
# Connect to database
fireo.connection(from_file=google_creds_path)

# Remove indexed event gram if we don't want to clean the index
if not clean_index:
for index_model in [
db_models.IndexedEventGram,
]:
DATABASE_MODELS.remove(index_model)

# Iter through database models and delete the whole collection
for model in DATABASE_MODELS:
log.info(f"Cleaning collection: {model.collection_name}")
Expand All @@ -57,7 +70,10 @@ def _clean_cdp_database(google_creds_path: Path) -> None:
def main() -> None:
try:
args = Args()
_clean_cdp_database(google_creds_path=args.google_credentials_file)
_clean_cdp_database(
google_creds_path=args.google_credentials_file,
clean_index=args.clean_index,
)
except Exception as e:
log.error("=============================================")
log.error("\n\n" + traceback.format_exc())
Expand Down
3 changes: 2 additions & 1 deletion cdp_backend/bin/process_special_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ def main() -> None:
# Create event gather pipeline flow
log.info("Beginning processing...")
flow = pipeline.create_event_gather_flow(
config=config, prefetched_events=[ingestion_model], from_local=True
config=config,
prefetched_events=[ingestion_model],
)

# Run flow
Expand Down
3 changes: 2 additions & 1 deletion cdp_backend/database/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ def create_event(

def create_session(
session: ingestion_models.Session,
session_video_hosted_url: str,
event_ref: db_models.Event,
credentials_file: Optional[str] = None,
) -> db_models.Session:
Expand All @@ -217,7 +218,7 @@ def create_session(
# Required fields
db_session.event_ref = event_ref
db_session.session_datetime = session.session_datetime
db_session.video_uri = session.video_uri
db_session.video_uri = session_video_hosted_url
db_session.session_index = session.session_index

# Optional fields
Expand Down
48 changes: 28 additions & 20 deletions cdp_backend/database/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import requests
from fireo.models import Model
from fsspec.implementations.local import LocalFileSystem
from fsspec.core import url_to_fs
from gcsfs import GCSFileSystem

from ..utils.constants_utils import get_all_class_attr_values
Expand Down Expand Up @@ -118,34 +118,42 @@ def resource_exists(uri: Optional[str], **kwargs: str) -> bool:
return True

if uri.startswith("gs://") or uri.startswith("https://storage.googleapis"):
if kwargs.get("google_credentials_file"):
fs = GCSFileSystem(token=str(kwargs.get("google_credentials_file")))

# Convert to gsutil form if necessary
if uri.startswith("https://storage.googleapis"):
uri = convert_gcs_json_url_to_gsutil_form(uri)
# Convert to gsutil form if necessary
if uri.startswith("https://storage.googleapis"):
uri = convert_gcs_json_url_to_gsutil_form(uri)

# If uri is not convertible to gsutil form we can't confirm
if uri == "":
return True
# If uri is not convertible to gsutil form we can't confirm
if uri == "":
return False

if kwargs.get("google_credentials_file"):
fs = GCSFileSystem(token=str(kwargs.get("google_credentials_file", "anon")))
return fs.exists(uri)

# Can't check GCS resources without creds file
else:
return True
try:
anon_fs = GCSFileSystem(token="anon")
return anon_fs.exists(uri)
except Exception:
return False

# Is HTTP remote resource
elif uri.startswith("http"):
# Use HEAD request to check if remote resource exists
r = requests.head(uri)

return r.status_code == requests.codes.ok

# Check local filesystem
else:
fs = LocalFileSystem()
return fs.exists(uri)
try:
# Use HEAD request to check if remote resource exists
r = requests.head(uri)

return r.status_code == requests.codes.ok
except requests.exceptions.SSLError:
return False

# Get any filesystem and try
try:
fs, path = url_to_fs(uri)
return fs.exists(path)
except Exception:
return False


def create_constant_value_validator(
Expand Down
22 changes: 22 additions & 0 deletions cdp_backend/file_store/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,28 @@ def upload_file(
return save_url


def get_open_url_for_gcs_file(credentials_file: str, uri: str) -> str:
"""
Simple wrapper around fsspec.FileSystem.url function for creating a connection
to the filesystem then getting the hosted / web accessible URL to the file.
Parameters
----------
credentials_file: str
The path to the Google Service Account credentials JSON file used
to initialize the file store connection.
uri: str
The URI to the file already stored to get a web accessible URL for.
Returns
-------
url: str
The web accessible URL for the file.
"""
fs = initialize_gcs_file_system(credentials_file=credentials_file)
return str(fs.url(uri))


def remove_local_file(filepath: Union[str, Path]) -> None:
"""
Deletes a file from the local file system.
Expand Down
Loading

0 comments on commit d7328a9

Please sign in to comment.