Skip to content

Commit

Permalink
Queue video processing job on upload (#15)
Browse files Browse the repository at this point in the history
* Change queue to a lib

* Add forge=debug to default tracer

* Change video ids to use nanoid

* Add configurable ffmpeg location and uploads dir

* Add posting a queue job after video upload

* Add videos directory to gitignore

* Fix start-queue to use forge package
  • Loading branch information
sneakycrow authored Nov 9, 2024
1 parent 826fc9b commit 033a3fb
Show file tree
Hide file tree
Showing 20 changed files with 109 additions and 56 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ target/
.env*
!.env.example
uploads/
videos/
16 changes: 14 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
members = [
"packages/db",
"packages/vod",
"packages/queue",
"services/forge-queue",
"services/silo-api",
]
1 change: 1 addition & 0 deletions config/api.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ COPY Cargo.toml Cargo.lock ./
COPY packages/db ./packages/db
COPY packages/vod ./packages/vod
COPY services/forge-queue ./services/forge-queue
COPY packages/queue ./packages/queue
COPY services/silo-api ./services/silo-api

# Build the project for release
Expand Down
1 change: 1 addition & 0 deletions config/queue.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ COPY Cargo.toml Cargo.lock ./
# Copy all workspace members
COPY packages/db ./packages/db
COPY packages/vod ./packages/vod
COPY packages/queue ./packages/queue
COPY services/forge-queue ./services/forge-queue
COPY services/silo-api ./services/silo-api

Expand Down
4 changes: 2 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ start-web:
yarn start

start-queue:
cargo run -p queue
cargo run -p forge

# Development commands
dev-api: start-api
Expand Down Expand Up @@ -47,7 +47,7 @@ build-api:
cargo build -p api --release

build-queue:
cargo build -p queue --release
cargo build -p forge --release

# Docker build commands - Queue Service (forge-queue)
biq: build-image-queue # Shorthand for building queue image
Expand Down
1 change: 1 addition & 0 deletions packages/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ sqlx = { version = "0.7", features = ["runtime-tokio", "postgres", "chrono"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.6", features = ["serde"] }
nanoid = "0.4"
6 changes: 3 additions & 3 deletions packages/db/migrations/20241101164200_add_videos.up.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
CREATE TYPE processing_status AS ENUM ('pending', 'processing', 'completed', 'failed');

CREATE TABLE videos (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
id TEXT PRIMARY KEY,
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
title VARCHAR(100) NOT NULL,
raw_video_path VARCHAR(255) NOT NULL,
processed_video_path VARCHAR(255),
processing_status processing_status NOT NULL DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX videos_user_id_idx ON videos(user_id);
27 changes: 9 additions & 18 deletions packages/db/src/videos.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use nanoid::nanoid;
use serde::{Deserialize, Serialize};
use sqlx::{postgres::PgRow, prelude::FromRow, PgPool, Row};
use sqlx::{prelude::FromRow, PgPool};
use uuid::Uuid;

#[derive(Debug, Serialize, Deserialize, FromRow)]
pub struct Video {
pub id: Uuid,
pub id: String,
pub user_id: Uuid,
pub title: String,
pub raw_video_path: String,
Expand All @@ -30,30 +31,20 @@ impl Video {
title: String,
raw_video_path: String,
) -> Result<Self, sqlx::Error> {
let row = sqlx::query(
let video_id = nanoid!(10);
sqlx::query_as::<_, Video>(
r#"
INSERT INTO videos (user_id, title, raw_video_path, processing_status)
VALUES ($1, $2, $3, 'pending')
INSERT INTO videos (id, user_id, title, raw_video_path, processing_status)
VALUES ($1, $2, $3, $4, 'pending')
RETURNING id, user_id, title, raw_video_path, processed_video_path,
processing_status, created_at, updated_at
"#,
)
.bind(video_id)
.bind(user_id)
.bind(title)
.bind(raw_video_path)
.map(|row: PgRow| Video {
id: row.get("id"),
user_id: row.get("user_id"),
title: row.get("title"),
raw_video_path: row.get("raw_video_path"),
processed_video_path: row.get("processed_video_path"),
processing_status: row.get("processing_status"),
created_at: row.get("created_at"),
updated_at: row.get("updated_at"),
})
.fetch_one(pool)
.await?;

Ok(row)
.await
}
}
24 changes: 24 additions & 0 deletions packages/queue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "queue"
version = "0.1.0"
edition = "2021"

[dependencies]
async-trait = "0.1"
chrono = "0.4"
db = { path = "../../packages/db" }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
thiserror = "1"
sqlx = { version = "0.7", features = [
"runtime-tokio-rustls",
"postgres",
"chrono",
"uuid",
"json",
] }
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
ulid = { version = "1", features = ["uuid"] }
uuid = { version = "1", features = ["serde", "v4"] }
vod = { path = "../../packages/vod" }
File renamed without changes.
File renamed without changes.
6 changes: 6 additions & 0 deletions packages/queue/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pub mod error;
pub mod job;
pub mod queue;
pub mod runner;

pub use queue::{Job, PostgresQueue, Queue};
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub async fn run_worker(queue: Arc<dyn Queue>, concurrency: usize, db_conn: &Poo
async fn handle_job(job: Job, db: &Pool<Postgres>) -> Result<(), Error> {
match job.message {
Message::ProcessRawVideoIntoStream { video_id } => {
tracing::info!("Start video processing for video_id {video_id}");
// Update video status to Processing
sqlx::query(
"UPDATE videos SET processing_status = 'processing', updated_at = NOW() WHERE id = $1"
Expand All @@ -66,12 +67,12 @@ async fn handle_job(job: Job, db: &Pool<Postgres>) -> Result<(), Error> {
.await?;

// Create output directory
let output_dir = PathBuf::from("processed_videos").join(&video_id.to_string());

let output_dir = PathBuf::from(get_videos_dir()).join(&video_id.to_string());
let ffmpeg_location = get_ffmpeg_location();
// Initialize HLS converter
let converter = HLSConverter::new(
"/usr/bin/ffmpeg",
&output_dir
ffmpeg_location.as_str(),
output_dir
.to_str()
.expect("Could not convert output dir to path string"),
)
Expand Down Expand Up @@ -125,3 +126,13 @@ async fn handle_job(job: Job, db: &Pool<Postgres>) -> Result<(), Error> {

Ok(())
}

/// Get the path to ffmpeg
fn get_ffmpeg_location() -> String {
std::env::var("FFMPEG_LOCATION").unwrap_or_else(|_| "/usr/bin/ffmpeg".to_string())
}

/// Get the directory for where to store videos
fn get_videos_dir() -> String {
std::env::var("VIDEOS_DIR").unwrap_or_else(|_| "videos".to_string())
}
18 changes: 2 additions & 16 deletions services/forge-queue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,26 +1,12 @@
[package]
name = "queue"
name = "forge"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1"
async-trait = "0.1"
chrono = "0.4"
db = { path = "../../packages/db" }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
thiserror = "1"
sqlx = { version = "0.7", features = [
"runtime-tokio-rustls",
"postgres",
"chrono",
"uuid",
"json",
] }
queue = { path = "../../packages/queue" }
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
ulid = { version = "1", features = ["uuid"] }
uuid = { version = "1", features = ["serde", "v4"] }
vod = { path = "../../packages/vod" }
9 changes: 2 additions & 7 deletions services/forge-queue/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
use db::connect_to_database;
use queue::{PostgresQueue, Queue};
use queue::{runner, PostgresQueue, Queue};
use std::sync::Arc;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

mod error;
mod job;
mod queue;
mod runner;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Start the tracer
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "queue=debug,db=debug".into()),
.unwrap_or_else(|_| "forge=debug,queue=debug,db=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
Expand Down
1 change: 1 addition & 0 deletions services/silo-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ sha2 = "0.10"
jsonwebtoken = "8.1"
lazy_static = "1.4"
nanoid = "0.4.0"
queue = { path = "../../packages/queue" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sqlx = { version = "0.7", features = [
Expand Down
6 changes: 5 additions & 1 deletion services/silo-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use axum::{
Router,
};
use config::Config;
use queue::{PostgresQueue, Queue};
use routes::upload::UPLOAD_CHUNK_SIZE;
use sqlx::PgPool;
use std::sync::Arc;
Expand All @@ -26,6 +27,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
pub struct AppState {
db: PgPool,
config: Config,
queue: Arc<dyn Queue>,
}

#[tokio::main]
Expand All @@ -49,12 +51,14 @@ async fn main() {
let db = db::connect_to_database()
.await
.expect("Could not connect to database");
// Initialize the queue
let queue = Arc::new(PostgresQueue::new(db.clone()));
// Run migrations
let _mig = db::run_migrations(&db)
.await
.expect("Could not run database migrations");
// Store shared data as state between routes
let state = Arc::new(AppState { db, config });
let state = Arc::new(AppState { db, config, queue });
routes::upload::init_cleanup().await;
// Initialize our router with the shared state and required routes
let app = Router::new()
Expand Down
24 changes: 21 additions & 3 deletions services/silo-api/src/routes/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ pub async fn upload_video(
upload_state.chunks_received.load(Ordering::SeqCst) >= upload_state.total_chunks;

if is_complete {
tracing::debug!("Video upload complete, finalizing processing");
// Ensure all data is written and flushed
upload_state.file.flush().await.map_err(|e| {
tracing::error!("Error flushing file: {}", e);
Expand Down Expand Up @@ -339,15 +340,32 @@ pub async fn upload_video(
match Video::create(&state.db, user.id, video_title, final_path_str.clone()).await {
Ok(video) => {
tracing::debug!("Saved video metadata to database: {:?}", video);

// Create and push video processing job to queue
let process_video_message =
queue::queue::Message::ProcessRawVideoIntoStream {
video_id: video.id.to_string(),
};

if let Err(e) = state
.queue
.push(
process_video_message,
None, // Schedule for immediate processing
)
.await
{
tracing::error!("Failed to queue video processing job: {}", e);
return Err(format!("Failed to queue video processing: {}", e));
}

tracing::debug!("Successfully queued video processing job");
}
Err(e) => {
tracing::error!("Failed to save video metadata: {}", e);
// Consider whether to return an error here or just log it
return Err(format!("Failed to save video metadata: {}", e));
}
}

tracing::debug!("Successfully completed file upload and rename");
}
}

Expand Down

0 comments on commit 033a3fb

Please sign in to comment.