Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix/unique-temp-filenames #225

Merged
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3183b17
Add transcription range fields to database and ingestion models, add …
chrisjkhan Nov 8, 2022
81bfaf5
Add tests for edge case transcription ranges
chrisjkhan Nov 9, 2022
5a525cc
Fix end_time=start_time typo, update tests to be more concise, add ff…
chrisjkhan Nov 12, 2022
06791d7
Updated transcription range to video range, updated video handling to…
chrisjkhan Nov 16, 2022
88d619f
Update session hash to reflect trimmed video
chrisjkhan Nov 18, 2022
217deef
Bypass hash task
chrisjkhan Nov 18, 2022
78c2019
Remove unnecessary logging, duration validation comments, elif typo f…
chrisjkhan Nov 22, 2022
f6955ac
Reverted function parameter doc for split audio
chrisjkhan Nov 28, 2022
46aff14
Improved documentation for video_start_time in ingestion_models
chrisjkhan Dec 6, 2022
a3a0861
Use content hash to name videos and prevent collisions across sessions
chrisjkhan Dec 28, 2022
ba7942d
Merge branch 'main' into bugfix/unique-temp-filenames
chrisjkhan Dec 28, 2022
ae6b28a
Use pathlib functions from 3.8, add type annotations for mock function
chrisjkhan Dec 28, 2022
b73a2e4
Add return type annotations for mock function
chrisjkhan Dec 28, 2022
6e5d707
Lint updates
chrisjkhan Dec 28, 2022
0f3f223
Move file renaming to earliest point, return unique names from file c…
chrisjkhan Dec 28, 2022
1d53f1c
UUID for original file resource copy task so to prevent collisions ea…
chrisjkhan Dec 28, 2022
dae4ad7
Minor comment change to force CI reprocessing
chrisjkhan Dec 28, 2022
0d81208
Stop renaming resource within task
chrisjkhan Dec 29, 2022
7436255
Lint
chrisjkhan Dec 29, 2022
6b31f4b
Flag for adding source suffix in resource copy
chrisjkhan Dec 30, 2022
21fe9c5
Log file status after copy for debugging
chrisjkhan Dec 31, 2022
3746b2a
Logging to debug file copy
chrisjkhan Dec 31, 2022
606a6fc
Logging to debug file copy
chrisjkhan Dec 31, 2022
01595c3
Logging to debug file copy
chrisjkhan Dec 31, 2022
ce9cf5e
Logging to debug file copy
chrisjkhan Dec 31, 2022
4cdb0ab
Remove rename
chrisjkhan Dec 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions cdp_backend/pipeline/event_gather_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from operator import attrgetter
from pathlib import Path
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Set, Tuple, Union
from uuid import uuid4

