diff --git a/crates/bili_sync/src/adapter/collection.rs b/crates/bili_sync/src/adapter/collection.rs index 27adf8c..4cbf7d6 100644 --- a/crates/bili_sync/src/adapter/collection.rs +++ b/crates/bili_sync/src/adapter/collection.rs @@ -57,7 +57,13 @@ impl VideoListModel for collection::Model { videos_info: &[VideoInfo], connection: &DatabaseConnection, ) -> Result> { - helper::video_keys(videos_info, [video::Column::Bvid, video::Column::Pubtime], connection).await + helper::video_keys( + video::Column::CollectionId.eq(self.id), + videos_info, + [video::Column::Bvid, video::Column::Pubtime], + connection, + ) + .await } fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { diff --git a/crates/bili_sync/src/adapter/favorite.rs b/crates/bili_sync/src/adapter/favorite.rs index 18281a6..5166c91 100644 --- a/crates/bili_sync/src/adapter/favorite.rs +++ b/crates/bili_sync/src/adapter/favorite.rs @@ -57,7 +57,13 @@ impl VideoListModel for favorite::Model { videos_info: &[VideoInfo], connection: &DatabaseConnection, ) -> Result> { - helper::video_keys(videos_info, [video::Column::Bvid, video::Column::Favtime], connection).await + helper::video_keys( + video::Column::FavoriteId.eq(self.id), + videos_info, + [video::Column::Bvid, video::Column::Favtime], + connection, + ) + .await } fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { diff --git a/crates/bili_sync/src/adapter/helper/mod.rs b/crates/bili_sync/src/adapter/helper/mod.rs index 2c39e3a..178710e 100644 --- a/crates/bili_sync/src/adapter/helper/mod.rs +++ b/crates/bili_sync/src/adapter/helper/mod.rs @@ -5,7 +5,7 @@ use anyhow::Result; use bili_sync_entity::*; use filenamify::filenamify; use sea_orm::entity::prelude::*; -use sea_orm::sea_query::OnConflict; +use sea_orm::sea_query::{OnConflict, SimpleExpr}; use sea_orm::ActiveValue::Set; use sea_orm::{Condition, QuerySelect}; @@ -37,12 +37,17 @@ pub(super) async fn filter_videos_with_pages( /// 返回 videos_info 存在于视频表里那部分对应的 key pub(super) async fn video_keys( + expr: SimpleExpr, videos_info: &[VideoInfo], columns: [video::Column; 2], conn: &DatabaseConnection, ) -> Result> { Ok(video::Entity::find() - .filter(video::Column::Bvid.is_in(videos_info.iter().map(|v| v.bvid().to_string()))) + .filter( + video::Column::Bvid + .is_in(videos_info.iter().map(|v| v.bvid().to_string())) + .and(expr), + ) .select_only() .columns(columns) .into_tuple() diff --git a/crates/bili_sync/src/adapter/mod.rs b/crates/bili_sync/src/adapter/mod.rs index 42ef454..60db867 100644 --- a/crates/bili_sync/src/adapter/mod.rs +++ b/crates/bili_sync/src/adapter/mod.rs @@ -1,6 +1,7 @@ mod collection; mod favorite; mod helper; +mod submission; mod watch_later; use std::collections::HashSet; @@ -9,19 +10,21 @@ use std::pin::Pin; use anyhow::Result; use async_trait::async_trait; -use collection::collection_from; -use favorite::favorite_from; use futures::Stream; use sea_orm::entity::prelude::*; use sea_orm::DatabaseConnection; -use watch_later::watch_later_from; +use crate::adapter::collection::collection_from; +use crate::adapter::favorite::favorite_from; +use crate::adapter::submission::submission_from; +use crate::adapter::watch_later::watch_later_from; use crate::bilibili::{self, BiliClient, CollectionItem, VideoInfo}; pub enum Args<'a> { Favorite { fid: &'a str }, Collection { collection_item: &'a CollectionItem }, WatchLater, + Submission { upper_id: &'a str }, } pub async fn video_list_from<'a>( @@ -34,6 +37,7 @@ pub async fn video_list_from<'a>( Args::Favorite { fid } => favorite_from(fid, path, bili_client, connection).await, Args::Collection { collection_item } => collection_from(collection_item, path, bili_client, connection).await, Args::WatchLater => watch_later_from(path, bili_client, connection).await, + Args::Submission { upper_id } => submission_from(upper_id, path, bili_client, connection).await, } } diff --git a/crates/bili_sync/src/adapter/submission.rs b/crates/bili_sync/src/adapter/submission.rs new file mode 100644 index 0000000..9cc39eb --- /dev/null +++ b/crates/bili_sync/src/adapter/submission.rs @@ -0,0 +1,177 @@ +use std::collections::HashSet; +use std::path::Path; +use std::pin::Pin; + +use anyhow::Result; +use async_trait::async_trait; +use bili_sync_entity::*; +use futures::Stream; +use sea_orm::entity::prelude::*; +use sea_orm::sea_query::{IntoCondition, OnConflict}; +use sea_orm::ActiveValue::Set; +use sea_orm::{DatabaseConnection, TransactionTrait}; + +use crate::adapter::helper::video_with_path; +use crate::adapter::{helper, VideoListModel}; +use crate::bilibili::{self, BiliClient, Submission, VideoInfo}; +use crate::utils::status::Status; + +#[async_trait] +impl VideoListModel for submission::Model { + async fn video_count(&self, connection: &DatabaseConnection) -> Result { + helper::count_videos(video::Column::SubmissionId.eq(self.id).into_condition(), connection).await + } + + async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result> { + helper::filter_videos( + video::Column::SubmissionId + .eq(self.id) + .and(video::Column::Valid.eq(true)) + .and(video::Column::DownloadStatus.eq(0)) + .and(video::Column::Category.eq(2)) + .and(video::Column::SinglePage.is_null()) + .into_condition(), + connection, + ) + .await + } + + async fn unhandled_video_pages( + &self, + connection: &DatabaseConnection, + ) -> Result)>> { + helper::filter_videos_with_pages( + video::Column::SubmissionId + .eq(self.id) + .and(video::Column::Valid.eq(true)) + .and(video::Column::DownloadStatus.lt(Status::handled())) + .and(video::Column::Category.eq(2)) + .and(video::Column::SinglePage.is_not_null()) + .into_condition(), + connection, + ) + .await + } + + async fn exist_labels( + &self, + videos_info: &[VideoInfo], + connection: &DatabaseConnection, + ) -> Result> { + helper::video_keys( + video::Column::SubmissionId.eq(self.id), + videos_info, + [video::Column::Bvid, video::Column::Ctime], + connection, + ) + .await + } + + fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { + let mut video_model = video_info.to_model(base_model); + video_model.submission_id = Set(Some(self.id)); + video_with_path(video_model, &self.path, video_info) + } + + async fn fetch_videos_detail( + &self, + video: bilibili::Video<'_>, + video_model: video::Model, + connection: &DatabaseConnection, + ) -> Result<()> { + let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await; + match info { + Ok((tags, view_info)) => { + let VideoInfo::View { pages, .. } = &view_info else { + unreachable!("view_info must be VideoInfo::View") + }; + let txn = connection.begin().await?; + // 将分页信息写入数据库 + helper::create_video_pages(pages, &video_model, &txn).await?; + // 将页标记和 tag 写入数据库 + let mut video_active_model = self.video_model_by_info(&view_info, Some(video_model)); + video_active_model.single_page = Set(Some(pages.len() == 1)); + video_active_model.tags = Set(Some(serde_json::to_value(tags).unwrap())); + video_active_model.save(&txn).await?; + txn.commit().await?; + } + Err(e) => { + helper::error_fetch_video_detail(e, video_model, connection).await?; + } + }; + Ok(()) + } + + fn log_fetch_video_start(&self) { + info!( + "开始获取 UP 主 {} - {} 投稿的视频与分页信息...", + self.upper_id, self.upper_name + ); + } + + fn log_fetch_video_end(&self) { + info!("获取稍后再看的视频与分页信息完成"); + info!( + "获取 UP 主 {} - {} 投稿的视频与分页信息完成", + self.upper_id, self.upper_name + ); + } + + fn log_download_video_start(&self) { + info!( + "开始下载 UP 主 {} - {} 投稿的所有未处理过的视频...", + self.upper_id, self.upper_name + ); + } + + fn log_download_video_end(&self) { + info!( + "下载 UP 主 {} - {} 投稿的所有未处理过的视频完成", + self.upper_id, self.upper_name + ); + } + + fn log_refresh_video_start(&self) { + info!("开始扫描 UP 主 {} - {} 投稿的新视频...", self.upper_id, self.upper_name); + } + + fn log_refresh_video_end(&self, got_count: usize, new_count: u64) { + info!( + "扫描 UP 主 {} - {} 投稿的新视频完成,获取了 {} 条新视频,其中有 {} 条新视频", + self.upper_id, self.upper_name, got_count, new_count, + ); + } +} + +pub(super) async fn submission_from<'a>( + upper_id: &str, + path: &Path, + bili_client: &'a BiliClient, + connection: &DatabaseConnection, +) -> Result<(Box, Pin + 'a>>)> { + let submission = Submission::new(bili_client, upper_id.to_owned()); + let upper = submission.get_info().await?; + submission::Entity::insert(submission::ActiveModel { + upper_id: Set(upper.mid), + upper_name: Set(upper.name), + path: Set(path.to_string_lossy().to_string()), + ..Default::default() + }) + .on_conflict( + OnConflict::column(submission::Column::UpperId) + .update_columns([submission::Column::UpperName, submission::Column::Path]) + .to_owned(), + ) + .exec(connection) + .await?; + Ok(( + Box::new( + submission::Entity::find() + .filter(submission::Column::UpperId.eq(upper.mid)) + .one(connection) + .await? + .unwrap(), + ), + Box::pin(submission.into_video_stream()), + )) +} diff --git a/crates/bili_sync/src/adapter/watch_later.rs b/crates/bili_sync/src/adapter/watch_later.rs index 1189c52..b9eb084 100644 --- a/crates/bili_sync/src/adapter/watch_later.rs +++ b/crates/bili_sync/src/adapter/watch_later.rs @@ -58,7 +58,13 @@ impl VideoListModel for watch_later::Model { videos_info: &[VideoInfo], connection: &DatabaseConnection, ) -> Result> { - helper::video_keys(videos_info, [video::Column::Bvid, video::Column::Favtime], connection).await + helper::video_keys( + video::Column::WatchLaterId.eq(self.id), + videos_info, + [video::Column::Bvid, video::Column::Favtime], + connection, + ) + .await } fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { diff --git a/crates/bili_sync/src/bilibili/mod.rs b/crates/bili_sync/src/bilibili/mod.rs index 16c0138..f92d490 100644 --- a/crates/bili_sync/src/bilibili/mod.rs +++ b/crates/bili_sync/src/bilibili/mod.rs @@ -13,6 +13,7 @@ pub use error::BiliError; pub use favorite_list::FavoriteList; use favorite_list::Upper; use once_cell::sync::Lazy; +pub use submission::Submission; pub use video::{Dimension, PageInfo, Video}; pub use watch_later::WatchLater; @@ -23,6 +24,7 @@ mod credential; mod danmaku; mod error; mod favorite_list; +mod submission; mod video; mod watch_later; @@ -121,6 +123,16 @@ pub enum VideoInfo { #[serde(rename = "pubdate", with = "ts_seconds")] pubtime: DateTime, }, + Submission { + title: String, + bvid: String, + #[serde(rename = "description")] + intro: String, + #[serde(rename = "pic")] + cover: String, + #[serde(rename = "created", with = "ts_seconds")] + ctime: DateTime, + }, } #[cfg(test)] @@ -160,5 +172,9 @@ mod tests { let stream = watch_later.into_video_stream(); pin_mut!(stream); assert!(matches!(stream.next().await, Some(VideoInfo::WatchLater { .. }))); + let submission = Submission::new(&bili_client, "956761".to_string()); + let stream = submission.into_video_stream(); + pin_mut!(stream); + assert!(matches!(stream.next().await, Some(VideoInfo::Submission { .. }))); } } diff --git a/crates/bili_sync/src/bilibili/submission.rs b/crates/bili_sync/src/bilibili/submission.rs new file mode 100644 index 0000000..3a59e66 --- /dev/null +++ b/crates/bili_sync/src/bilibili/submission.rs @@ -0,0 +1,91 @@ +use anyhow::Result; +use arc_swap::access::Access; +use async_stream::stream; +use futures::Stream; +use reqwest::Method; +use serde_json::Value; + +use crate::bilibili::credential::encoded_query; +use crate::bilibili::favorite_list::Upper; +use crate::bilibili::{BiliClient, Validate, VideoInfo, MIXIN_KEY}; +pub struct Submission<'a> { + client: &'a BiliClient, + upper_id: String, +} + +impl<'a> Submission<'a> { + pub fn new(client: &'a BiliClient, upper_id: String) -> Self { + Self { client, upper_id } + } + + pub async fn get_info(&self) -> Result { + let mut res = self + .client + .request(Method::GET, "https://api.bilibili.com/x/web-interface/card") + .query(&[("mid", self.upper_id.as_str())]) + .send() + .await? + .error_for_status()? + .json::() + .await? + .validate()?; + Ok(serde_json::from_value(res["data"]["card"].take())?) + } + + async fn get_videos(&self, page: i32) -> Result { + self.client + .request(Method::GET, "https://api.bilibili.com/x/space/wbi/arc/search") + .query(&encoded_query( + vec![ + ("mid", self.upper_id.clone()), + ("order", "pubdate".to_string()), + ("order_avoided", "true".to_string()), + ("platform", "web".to_string()), + ("web_location", "1550101".to_string()), + ("pn", page.to_string()), + ("ps", "30".to_string()), + ], + MIXIN_KEY.load().as_ref().unwrap(), + )) + .send() + .await? + .error_for_status()? + .json::() + .await? + .validate() + } + + pub fn into_video_stream(self) -> impl Stream + 'a { + stream! { + let mut page = 1; + loop { + let mut videos = match self.get_videos(page).await { + Ok(v) => v, + Err(e) => { + error!("failed to get videos of upper {} page {}: {}", self.upper_id, page, e); + break; + }, + }; + if !videos["data"]["list"]["vlist"].is_array() { + warn!("no medias found in upper {} page {}", self.upper_id, page); + break; + } + let videos_info = match serde_json::from_value::>(videos["data"]["list"]["vlist"].take()) { + Ok(v) => v, + Err(e) => { + error!("failed to parse videos of upper {} page {}: {}", self.upper_id, page, e); + break; + }, + }; + for video_info in videos_info{ + yield video_info; + } + if videos["data"]["page"]["count"].is_i64() && videos["data"]["page"]["count"].as_i64().unwrap() > (page * 30) as i64 { + page += 1; + continue; + } + break; + } + } + } +} diff --git a/crates/bili_sync/src/config.rs b/crates/bili_sync/src/config.rs index 9529a88..4606015 100644 --- a/crates/bili_sync/src/config.rs +++ b/crates/bili_sync/src/config.rs @@ -69,6 +69,8 @@ pub struct Config { )] pub collection_list: HashMap, #[serde(default)] + pub submission_list: HashMap, + #[serde(default)] pub watch_later: WatchLaterConfig, pub video_name: Cow<'static, str>, pub page_name: Cow<'static, str>, @@ -123,6 +125,7 @@ impl Default for Config { danmaku_option: DanmakuOption::default(), favorite_list: HashMap::new(), collection_list: HashMap::new(), + submission_list: HashMap::new(), watch_later: Default::default(), video_name: Cow::Borrowed("{{title}}"), page_name: Cow::Borrowed("{{bvid}}"), diff --git a/crates/bili_sync/src/main.rs b/crates/bili_sync/src/main.rs index 551620f..bbb7359 100644 --- a/crates/bili_sync/src/main.rs +++ b/crates/bili_sync/src/main.rs @@ -71,6 +71,12 @@ async fn main() { } } info!("稍后再看处理完毕"); + for (upper_id, path) in &CONFIG.submission_list { + if let Err(e) = process_video_list(Args::Submission { upper_id }, &bili_client, path, &connection).await + { + error!("处理 UP 主 {upper_id} 投稿时遇到非预期的错误:{e}"); + } + } info!("本轮任务执行完毕,等待下一轮执行"); } time::sleep(time::Duration::from_secs(CONFIG.interval)).await; diff --git a/crates/bili_sync/src/utils/convert.rs b/crates/bili_sync/src/utils/convert.rs index b01782d..404fe2d 100644 --- a/crates/bili_sync/src/utils/convert.rs +++ b/crates/bili_sync/src/utils/convert.rs @@ -119,12 +119,28 @@ impl VideoInfo { upper_face: Set(upper.face.clone()), ..base_model }, + VideoInfo::Submission { + title, + bvid, + intro, + cover, + ctime, + } => bili_sync_entity::video::ActiveModel { + bvid: Set(bvid.clone()), + name: Set(title.clone()), + intro: Set(intro.clone()), + cover: Set(cover.clone()), + ctime: Set(ctime.naive_utc()), + category: Set(2), // 投稿视频的内容类型肯定是视频 + valid: Set(true), + ..base_model + }, } } pub fn to_fmt_args(&self) -> Option { match self { - VideoInfo::Simple { .. } => None, // 不能从简单的视频信息中构造格式化参数 + VideoInfo::Simple { .. } | VideoInfo::Submission { .. } => None, // 不能从简单视频信息中构造格式化参数 VideoInfo::Detail { title, bvid, @@ -171,9 +187,16 @@ impl VideoInfo { pub fn video_key(&self) -> String { match self { // 对于合集没有 fav_time,只能用 pubtime 代替 - VideoInfo::Simple { bvid, pubtime, .. } => id_time_key(bvid, pubtime), - VideoInfo::Detail { bvid, fav_time, .. } => id_time_key(bvid, fav_time), - VideoInfo::WatchLater { bvid, fav_time, .. } => id_time_key(bvid, fav_time), + VideoInfo::Simple { + bvid, pubtime: time, .. + } + | VideoInfo::Detail { + bvid, fav_time: time, .. + } + | VideoInfo::WatchLater { + bvid, fav_time: time, .. + } + | VideoInfo::Submission { bvid, ctime: time, .. } => id_time_key(bvid, time), // 详情接口返回的数据仅用于填充详情,不会被作为 video_key _ => unreachable!(), } @@ -181,9 +204,10 @@ impl VideoInfo { pub fn bvid(&self) -> &str { match self { - VideoInfo::Simple { bvid, .. } => bvid, - VideoInfo::Detail { bvid, .. } => bvid, - VideoInfo::WatchLater { bvid, .. } => bvid, + VideoInfo::Simple { bvid, .. } + | VideoInfo::Detail { bvid, .. } + | VideoInfo::WatchLater { bvid, .. } + | VideoInfo::Submission { bvid, .. } => bvid, // 同上 _ => unreachable!(), } diff --git a/crates/bili_sync_entity/src/entities/mod.rs b/crates/bili_sync_entity/src/entities/mod.rs index 908557e..4721356 100644 --- a/crates/bili_sync_entity/src/entities/mod.rs +++ b/crates/bili_sync_entity/src/entities/mod.rs @@ -5,5 +5,6 @@ pub mod prelude; pub mod collection; pub mod favorite; pub mod page; +pub mod submission; pub mod video; pub mod watch_later; diff --git a/crates/bili_sync_entity/src/entities/submission.rs b/crates/bili_sync_entity/src/entities/submission.rs new file mode 100644 index 0000000..f456f5a --- /dev/null +++ b/crates/bili_sync_entity/src/entities/submission.rs @@ -0,0 +1,19 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.15 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "submission")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub upper_id: i64, + pub upper_name: String, + pub path: String, + pub created_at: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/bili_sync_entity/src/entities/video.rs b/crates/bili_sync_entity/src/entities/video.rs index db74c60..cc40502 100644 --- a/crates/bili_sync_entity/src/entities/video.rs +++ b/crates/bili_sync_entity/src/entities/video.rs @@ -10,6 +10,7 @@ pub struct Model { pub collection_id: Option, pub favorite_id: Option, pub watch_later_id: Option, + pub submission_id: Option, pub upper_id: i64, pub upper_name: String, pub upper_face: String, diff --git a/crates/bili_sync_migration/src/lib.rs b/crates/bili_sync_migration/src/lib.rs index 6439a80..59e70ef 100644 --- a/crates/bili_sync_migration/src/lib.rs +++ b/crates/bili_sync_migration/src/lib.rs @@ -3,6 +3,7 @@ pub use sea_orm_migration::prelude::*; mod m20240322_000001_create_table; mod m20240505_130850_add_collection; mod m20240709_130914_watch_later; +mod m20240724_161008_submission; pub struct Migrator; @@ -13,6 +14,7 @@ impl MigratorTrait for Migrator { Box::new(m20240322_000001_create_table::Migration), Box::new(m20240505_130850_add_collection::Migration), Box::new(m20240709_130914_watch_later::Migration), + Box::new(m20240724_161008_submission::Migration), ] } } diff --git a/crates/bili_sync_migration/src/m20240724_161008_submission.rs b/crates/bili_sync_migration/src/m20240724_161008_submission.rs new file mode 100644 index 0000000..8ea92ee --- /dev/null +++ b/crates/bili_sync_migration/src/m20240724_161008_submission.rs @@ -0,0 +1,87 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let db = manager.get_connection(); + manager + .create_table( + Table::create() + .table(Submission::Table) + .if_not_exists() + .col( + ColumnDef::new(Submission::Id) + .unsigned() + .not_null() + .auto_increment() + .primary_key(), + ) + .col(ColumnDef::new(Submission::UpperId).unique_key().unsigned().not_null()) + .col(ColumnDef::new(Submission::UpperName).string().not_null()) + .col(ColumnDef::new(Submission::Path).string().not_null()) + .col( + ColumnDef::new(Submission::CreatedAt) + .timestamp() + .default(Expr::current_timestamp()) + .not_null(), + ) + .to_owned(), + ) + .await?; + manager + .drop_index(Index::drop().table(Video::Table).name("idx_video_unique").to_owned()) + .await?; + manager + .alter_table( + Table::alter() + .table(Video::Table) + .add_column(ColumnDef::new(Video::SubmissionId).unsigned().null()) + .to_owned(), + ) + .await?; + db.execute_unprepared("CREATE UNIQUE INDEX `idx_video_unique` ON `video` (ifnull(`collection_id`, -1), ifnull(`favorite_id`, -1), ifnull(`watch_later_id`, -1), ifnull(`submission_id`, -1), `bvid`)") + .await?; + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let db = manager.get_connection(); + manager + .drop_index(Index::drop().table(Video::Table).name("idx_video_unique").to_owned()) + .await?; + db.execute_unprepared("DELETE FROM video WHERE submission_id IS NOT NULL") + .await?; + manager + .alter_table( + Table::alter() + .table(Video::Table) + .drop_column(Video::SubmissionId) + .to_owned(), + ) + .await?; + db.execute_unprepared("CREATE UNIQUE INDEX `idx_video_unique` ON `video` (ifnull(`collection_id`, -1), ifnull(`favorite_id`, -1), ifnull(`watch_later_id`, -1), `bvid`)") + .await?; + manager + .drop_table(Table::drop().table(Submission::Table).to_owned()) + .await + } +} + +#[derive(DeriveIden)] +enum Submission { + Table, + Id, + UpperId, + UpperName, + Path, + CreatedAt, +} + +#[derive(DeriveIden)] +enum Video { + Table, + SubmissionId, +}