From 3183b179fbca10bdffc48c5ec2c51de3f518349d Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Mon, 7 Nov 2022 19:13:07 -0700 Subject: [PATCH 01/25] Add transcription range fields to database and ingestion models, add validator for time duration, add range filter to ffmpeg audio split, update tests --- cdp_backend/database/models.py | 4 +++ cdp_backend/database/validators.py | 28 +++++++++++++++++++ cdp_backend/pipeline/event_gather_pipeline.py | 10 +++++++ cdp_backend/pipeline/ingestion_models.py | 4 +++ cdp_backend/pipeline/mock_get_events.py | 17 ++++++++++- cdp_backend/tests/database/test_validators.py | 27 ++++++++++++++++++ .../pipeline/test_event_gather_pipeline.py | 10 ++++++- cdp_backend/tests/utils/test_file_utils.py | 13 +++++++++ cdp_backend/utils/file_utils.py | 12 +++++++- 9 files changed, 122 insertions(+), 3 deletions(-) diff --git a/cdp_backend/database/models.py b/cdp_backend/database/models.py index 611526e3..1e13a71e 100644 --- a/cdp_backend/database/models.py +++ b/cdp_backend/database/models.py @@ -494,6 +494,8 @@ class Session(Model): session_index = fields.NumberField(required=True) session_content_hash = fields.TextField(required=True) video_uri = fields.TextField(required=True, validator=validators.resource_exists) + transcription_start_time = fields.TextField(validators.time_duration_is_valid) + transcription_end_time = fields.TextField(validators.time_duration_is_valid) caption_uri = fields.TextField(validator=validators.resource_exists) external_source_id = fields.TextField() @@ -515,6 +517,8 @@ def Example(cls) -> Model: session.video_uri = ( "https://video.seattle.gov/media/council/brief_072219_2011957V.mp4" ) + session.transcription_start_time = "00:00:00" + session.transcription_end_time = "99:59:59" session.session_content_hash = ( "05bd857af7f70bf51b6aac1144046973bf3325c9101a554bc27dc9607dbbd8f5" ) diff --git a/cdp_backend/database/validators.py b/cdp_backend/database/validators.py index 8d927e2e..df4a7e2d 100644 --- a/cdp_backend/database/validators.py +++ b/cdp_backend/database/validators.py @@ -72,6 +72,34 @@ def router_string_is_valid(router_string: Optional[str]) -> bool: return False +def time_duration_is_valid(time_duration: Optional[str]) -> bool: + """ + Validate that the provided time duration string is acceptable to FFmpeg. + The validator is unnecessarily limited to HH:MM:SS. The spec is a little + more flexible. + + None is a valid option. + + Parameters + ---------- + time_duration: Optional[str] + The time duration to validate. + + Returns + ------- + status: bool + The validation status. + """ + if time_duration is None: + return True + + # HH:MM:SS + if re.match(r"^((((\d{1,2}:)?[0-5])?\d:)?[0-5])?\d$", time_duration): + return True + + return False + + def email_is_valid(email: Optional[str]) -> bool: """ Validate that a valid email was provided. diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index a508812e..b7c73978 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -152,6 +152,8 @@ def create_event_gather_flow( audio_uri = split_audio( session_content_hash=session_content_hash, tmp_video_filepath=tmp_video_filepath, + start_time=session.transcription_start_time, + end_time=session.transcription_end_time, bucket=config.validated_gcs_bucket_name, credentials_file=config.google_credentials_file, ) @@ -394,6 +396,8 @@ def convert_video_and_handle_host( def split_audio( session_content_hash: str, tmp_video_filepath: str, + start_time: str, + end_time: str, bucket: str, credentials_file: str, ) -> str: @@ -406,6 +410,10 @@ def split_audio( The unique identifier for the session. tmp_video_filepath: str The local path for video file to generate a hash for. + start_time: str + The start time of the clip in HH:MM:SS. + end_time: str + The end time of the clip in HH:MM:SS. bucket: str The bucket to store the transcript to. credentials_file: str @@ -433,6 +441,8 @@ def split_audio( tmp_audio_log_err_filepath, ) = file_utils.split_audio( video_read_path=tmp_video_filepath, + start_time=start_time, + end_time=start_time, audio_save_path=tmp_audio_filepath, overwrite=True, ) diff --git a/cdp_backend/pipeline/ingestion_models.py b/cdp_backend/pipeline/ingestion_models.py index e261a45f..e1662e77 100644 --- a/cdp_backend/pipeline/ingestion_models.py +++ b/cdp_backend/pipeline/ingestion_models.py @@ -139,6 +139,8 @@ class Session(IngestionModel, DataClassJsonMixin): session_datetime: datetime video_uri: str session_index: int + transcription_start_time: Optional[str] = None + transcription_end_time: Optional[str] = None caption_uri: Optional[str] = None external_source_id: Optional[str] = None @@ -263,6 +265,8 @@ class EventIngestionModel(IngestionModel, DataClassJsonMixin): video_uri=( "https://video.seattle.gov/media/council/council_113020_2022091V.mp4" ), + transcription_start_time=("00:00:00"), + transcription_end_time=("99:59:59"), caption_uri=( "https://www.seattlechannel.org/documents/seattlechannel/closedcaption/2020/council_113020_2022091.vtt" # noqa: E501 ), diff --git a/cdp_backend/pipeline/mock_get_events.py b/cdp_backend/pipeline/mock_get_events.py index 1364cff3..a5cdf872 100644 --- a/cdp_backend/pipeline/mock_get_events.py +++ b/cdp_backend/pipeline/mock_get_events.py @@ -54,20 +54,33 @@ ( "https://video.seattle.gov/media/council/council_010421_2022101V.mp4", "https://www.seattlechannel.org/documents/seattlechannel/closedcaption/2021/council_010421_2022101.vtt", # noqa + None, + None, ), ( "https://video.seattle.gov/media/council/council_113020_2022091V.mp4", "https://www.seattlechannel.org/documents/seattlechannel/closedcaption/2020/council_113020_2022091.vtt", # noqa + "1", + "25:25", ), ( "https://video.seattle.gov/media/council/council_112320_2022089V.mp4", "https://www.seattlechannel.org/documents/seattlechannel/closedcaption/2020/brief_112320_2012089.vtt", # noqa + None, + "2:58:14", ), ( "https://video.seattle.gov/media/council/council_110920_2022085V.mp4", "https://www.seattlechannel.org/documents/seattlechannel/closedcaption/2020/council_110920_2022085.vtt", # noqa + "1", + None, + ), + ( + "https://video.seattle.gov/media/council/council_101220_2022077V.mp4", + None, + None, + None, ), - ("https://video.seattle.gov/media/council/council_101220_2022077V.mp4", None), ] @@ -121,6 +134,8 @@ def _get_example_event() -> EventIngestionModel: session_datetime=datetime.utcnow() + (i * timedelta(hours=3)), session_index=i, video_uri=session[0], + transcription_start_time=session[2], + transcription_end_time=session[3], caption_uri=session[1], ) for i, session in enumerate(random.sample(SESSIONS, random.randint(1, 3))) diff --git a/cdp_backend/tests/database/test_validators.py b/cdp_backend/tests/database/test_validators.py index 6ba8554d..9adda431 100644 --- a/cdp_backend/tests/database/test_validators.py +++ b/cdp_backend/tests/database/test_validators.py @@ -48,6 +48,33 @@ def test_router_string_is_valid(router_string: str, expected_result: bool) -> No assert actual_result == expected_result +@pytest.mark.parametrize( + "time_duration, expected_result", + [ + (None, True), + ("1", True), + ("11", True), + ("1:11", True), + ("11:11", True), + ("1:11:11", True), + ("99:59:59", True), + ("111", False), + ("11:1", False), + ("111:11", False), + ("11:1:11", False), + ("11:11:1", False), + ("111:11:11", False), + ("60", False), + ("60:00", False), + ("1:60:00", False), + ("1:00:60", False), + ], +) +def test_time_duration_is_valid(time_duration: str, expected_result: bool) -> None: + actual_result = validators.time_duration_is_valid(time_duration) + assert actual_result == expected_result + + @pytest.mark.parametrize( "email, expected_result", [ diff --git a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py index acc37106..0487b347 100644 --- a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py @@ -68,15 +68,19 @@ def test_create_event_gather_flow(config: EventGatherPipelineConfig) -> None: @mock.patch(f"{PIPELINE_PATH}.fs_functions.get_file_uri") @mock.patch(f"{PIPELINE_PATH}.fs_functions.upload_file") @pytest.mark.parametrize( - "get_file_uri_value, audio_upload_file_return", + "get_file_uri_value, audio_upload_file_return, start_time, end_time", [ ( None, f"fake://{VIDEO_CONTENT_HASH}-audio.wav", + None, + None, ), ( f"fake://{VIDEO_CONTENT_HASH}-audio.wav", f"fake://{VIDEO_CONTENT_HASH}-audio.wav", + "1", + "2", ), ], ) @@ -85,6 +89,8 @@ def test_split_audio( mock_get_file_uri: MagicMock, get_file_uri_value: str, audio_upload_file_return: str, + start_time: str, + end_time: str, example_video: Path, ) -> None: mock_get_file_uri.return_value = get_file_uri_value @@ -93,6 +99,8 @@ def test_split_audio( audio_uri = pipeline.split_audio.run( session_content_hash=VIDEO_CONTENT_HASH, tmp_video_filepath=str(example_video), + start_time=start_time, + end_time=end_time, bucket="bucket", credentials_file="/fake/credentials/path", ) diff --git a/cdp_backend/tests/utils/test_file_utils.py b/cdp_backend/tests/utils/test_file_utils.py index fdbe760f..9217eb9e 100644 --- a/cdp_backend/tests/utils/test_file_utils.py +++ b/cdp_backend/tests/utils/test_file_utils.py @@ -95,9 +95,20 @@ def test_hash_file_contents(tmpdir) -> None: # type: ignore ), ], ) +@pytest.mark.parametrize( + "start_time, end_time", + [ + (None, None), + ("1", "10"), + (None, "10"), + ("1", None), + ], +) def test_split_audio( # type: ignore tmpdir, example_video: str, + start_time: str, + end_time: str, audio_save_path: str, ) -> None: # Append save name to tmpdir @@ -109,6 +120,8 @@ def test_split_audio( # type: ignore try: audio_file, stdout_log, stderr_log = file_utils.split_audio( video_read_path=example_video, + start_time=start_time, + end_time=end_time, audio_save_path=str(tmp_dir_audio_save_path), ) diff --git a/cdp_backend/utils/file_utils.py b/cdp_backend/utils/file_utils.py index 1dbb8f4c..dfff6d36 100644 --- a/cdp_backend/utils/file_utils.py +++ b/cdp_backend/utils/file_utils.py @@ -249,6 +249,8 @@ def vimeo_copy(uri: str, dst: Path, overwrite: bool = False) -> str: def split_audio( video_read_path: str, + start_time: str, + end_time: str, audio_save_path: str, overwrite: bool = False, ) -> Tuple[str, str, str]: @@ -259,6 +261,10 @@ def split_audio( ---------- video_read_path: str Path to the video to split the audio from. + start_time: str + The start time of the clip in HH:MM:SS. + end_time: str + The end time of the clip in HH:MM:SS. audio_save_path: str Path to where the audio should be stored. @@ -282,7 +288,11 @@ def split_audio( raise IsADirectoryError(resolved_audio_save_path) # Construct ffmpeg dag - stream = ffmpeg.input(resolved_video_read_path) + stream = ffmpeg.input( + resolved_video_read_path, + ss=start_time or "0", + to=end_time or "99:59:59", + ) stream = ffmpeg.output( stream, filename=resolved_audio_save_path, From 81bfaf5a26404c1e74b8efa8af9bfa05f4bb8df6 Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Wed, 9 Nov 2022 02:43:52 -0700 Subject: [PATCH 02/25] Add tests for edge case transcription ranges --- cdp_backend/tests/database/test_validators.py | 6 ++++++ .../tests/pipeline/test_event_gather_pipeline.py | 12 ++++++++++++ cdp_backend/tests/utils/test_file_utils.py | 2 ++ 3 files changed, 20 insertions(+) diff --git a/cdp_backend/tests/database/test_validators.py b/cdp_backend/tests/database/test_validators.py index 9adda431..26067b21 100644 --- a/cdp_backend/tests/database/test_validators.py +++ b/cdp_backend/tests/database/test_validators.py @@ -58,6 +58,12 @@ def test_router_string_is_valid(router_string: str, expected_result: bool) -> No ("11:11", True), ("1:11:11", True), ("99:59:59", True), + ("0", True), + ("00", True), + ("0:00", True), + ("00:00", True), + ("0:00:00", True), + ("00:00:00", True), ("111", False), ("11:1", False), ("111:11", False), diff --git a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py index 0487b347..f6c4c3bb 100644 --- a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py @@ -82,6 +82,18 @@ def test_create_event_gather_flow(config: EventGatherPipelineConfig) -> None: "1", "2", ), + ( + f"fake://{VIDEO_CONTENT_HASH}-audio.wav", + f"fake://{VIDEO_CONTENT_HASH}-audio.wav", + "1", + "0", + ), + ( + f"fake://{VIDEO_CONTENT_HASH}-audio.wav", + f"fake://{VIDEO_CONTENT_HASH}-audio.wav", + "0", + "0", + ), ], ) def test_split_audio( diff --git a/cdp_backend/tests/utils/test_file_utils.py b/cdp_backend/tests/utils/test_file_utils.py index 9217eb9e..944481c6 100644 --- a/cdp_backend/tests/utils/test_file_utils.py +++ b/cdp_backend/tests/utils/test_file_utils.py @@ -102,6 +102,8 @@ def test_hash_file_contents(tmpdir) -> None: # type: ignore ("1", "10"), (None, "10"), ("1", None), + ("0", "0"), + ("1", "0"), ], ) def test_split_audio( # type: ignore From 5a525cc0d99eb6df2aeeff62fa372a3565252827 Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Sat, 12 Nov 2022 09:44:40 -0700 Subject: [PATCH 03/25] Fix end_time=start_time typo, update tests to be more concise, add ffmpeg error logging --- cdp_backend/pipeline/event_gather_pipeline.py | 2 +- .../pipeline/test_event_gather_pipeline.py | 27 +++++++------------ cdp_backend/tests/utils/test_file_utils.py | 2 -- cdp_backend/utils/file_utils.py | 7 +++-- 4 files changed, 16 insertions(+), 22 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index b7c73978..3e0beafd 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -442,7 +442,7 @@ def split_audio( ) = file_utils.split_audio( video_read_path=tmp_video_filepath, start_time=start_time, - end_time=start_time, + end_time=end_time, audio_save_path=tmp_audio_filepath, overwrite=True, ) diff --git a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py index f6c4c3bb..bc274767 100644 --- a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py @@ -68,31 +68,24 @@ def test_create_event_gather_flow(config: EventGatherPipelineConfig) -> None: @mock.patch(f"{PIPELINE_PATH}.fs_functions.get_file_uri") @mock.patch(f"{PIPELINE_PATH}.fs_functions.upload_file") @pytest.mark.parametrize( - "get_file_uri_value, audio_upload_file_return, start_time, end_time", + "start_time, end_time", + [ + (None, None), + ("1", "2"), + pytest.param("1", "0", marks=pytest.mark.xfail), + pytest.param("0", "0", marks=pytest.mark.xfail), + ], +) +@pytest.mark.parametrize( + "get_file_uri_value, audio_upload_file_return", [ ( None, f"fake://{VIDEO_CONTENT_HASH}-audio.wav", - None, - None, - ), - ( - f"fake://{VIDEO_CONTENT_HASH}-audio.wav", - f"fake://{VIDEO_CONTENT_HASH}-audio.wav", - "1", - "2", - ), - ( - f"fake://{VIDEO_CONTENT_HASH}-audio.wav", - f"fake://{VIDEO_CONTENT_HASH}-audio.wav", - "1", - "0", ), ( f"fake://{VIDEO_CONTENT_HASH}-audio.wav", f"fake://{VIDEO_CONTENT_HASH}-audio.wav", - "0", - "0", ), ], ) diff --git a/cdp_backend/tests/utils/test_file_utils.py b/cdp_backend/tests/utils/test_file_utils.py index 944481c6..9217eb9e 100644 --- a/cdp_backend/tests/utils/test_file_utils.py +++ b/cdp_backend/tests/utils/test_file_utils.py @@ -102,8 +102,6 @@ def test_hash_file_contents(tmpdir) -> None: # type: ignore ("1", "10"), (None, "10"), ("1", None), - ("0", "0"), - ("1", "0"), ], ) def test_split_audio( # type: ignore diff --git a/cdp_backend/utils/file_utils.py b/cdp_backend/utils/file_utils.py index dfff6d36..5b6664bf 100644 --- a/cdp_backend/utils/file_utils.py +++ b/cdp_backend/utils/file_utils.py @@ -304,10 +304,13 @@ def split_audio( # Run dag log.debug(f"Beginning audio separation for: {video_read_path}") - out, err = ffmpeg.run(stream, capture_stdout=True, capture_stderr=True) + try: + out, err = ffmpeg.run(stream, capture_stdout=True, capture_stderr=True) + except ffmpeg._run.Error as e: + log.error(e.stderr) + raise e log.debug(f"Completed audio separation for: {video_read_path}") log.debug(f"Stored audio: {audio_save_path}") - # Store logs ffmpeg_stdout_path = resolved_audio_save_path.with_suffix(".out") ffmpeg_stderr_path = resolved_audio_save_path.with_suffix(".err") From 06791d730c21b7e289991b7e2777e6a367593fed Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Wed, 16 Nov 2022 11:23:44 -0700 Subject: [PATCH 04/25] Updated transcription range to video range, updated video handling to host when limiting video to a range, updated mp4 conversion to allow a range, connected mp4 to clip functionality, updated tests and tried to make testing slightly more consistent, added Session ingestion verification to test out --- cdp_backend/database/models.py | 8 +- cdp_backend/pipeline/event_gather_pipeline.py | 34 +++++-- cdp_backend/pipeline/ingestion_models.py | 19 +++- cdp_backend/pipeline/mock_get_events.py | 4 +- .../pipeline/test_event_gather_pipeline.py | 13 --- cdp_backend/tests/utils/test_file_utils.py | 72 ++++++++------ cdp_backend/utils/file_utils.py | 95 ++++++++++++------- 7 files changed, 146 insertions(+), 99 deletions(-) diff --git a/cdp_backend/database/models.py b/cdp_backend/database/models.py index 1e13a71e..a7c7380e 100644 --- a/cdp_backend/database/models.py +++ b/cdp_backend/database/models.py @@ -494,8 +494,8 @@ class Session(Model): session_index = fields.NumberField(required=True) session_content_hash = fields.TextField(required=True) video_uri = fields.TextField(required=True, validator=validators.resource_exists) - transcription_start_time = fields.TextField(validators.time_duration_is_valid) - transcription_end_time = fields.TextField(validators.time_duration_is_valid) + video_start_time = fields.TextField(validators.time_duration_is_valid) + video_end_time = fields.TextField(validators.time_duration_is_valid) caption_uri = fields.TextField(validator=validators.resource_exists) external_source_id = fields.TextField() @@ -517,8 +517,8 @@ def Example(cls) -> Model: session.video_uri = ( "https://video.seattle.gov/media/council/brief_072219_2011957V.mp4" ) - session.transcription_start_time = "00:00:00" - session.transcription_end_time = "99:59:59" + session.video_start_time = "01:00:00" + session.video_end_time = "99:59:59" session.session_content_hash = ( "05bd857af7f70bf51b6aac1144046973bf3325c9101a554bc27dc9607dbbd8f5" ) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index 3e0beafd..a3559c66 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -152,8 +152,6 @@ def create_event_gather_flow( audio_uri = split_audio( session_content_hash=session_content_hash, tmp_video_filepath=tmp_video_filepath, - start_time=session.transcription_start_time, - end_time=session.transcription_end_time, bucket=config.validated_gcs_bucket_name, credentials_file=config.google_credentials_file, ) @@ -332,19 +330,41 @@ def convert_video_and_handle_host( # Get file extension ext = Path(video_filepath).suffix.lower() + trim_video = bool(session.video_start_time or session.video_end_time) + # Convert to mp4 if file isn't of approved web format cdp_will_host = False if ext not in [".mp4", ".webm"]: cdp_will_host = True # Convert video to mp4 - mp4_filepath = file_utils.convert_video_to_mp4(video_filepath) + mp4_filepath = file_utils.convert_video_to_mp4( + video_filepath=Path(video_filepath), + start_time=session.video_start_time, + end_time=session.video_end_time, + ) + + fs_functions.remove_local_file(video_filepath) + + # Update variable name for easier downstream typing + video_filepath = str(mp4_filepath) + + # host trimmed videos because it's simpler than setting + # up transcription ranges and seamless front end UX + if trim_video: + cdp_will_host = True + + # Convert video to mp4 + trimmed_filepath = file_utils.clip_and_reformat_video( + video_filepath=Path(video_filepath), + start_time=session.video_start_time, + end_time=session.video_end_time, + ) - # Remove old mkv file fs_functions.remove_local_file(video_filepath) # Update variable name for easier downstream typing - video_filepath = mp4_filepath + video_filepath = str(trimmed_filepath) # Check if original session video uri is a m3u8 # We cant follow the normal coonvert video process from above @@ -396,8 +416,6 @@ def convert_video_and_handle_host( def split_audio( session_content_hash: str, tmp_video_filepath: str, - start_time: str, - end_time: str, bucket: str, credentials_file: str, ) -> str: @@ -441,8 +459,6 @@ def split_audio( tmp_audio_log_err_filepath, ) = file_utils.split_audio( video_read_path=tmp_video_filepath, - start_time=start_time, - end_time=end_time, audio_save_path=tmp_audio_filepath, overwrite=True, ) diff --git a/cdp_backend/pipeline/ingestion_models.py b/cdp_backend/pipeline/ingestion_models.py index e1662e77..c27b063e 100644 --- a/cdp_backend/pipeline/ingestion_models.py +++ b/cdp_backend/pipeline/ingestion_models.py @@ -139,11 +139,22 @@ class Session(IngestionModel, DataClassJsonMixin): session_datetime: datetime video_uri: str session_index: int - transcription_start_time: Optional[str] = None - transcription_end_time: Optional[str] = None + video_start_time: Optional[str] = None + video_end_time: Optional[str] = None caption_uri: Optional[str] = None external_source_id: Optional[str] = None + def __post_init__(self) -> None: + if self.video_start_time and self.video_end_time: + start = list(map(int, ("00:00:0" + self.video_start_time).split(":"))) + end = list(map(int, ("00:00:0" + self.video_end_time).split(":"))) + start.reverse() + end.reverse() + start_seconds = start[0] + start[1] * 60 + start[2] * 3600 + end_seconds = end[0] + end[1] * 60 + end[2] * 3600 + if start_seconds >= end_seconds: + raise ValueError("start_time must be less than end_time if both exist") + @dataclass class Body(IngestionModel, DataClassJsonMixin): @@ -265,8 +276,8 @@ class EventIngestionModel(IngestionModel, DataClassJsonMixin): video_uri=( "https://video.seattle.gov/media/council/council_113020_2022091V.mp4" ), - transcription_start_time=("00:00:00"), - transcription_end_time=("99:59:59"), + video_start_time=("00:00:00"), + video_end_time=("99:59:59"), caption_uri=( "https://www.seattlechannel.org/documents/seattlechannel/closedcaption/2020/council_113020_2022091.vtt" # noqa: E501 ), diff --git a/cdp_backend/pipeline/mock_get_events.py b/cdp_backend/pipeline/mock_get_events.py index a5cdf872..190f63fb 100644 --- a/cdp_backend/pipeline/mock_get_events.py +++ b/cdp_backend/pipeline/mock_get_events.py @@ -134,8 +134,8 @@ def _get_example_event() -> EventIngestionModel: session_datetime=datetime.utcnow() + (i * timedelta(hours=3)), session_index=i, video_uri=session[0], - transcription_start_time=session[2], - transcription_end_time=session[3], + video_start_time=session[2], + video_end_time=session[3], caption_uri=session[1], ) for i, session in enumerate(random.sample(SESSIONS, random.randint(1, 3))) diff --git a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py index bc274767..acc37106 100644 --- a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py @@ -67,15 +67,6 @@ def test_create_event_gather_flow(config: EventGatherPipelineConfig) -> None: ) @mock.patch(f"{PIPELINE_PATH}.fs_functions.get_file_uri") @mock.patch(f"{PIPELINE_PATH}.fs_functions.upload_file") -@pytest.mark.parametrize( - "start_time, end_time", - [ - (None, None), - ("1", "2"), - pytest.param("1", "0", marks=pytest.mark.xfail), - pytest.param("0", "0", marks=pytest.mark.xfail), - ], -) @pytest.mark.parametrize( "get_file_uri_value, audio_upload_file_return", [ @@ -94,8 +85,6 @@ def test_split_audio( mock_get_file_uri: MagicMock, get_file_uri_value: str, audio_upload_file_return: str, - start_time: str, - end_time: str, example_video: Path, ) -> None: mock_get_file_uri.return_value = get_file_uri_value @@ -104,8 +93,6 @@ def test_split_audio( audio_uri = pipeline.split_audio.run( session_content_hash=VIDEO_CONTENT_HASH, tmp_video_filepath=str(example_video), - start_time=start_time, - end_time=end_time, bucket="bucket", credentials_file="/fake/credentials/path", ) diff --git a/cdp_backend/tests/utils/test_file_utils.py b/cdp_backend/tests/utils/test_file_utils.py index 9217eb9e..0346c97d 100644 --- a/cdp_backend/tests/utils/test_file_utils.py +++ b/cdp_backend/tests/utils/test_file_utils.py @@ -80,6 +80,7 @@ def test_hash_file_contents(tmpdir) -> None: # type: ignore assert hash_a != hash_b +@pytest.mark.parametrize("video_filename", [(EXAMPLE_VIDEO_FILENAME)]) # Type ignore because changing tmpdir typing @pytest.mark.parametrize( "audio_save_path", @@ -95,24 +96,15 @@ def test_hash_file_contents(tmpdir) -> None: # type: ignore ), ], ) -@pytest.mark.parametrize( - "start_time, end_time", - [ - (None, None), - ("1", "10"), - (None, "10"), - ("1", None), - ], -) def test_split_audio( # type: ignore - tmpdir, - example_video: str, - start_time: str, - end_time: str, + tmpdir: Path, + resources_dir: Path, + video_filename: str, audio_save_path: str, ) -> None: # Append save name to tmpdir tmp_dir_audio_save_path = Path(tmpdir) / Path(audio_save_path).resolve() + example_video = str(resources_dir / video_filename) # Mock split with mock.patch("ffmpeg.run") as mocked_ffmpeg: @@ -120,8 +112,6 @@ def test_split_audio( # type: ignore try: audio_file, stdout_log, stderr_log = file_utils.split_audio( video_read_path=example_video, - start_time=start_time, - end_time=end_time, audio_save_path=str(tmp_dir_audio_save_path), ) @@ -129,6 +119,10 @@ def test_split_audio( # type: ignore assert str(tmp_dir_audio_save_path.with_suffix(".out")) == stdout_log assert str(tmp_dir_audio_save_path.with_suffix(".err")) == stderr_log + os.remove(audio_file) + os.remove(stdout_log) + os.remove(stderr_log) + except Exception as e: raise e @@ -138,7 +132,7 @@ def test_split_audio( # type: ignore reason="File removal for test cleanup sometimes fails on Windows", ) @pytest.mark.parametrize( - "video_url, session_content_hash, seconds, expected", + "video_filename, session_content_hash, seconds, expected", [ (EXAMPLE_VIDEO_FILENAME, "example2", 45, "example2-static-thumbnail.png"), (EXAMPLE_VIDEO_FILENAME, "example3", 999999, "example3-static-thumbnail.png"), @@ -161,15 +155,15 @@ def test_split_audio( # type: ignore ) def test_static_thumbnail_generator( resources_dir: Path, - video_url: Path, + video_filename: str, session_content_hash: str, seconds: int, expected: str, ) -> None: - video_url = resources_dir / video_url + video_path = resources_dir / video_filename result = file_utils.get_static_thumbnail( - str(video_url), session_content_hash, seconds + str(video_path), session_content_hash, seconds ) assert result == expected @@ -187,7 +181,7 @@ def test_static_thumbnail_generator( reason="File removal for test cleanup sometimes fails on Windows", ) @pytest.mark.parametrize( - "video_url, session_content_hash, num_frames, expected", + "video_filename, session_content_hash, num_frames, expected", [ (EXAMPLE_VIDEO_FILENAME, "example2", 15, "example2-hover-thumbnail.gif"), (EXAMPLE_VIDEO_HD_FILENAME, "example3", 2, "example3-hover-thumbnail.gif"), @@ -209,18 +203,18 @@ def test_static_thumbnail_generator( ) def test_hover_thumbnail_generator( resources_dir: Path, - video_url: Path, + video_filename: str, session_content_hash: str, num_frames: int, expected: str, ) -> None: - video_url = resources_dir / video_url + video_path = resources_dir / video_filename # Set random seed to get consistent result random.seed(42) result = file_utils.get_hover_thumbnail( - str(video_url), session_content_hash, num_frames + str(video_path), session_content_hash, num_frames ) assert result == expected @@ -239,18 +233,32 @@ def test_hover_thumbnail_generator( reason="No internet connection", ) @pytest.mark.parametrize( - "video_uri, expected", + "video_filename, expected", + [ + (EXAMPLE_MKV_VIDEO_FILENAME, "test-" + EXAMPLE_VIDEO_FILENAME), + ], +) +@pytest.mark.parametrize( + "start_time, end_time", [ - (EXAMPLE_MKV_VIDEO_FILENAME, EXAMPLE_VIDEO_FILENAME), + ("1", "3"), + (None, "3"), + ("2:58", None), ], ) def test_convert_video_to_mp4( resources_dir: Path, - video_uri: str, + video_filename: str, expected: str, + start_time: Optional[str], + end_time: Optional[str], ) -> None: - filepath = str(resources_dir / video_uri) - assert file_utils.convert_video_to_mp4(filepath) == str(resources_dir / expected) + filepath = resources_dir / video_filename + outfile = resources_dir / expected + outfile = file_utils.convert_video_to_mp4(filepath, start_time, end_time, outfile) + assert outfile == resources_dir / expected + assert outfile.exists() + os.remove(outfile) @pytest.mark.skipif( @@ -294,6 +302,7 @@ def test_invalid_uri() -> None: ) +@pytest.mark.parametrize("video_filename", [EXAMPLE_VIDEO_FILENAME]) @pytest.mark.parametrize( "start_time, end_time", [ @@ -309,6 +318,7 @@ def test_invalid_uri() -> None: ) def test_clip_and_reformat_video( resources_dir: Path, + video_filename: str, start_time: str, end_time: str, output_format: str, @@ -331,7 +341,7 @@ def test_clip_and_reformat_video( @pytest.mark.parametrize( - "video_uri, caption_uri, end_time, is_resource, expected", + "video_filename, caption_uri, end_time, is_resource, expected", [ # the video is about 3 minutes and boston_captions.vtt is about 1 minute (EXAMPLE_VIDEO_FILENAME, "boston_captions.vtt", 120, True, False), @@ -349,7 +359,7 @@ def test_clip_and_reformat_video( ) def test_caption_is_valid( resources_dir: Path, - video_uri: str, + video_filename: str, caption_uri: str, end_time: int, is_resource: bool, @@ -357,7 +367,7 @@ def test_caption_is_valid( ) -> None: with TemporaryDirectory() as dir_path: temp_video = str(Path(dir_path) / f"caption-test-{end_time}.mp4") - ffmpeg.input(str(resources_dir / video_uri)).output( + ffmpeg.input(str(resources_dir / video_filename)).output( temp_video, codec="copy", t=end_time ).run(overwrite_output=True) diff --git a/cdp_backend/utils/file_utils.py b/cdp_backend/utils/file_utils.py index 5b6664bf..55da0d49 100644 --- a/cdp_backend/utils/file_utils.py +++ b/cdp_backend/utils/file_utils.py @@ -249,8 +249,6 @@ def vimeo_copy(uri: str, dst: Path, overwrite: bool = False) -> str: def split_audio( video_read_path: str, - start_time: str, - end_time: str, audio_save_path: str, overwrite: bool = False, ) -> Tuple[str, str, str]: @@ -261,10 +259,6 @@ def split_audio( ---------- video_read_path: str Path to the video to split the audio from. - start_time: str - The start time of the clip in HH:MM:SS. - end_time: str - The end time of the clip in HH:MM:SS. audio_save_path: str Path to where the audio should be stored. @@ -287,25 +281,21 @@ def split_audio( if resolved_audio_save_path.is_dir(): raise IsADirectoryError(resolved_audio_save_path) - # Construct ffmpeg dag - stream = ffmpeg.input( - resolved_video_read_path, - ss=start_time or "0", - to=end_time or "99:59:59", - ) - stream = ffmpeg.output( - stream, - filename=resolved_audio_save_path, - format="wav", - acodec="pcm_s16le", - ac=1, - ar="16k", - ) - # Run dag log.debug(f"Beginning audio separation for: {video_read_path}") try: - out, err = ffmpeg.run(stream, capture_stdout=True, capture_stderr=True) + # Construct ffmpeg dag + out, err = ( + ffmpeg.input(resolved_video_read_path) + .output( + filename=resolved_audio_save_path, + format="wav", + acodec="pcm_s16le", + ac=1, + ar="16k", + ) + .run(capture_stdout=True, capture_stderr=True) + ) except ffmpeg._run.Error as e: log.error(e.stderr) raise e @@ -511,7 +501,12 @@ def hash_file_contents(uri: str, buffer_size: int = 2**16) -> str: return hasher.hexdigest() -def convert_video_to_mp4(video_filepath: str) -> str: +def convert_video_to_mp4( + video_filepath: Path, + start_time: Optional[str], + end_time: Optional[str], + output_path: Path = None, +) -> Path: """ Converts a video to an equivalent MP4 file. @@ -519,19 +514,31 @@ def convert_video_to_mp4(video_filepath: str) -> str: ---------- video_filepath: str The filepath of the video to convert. + start_time: str + The start time to trim the video in HH:MM:SS. + end_time: str + The end time to trim the video in HH:MM:SS. + output_path: Path + The output path to place the clip at. + Must include a suffix to use for the reformatting. Returns ------- - mp4_filepath: str + output_path: str The filepath of the converted MP4 video. """ - import ffmpeg - mp4_filepath = str(Path(video_filepath).with_suffix(".mp4")) - ffmpeg.input(video_filepath).output(mp4_filepath).overwrite_output().run() + output_path = output_path or video_filepath.with_suffix(".mp4") + output_path = clip_and_reformat_video( + video_filepath=video_filepath, + start_time=start_time, + end_time=end_time, + output_path=output_path, + output_format="mp4", + ) log.info("Finished converting {} to mp4".format(video_filepath)) - return mp4_filepath + return output_path def generate_file_storage_name(file_uri: str, suffix: str) -> str: @@ -596,8 +603,8 @@ def download_video_from_session_id( def clip_and_reformat_video( video_filepath: Path, - start_time: str, - end_time: str, + start_time: Optional[str], + end_time: Optional[str], output_path: Path = Path("clipped.mp4"), output_format: str = "mp4", ) -> Path: @@ -627,12 +634,19 @@ def clip_and_reformat_video( """ import ffmpeg - video_filepath = Path(video_filepath) - ffmpeg_stdout, ffmpeg_stderr = ( - ffmpeg.input(video_filepath) - .output(str(output_path), ss=start_time, to=end_time, format=output_format) - .run(quiet=True) - ) + try: + ffmpeg_stdout, ffmpeg_stderr = ( + ffmpeg.input( + video_filepath, + ss=start_time or "0", + to=end_time or "99:59:59", + ) + .output(filename=str(output_path), format=output_format) + .run(capture_stdout=True, capture_stderr=True) + ) + except ffmpeg._run.Error as e: + log.error(e.stderr) + raise e log.info(f"Finished clipping {video_filepath} to {output_path}") log.debug(ffmpeg_stdout) @@ -694,4 +708,13 @@ def caption_is_valid(video_uri: str, caption_uri: str) -> bool: ), ffprobe.get("streams", []), ) + audio_streams = filter( + lambda s: s.get("codec_type", "") == "audio", + ffprobe.get("streams", []), + ) + for s in audio_streams: + duration = s.get("duration", "0.0") + log.warning(f"stream length {duration}") + log.warning(f"caption length {caption_length}") + return any(similar_audio_streams) From 88d619fc6c82bcc81c50ade1cd300a46d39544cb Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Fri, 18 Nov 2022 09:42:15 -0700 Subject: [PATCH 05/25] Update session hash to reflect trimmed video --- cdp_backend/pipeline/event_gather_pipeline.py | 19 +++++++++---------- .../pipeline/test_event_gather_pipeline.py | 5 ++++- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index a3559c66..ba65ff4e 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -130,18 +130,13 @@ def create_event_gather_flow( # Download video to local copy resource_copy_filepath = resource_copy_task(uri=session.video_uri) - # Get unique session identifier - session_content_hash = get_session_content_hash( - tmp_video_filepath=resource_copy_filepath, - ) - # Handle video conversion or non-secure resource # hosting ( tmp_video_filepath, session_video_hosted_url, + session_content_hash, ) = convert_video_and_handle_host( - session_content_hash=session_content_hash, video_filepath=resource_copy_filepath, session=session, credentials_file=config.google_credentials_file, @@ -293,14 +288,13 @@ def get_session_content_hash( return file_utils.hash_file_contents(uri=tmp_video_filepath) -@task(nout=2) +@task(nout=3) def convert_video_and_handle_host( - session_content_hash: str, video_filepath: str, session: Session, credentials_file: str, bucket: str, -) -> Tuple[str, str]: +) -> Tuple[str, str, str]: """ Convert a video to MP4 (if necessary), upload it to the file store, and remove the original non-MP4 file that was resource copied. @@ -392,6 +386,11 @@ def convert_video_and_handle_host( else: hosted_video_media_url = session.video_uri + # Get unique session identifier + session_content_hash = get_session_content_hash( + tmp_video_filepath=video_filepath, + ) + # Upload and swap if cdp is hosting if cdp_will_host: # Upload to gcsfs @@ -409,7 +408,7 @@ def convert_video_and_handle_host( uri=hosted_video_uri, ) - return video_filepath, hosted_video_media_url + return video_filepath, hosted_video_media_url, session_content_hash @task diff --git a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py index acc37106..7843c6b8 100644 --- a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py @@ -577,6 +577,7 @@ def test_store_event_processing_results( EXISTING_REMOTE_M3U8_MINIMAL_EVENT.sessions[0].video_uri = EXAMPLE_M3U8_PLAYLIST_URI +@mock.patch(f"{PIPELINE_PATH}.get_session_content_hash") @mock.patch(f"{PIPELINE_PATH}.fs_functions.upload_file") @mock.patch(f"{PIPELINE_PATH}.fs_functions.get_open_url_for_gcs_file") @mock.patch(f"{PIPELINE_PATH}.fs_functions.remove_local_file") @@ -621,6 +622,7 @@ def test_convert_video_and_handle_host( mock_remove_local_file: MagicMock, mock_generate_url: MagicMock, mock_upload_file: MagicMock, + mock_get_session_content_hash: MagicMock, video_filepath: str, session: Session, expected_filepath: str, @@ -629,12 +631,13 @@ def test_convert_video_and_handle_host( mock_upload_file.return_value = "file_store_uri" mock_generate_url.return_value = "hosted-video.mp4" mock_convert_video_to_mp4.return_value = expected_filepath + mock_get_session_content_hash.return_value = "abc123" ( mp4_filepath, session_video_hosted_url, + session_content_hash, ) = pipeline.convert_video_and_handle_host.run( - session_content_hash="abc123", video_filepath=video_filepath, session=session, credentials_file="fake/credentials.json", From 217deef35cbc4441e05586a5b0fd90be8c3596b1 Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Fri, 18 Nov 2022 11:35:09 -0700 Subject: [PATCH 06/25] Bypass hash task --- cdp_backend/pipeline/event_gather_pipeline.py | 4 +--- cdp_backend/tests/pipeline/test_event_gather_pipeline.py | 6 +++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index ba65ff4e..fb18abf0 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -387,9 +387,7 @@ def convert_video_and_handle_host( hosted_video_media_url = session.video_uri # Get unique session identifier - session_content_hash = get_session_content_hash( - tmp_video_filepath=video_filepath, - ) + session_content_hash = file_utils.hash_file_contents(uri=video_filepath) # Upload and swap if cdp is hosting if cdp_will_host: diff --git a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py index 7843c6b8..50236609 100644 --- a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py @@ -577,10 +577,10 @@ def test_store_event_processing_results( EXISTING_REMOTE_M3U8_MINIMAL_EVENT.sessions[0].video_uri = EXAMPLE_M3U8_PLAYLIST_URI -@mock.patch(f"{PIPELINE_PATH}.get_session_content_hash") @mock.patch(f"{PIPELINE_PATH}.fs_functions.upload_file") @mock.patch(f"{PIPELINE_PATH}.fs_functions.get_open_url_for_gcs_file") @mock.patch(f"{PIPELINE_PATH}.fs_functions.remove_local_file") +@mock.patch(f"{PIPELINE_PATH}.file_utils.hash_file_contents") @mock.patch(f"{PIPELINE_PATH}.file_utils.convert_video_to_mp4") @pytest.mark.parametrize( "video_filepath, session, expected_filepath, expected_hosted_video_url", @@ -619,10 +619,10 @@ def test_store_event_processing_results( ) def test_convert_video_and_handle_host( mock_convert_video_to_mp4: MagicMock, + mock_hash_file_contents: MagicMock, mock_remove_local_file: MagicMock, mock_generate_url: MagicMock, mock_upload_file: MagicMock, - mock_get_session_content_hash: MagicMock, video_filepath: str, session: Session, expected_filepath: str, @@ -631,7 +631,7 @@ def test_convert_video_and_handle_host( mock_upload_file.return_value = "file_store_uri" mock_generate_url.return_value = "hosted-video.mp4" mock_convert_video_to_mp4.return_value = expected_filepath - mock_get_session_content_hash.return_value = "abc123" + mock_hash_file_contents.return_value = "abc123" ( mp4_filepath, From 78c2019b0b06df1be99eab67e8e470e02870527c Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Mon, 21 Nov 2022 21:46:53 -0700 Subject: [PATCH 07/25] Remove unnecessary logging, duration validation comments, elif typo fix in cdp_will_host control structure --- cdp_backend/pipeline/event_gather_pipeline.py | 6 +++--- cdp_backend/pipeline/ingestion_models.py | 3 +++ cdp_backend/utils/file_utils.py | 8 -------- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index fb18abf0..f1b5d370 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -344,11 +344,11 @@ def convert_video_and_handle_host( video_filepath = str(mp4_filepath) # host trimmed videos because it's simpler than setting - # up transcription ranges and seamless front end UX - if trim_video: + # up transcription and playback ranges + elif trim_video: cdp_will_host = True - # Convert video to mp4 + # Trim video trimmed_filepath = file_utils.clip_and_reformat_video( video_filepath=Path(video_filepath), start_time=session.video_start_time, diff --git a/cdp_backend/pipeline/ingestion_models.py b/cdp_backend/pipeline/ingestion_models.py index c27b063e..6b64f66e 100644 --- a/cdp_backend/pipeline/ingestion_models.py +++ b/cdp_backend/pipeline/ingestion_models.py @@ -145,7 +145,10 @@ class Session(IngestionModel, DataClassJsonMixin): external_source_id: Optional[str] = None def __post_init__(self) -> None: + # validate start/end time pair during ingestion if self.video_start_time and self.video_end_time: + # fill in potentially missing hh:mm:s + # for flexible input format [h[h:[m[m:[s]]]]]s start = list(map(int, ("00:00:0" + self.video_start_time).split(":"))) end = list(map(int, ("00:00:0" + self.video_end_time).split(":"))) start.reverse() diff --git a/cdp_backend/utils/file_utils.py b/cdp_backend/utils/file_utils.py index 55da0d49..63d68567 100644 --- a/cdp_backend/utils/file_utils.py +++ b/cdp_backend/utils/file_utils.py @@ -708,13 +708,5 @@ def caption_is_valid(video_uri: str, caption_uri: str) -> bool: ), ffprobe.get("streams", []), ) - audio_streams = filter( - lambda s: s.get("codec_type", "") == "audio", - ffprobe.get("streams", []), - ) - for s in audio_streams: - duration = s.get("duration", "0.0") - log.warning(f"stream length {duration}") - log.warning(f"caption length {caption_length}") return any(similar_audio_streams) From f6955ac247eee07ab799f86b36e2f6d7e9dc685f Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Mon, 28 Nov 2022 12:35:57 -0700 Subject: [PATCH 08/25] Reverted function parameter doc for split audio --- cdp_backend/pipeline/event_gather_pipeline.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index f1b5d370..35ef6deb 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -425,10 +425,6 @@ def split_audio( The unique identifier for the session. tmp_video_filepath: str The local path for video file to generate a hash for. - start_time: str - The start time of the clip in HH:MM:SS. - end_time: str - The end time of the clip in HH:MM:SS. bucket: str The bucket to store the transcript to. credentials_file: str From 46aff143bdf36ae30acf924b3162946479ea5c0e Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Tue, 6 Dec 2022 11:55:18 -0700 Subject: [PATCH 09/25] Improved documentation for video_start_time in ingestion_models --- cdp_backend/pipeline/ingestion_models.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cdp_backend/pipeline/ingestion_models.py b/cdp_backend/pipeline/ingestion_models.py index 6b64f66e..55018fc5 100644 --- a/cdp_backend/pipeline/ingestion_models.py +++ b/cdp_backend/pipeline/ingestion_models.py @@ -134,6 +134,15 @@ class Session(IngestionModel, DataClassJsonMixin): """ A session is a working period for an event. For example, an event could have a morning and afternoon session. + + Notes + ----- + video_start_time is a duration relative to the beginning of the video in + HH:MM:SS format. It does not affect nor is relative to session_datetime + or any other datetime. If the portion of the video relavent to the session + begins 37m50s into the full video, video_start_time will be "37:50". + An absent start time is equivalent to the beginning of the video, and an + absent end time is equivalent to the end of the video, so either can be omitted. """ session_datetime: datetime From a3a0861eee204755a423fb8ec47ce5dc4e5d1afa Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Tue, 27 Dec 2022 17:14:08 -0800 Subject: [PATCH 10/25] Use content hash to name videos and prevent collisions across sessions --- cdp_backend/pipeline/event_gather_pipeline.py | 7 ++++ .../pipeline/test_event_gather_pipeline.py | 38 +++++++++++-------- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index 35ef6deb..704a61dd 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -136,6 +136,7 @@ def create_event_gather_flow( tmp_video_filepath, session_video_hosted_url, session_content_hash, + ) = convert_video_and_handle_host( video_filepath=resource_copy_filepath, session=session, @@ -389,6 +390,12 @@ def convert_video_and_handle_host( # Get unique session identifier session_content_hash = file_utils.hash_file_contents(uri=video_filepath) + # Rename file to prevent collisions + video_file = Path(video_filepath) + video_filepath = str(video_file.rename(video_file.with_stem( + f"{session_content_hash}_temp_video" + ))) + # Upload and swap if cdp is hosting if cdp_will_host: # Upload to gcsfs diff --git a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py index 50236609..54066790 100644 --- a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py @@ -7,7 +7,7 @@ from pathlib import Path from typing import List, Optional from unittest import mock -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest from prefect import Flow @@ -577,6 +577,10 @@ def test_store_event_processing_results( EXISTING_REMOTE_M3U8_MINIMAL_EVENT.sessions[0].video_uri = EXAMPLE_M3U8_PLAYLIST_URI +def path_rename(self, newPath): + return newPath + + @mock.patch(f"{PIPELINE_PATH}.fs_functions.upload_file") @mock.patch(f"{PIPELINE_PATH}.fs_functions.get_open_url_for_gcs_file") @mock.patch(f"{PIPELINE_PATH}.fs_functions.remove_local_file") @@ -633,20 +637,22 @@ def test_convert_video_and_handle_host( mock_convert_video_to_mp4.return_value = expected_filepath mock_hash_file_contents.return_value = "abc123" - ( - mp4_filepath, - session_video_hosted_url, - session_content_hash, - ) = pipeline.convert_video_and_handle_host.run( - video_filepath=video_filepath, - session=session, - credentials_file="fake/credentials.json", - bucket="doesnt://matter", - ) + with patch.object(Path, 'rename', path_rename): + + ( + mp4_filepath, + session_video_hosted_url, + session_content_hash, + ) = pipeline.convert_video_and_handle_host.run( + video_filepath=video_filepath, + session=session, + credentials_file="fake/credentials.json", + bucket="doesnt://matter", + ) - # Make sure mp4 files don't go through conversion - if Path(video_filepath).suffix == ".mp4": - assert not mock_convert_video_to_mp4.called + # Make sure mp4 files don't go through conversion + if Path(video_filepath).suffix == ".mp4": + assert not mock_convert_video_to_mp4.called - assert mp4_filepath == expected_filepath - assert session_video_hosted_url == expected_hosted_video_url + assert mp4_filepath == f"{session_content_hash}_temp_video.mp4" + assert session_video_hosted_url == expected_hosted_video_url From ae6b28a901e76baae92f1118e200d03aea7f6596 Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Tue, 27 Dec 2022 17:48:23 -0800 Subject: [PATCH 11/25] Use pathlib functions from 3.8, add type annotations for mock function --- cdp_backend/pipeline/event_gather_pipeline.py | 4 ++-- cdp_backend/tests/pipeline/test_event_gather_pipeline.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index f781a05a..3c0b3c4a 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -391,8 +391,8 @@ def convert_video_and_handle_host( # Rename file to prevent collisions video_file = Path(video_filepath) - video_filepath = str(video_file.rename(video_file.with_stem( - f"{session_content_hash}_temp_video" + video_filepath = str(video_file.rename(video_file.with_name( + f"{session_content_hash}_temp_video{video_file.suffix}" ))) # Upload and swap if cdp is hosting diff --git a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py index 54066790..5fa25d44 100644 --- a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py @@ -577,7 +577,7 @@ def test_store_event_processing_results( EXISTING_REMOTE_M3U8_MINIMAL_EVENT.sessions[0].video_uri = EXAMPLE_M3U8_PLAYLIST_URI -def path_rename(self, newPath): +def path_rename(self: Path, newPath: Path): return newPath From b73a2e4c0bcbd54f6ed2ac50485cda88c8d3586a Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Tue, 27 Dec 2022 17:56:12 -0800 Subject: [PATCH 12/25] Add return type annotations for mock function --- cdp_backend/tests/pipeline/test_event_gather_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py index 5fa25d44..9c6efe74 100644 --- a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py @@ -577,7 +577,7 @@ def test_store_event_processing_results( EXISTING_REMOTE_M3U8_MINIMAL_EVENT.sessions[0].video_uri = EXAMPLE_M3U8_PLAYLIST_URI -def path_rename(self: Path, newPath: Path): +def path_rename(self: Path, newPath: Path) -> Path: return newPath From 6e5d707f2a1950a077db0b6da5c1fa4196af2f4e Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Tue, 27 Dec 2022 18:13:31 -0800 Subject: [PATCH 13/25] Lint updates --- cdp_backend/pipeline/event_gather_pipeline.py | 10 +++++++--- .../tests/pipeline/test_event_gather_pipeline.py | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index 3c0b3c4a..d5259271 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -391,9 +391,13 @@ def convert_video_and_handle_host( # Rename file to prevent collisions video_file = Path(video_filepath) - video_filepath = str(video_file.rename(video_file.with_name( - f"{session_content_hash}_temp_video{video_file.suffix}" - ))) + video_filepath = str( + video_file.rename( + video_file.with_name( + f"{session_content_hash}_temp_video{video_file.suffix}" + ) + ) + ) # Upload and swap if cdp is hosting if cdp_will_host: diff --git a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py index 9c6efe74..777d0046 100644 --- a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py @@ -637,7 +637,7 @@ def test_convert_video_and_handle_host( mock_convert_video_to_mp4.return_value = expected_filepath mock_hash_file_contents.return_value = "abc123" - with patch.object(Path, 'rename', path_rename): + with patch.object(Path, "rename", path_rename): ( mp4_filepath, From 0f3f223b92cff38b459a7def46e0ce631744ff7b Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Tue, 27 Dec 2022 23:47:25 -0800 Subject: [PATCH 14/25] Move file renaming to earliest point, return unique names from file conversion functions --- cdp_backend/pipeline/event_gather_pipeline.py | 20 +++-- .../pipeline/test_event_gather_pipeline.py | 2 +- cdp_backend/tests/utils/test_file_utils.py | 52 ++++++++++++ cdp_backend/utils/file_utils.py | 83 ++++++++++++++++++- 4 files changed, 145 insertions(+), 12 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index d5259271..51ddfaf9 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -7,6 +7,7 @@ from operator import attrgetter from pathlib import Path from typing import Any, Callable, Dict, List, NamedTuple, Optional, Set, Tuple, Union +from uuid import uuid4 from aiohttp.client_exceptions import ClientResponseError from fireo.fields.errors import FieldValidationFailed, InvalidFieldType, RequiredField @@ -321,6 +322,14 @@ def convert_video_and_handle_host( hosted_video_uri: str The URI for the CDP hosted video. """ + # Prevent path collision + video_filepath = str( + file_utils.rename_with_stem( + Path(video_filepath), + f"{str(uuid4())}_temp", + ) + ) + # Get file extension ext = Path(video_filepath).suffix.lower() @@ -389,14 +398,11 @@ def convert_video_and_handle_host( # Get unique session identifier session_content_hash = file_utils.hash_file_contents(uri=video_filepath) - # Rename file to prevent collisions - video_file = Path(video_filepath) + # Rename file for deterministic output video_filepath = str( - video_file.rename( - video_file.with_name( - f"{session_content_hash}_temp_video{video_file.suffix}" - ) - ) + file_utils.rename_with_stem( + Path(video_filepath), f"{session_content_hash}_temp" + ), ) # Upload and swap if cdp is hosting diff --git a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py index 777d0046..7d15555c 100644 --- a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py @@ -654,5 +654,5 @@ def test_convert_video_and_handle_host( if Path(video_filepath).suffix == ".mp4": assert not mock_convert_video_to_mp4.called - assert mp4_filepath == f"{session_content_hash}_temp_video.mp4" + assert mp4_filepath == f"{session_content_hash}_temp.mp4" assert session_video_hosted_url == expected_hosted_video_url diff --git a/cdp_backend/tests/utils/test_file_utils.py b/cdp_backend/tests/utils/test_file_utils.py index 0346c97d..8b588c21 100644 --- a/cdp_backend/tests/utils/test_file_utils.py +++ b/cdp_backend/tests/utils/test_file_utils.py @@ -35,6 +35,58 @@ ############################################################################# +@pytest.mark.parametrize( + "path, stem, expected_result", + [ + (Path("file.ext"), "new", "new.ext"), + ], +) +def test_with_stem(path: Path, stem: str, expected_result: str) -> None: + new_path = file_utils.with_stem(path, stem) + assert str(new_path) == expected_result + + +@pytest.mark.parametrize( + "path, addition, expected_result", + [ + (Path("file.ext"), "-new", "file-new.ext"), + ], +) +def test_append_to_stem(path: Path, addition: str, expected_result: str) -> None: + new_path = file_utils.append_to_stem(path, addition) + assert str(new_path) == expected_result + + +@pytest.mark.parametrize( + "path, stem, expected_result", + [ + (Path("file.ext"), "new", "new.ext"), + ], +) +def test_rename_with_stem(path: Path, stem: str, expected_result: str) -> None: + file = open(path, "w") + file.close() + new_path = file_utils.rename_with_stem(path, stem) + assert str(new_path) == expected_result + assert new_path.exists() + os.remove(new_path) + + +@pytest.mark.parametrize( + "path, addition, expected_result", + [ + (Path("file.ext"), "-new", "file-new.ext"), + ], +) +def test_rename_append_to_stem(path: Path, addition: str, expected_result: str) -> None: + file = open(path, "w") + file.close() + new_path = file_utils.rename_append_to_stem(path, addition) + assert str(new_path) == expected_result + assert new_path.exists() + os.remove(new_path) + + @pytest.mark.parametrize( "uri, expected_result", [ diff --git a/cdp_backend/utils/file_utils.py b/cdp_backend/utils/file_utils.py index 63d68567..d020185b 100644 --- a/cdp_backend/utils/file_utils.py +++ b/cdp_backend/utils/file_utils.py @@ -28,6 +28,82 @@ MAX_THUMBNAIL_WIDTH = 960 +def with_stem(path: Path, stem: str) -> Path: + """ + Create a path with a new stem + + Parameters + ---------- + path: Path + The path to alter + stem: str + The string to be the new stem of the path + + Returns + ------- + path: Path + The new path with the replaced stem + """ + return path.with_name(f"{stem}{path.suffix}") + + +def append_to_stem(path: Path, addition: str) -> Path: + """ + Rename a file with a string appended to the path stem + + Parameters + ---------- + path: Path + The path to alter + addition: str + The string to be appended to the path stem + + Returns + ------- + path: Path + The new path with the stem addition + """ + return with_stem(path, f"{path.stem}{addition}") + + +def rename_with_stem(path: Path, stem: str) -> Path: + """ + Rename a file with a string appended to the path stem + + Parameters + ---------- + path: Path + The path to be renamed + stem: str + The string to become the new stem + + Returns + ------- + path: Path + The new path of the renamed file + """ + return path.rename(with_stem(path, stem)) + + +def rename_append_to_stem(path: Path, addition: str) -> Path: + """ + Rename a file with a string appended to the path stem + + Parameters + ---------- + path: Path + The path to be renamed + addition: str + The string to be appended to the path stem + + Returns + ------- + path: Path + The new path of the renamed file + """ + return path.rename(append_to_stem(path, addition)) + + def get_media_type(uri: str) -> Optional[str]: """ Get the IANA media type for the provided URI. @@ -520,7 +596,6 @@ def convert_video_to_mp4( The end time to trim the video in HH:MM:SS. output_path: Path The output path to place the clip at. - Must include a suffix to use for the reformatting. Returns ------- @@ -605,7 +680,7 @@ def clip_and_reformat_video( video_filepath: Path, start_time: Optional[str], end_time: Optional[str], - output_path: Path = Path("clipped.mp4"), + output_path: Path = None, output_format: str = "mp4", ) -> Path: """ @@ -621,8 +696,6 @@ def clip_and_reformat_video( The end time of the clip in HH:MM:SS. output_path: Path The output path to place the clip at. - Must include a suffix to use for the reformatting. - Default: "clipped.mp4" output_format: str The output format. Default: "mp4" @@ -634,6 +707,8 @@ def clip_and_reformat_video( """ import ffmpeg + output_path = output_path or rename_append_to_stem(video_filepath, "_clipped") + try: ffmpeg_stdout, ffmpeg_stderr = ( ffmpeg.input( From 1d53f1cb8702a389a298512f6244568fb6303a22 Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Wed, 28 Dec 2022 09:33:13 -0800 Subject: [PATCH 15/25] UUID for original file resource copy task so to prevent collisions earlier in the process --- cdp_backend/pipeline/event_gather_pipeline.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index 51ddfaf9..38d5a9ad 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -129,7 +129,9 @@ def create_event_gather_flow( session_processing_results: List[SessionProcessingResult] = [] for session in event.sessions: # Download video to local copy - resource_copy_filepath = resource_copy_task(uri=session.video_uri) + resource_copy_filepath = resource_copy_task( + uri=session.video_uri, dst=f"{str(uuid4())}_temp.mp4" + ) # Handle video conversion or non-secure resource # hosting @@ -230,7 +232,7 @@ def create_event_gather_flow( @task(max_retries=3, retry_delay=timedelta(seconds=120)) -def resource_copy_task(uri: str) -> str: +def resource_copy_task(uri: str, dst: str = None) -> str: """ Copy a file to a temporary location for processing. @@ -251,6 +253,7 @@ def resource_copy_task(uri: str) -> str: """ return file_utils.resource_copy( uri=uri, + dst=dst, overwrite=True, ) @@ -322,14 +325,6 @@ def convert_video_and_handle_host( hosted_video_uri: str The URI for the CDP hosted video. """ - # Prevent path collision - video_filepath = str( - file_utils.rename_with_stem( - Path(video_filepath), - f"{str(uuid4())}_temp", - ) - ) - # Get file extension ext = Path(video_filepath).suffix.lower() From dae4ad7c48defb8dd727637fd27f3b62dc2246a5 Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Wed, 28 Dec 2022 09:49:02 -0800 Subject: [PATCH 16/25] Minor comment change to force CI reprocessing --- cdp_backend/pipeline/event_gather_pipeline.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index 38d5a9ad..7549d655 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -128,7 +128,8 @@ def create_event_gather_flow( for event in events: session_processing_results: List[SessionProcessingResult] = [] for session in event.sessions: - # Download video to local copy + # Download video to local copy making + # copy unique in case of shared session video resource_copy_filepath = resource_copy_task( uri=session.video_uri, dst=f"{str(uuid4())}_temp.mp4" ) From 0d812089cb1df864360d1f49eb2835cf615cca4b Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Wed, 28 Dec 2022 22:48:57 -0800 Subject: [PATCH 17/25] Stop renaming resource within task --- cdp_backend/pipeline/event_gather_pipeline.py | 7 ------- cdp_backend/tests/pipeline/test_event_gather_pipeline.py | 2 +- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index 7549d655..5ba5b0d8 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -394,13 +394,6 @@ def convert_video_and_handle_host( # Get unique session identifier session_content_hash = file_utils.hash_file_contents(uri=video_filepath) - # Rename file for deterministic output - video_filepath = str( - file_utils.rename_with_stem( - Path(video_filepath), f"{session_content_hash}_temp" - ), - ) - # Upload and swap if cdp is hosting if cdp_will_host: # Upload to gcsfs diff --git a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py index 7d15555c..e33151ff 100644 --- a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py @@ -654,5 +654,5 @@ def test_convert_video_and_handle_host( if Path(video_filepath).suffix == ".mp4": assert not mock_convert_video_to_mp4.called - assert mp4_filepath == f"{session_content_hash}_temp.mp4" + assert mp4_filepath == str(Path(video_filepath).with_suffix('.mp4')) assert session_video_hosted_url == expected_hosted_video_url From 74362559829fa1b49862b649eb50076932c57d10 Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Wed, 28 Dec 2022 23:18:05 -0800 Subject: [PATCH 18/25] Lint --- cdp_backend/tests/pipeline/test_event_gather_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py index e33151ff..5381e346 100644 --- a/cdp_backend/tests/pipeline/test_event_gather_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_gather_pipeline.py @@ -654,5 +654,5 @@ def test_convert_video_and_handle_host( if Path(video_filepath).suffix == ".mp4": assert not mock_convert_video_to_mp4.called - assert mp4_filepath == str(Path(video_filepath).with_suffix('.mp4')) + assert mp4_filepath == str(Path(video_filepath).with_suffix(".mp4")) assert session_video_hosted_url == expected_hosted_video_url From 6b31f4b2921de98cd41e2cf53d3d347d68daec85 Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Thu, 29 Dec 2022 20:53:59 -0800 Subject: [PATCH 19/25] Flag for adding source suffix in resource copy --- cdp_backend/pipeline/event_gather_pipeline.py | 7 +++++-- cdp_backend/utils/file_utils.py | 7 ++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index 5ba5b0d8..559d3136 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -131,7 +131,9 @@ def create_event_gather_flow( # Download video to local copy making # copy unique in case of shared session video resource_copy_filepath = resource_copy_task( - uri=session.video_uri, dst=f"{str(uuid4())}_temp.mp4" + uri=session.video_uri, + dst=f"{str(uuid4())}_temp", + copy_suffix=True, ) # Handle video conversion or non-secure resource @@ -233,7 +235,7 @@ def create_event_gather_flow( @task(max_retries=3, retry_delay=timedelta(seconds=120)) -def resource_copy_task(uri: str, dst: str = None) -> str: +def resource_copy_task(uri: str, dst: str = None, copy_suffix: bool = None) -> str: """ Copy a file to a temporary location for processing. @@ -255,6 +257,7 @@ def resource_copy_task(uri: str, dst: str = None) -> str: return file_utils.resource_copy( uri=uri, dst=dst, + copy_suffix=copy_suffix, overwrite=True, ) diff --git a/cdp_backend/utils/file_utils.py b/cdp_backend/utils/file_utils.py index d020185b..72d48657 100644 --- a/cdp_backend/utils/file_utils.py +++ b/cdp_backend/utils/file_utils.py @@ -145,6 +145,7 @@ def get_media_type(uri: str) -> Optional[str]: def resource_copy( uri: str, dst: Optional[Union[str, Path]] = None, + copy_suffix: Optional[bool] = False, overwrite: bool = False, ) -> str: """ @@ -166,6 +167,7 @@ def resource_copy( saved_path: str The path of where the resource ended up getting copied to. """ + uri_suffix = Path(uri.split("/")[-1].split("?")[0].split("#")[0]).suffix if dst is None: dst = uri.split("/")[-1] @@ -179,10 +181,13 @@ def resource_copy( # Split by the last "/" dst = dst / uri.split("/")[-1] + if copy_suffix: + dst = dst.with_suffix(uri_suffix) + # Ensure filename is less than 255 chars # Otherwise this can raise an OSError for too long of a filename if len(dst.name) > 255: - dst = Path(str(dst)[:255]) + dst = with_stem(dst, dst.stem[: (255 - len(dst.suffix))]) # Ensure dest isn't a file if dst.is_file() and not overwrite: From 21fe9c525b6aa81fa72731f54c9ec68c04eb7dac Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Sat, 31 Dec 2022 00:35:18 -0800 Subject: [PATCH 20/25] Log file status after copy for debugging --- cdp_backend/utils/file_utils.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cdp_backend/utils/file_utils.py b/cdp_backend/utils/file_utils.py index 72d48657..cb4dbd7d 100644 --- a/cdp_backend/utils/file_utils.py +++ b/cdp_backend/utils/file_utils.py @@ -229,6 +229,7 @@ def resource_copy( # It was added because it's very common for SSL certs to be bad # See: https://github.com/CouncilDataProject/cdp-scrapers/pull/85 # And: https://github.com/CouncilDataProject/seattle/runs/5957646032 + with open(dst, "wb") as open_dst: open_dst.write( requests.get( @@ -237,6 +238,10 @@ def resource_copy( timeout=1800, ).content ) + log.info(f"File complete: {str(dst)}") + log.info(f"URI complete: {str(uri)}") + log.info(f"File exists: {dst.exists()}") + log.info(f"File stats: {dst.stat()}") else: # TODO: Add explicit use of GCS credentials until public read is fixed From 3746b2a455ea45b14654e464ba112af913e1312f Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Sat, 31 Dec 2022 01:42:05 -0800 Subject: [PATCH 21/25] Logging to debug file copy --- cdp_backend/pipeline/event_gather_pipeline.py | 4 ++++ cdp_backend/utils/file_utils.py | 8 ++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index 559d3136..04dbfc63 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -334,6 +334,10 @@ def convert_video_and_handle_host( trim_video = bool(session.video_start_time or session.video_end_time) + log.info(f"File to trim: {str(video_filepath)}") + log.info(f"File exists: {video_filepath.exists()}") + log.info(f"File stats: {video_filepath.stat()}") + # Convert to mp4 if file isn't of approved web format cdp_will_host = False if ext not in [".mp4", ".webm"]: diff --git a/cdp_backend/utils/file_utils.py b/cdp_backend/utils/file_utils.py index cb4dbd7d..cecf4dcf 100644 --- a/cdp_backend/utils/file_utils.py +++ b/cdp_backend/utils/file_utils.py @@ -238,10 +238,6 @@ def resource_copy( timeout=1800, ).content ) - log.info(f"File complete: {str(dst)}") - log.info(f"URI complete: {str(uri)}") - log.info(f"File exists: {dst.exists()}") - log.info(f"File stats: {dst.stat()}") else: # TODO: Add explicit use of GCS credentials until public read is fixed @@ -719,6 +715,10 @@ def clip_and_reformat_video( output_path = output_path or rename_append_to_stem(video_filepath, "_clipped") + log.info(f"File to trim: {str(video_filepath)}") + log.info(f"File exists: {video_filepath.exists()}") + log.info(f"File stats: {video_filepath.stat()}") + try: ffmpeg_stdout, ffmpeg_stderr = ( ffmpeg.input( From 606a6fcffc4c8b3a556ea5fedf1089af34842bae Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Sat, 31 Dec 2022 01:56:45 -0800 Subject: [PATCH 22/25] Logging to debug file copy --- cdp_backend/pipeline/event_gather_pipeline.py | 7 ++++--- cdp_backend/utils/file_utils.py | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index 04dbfc63..042d441c 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -334,9 +334,10 @@ def convert_video_and_handle_host( trim_video = bool(session.video_start_time or session.video_end_time) - log.info(f"File to trim: {str(video_filepath)}") - log.info(f"File exists: {video_filepath.exists()}") - log.info(f"File stats: {video_filepath.stat()}") + log.info(f"File to trim: {video_filepath}") + log.info(f"File exists: {Path(video_filepath).exists()}") + if Path(video_filepath).exists(): + log.info(f"File stats: {Path(video_filepath).stat()}") # Convert to mp4 if file isn't of approved web format cdp_will_host = False diff --git a/cdp_backend/utils/file_utils.py b/cdp_backend/utils/file_utils.py index cecf4dcf..042a7cbb 100644 --- a/cdp_backend/utils/file_utils.py +++ b/cdp_backend/utils/file_utils.py @@ -717,7 +717,8 @@ def clip_and_reformat_video( log.info(f"File to trim: {str(video_filepath)}") log.info(f"File exists: {video_filepath.exists()}") - log.info(f"File stats: {video_filepath.stat()}") + if video_filepath.exists(): + log.info(f"File stats: {video_filepath.stat()}") try: ffmpeg_stdout, ffmpeg_stderr = ( From 01595c3d6653649c4fab376ac686cfd5ada91f9a Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Sat, 31 Dec 2022 02:41:54 -0800 Subject: [PATCH 23/25] Logging to debug file copy --- cdp_backend/pipeline/event_gather_pipeline.py | 9 ++------- cdp_backend/utils/file_utils.py | 5 +++++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index 042d441c..c50be448 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -334,11 +334,6 @@ def convert_video_and_handle_host( trim_video = bool(session.video_start_time or session.video_end_time) - log.info(f"File to trim: {video_filepath}") - log.info(f"File exists: {Path(video_filepath).exists()}") - if Path(video_filepath).exists(): - log.info(f"File stats: {Path(video_filepath).stat()}") - # Convert to mp4 if file isn't of approved web format cdp_will_host = False if ext not in [".mp4", ".webm"]: @@ -351,7 +346,7 @@ def convert_video_and_handle_host( end_time=session.video_end_time, ) - fs_functions.remove_local_file(video_filepath) + # fs_functions.remove_local_file(video_filepath) # Update variable name for easier downstream typing video_filepath = str(mp4_filepath) @@ -368,7 +363,7 @@ def convert_video_and_handle_host( end_time=session.video_end_time, ) - fs_functions.remove_local_file(video_filepath) + # fs_functions.remove_local_file(video_filepath) # Update variable name for easier downstream typing video_filepath = str(trimmed_filepath) diff --git a/cdp_backend/utils/file_utils.py b/cdp_backend/utils/file_utils.py index 042a7cbb..c2c95f97 100644 --- a/cdp_backend/utils/file_utils.py +++ b/cdp_backend/utils/file_utils.py @@ -609,6 +609,11 @@ def convert_video_to_mp4( The filepath of the converted MP4 video. """ + log.info(f"File to trim: {str(video_filepath)}") + log.info(f"File exists: {video_filepath.exists()}") + if video_filepath.exists(): + log.info(f"File stats: {video_filepath.stat()}") + output_path = output_path or video_filepath.with_suffix(".mp4") output_path = clip_and_reformat_video( video_filepath=video_filepath, From ce9cf5e2f991f5d016581b956742fc695bd620d7 Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Sat, 31 Dec 2022 15:05:01 -0800 Subject: [PATCH 24/25] Logging to debug file copy --- cdp_backend/pipeline/event_gather_pipeline.py | 4 ++-- cdp_backend/utils/file_utils.py | 18 ++++++++++-------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/cdp_backend/pipeline/event_gather_pipeline.py b/cdp_backend/pipeline/event_gather_pipeline.py index c50be448..559d3136 100644 --- a/cdp_backend/pipeline/event_gather_pipeline.py +++ b/cdp_backend/pipeline/event_gather_pipeline.py @@ -346,7 +346,7 @@ def convert_video_and_handle_host( end_time=session.video_end_time, ) - # fs_functions.remove_local_file(video_filepath) + fs_functions.remove_local_file(video_filepath) # Update variable name for easier downstream typing video_filepath = str(mp4_filepath) @@ -363,7 +363,7 @@ def convert_video_and_handle_host( end_time=session.video_end_time, ) - # fs_functions.remove_local_file(video_filepath) + fs_functions.remove_local_file(video_filepath) # Update variable name for easier downstream typing video_filepath = str(trimmed_filepath) diff --git a/cdp_backend/utils/file_utils.py b/cdp_backend/utils/file_utils.py index c2c95f97..30c7086f 100644 --- a/cdp_backend/utils/file_utils.py +++ b/cdp_backend/utils/file_utils.py @@ -609,11 +609,6 @@ def convert_video_to_mp4( The filepath of the converted MP4 video. """ - log.info(f"File to trim: {str(video_filepath)}") - log.info(f"File exists: {video_filepath.exists()}") - if video_filepath.exists(): - log.info(f"File stats: {video_filepath.stat()}") - output_path = output_path or video_filepath.with_suffix(".mp4") output_path = clip_and_reformat_video( video_filepath=video_filepath, @@ -720,10 +715,17 @@ def clip_and_reformat_video( output_path = output_path or rename_append_to_stem(video_filepath, "_clipped") - log.info(f"File to trim: {str(video_filepath)}") - log.info(f"File exists: {video_filepath.exists()}") + log.info(f"Input video to trim: {str(video_filepath)}") + log.info(f"Input video exists: {video_filepath.exists()}") + if video_filepath.exists(): + log.info(f"Input video stats: {video_filepath.stat()}") + log.info(f"Output video: {str(output_path)}") + log.info(f"Output video exists: {output_path.exists()}") if video_filepath.exists(): - log.info(f"File stats: {video_filepath.stat()}") + log.info(f"Output video stats: {output_path.stat()}") + log.info("Directory contents:") + for file in video_filepath.parent.iterdir(): + log.info(str(file)) try: ffmpeg_stdout, ffmpeg_stderr = ( From 4cdb0abf453cd2d9a541505d32ef8a46859c993c Mon Sep 17 00:00:00 2001 From: Chris Khan Date: Sat, 31 Dec 2022 15:19:21 -0800 Subject: [PATCH 25/25] Remove rename --- cdp_backend/utils/file_utils.py | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/cdp_backend/utils/file_utils.py b/cdp_backend/utils/file_utils.py index 30c7086f..269b0c06 100644 --- a/cdp_backend/utils/file_utils.py +++ b/cdp_backend/utils/file_utils.py @@ -713,19 +713,7 @@ def clip_and_reformat_video( """ import ffmpeg - output_path = output_path or rename_append_to_stem(video_filepath, "_clipped") - - log.info(f"Input video to trim: {str(video_filepath)}") - log.info(f"Input video exists: {video_filepath.exists()}") - if video_filepath.exists(): - log.info(f"Input video stats: {video_filepath.stat()}") - log.info(f"Output video: {str(output_path)}") - log.info(f"Output video exists: {output_path.exists()}") - if video_filepath.exists(): - log.info(f"Output video stats: {output_path.stat()}") - log.info("Directory contents:") - for file in video_filepath.parent.iterdir(): - log.info(str(file)) + output_path = output_path or append_to_stem(video_filepath, "_clipped") try: ffmpeg_stdout, ffmpeg_stderr = (