from aiohttp.client_exceptions import ClientResponseError
from fireo.fields.errors import FieldValidationFailed, InvalidFieldType, RequiredField
Expand Down Expand Up @@ -127,8 +128,13 @@ def create_event_gather_flow(
for event in events:
session_processing_results: List[SessionProcessingResult] = []
for session in event.sessions:
# Download video to local copy
resource_copy_filepath = resource_copy_task(uri=session.video_uri)
# Download video to local copy making
# copy unique in case of shared session video
resource_copy_filepath = resource_copy_task(
uri=session.video_uri,
dst=f"{str(uuid4())}_temp",
copy_suffix=True,
)

# Handle video conversion or non-secure resource
# hosting
Expand Down Expand Up @@ -229,7 +235,7 @@ def create_event_gather_flow(


@task(max_retries=3, retry_delay=timedelta(seconds=120))
def resource_copy_task(uri: str) -> str:
def resource_copy_task(uri: str, dst: str = None, copy_suffix: bool = None) -> str:
"""
Copy a file to a temporary location for processing.

Expand All @@ -250,6 +256,8 @@ def resource_copy_task(uri: str) -> str:
"""
return file_utils.resource_copy(
uri=uri,
dst=dst,
copy_suffix=copy_suffix,
overwrite=True,
)

Expand Down
38 changes: 22 additions & 16 deletions cdp_backend/tests/pipeline/test_event_gather_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pathlib import Path
from typing import List, Optional
from unittest import mock
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest
from prefect import Flow
Expand Down Expand Up @@ -577,6 +577,10 @@ def test_store_event_processing_results(
EXISTING_REMOTE_M3U8_MINIMAL_EVENT.sessions[0].video_uri = EXAMPLE_M3U8_PLAYLIST_URI


def path_rename(self: Path, newPath: Path) -> Path:
return newPath


@mock.patch(f"{PIPELINE_PATH}.fs_functions.upload_file")
@mock.patch(f"{PIPELINE_PATH}.fs_functions.get_open_url_for_gcs_file")
@mock.patch(f"{PIPELINE_PATH}.fs_functions.remove_local_file")
Expand Down Expand Up @@ -633,20 +637,22 @@ def test_convert_video_and_handle_host(
mock_convert_video_to_mp4.return_value = expected_filepath
mock_hash_file_contents.return_value = "abc123"

(
mp4_filepath,
session_video_hosted_url,
session_content_hash,
) = pipeline.convert_video_and_handle_host.run(
video_filepath=video_filepath,
session=session,
credentials_file="fake/credentials.json",
bucket="doesnt://matter",
)
with patch.object(Path, "rename", path_rename):

(
mp4_filepath,
session_video_hosted_url,
session_content_hash,
) = pipeline.convert_video_and_handle_host.run(
video_filepath=video_filepath,
session=session,
credentials_file="fake/credentials.json",
bucket="doesnt://matter",
)

# Make sure mp4 files don't go through conversion
if Path(video_filepath).suffix == ".mp4":
assert not mock_convert_video_to_mp4.called
# Make sure mp4 files don't go through conversion
if Path(video_filepath).suffix == ".mp4":
assert not mock_convert_video_to_mp4.called

assert mp4_filepath == expected_filepath
assert session_video_hosted_url == expected_hosted_video_url
assert mp4_filepath == str(Path(video_filepath).with_suffix(".mp4"))
assert session_video_hosted_url == expected_hosted_video_url
52 changes: 52 additions & 0 deletions cdp_backend/tests/utils/test_file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,58 @@
#############################################################################


@pytest.mark.parametrize(
"path, stem, expected_result",
[
(Path("file.ext"), "new", "new.ext"),
],
)
def test_with_stem(path: Path, stem: str, expected_result: str) -> None:
new_path = file_utils.with_stem(path, stem)
assert str(new_path) == expected_result


@pytest.mark.parametrize(
"path, addition, expected_result",
[
(Path("file.ext"), "-new", "file-new.ext"),
],
)
def test_append_to_stem(path: Path, addition: str, expected_result: str) -> None:
new_path = file_utils.append_to_stem(path, addition)
assert str(new_path) == expected_result


@pytest.mark.parametrize(
"path, stem, expected_result",
[
(Path("file.ext"), "new", "new.ext"),
],
)
def test_rename_with_stem(path: Path, stem: str, expected_result: str) -> None:
file = open(path, "w")
file.close()
new_path = file_utils.rename_with_stem(path, stem)
assert str(new_path) == expected_result
assert new_path.exists()
os.remove(new_path)


@pytest.mark.parametrize(
"path, addition, expected_result",
[
(Path("file.ext"), "-new", "file-new.ext"),
],
)
def test_rename_append_to_stem(path: Path, addition: str, expected_result: str) -> None:
chrisjkhan marked this conversation as resolved.
Show resolved Hide resolved
file = open(path, "w")
file.close()
new_path = file_utils.rename_append_to_stem(path, addition)
assert str(new_path) == expected_result
assert new_path.exists()
os.remove(new_path)


@pytest.mark.parametrize(
"uri, expected_result",
[
Expand Down
90 changes: 85 additions & 5 deletions cdp_backend/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,82 @@
MAX_THUMBNAIL_WIDTH = 960


def with_stem(path: Path, stem: str) -> Path:
"""
Create a path with a new stem

Parameters
----------
path: Path
The path to alter
stem: str
The string to be the new stem of the path

Returns
-------
path: Path
The new path with the replaced stem
"""
return path.with_name(f"{stem}{path.suffix}")


def append_to_stem(path: Path, addition: str) -> Path:
"""
Rename a file with a string appended to the path stem

Parameters
----------
path: Path
The path to alter
addition: str
The string to be appended to the path stem

Returns
-------
path: Path
The new path with the stem addition
"""
return with_stem(path, f"{path.stem}{addition}")


def rename_with_stem(path: Path, stem: str) -> Path:
"""
Rename a file with a string appended to the path stem

Parameters
----------
path: Path
The path to be renamed
stem: str
The string to become the new stem

Returns
-------
path: Path
The new path of the renamed file
"""
return path.rename(with_stem(path, stem))


def rename_append_to_stem(path: Path, addition: str) -> Path:
"""
Rename a file with a string appended to the path stem

Parameters
----------
path: Path
The path to be renamed
addition: str
The string to be appended to the path stem

Returns
-------
path: Path
The new path of the renamed file
"""
return path.rename(append_to_stem(path, addition))


def get_media_type(uri: str) -> Optional[str]:
"""
Get the IANA media type for the provided URI.
Expand Down Expand Up @@ -69,6 +145,7 @@ def get_media_type(uri: str) -> Optional[str]:
def resource_copy(
uri: str,
dst: Optional[Union[str, Path]] = None,
copy_suffix: Optional[bool] = False,
overwrite: bool = False,
) -> str:
"""
Expand All @@ -90,6 +167,7 @@ def resource_copy(
saved_path: str
The path of where the resource ended up getting copied to.
"""
uri_suffix = Path(uri.split("/")[-1].split("?")[0].split("#")[0]).suffix
if dst is None:
dst = uri.split("/")[-1]

Expand All @@ -103,10 +181,13 @@ def resource_copy(
# Split by the last "/"
dst = dst / uri.split("/")[-1]

if copy_suffix:
dst = dst.with_suffix(uri_suffix)

# Ensure filename is less than 255 chars
# Otherwise this can raise an OSError for too long of a filename
if len(dst.name) > 255:
dst = Path(str(dst)[:255])
dst = with_stem(dst, dst.stem[: (255 - len(dst.suffix))])

# Ensure dest isn't a file
if dst.is_file() and not overwrite:
Expand Down Expand Up @@ -520,7 +601,6 @@ def convert_video_to_mp4(
The end time to trim the video in HH:MM:SS.
output_path: Path
The output path to place the clip at.
Must include a suffix to use for the reformatting.

Returns
-------
Expand Down Expand Up @@ -605,7 +685,7 @@ def clip_and_reformat_video(
video_filepath: Path,
start_time: Optional[str],
end_time: Optional[str],
output_path: Path = Path("clipped.mp4"),
output_path: Path = None,
output_format: str = "mp4",
) -> Path:
"""
Expand All @@ -621,8 +701,6 @@ def clip_and_reformat_video(
The end time of the clip in HH:MM:SS.
output_path: Path
The output path to place the clip at.
Must include a suffix to use for the reformatting.
Default: "clipped.mp4"
output_format: str
The output format.
Default: "mp4"
Expand All @@ -634,6 +712,8 @@ def clip_and_reformat_video(
"""
import ffmpeg

output_path = output_path or rename_append_to_stem(video_filepath, "_clipped")
chrisjkhan marked this conversation as resolved.
Show resolved Hide resolved

try:
ffmpeg_stdout, ffmpeg_stderr = (
ffmpeg.input(
Expand Down