From 317ba7e3776b47cf643c5956d76c1853524589be Mon Sep 17 00:00:00 2001 From: Mithronn Date: Thu, 16 May 2024 01:36:43 +0300 Subject: [PATCH] 16.05.2024 * `ffmpeg` feature implementation improved --- Cargo.toml | 22 ++--- README.md | 5 +- src/info.rs | 2 - src/stream/streams/mod.rs | 174 ++++++++++++++++++++++++++++++++- src/stream/streams/non_live.rs | 117 +++++++++++----------- src/utils.rs | 33 +------ 6 files changed, 246 insertions(+), 107 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d4db5ba..a9bab39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rusty_ytdl" -version = "0.7.1" +version = "0.7.2" authors = ["Mithronn"] edition = "2021" description = "A Rust library for Youtube video searcher and downloader" @@ -22,38 +22,38 @@ members = [".", "cli"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -reqwest = { version = "0.12.3", features = [ +reqwest = { version = "0.12.4", features = [ "cookies", "gzip", ], default-features = false } scraper = "0.19.0" -serde = "1.0.197" -serde_json = "1.0.114" +serde = "1.0.202" +serde_json = "1.0.117" serde_qs = "0.13.0" regex = "1.10.3" url = "2.5.0" urlencoding = "2.1.3" -thiserror = "1.0.57" +thiserror = "1.0.60" derive_more = "0.99.17" derivative = "2.2.0" once_cell = "1.19.0" -tokio = { version = "1.36.0", default-features = false, features = ["sync"] } +tokio = { version = "1.37.0", default-features = false, features = ["sync"] } rand = "0.8.5" -reqwest-middleware = { version = "0.3.0", features = ["json"] } +reqwest-middleware = { version = "0.3.1", features = ["json"] } reqwest-retry = "0.5.0" m3u8-rs = "6.0.0" -async-trait = "0.1.77" +async-trait = "0.1.80" aes = "0.8.4" cbc = { version = "0.1.2", features = ["std"] } hex = "0.4.3" boa_engine = "0.17.3" mime = "0.3.17" -bytes = "1.5.0" +bytes = "1.6.0" flame = { version = "0.2.2", optional = true } flamer = { version = "0.5.0", optional = true } [dev-dependencies] -tokio = { version = "1.36.0", features = ["full"] } +tokio = { version = "1.37.0", features = ["full"] } [features] default = ["search", "live", "default-tls"] @@ -61,7 +61,7 @@ performance_analysis = ["flame", "flamer"] live = ["tokio/time", "tokio/process"] blocking = ["tokio/rt", "tokio/rt-multi-thread"] search = [] -ffmpeg = ["tokio/process"] +ffmpeg = ["tokio/process", "tokio/io-util"] default-tls = ["reqwest/default-tls"] native-tls = ["reqwest/native-tls"] rustls-tls = ["reqwest/rustls-tls"] diff --git a/README.md b/README.md index 5dd985f..98697cb 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,6 @@ Download videos **blazing-fast** without getting stuck on Youtube download speed ## Roadmap -- [x] ffmpeg feature - [ ] benchmarks ## Features @@ -28,7 +27,7 @@ Download videos **blazing-fast** without getting stuck on Youtube download speed - Search with query (Video, Playlist, Channel) - Blocking and asynchronous API - Proxy, IPv6, and cookie support on request -- Built-in FFmpeg audio and video filter apply support. [Example](examples/download_with_ffmpeg.rs) +- Built-in FFmpeg audio and video filter apply support (Non-live videos only) [Example](examples/download_with_ffmpeg.rs) - [CLI](https://crates.io/crates/rusty_ytdl-cli) # Usage @@ -159,5 +158,5 @@ Or add the following to your `Cargo.toml` file: ```toml [dependencies] -rusty_ytdl = "0.7.1" +rusty_ytdl = "0.7.2" ``` diff --git a/src/info.rs b/src/info.rs index 30c6e98..e6a08bf 100644 --- a/src/info.rs +++ b/src/info.rs @@ -1,5 +1,3 @@ -use std::{collections::HashMap, sync::Arc}; - use scraper::{Html, Selector}; use crate::constants::{BASE_URL, FORMATS}; diff --git a/src/stream/streams/mod.rs b/src/stream/streams/mod.rs index 5f8ce9e..66295aa 100644 --- a/src/stream/streams/mod.rs +++ b/src/stream/streams/mod.rs @@ -2,14 +2,33 @@ mod live; mod non_live; +use async_trait::async_trait; use bytes::Bytes; +#[cfg(feature = "ffmpeg")] +use bytes::BytesMut; + +#[cfg(feature = "ffmpeg")] +use std::{process::Stdio, sync::Arc}; + +#[cfg(feature = "ffmpeg")] +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + process::{Child, Command}, + sync::{ + mpsc::{channel, Receiver}, + Mutex, Notify, + }, + task::JoinHandle, +}; + #[cfg(feature = "live")] pub use live::{LiveStream, LiveStreamOptions}; pub use non_live::{NonLiveStream, NonLiveStreamOptions}; +#[cfg(feature = "ffmpeg")] +use crate::constants::DEFAULT_HEADERS; use crate::VideoError; -use async_trait::async_trait; #[async_trait] pub trait Stream { @@ -25,3 +44,156 @@ pub trait Stream { 0 } } + +#[cfg(feature = "ffmpeg")] +pub struct FFmpegStreamOptions { + pub client: reqwest_middleware::ClientWithMiddleware, + pub link: String, + pub content_length: u64, + pub dl_chunk_size: u64, + pub start: u64, + pub end: u64, + pub ffmpeg_args: Vec, +} + +#[cfg(feature = "ffmpeg")] +pub(crate) struct FFmpegStream { + pub refined_data_reciever: Option>>>, + download_notify: Arc, + ffmpeg_child: Child, + + tasks: Vec>>, +} + +#[cfg(feature = "ffmpeg")] +impl FFmpegStream { + pub fn new(options: FFmpegStreamOptions) -> Result { + let (tx, mut rx) = channel::(16384); + let (refined_tx, refined_rx) = channel::(16384); + + // Spawn FFmpeg process + let mut ffmpeg_child = Command::new("ffmpeg") + .args(&options.ffmpeg_args) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .map_err(|x| VideoError::FFmpeg(x.to_string()))?; + + let mut stdin = ffmpeg_child.stdin.take().unwrap(); + let mut stdout = ffmpeg_child.stdout.take().unwrap(); + + let read_stdout_task = tokio::spawn(async move { + let mut buffer = vec![0u8; 16384]; + while let Ok(line) = stdout.read(&mut buffer).await { + match line { + 0 => { + break; + } + n => { + if let Err(_err) = refined_tx.send(Bytes::from(buffer[..n].to_vec())).await + { + return Err(VideoError::FFmpeg("channel closed".to_string())); + // Error or channel closed + }; + } + } + } + + Ok(()) + }); + + let write_stdin_task = tokio::spawn(async move { + while let Some(data) = rx.recv().await { + if let Err(err) = stdin.write_all(&data).await { + return Err(VideoError::FFmpeg(err.to_string())); // Error or channel closed + } + } + Ok(()) + }); + + let download_notify = Arc::new(Notify::new()); + let download_notify_task = download_notify.clone(); + + let download_task = tokio::spawn(async move { + let mut end = options.end; + let mut start = options.start; + let content_length = options.content_length; + let client = options.client; + let link = options.link; + let dl_chunk_size = options.dl_chunk_size; + + download_notify_task.notified().await; + + loop { + // Nothing else remain send break to finish + if end == 0 { + break; + } + + if end >= content_length { + end = 0; + } + + let mut headers = DEFAULT_HEADERS.clone(); + + let range_end = if end == 0 { + "".to_string() + } else { + end.to_string() + }; + + headers.insert( + reqwest::header::RANGE, + format!("bytes={}-{}", start, range_end).parse().unwrap(), + ); + + let mut response = client + .get(&link) + .headers(headers) + .send() + .await + .map_err(VideoError::ReqwestMiddleware)?; + + let mut buf: BytesMut = BytesMut::new(); + + while let Some(chunk) = response.chunk().await.map_err(VideoError::Reqwest)? { + buf.extend(chunk); + } + + if end != 0 { + start = end + 1; + + end += dl_chunk_size; + } + + tx.send(buf.into()) + .await + .map_err(|x| VideoError::FFmpeg(x.to_string()))?; + } + + Ok(()) + }); + + Ok(Self { + refined_data_reciever: Some(Arc::new(Mutex::new(refined_rx))), + download_notify, + ffmpeg_child, + tasks: vec![download_task, write_stdin_task, read_stdout_task], + }) + } + + pub fn start_download(&self) { + self.download_notify.notify_one(); + } +} + +#[cfg(feature = "ffmpeg")] +impl Drop for FFmpegStream { + fn drop(&mut self) { + // kill tasks if they are still running + for handle in &self.tasks { + handle.abort(); + } + } +} diff --git a/src/stream/streams/non_live.rs b/src/stream/streams/non_live.rs index 3034f33..f35065d 100644 --- a/src/stream/streams/non_live.rs +++ b/src/stream/streams/non_live.rs @@ -1,5 +1,11 @@ use async_trait::async_trait; use bytes::{Bytes, BytesMut}; + +#[cfg(feature = "ffmpeg")] +use std::sync::Arc; + +#[cfg(feature = "ffmpeg")] +use tokio::sync::Mutex; use tokio::sync::RwLock; use crate::constants::DEFAULT_HEADERS; @@ -7,7 +13,10 @@ use crate::stream::streams::Stream; use crate::structs::VideoError; #[cfg(feature = "ffmpeg")] -use crate::{structs::FFmpegArgs, utils::ffmpeg_cmd_run}; +use crate::structs::FFmpegArgs; + +#[cfg(feature = "ffmpeg")] +use super::{FFmpegStream, FFmpegStreamOptions}; pub struct NonLiveStreamOptions { pub client: Option, @@ -34,10 +43,9 @@ pub struct NonLiveStream { #[cfg(feature = "ffmpeg")] ffmpeg_args: Vec, + #[cfg(feature = "ffmpeg")] - ffmpeg_start_byte: RwLock, - #[cfg(feature = "ffmpeg")] - ffmpeg_end_byte: RwLock, + ffmpeg_stream: Arc>>, } impl NonLiveStream { @@ -70,6 +78,20 @@ impl NonLiveStream { .map(|x| x.build()) .unwrap_or_default(); + let ffmpeg_stream = if !ffmpeg_args.is_empty() { + Arc::new(Mutex::new(Some(FFmpegStream::new(FFmpegStreamOptions { + client: client.clone(), + link: options.link.clone(), + content_length: options.content_length, + dl_chunk_size: options.dl_chunk_size, + start: options.start, + end: options.end, + ffmpeg_args: ffmpeg_args.clone(), + })?))) + } else { + Arc::new(Mutex::new(None)) + }; + Ok(Self { client, link: options.link, @@ -79,9 +101,8 @@ impl NonLiveStream { end: RwLock::new(options.end), start_static: options.start, end_static: options.end, - ffmpeg_args: ffmpeg_args.clone(), - ffmpeg_end_byte: RwLock::new(0), - ffmpeg_start_byte: RwLock::new(Bytes::new()), + ffmpeg_args, + ffmpeg_stream, }) } @@ -111,21 +132,42 @@ impl NonLiveStream { async fn start_index(&self) -> u64 { *self.start.read().await } - - #[cfg(feature = "ffmpeg")] - async fn ffmpeg_end_byte_index(&self) -> usize { - *self.ffmpeg_end_byte.read().await - } - - #[cfg(feature = "ffmpeg")] - async fn ffmpeg_start_byte_index(&self) -> Bytes { - self.ffmpeg_start_byte.read().await.to_vec().into() - } } #[async_trait] impl Stream for NonLiveStream { async fn chunk(&self) -> Result, VideoError> { + #[cfg(feature = "ffmpeg")] + { + if !self.ffmpeg_args.is_empty() { + if let Some(ffmpeg_stream) = &mut *self.ffmpeg_stream.lock().await { + // notify to start download task + ffmpeg_stream.start_download(); + + if let Some(reciever) = ffmpeg_stream.refined_data_reciever.clone() { + let mut reciever = reciever.lock().await; + + let byte_value = reciever.recv().await; + + // reset ffmpeg_stream for reuse + if byte_value.is_none() { + *ffmpeg_stream = FFmpegStream::new(FFmpegStreamOptions { + client: self.client.clone(), + link: self.link.clone(), + content_length: self.content_length, + dl_chunk_size: self.dl_chunk_size, + start: self.start_static, + end: self.end_static, + ffmpeg_args: self.ffmpeg_args.clone(), + })?; + } + + return Ok(byte_value); + } + } + } + } + let end = self.end_index().await; // Nothing else remain set controllers to the beginning state and send None to finish @@ -135,14 +177,6 @@ impl Stream for NonLiveStream { *end = self.end_static; *start = self.start_static; - #[cfg(feature = "ffmpeg")] - { - let mut ffmpeg_end_byte = self.ffmpeg_end_byte.write().await; - let mut ffmpeg_start_byte = self.ffmpeg_start_byte.write().await; - *ffmpeg_end_byte = 0; - *ffmpeg_start_byte = Bytes::new(); - } - // Send None to close return Ok(None); } @@ -182,39 +216,6 @@ impl Stream for NonLiveStream { buf.extend(chunk); } - #[cfg(feature = "ffmpeg")] - { - if !self.ffmpeg_args.is_empty() { - let ffmpeg_start_byte_index = self.ffmpeg_start_byte_index().await; - - let cmd_output = ffmpeg_cmd_run( - &self.ffmpeg_args, - Bytes::from( - [ - BytesMut::from_iter(ffmpeg_start_byte_index.clone()), - buf.clone(), - ] - .concat(), - ), - ) - .await?; - - let end_index = self.ffmpeg_end_byte_index().await; - - let mut first_buffer_trim = if cmd_output.is_empty() { 0 } else { 1 }; - if ffmpeg_start_byte_index.is_empty() { - let mut start_byte = self.ffmpeg_start_byte.write().await; - *start_byte = buf.into(); - let mut end_byte = self.ffmpeg_end_byte.write().await; - *end_byte = cmd_output.len(); - - first_buffer_trim = 0; - } - - buf = BytesMut::from(&cmd_output[(end_index + first_buffer_trim)..]); - } - } - if end != 0 { let mut start = self.start.write().await; *start = end + 1; diff --git a/src/utils.rs b/src/utils.rs index 9f8fef6..3dba7bc 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,13 +1,9 @@ -use boa_engine::{optimizer::OptimizerOptions, Context, JsValue, Source}; -use bytes::Bytes; +use boa_engine::{Context, JsValue, Source}; use once_cell::sync::Lazy; use rand::Rng; use regex::Regex; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::process::Stdio; -use std::sync::Mutex; -use std::time::Instant; use tokio::sync::RwLock; use urlencoding::decode; @@ -21,33 +17,6 @@ use crate::structs::{ VideoQuality, VideoSearchOptions, }; -#[cfg(feature = "ffmpeg")] -pub async fn ffmpeg_cmd_run(args: &Vec, data: Bytes) -> Result { - use tokio::{io::AsyncWriteExt, process::Command}; - - let mut cmd = Command::new("ffmpeg"); - cmd.args(args) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .kill_on_drop(true); - - let mut process = cmd.spawn().map_err(|x| VideoError::FFmpeg(x.to_string()))?; - let mut stdin = process - .stdin - .take() - .ok_or(VideoError::FFmpeg("Failed to open stdin".to_string()))?; - - tokio::spawn(async move { stdin.write_all(&data).await }); - - let output = process - .wait_with_output() - .await - .map_err(|x| VideoError::FFmpeg(x.to_string()))?; - println!("{}", output.stdout.len()); - - Ok(Bytes::from(output.stdout)) -} - #[cfg_attr(feature = "performance_analysis", flamer::flame)] pub fn get_html5player(body: &str) -> Option { // Lazy gain ~0.2ms per req on Ryzen 9 7950XT (probably a lot more on slower CPUs)