From dc4d0885e03ecaf76fa25af58afb3ff1cb036c8d Mon Sep 17 00:00:00 2001 From: YuliaProkopovych Date: Tue, 19 Mar 2024 10:23:53 +0200 Subject: [PATCH 1/8] fix link download --- module/move/unitore/Cargo.toml | 5 +- .../unitore/src/executor/actions/config.rs | 10 +- .../unitore/src/executor/actions/frames.rs | 3 +- .../unitore/src/executor/actions/table.rs | 53 ++++---- module/move/unitore/src/executor/mod.rs | 24 ++-- module/move/unitore/src/feed_config.rs | 2 +- module/move/unitore/src/retriever.rs | 6 +- module/move/unitore/src/storage/config.rs | 1 - module/move/unitore/src/storage/frame.rs | 116 ------------------ module/move/unitore/src/storage/mod.rs | 2 +- module/move/unitore/tests/frame.rs | 6 - module/move/unitore/tests/save_feed.rs | 6 +- .../move/unitore/tests/update_newer_feed.rs | 4 +- 13 files changed, 61 insertions(+), 177 deletions(-) diff --git a/module/move/unitore/Cargo.toml b/module/move/unitore/Cargo.toml index 9ed15eafbd..9c9b7771b4 100644 --- a/module/move/unitore/Cargo.toml +++ b/module/move/unitore/Cargo.toml @@ -14,8 +14,8 @@ Feed reader with the ability to set updates frequency. categories = [ "development-tools" ] keywords = [ "rss-feed", "atom-feed" ] -[lints] -workspace = true +# [lints] +# workspace = true [package.metadata.docs.rs] features = [ "full" ] @@ -39,6 +39,7 @@ http-body-util = "0.1" feed-rs = "1.4.0" toml = "0.8.10" serde = "1.0.196" +url = { version = "2.0", features = ["serde"] } humantime-serde = "1.1.1" gluesql = "0.15.0" async-trait = "0.1.41" diff --git a/module/move/unitore/src/executor/actions/config.rs b/module/move/unitore/src/executor/actions/config.rs index f0989f300a..d17b2440b3 100644 --- a/module/move/unitore/src/executor/actions/config.rs +++ b/module/move/unitore/src/executor/actions/config.rs @@ -4,7 +4,13 @@ use crate::*; use super::*; use error_tools::{ err, for_app::Context, BasicError, Result }; use executor::FeedManager; -use storage::{ FeedStorage, FeedStore, config::{ ConfigStore, Config } }; +use storage:: +{ + FeedStorage, + FeedStore, + config::{ ConfigStore, Config }, + model::FeedRow, +}; use gluesql::{ prelude::Payload, sled_storage::SledStorage }; /// Add configuration file with subscriptions to storage. @@ -28,7 +34,7 @@ pub async fn add_config( storage : FeedStorage< SledStorage >, args : &wca::Args let feeds = feed_config::read( config.path() )? .into_iter() - .map( | feed | crate::storage::model::FeedRow::new( feed.link, feed.update_period ) ) + .map( | feed | FeedRow::new( feed.link.to_string(), feed.update_period ) ) .collect::< Vec< _ > >() ; diff --git a/module/move/unitore/src/executor/actions/frames.rs b/module/move/unitore/src/executor/actions/frames.rs index 49faa79031..55bf2fe3fd 100644 --- a/module/move/unitore/src/executor/actions/frames.rs +++ b/module/move/unitore/src/executor/actions/frames.rs @@ -59,7 +59,8 @@ pub async fn download_frames if subscriptions.is_empty() { - return Err( err!( format!( + return Err( err!( format! + ( "Failed to download frames.\n Config files {} contain no feed subscriptions!", configs.join( ", " ) ) ) ) diff --git a/module/move/unitore/src/executor/actions/table.rs b/module/move/unitore/src/executor/actions/table.rs index 6eac1131cf..6650ae313f 100644 --- a/module/move/unitore/src/executor/actions/table.rs +++ b/module/move/unitore/src/executor/actions/table.rs @@ -153,42 +153,41 @@ impl std::fmt::Display for ColumnsReport write!( f, "Table name: {}", self.table_name )?; writeln!( f, "{}", self.table_description )?; - if !self.columns.is_empty() + if !self.columns.is_empty() + { + let mut rows = Vec::new(); + for ( label, desc ) in &self.columns { - let mut rows = Vec::new(); - for ( label, desc ) in &self.columns - { - rows.push - ( - vec! - [ - EMPTY_CELL.to_owned(), - label.clone(), - desc.clone(), - ] - ); - } - let table = table_display::table_with_headers + rows.push ( vec! [ EMPTY_CELL.to_owned(), - "label".to_owned(), - "description".to_owned(), - ], - rows, + label.clone(), + desc.clone(), + ] ); - - if let Some( table ) = table - { - writeln!( f, "{}", table )?; - } } - else + let table = table_display::table_with_headers + ( + vec! + [ + EMPTY_CELL.to_owned(), + "label".to_owned(), + "description".to_owned(), + ], + rows, + ); + + if let Some( table ) = table { - writeln!( f, "No columns" ); + writeln!( f, "{}", table )?; } - + } + else + { + writeln!( f, "No columns" )?; + } Ok( () ) } diff --git a/module/move/unitore/src/executor/mod.rs b/module/move/unitore/src/executor/mod.rs index 889b0560b8..41084ce9ba 100644 --- a/module/move/unitore/src/executor/mod.rs +++ b/module/move/unitore/src/executor/mod.rs @@ -19,12 +19,12 @@ use actions:: feeds::list_feeds, config::{ add_config, delete_config, list_configs }, query::execute_query, - table::{ list_columns, list_tables, FieldsReport }, + table::{ list_columns, list_tables }, }; use std::future::Future; -fn endpoint< 'a, F, Fut, R >( async_endpoint : F, args : &'a Args ) -> Result< R > +fn action< 'a, F, Fut, R >( async_endpoint : F, args : &'a Args ) -> Result< R > where F : FnOnce( FeedStorage< SledStorage >, &'a Args ) -> Fut, Fut : Future< Output = Result< R > >, @@ -59,7 +59,7 @@ pub fn execute() -> Result< (), Box< dyn std::error::Error + Send + Sync > > )) .routine( | args | { - match endpoint( download_frames, &args ) + match action( download_frames, &args ) { Ok( report ) => report.report(), Err( err ) => println!( "{:?}", err ), @@ -75,7 +75,7 @@ pub fn execute() -> Result< (), Box< dyn std::error::Error + Send + Sync > > )) .routine( | args | { - match endpoint( list_feeds, &args ) + match action( list_feeds, &args ) { Ok( report ) => report.report(), Err( err ) => println!( "{:?}", err ), @@ -91,7 +91,7 @@ pub fn execute() -> Result< (), Box< dyn std::error::Error + Send + Sync > > )) .routine( | args | { - match endpoint( list_frames, &args ) + match action( list_frames, &args ) { Ok( report ) => report.report(), Err( err ) => println!( "{:?}", err ), @@ -115,7 +115,7 @@ pub fn execute() -> Result< (), Box< dyn std::error::Error + Send + Sync > > .subject().hint( "Path" ).kind( Type::Path ).optional( false ).end() .routine( | args : Args | { - match endpoint( add_config, &args ) + match action( add_config, &args ) { Ok( report ) => report.report(), Err( err ) => println!( "{:?}", err ), @@ -132,7 +132,7 @@ pub fn execute() -> Result< (), Box< dyn std::error::Error + Send + Sync > > .subject().hint( "Path" ).kind( Type::Path ).optional( false ).end() .routine( | args : Args | { - match endpoint( delete_config, &args ) + match action( delete_config, &args ) { Ok( report ) => report.report(), Err( err ) => println!( "{:?}", err ), @@ -148,7 +148,7 @@ pub fn execute() -> Result< (), Box< dyn std::error::Error + Send + Sync > > )) .routine( | args | { - match endpoint( list_configs, &args ) + match action( list_configs, &args ) { Ok( report ) => report.report(), Err( err ) => println!( "{:?}", err ), @@ -164,7 +164,7 @@ pub fn execute() -> Result< (), Box< dyn std::error::Error + Send + Sync > > )) .routine( | args | { - match endpoint( list_tables, &args ) + match action( list_tables, &args ) { Ok( report ) => report.report(), Err( err ) => println!( "{:?}", err ), @@ -182,7 +182,7 @@ pub fn execute() -> Result< (), Box< dyn std::error::Error + Send + Sync > > .subject().hint( "Name" ).kind( wca::Type::String ).optional( false ).end() .routine( | args : Args | { - match endpoint( list_columns, &args ) + match action( list_columns, &args ) { Ok( report ) => report.report(), Err( err ) => println!( "{:?}", err ), @@ -205,7 +205,7 @@ pub fn execute() -> Result< (), Box< dyn std::error::Error + Send + Sync > > .subject().hint( "Query" ).kind( Type::List( Type::String.into(), ' ' ) ).optional( false ).end() .routine( | args : Args | { - match endpoint( execute_query, &args ) + match action( execute_query, &args ) { Ok( report ) => report.report(), Err( err ) => println!( "{:?}", err ), @@ -267,7 +267,7 @@ impl< C : FeedFetch, S : FeedStore + ConfigStore + FrameStore + TableStore + Sen for i in 0..subscriptions.len() { let feed = self.client.fetch( subscriptions[ i ].link.clone() ).await?; - feeds.push( ( feed, subscriptions[ i ].update_period.clone(), subscriptions[ i ].link.clone() ) ); + feeds.push( ( feed, subscriptions[ i ].update_period.clone(), subscriptions[ i ].link.to_string() ) ); } self.storage.process_feeds( feeds ).await } diff --git a/module/move/unitore/src/feed_config.rs b/module/move/unitore/src/feed_config.rs index 37e910136a..221288e94d 100644 --- a/module/move/unitore/src/feed_config.rs +++ b/module/move/unitore/src/feed_config.rs @@ -12,7 +12,7 @@ pub struct SubscriptionConfig #[serde(with = "humantime_serde")] pub update_period : std::time::Duration, /// Resource link. - pub link : String, + pub link : url::Url, } /// All subscriptions read from config file. diff --git a/module/move/unitore/src/retriever.rs b/module/move/unitore/src/retriever.rs index ad6270ff5b..98e6672934 100644 --- a/module/move/unitore/src/retriever.rs +++ b/module/move/unitore/src/retriever.rs @@ -16,7 +16,7 @@ use error_tools::{ Result, for_app::Context }; pub trait FeedFetch { /// Get feed from source specified by its link. - async fn fetch( &self, source : String ) -> Result< feed_rs::model::Feed >; + async fn fetch( &self, source : url::Url ) -> Result< feed_rs::model::Feed >; } /// Feed client for fetching feed. @@ -26,11 +26,11 @@ pub struct FeedClient; #[ async_trait::async_trait ] impl FeedFetch for FeedClient { - async fn fetch( &self, source : String ) -> Result< feed_rs::model::Feed > + async fn fetch( &self, source : url::Url ) -> Result< feed_rs::model::Feed > { let https = HttpsConnector::new(); let client = Client::builder( TokioExecutor::new() ).build::< _, Empty< Bytes > >( https ); - let link = source.parse().context( format!( "Failed to parse source link {}", source ) )?; + let link = source.to_string().parse().context( format!( "Failed to parse source link {}", source ) )?; let mut res = client .get( link ) .await diff --git a/module/move/unitore/src/storage/config.rs b/module/move/unitore/src/storage/config.rs index 8eb2b286b9..f5b812e475 100644 --- a/module/move/unitore/src/storage/config.rs +++ b/module/move/unitore/src/storage/config.rs @@ -1,6 +1,5 @@ //! Functionality for storing and retrieving config files. -use crate::*; use super::*; use error_tools::{ err, Result }; use gluesql:: diff --git a/module/move/unitore/src/storage/frame.rs b/module/move/unitore/src/storage/frame.rs index 8a31837b6e..102533c3a6 100644 --- a/module/move/unitore/src/storage/frame.rs +++ b/module/move/unitore/src/storage/frame.rs @@ -213,121 +213,6 @@ impl FrameStore for FeedStorage< SledStorage > #[ derive( Debug ) ] pub struct FrameRow( pub Vec< ExprNode< 'static > > ); -// /// Create row for QlueSQL storage from Feed Entry type. -// impl From< ( feed_rs::model::Entry, String ) > for FrameRow -// { -// fn from( entry : ( feed_rs::model::Entry, String ) ) -> Self -// { -// let feed_link = text( entry.1.clone() ); -// let entry = &entry.0; - -// let id = text( entry.id.clone() ); -// let title = entry.title -// .clone() -// .map( | title | text( title.content ) ) -// .unwrap_or( null() ) -// ; - -// let updated = entry.updated -// .map( | d | timestamp( d.to_rfc3339_opts( SecondsFormat::Millis, true ) ) ) -// .unwrap_or( null() ) -// ; - -// let authors = text -// ( -// entry.authors -// .iter() -// .map( | p | p.name.clone() ) -// .fold( String::new(), | acc, val | format!( "{}, {}", acc, val ) ) -// ) -// .to_owned(); - -// let content = entry.content -// .clone() -// .map( | c | -// text -// ( -// c.body.unwrap_or( c.src.map( | link | link.href ).unwrap_or_default() ) -// ) -// ) -// .unwrap_or( null() ) -// ; - -// let links = if entry.links.len() != 0 -// { -// text -// ( -// entry.links -// .clone() -// .iter() -// .map( | link | link.href.clone() ) -// .fold( String::new(), | acc, val | format!( "{} {}", acc, val ) ) -// ) -// } -// else -// { -// null() -// }; -// let summary = entry.summary.clone().map( | c | text( c.content ) ).unwrap_or( null() ); -// let categories = if entry.categories.len() != 0 -// { -// text -// ( -// entry.categories -// .clone() -// .iter() -// .map( | cat | cat.term.clone() ) -// .fold( String::new(), | acc, val | format!( "{} {}", acc, val ) ) -// ) -// } -// else -// { -// null() -// }; -// let published = entry.published -// .map( | d | timestamp( d.to_rfc3339_opts( SecondsFormat::Millis, true ) ) ) -// .unwrap_or( null() ) -// ; - -// let source = entry.source.clone().map( | s | text( s ) ).unwrap_or( null() ); -// let rights = entry.rights.clone().map( | r | text( r.content ) ).unwrap_or( null() ); -// let media = if entry.media.len() != 0 -// { -// text -// ( -// entry.media -// .clone() -// .iter() -// .map( | m | m.title.clone().map( | t | t.content ).unwrap_or_default() ) -// .fold( String::new(), | acc, val | format!( "{} {}", acc, val ) ) -// ) -// } -// else -// { -// null() -// }; -// let language = entry.language.clone().map( | l | text( l ) ).unwrap_or( null() ); - -// FrameRow( vec! -// [ -// id, -// title, -// updated, -// authors, -// content, -// links, -// summary, -// categories, -// published, -// source, -// rights, -// media, -// language, -// feed_link -// ] ) -// } -// } - impl From< Frame > for FrameRow { fn from( entry : Frame ) -> Self @@ -449,4 +334,3 @@ impl From< RowValue< '_ > > for String } } } - diff --git a/module/move/unitore/src/storage/mod.rs b/module/move/unitore/src/storage/mod.rs index 20e4200c16..0fca1946bb 100644 --- a/module/move/unitore/src/storage/mod.rs +++ b/module/move/unitore/src/storage/mod.rs @@ -208,7 +208,7 @@ impl FeedStore for FeedStorage< SledStorage > } ) .collect::< Vec< _ > >() .get( 0 ) - .unwrap_or( &feed.2 ) + .unwrap_or( &format!( "'{}'", feed.2 ) ) .clone() ) .join( "," ) diff --git a/module/move/unitore/tests/frame.rs b/module/move/unitore/tests/frame.rs index 02d07ad50d..248f40330b 100644 --- a/module/move/unitore/tests/frame.rs +++ b/module/move/unitore/tests/frame.rs @@ -1,10 +1,4 @@ -use std::path::PathBuf; use feed_rs::parser as feed_parser; -use gluesql::sled_storage::sled::Config; -use unitore::{ - executor::FeedManager, - storage::{ FeedStorage, FeedStore }, -}; use error_tools::Result; #[ tokio::test ] diff --git a/module/move/unitore/tests/save_feed.rs b/module/move/unitore/tests/save_feed.rs index a0a0042f08..4b51962b97 100644 --- a/module/move/unitore/tests/save_feed.rs +++ b/module/move/unitore/tests/save_feed.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use feed_rs::parser as feed_parser; use unitore:: { - executor::{ FeedManager, actions }, + executor::FeedManager, feed_config::SubscriptionConfig, retriever::FeedFetch, storage::{ FeedStorage, MockFeedStore, frame::FrameStore }, @@ -16,7 +16,7 @@ pub struct TestClient; #[ async_trait ] impl FeedFetch for TestClient { - async fn fetch( &self, _ : String ) -> Result< feed_rs::model::Feed > + async fn fetch( &self, _ : url::Url ) -> Result< feed_rs::model::Feed > { let feed = feed_parser::parse( include_str!( "./fixtures/plain_feed.xml" ).as_bytes() )?; @@ -53,7 +53,7 @@ async fn test_save_feed_plain() -> Result< () > let feed_config = SubscriptionConfig { update_period : std::time::Duration::from_secs( 1000 ), - link : String::from( "test" ), + link : url::Url::parse( "https://test" )?, }; let mut manager = FeedManager diff --git a/module/move/unitore/tests/update_newer_feed.rs b/module/move/unitore/tests/update_newer_feed.rs index fb26be3d3c..5b87dc3858 100644 --- a/module/move/unitore/tests/update_newer_feed.rs +++ b/module/move/unitore/tests/update_newer_feed.rs @@ -26,7 +26,7 @@ pub struct TestClient ( String ); #[ async_trait ] impl FeedFetch for TestClient { - async fn fetch( &self, _ : String ) -> Result< feed_rs::model::Feed > + async fn fetch( &self, _ : url::Url ) -> Result< feed_rs::model::Feed > { let feed = feed_parser::parse( std::fs::read_to_string( &self.0 )?.as_bytes() )?; Ok( feed ) @@ -46,7 +46,7 @@ async fn test_update() -> Result< () > let feed_config = SubscriptionConfig { update_period : std::time::Duration::from_secs( 1000 ), - link : String::from( "test" ), + link : url::Url::parse( "https://test" )?, }; let mut manager = FeedManager From 8be5a6df7998bada51cc7340c5011a883077bd2d Mon Sep 17 00:00:00 2001 From: YuliaProkopovych Date: Tue, 19 Mar 2024 11:15:36 +0200 Subject: [PATCH 2/8] fix wrong config path message --- .../unitore/src/executor/actions/config.rs | 21 +++++++++++++++++-- .../unitore/src/executor/actions/frames.rs | 2 +- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/module/move/unitore/src/executor/actions/config.rs b/module/move/unitore/src/executor/actions/config.rs index d17b2440b3..599e0bf9e6 100644 --- a/module/move/unitore/src/executor/actions/config.rs +++ b/module/move/unitore/src/executor/actions/config.rs @@ -21,9 +21,26 @@ pub async fn add_config( storage : FeedStorage< SledStorage >, args : &wca::Args .ok_or_else::< BasicError, _ >( || err!( "Cannot get path argument for command .config.add" ) )? .into() ; - let path = path.canonicalize().context( format!( "Invalid path for config file {:?}", path ) )?; - let config = Config::new( path.to_string_lossy().to_string() ); + let mut err_str = format!( "Invalid path for config file {:?}", path ); + + let start = path.components().next(); + if let Some( start ) = start + { + let abs_path : &std::path::Path = start.as_ref(); + let abs_path = abs_path.canonicalize(); + if let Ok( mut abs_path ) = abs_path + { + for component in path.components().skip( 1 ) + { + abs_path.push( component ); + } + err_str = format!( "Invalid path for config file {:?}", abs_path ); + } + } + let path = path.canonicalize().context( err_str )?; + + let config = Config::new( path.to_string_lossy().to_string() ); let mut manager = FeedManager::new( storage ); let config_report = manager.storage diff --git a/module/move/unitore/src/executor/actions/frames.rs b/module/move/unitore/src/executor/actions/frames.rs index 55bf2fe3fd..a18168ee08 100644 --- a/module/move/unitore/src/executor/actions/frames.rs +++ b/module/move/unitore/src/executor/actions/frames.rs @@ -61,7 +61,7 @@ pub async fn download_frames { return Err( err!( format! ( - "Failed to download frames.\n Config files {} contain no feed subscriptions!", + "Failed to download frames.\n Config file(s) {} contain no feed subscriptions!", configs.join( ", " ) ) ) ) } From 7b69b640a3780570f991321105ce2da58e2950a2 Mon Sep 17 00:00:00 2001 From: YuliaProkopovych Date: Tue, 19 Mar 2024 12:07:12 +0200 Subject: [PATCH 3/8] add tables description --- .../unitore/src/executor/actions/table.rs | 116 ++++++++---------- module/move/unitore/src/storage/tables.rs | 13 +- 2 files changed, 50 insertions(+), 79 deletions(-) diff --git a/module/move/unitore/src/executor/actions/table.rs b/module/move/unitore/src/executor/actions/table.rs index 6650ae313f..22708cef63 100644 --- a/module/move/unitore/src/executor/actions/table.rs +++ b/module/move/unitore/src/executor/actions/table.rs @@ -46,25 +46,37 @@ pub async fn list_columns { "feed" => { - table_description = String::from( "Table contains information about feed." ); + table_description = String::from( "Contains feed items." ); for label in columns.get( "feed" ).unwrap() { match label.as_str() { - "id" => { columns_desc.insert( label.clone(), String::from( "A unique identifier for this feed" ) ); } + "link" => { columns_desc.insert + ( + label.clone(), + String::from( "Link to feed source, unique identifier for the feed" ), + ); } "title" => { columns_desc.insert( label.clone(), String::from( "The title of the feed" ) ); } "updated" => { columns_desc.insert( label.clone(), String::from ( - "The time at which the feed was last modified. If not provided in the source, or invalid, it is None." + "The time at which the feed was last modified. If not provided in the source, or invalid, is Null." ) ); }, "type" => { columns_desc.insert( label.clone(), String::from( "Type of this feed (e.g. RSS2, Atom etc)" ) ); } - "authors" => { columns_desc.insert( label.clone(), String::from( "Collection of authors defined at the feed level" ) ); } + "authors" => { columns_desc.insert + ( + label.clone(), + String::from( "Collection of authors defined at the feed level" ) + ); } "description" => { columns_desc.insert( label.clone(), String::from( "Description of the feed" ) ); } - "published" => { columns_desc.insert( label.clone(), String::from( "The publication date for the content in the channel" ) ); } + "published" => { columns_desc.insert + ( + label.clone(), + String::from( "The publication date for the content in the channel" ), + ); } "update_period" => { columns_desc.insert( label.clone(), String::from( "How often this feed must be updated" ) ); } _ => { columns_desc.insert( label.clone(), String::from( "Desciption for this column hasn't been added yet!" ) ); } } @@ -72,30 +84,32 @@ pub async fn list_columns }, "frame" => { + table_description = String::from( "Contains frame items." ); for label in columns.get( "frame" ).unwrap() { match label.as_str() { "id" => { columns_desc.insert( label.clone(), String::from( "A unique identifier for this frame in the feed. " ) ); }, - "title" => { columns_desc.insert( label.clone(), String::from("Title of the frame" ) ); }, - "updated" => { columns_desc.insert( label.clone(), String::from("Time at which this item was fetched from source." ) ); }, - "authors" => { columns_desc.insert( label.clone(), String::from("List of authors of the frame, optional." ) ); }, - "content" => { columns_desc.insert( label.clone(), String::from("The content of the frame in html or plain text, optional." ) ); }, - "links" => { columns_desc.insert( label.clone(), String::from("List of links associated with this item of related Web page and attachments." ) ); }, - "summary" => { columns_desc.insert( label.clone(), String::from("Short summary, abstract, or excerpt of the frame item, optional." ) ); }, - "categories" => { columns_desc.insert( label.clone(), String::from("Specifies a list of categories that the item belongs to." ) ); }, - "published" => { columns_desc.insert( label.clone(), String::from("Time at which this item was first published or updated." ) ); }, - "source" => { columns_desc.insert( label.clone(), String::from("Specifies the source feed if the frame was copied from one feed into another feed, optional." ) ); }, + "title" => { columns_desc.insert( label.clone(), String::from( "Title of the frame" ) ); }, + "updated" => { columns_desc.insert( label.clone(), String::from( "Time at which this item was fetched from source." ) ); }, + "authors" => { columns_desc.insert( label.clone(), String::from( "List of authors of the frame, optional." ) ); }, + "content" => { columns_desc.insert( label.clone(), String::from( "The content of the frame in html or plain text, optional." ) ); }, + "links" => { columns_desc.insert( label.clone(), String::from( "List of links associated with this item of related Web page and attachments." ) ); }, + "summary" => { columns_desc.insert( label.clone(), String::from( "Short summary, abstract, or excerpt of the frame item, optional." ) ); }, + "categories" => { columns_desc.insert( label.clone(), String::from( "Specifies a list of categories that the item belongs to." ) ); }, + "published" => { columns_desc.insert( label.clone(), String::from( "Time at which this item was first published or updated." ) ); }, + "source" => { columns_desc.insert( label.clone(), String::from( "Specifies the source feed if the frame was copied from one feed into another feed, optional." ) ); }, "rights" => { columns_desc.insert( label.clone(), String::from( "Conveys information about copyrights over the feed, optional." ) ); }, - "media" => { columns_desc.insert( label.clone(), String::from("List of media oblects, encountered in the frame, optional." ) ); }, - "language" => { columns_desc.insert( label.clone(), String::from("The language specified on the item, optional." ) ); }, - "feed_link" => { columns_desc.insert( label.clone(), String::from("Link of feed that contains this frame." ) ); }, + "media" => { columns_desc.insert( label.clone(), String::from( "List of media oblects, encountered in the frame, optional." ) ); }, + "language" => { columns_desc.insert( label.clone(), String::from( "The language specified on the item, optional." ) ); }, + "feed_link" => { columns_desc.insert( label.clone(), String::from( "Link of feed that contains this frame." ) ); }, _ => { columns_desc.insert( label.clone(), String::from( "Desciption for this column hasn't been added yet!" ) ); } } } } "config" => { + table_description = String::from( "Contains paths to feed config files." ); for label in columns.get( "config" ).unwrap() { match label.as_str() @@ -150,11 +164,12 @@ impl std::fmt::Display for ColumnsReport { fn fmt( &self, f : &mut std::fmt::Formatter<'_> ) -> std::fmt::Result { - write!( f, "Table name: {}", self.table_name )?; - writeln!( f, "{}", self.table_description )?; + writeln!( f, "Table name: {}", self.table_name )?; + writeln!( f, "Description: {}", self.table_description )?; if !self.columns.is_empty() { + writeln!( f, "Columns:" )?; let mut rows = Vec::new(); for ( label, desc ) in &self.columns { @@ -199,7 +214,7 @@ impl Report for ColumnsReport {} #[ derive( Debug ) ] pub struct TablesReport { - tables : std::collections::HashMap< String, Vec< String > > + tables : std::collections::HashMap< String, ( String, Vec< String > ) > } impl TablesReport @@ -215,9 +230,16 @@ impl TablesReport for row in rows_vec { let table = String::from( row[ 0 ].clone() ); + let table_description = match table.as_str() + { + "feed" => String::from( "Contains feed items." ), + "frame" => String::from( "Contains frame items." ), + "config" => String::from( "Contains paths to feed config files." ), + _ => String::new(), + }; result.entry( table ) - .and_modify( | vec : &mut Vec< String > | vec.push( String::from( row[ 1 ].clone() ) ) ) - .or_insert( vec![ String::from( row[ 1 ].clone() ) ] ) + .and_modify( | ( _, vec ) : &mut ( String, Vec< String > ) | vec.push( String::from( row[ 1 ].clone() ) ) ) + .or_insert( ( table_description, vec![ String::from( row[ 1 ].clone() ) ] ) ) ; } }, @@ -233,12 +255,11 @@ impl std::fmt::Display for TablesReport { writeln!( f, "Storage tables:" )?; let mut rows = Vec::new(); - for ( table_name, columns ) in &self.tables + for ( table_name, ( desc, columns ) ) in &self.tables { let columns_str = if !columns.is_empty() { - let first = columns[ 0 ].clone(); - columns.iter().skip( 1 ).fold( first, | acc, val | format!( "{}, {}", acc, val ) ) + format!( "{};", columns.join( ", " ) ) } else { @@ -251,7 +272,8 @@ impl std::fmt::Display for TablesReport [ EMPTY_CELL.to_owned(), table_name.to_owned(), - columns_str, + textwrap::fill( desc, 80 ), + textwrap::fill( &columns_str, 80 ), ] ); } @@ -262,6 +284,7 @@ impl std::fmt::Display for TablesReport [ EMPTY_CELL.to_owned(), "name".to_owned(), + "description".to_owned(), "columns".to_owned(), ], rows, @@ -276,44 +299,3 @@ impl std::fmt::Display for TablesReport } impl Report for TablesReport {} - -#[ derive( Debug ) ] -pub struct FieldsReport -{ - pub fields_list : Vec< [ &'static str; 3 ] >, -} - -impl std::fmt::Display for FieldsReport -{ - - fn fmt( &self, f : &mut std::fmt::Formatter<'_> ) -> std::fmt::Result - { - let mut rows = Vec::new(); - for field in &self.fields_list - { - rows.push( vec![ EMPTY_CELL.to_owned(), field[ 0 ].to_owned(), field[ 1 ].to_owned(), field[ 2 ].to_owned() ] ); - } - - let table = table_display::table_with_headers - ( - vec! - [ - EMPTY_CELL.to_owned(), - "name".to_owned(), - "type".to_owned(), - "explanation".to_owned(), - ], - rows - ); - - if let Some( table ) = table - { - writeln!( f, "Frames fields:" )?; - writeln!( f, "{}", table )?; - } - - Ok( () ) - } -} - -impl Report for FieldsReport {} \ No newline at end of file diff --git a/module/move/unitore/src/storage/tables.rs b/module/move/unitore/src/storage/tables.rs index 7fb5f8d25d..08038df8ee 100644 --- a/module/move/unitore/src/storage/tables.rs +++ b/module/move/unitore/src/storage/tables.rs @@ -8,16 +8,13 @@ use gluesql:: prelude::Payload, }; -use executor::actions::table::{ TablesReport, FieldsReport }; +use executor::actions::table::TablesReport; use storage::FeedStorage; /// Functions for tables informantion. #[ async_trait::async_trait( ?Send ) ] pub trait TableStore { - /// Get list of column titles of feed table. - fn columns_titles( &mut self ) -> FieldsReport; - /// List tables in storage. async fn list_tables( &mut self ) -> Result< TablesReport >; @@ -28,14 +25,6 @@ pub trait TableStore #[ async_trait::async_trait( ?Send ) ] impl TableStore for FeedStorage< SledStorage > { - fn columns_titles( &mut self ) -> FieldsReport - { - FieldsReport - { - fields_list : self.frame_fields.clone() - } - } - async fn list_tables( &mut self ) -> Result< TablesReport > { let glue = &mut *self.storage.lock().await; From f90e064ecc5932e8f9c163f58ab9b13bc44c5f0b Mon Sep 17 00:00:00 2001 From: YuliaProkopovych Date: Tue, 19 Mar 2024 15:07:51 +0200 Subject: [PATCH 4/8] add feed entity --- .../unitore/src/executor/actions/config.rs | 5 +- .../executor/actions/{feeds.rs => feed.rs} | 4 +- .../executor/actions/{frames.rs => frame.rs} | 11 +- .../move/unitore/src/executor/actions/mod.rs | 4 +- .../unitore/src/executor/actions/query.rs | 2 +- .../unitore/src/executor/actions/table.rs | 5 +- module/move/unitore/src/executor/mod.rs | 24 +- module/move/unitore/src/storage/feed.rs | 389 ++++++++++++++++++ module/move/unitore/src/storage/frame.rs | 17 +- module/move/unitore/src/storage/mod.rs | 265 +----------- module/move/unitore/src/storage/model.rs | 67 --- .../src/storage/{tables.rs => table.rs} | 0 module/move/unitore/tests/add_config.rs | 5 +- module/move/unitore/tests/save_feed.rs | 21 +- .../move/unitore/tests/update_newer_feed.rs | 28 +- 15 files changed, 452 insertions(+), 395 deletions(-) rename module/move/unitore/src/executor/actions/{feeds.rs => feed.rs} (94%) rename module/move/unitore/src/executor/actions/{frames.rs => frame.rs} (94%) create mode 100644 module/move/unitore/src/storage/feed.rs delete mode 100644 module/move/unitore/src/storage/model.rs rename module/move/unitore/src/storage/{tables.rs => table.rs} (100%) diff --git a/module/move/unitore/src/executor/actions/config.rs b/module/move/unitore/src/executor/actions/config.rs index 599e0bf9e6..b3450c934b 100644 --- a/module/move/unitore/src/executor/actions/config.rs +++ b/module/move/unitore/src/executor/actions/config.rs @@ -7,9 +7,8 @@ use executor::FeedManager; use storage:: { FeedStorage, - FeedStore, + feed::{ FeedStore, Feed }, config::{ ConfigStore, Config }, - model::FeedRow, }; use gluesql::{ prelude::Payload, sled_storage::SledStorage }; @@ -51,7 +50,7 @@ pub async fn add_config( storage : FeedStorage< SledStorage >, args : &wca::Args let feeds = feed_config::read( config.path() )? .into_iter() - .map( | feed | FeedRow::new( feed.link.to_string(), feed.update_period ) ) + .map( | feed | Feed::new( feed.link, feed.update_period ) ) .collect::< Vec< _ > >() ; diff --git a/module/move/unitore/src/executor/actions/feeds.rs b/module/move/unitore/src/executor/actions/feed.rs similarity index 94% rename from module/move/unitore/src/executor/actions/feeds.rs rename to module/move/unitore/src/executor/actions/feed.rs index 82eb0d78c7..850384c846 100644 --- a/module/move/unitore/src/executor/actions/feeds.rs +++ b/module/move/unitore/src/executor/actions/feed.rs @@ -4,9 +4,9 @@ use crate::*; use executor:: { FeedManager, - actions::{ Report, frames::SelectedEntries }, + actions::{ Report, frame::SelectedEntries }, }; -use storage::{ FeedStorage, FeedStore }; +use storage::{ FeedStorage, feed::FeedStore }; use error_tools::Result; /// List all feeds. diff --git a/module/move/unitore/src/executor/actions/frames.rs b/module/move/unitore/src/executor/actions/frame.rs similarity index 94% rename from module/move/unitore/src/executor/actions/frames.rs rename to module/move/unitore/src/executor/actions/frame.rs index a18168ee08..fdd35ed871 100644 --- a/module/move/unitore/src/executor/actions/frames.rs +++ b/module/move/unitore/src/executor/actions/frame.rs @@ -1,6 +1,8 @@ //! Frames commands actions. use crate::*; +use self::storage::feed::FeedStore; + use super::*; use executor::FeedManager; use storage:: @@ -66,7 +68,14 @@ pub async fn download_frames ) ) ) } - manager.update_feed( subscriptions ).await + let mut feeds = Vec::new(); + let client = retriever::FeedClient; + for i in 0..subscriptions.len() + { + let feed = retriever::FeedFetch::fetch(&client, subscriptions[ i ].link.clone()).await?; + feeds.push( ( feed, subscriptions[ i ].update_period.clone(), subscriptions[ i ].link.clone() ) ); + } + manager.storage.process_feeds( feeds ).await } diff --git a/module/move/unitore/src/executor/actions/mod.rs b/module/move/unitore/src/executor/actions/mod.rs index 80c264f88d..2c40c7e470 100644 --- a/module/move/unitore/src/executor/actions/mod.rs +++ b/module/move/unitore/src/executor/actions/mod.rs @@ -1,7 +1,7 @@ //! Endpoint for command execution. -pub mod frames; -pub mod feeds; +pub mod frame; +pub mod feed; pub mod config; pub mod query; pub mod table; diff --git a/module/move/unitore/src/executor/actions/query.rs b/module/move/unitore/src/executor/actions/query.rs index 05dd6cab96..192d3d9a7f 100644 --- a/module/move/unitore/src/executor/actions/query.rs +++ b/module/move/unitore/src/executor/actions/query.rs @@ -3,7 +3,7 @@ use crate::*; use gluesql::core::executor::Payload; use super::Report; -use storage::{ FeedStorage, FeedStore }; +use storage::{ FeedStorage, Store }; use executor::FeedManager; use error_tools::{ err, BasicError, Result }; diff --git a/module/move/unitore/src/executor/actions/table.rs b/module/move/unitore/src/executor/actions/table.rs index 22708cef63..6f8b3cae27 100644 --- a/module/move/unitore/src/executor/actions/table.rs +++ b/module/move/unitore/src/executor/actions/table.rs @@ -1,11 +1,10 @@ //! Tables metadata commands actions and reports. use crate::*; -use executor::FeedManager; use gluesql::prelude::Payload; use std::collections::HashMap; -use executor::Report; -use storage::{ FeedStorage, tables::TableStore }; +use executor::{ FeedManager, Report }; +use storage::{ FeedStorage, table::TableStore }; use error_tools::{ err, BasicError, Result }; /// Get labels of column for specified table. diff --git a/module/move/unitore/src/executor/mod.rs b/module/move/unitore/src/executor/mod.rs index 41084ce9ba..b7b79a5f9b 100644 --- a/module/move/unitore/src/executor/mod.rs +++ b/module/move/unitore/src/executor/mod.rs @@ -6,7 +6,7 @@ use super::*; use feed_config::SubscriptionConfig; use gluesql::sled_storage::{ sled::Config, SledStorage }; use retriever::{ FeedClient, FeedFetch }; -use storage::{ FeedStorage, FeedStore, config::ConfigStore, tables::TableStore }; +use storage::{ Store, FeedStorage, feed::FeedStore, config::ConfigStore, table::TableStore }; use wca::{ Args, Type }; use executor::actions::Report; use error_tools::Result; @@ -15,8 +15,8 @@ use error_tools::Result; pub mod actions; use actions:: { - frames::{ list_frames, download_frames }, - feeds::list_feeds, + frame::{ list_frames, download_frames }, + feed::list_feeds, config::{ add_config, delete_config, list_configs }, query::execute_query, table::{ list_columns, list_tables }, @@ -222,7 +222,7 @@ pub fn execute() -> Result< (), Box< dyn std::error::Error + Send + Sync > > } /// Manages feed subsriptions and updates. -pub struct FeedManager< C, S : FeedStore + ConfigStore + FrameStore + Send > +pub struct FeedManager< C, S : FeedStore + ConfigStore + FrameStore + Store + Send > { /// Subscription configuration with link and update period. pub config : Vec< SubscriptionConfig >, @@ -232,7 +232,7 @@ pub struct FeedManager< C, S : FeedStore + ConfigStore + FrameStore + Send > pub client : C, } -impl< S : FeedStore + ConfigStore + FrameStore + TableStore + Send > FeedManager< FeedClient, S > +impl< S : FeedStore + ConfigStore + FrameStore + TableStore + Store + Send > FeedManager< FeedClient, S > { /// Create new instance of FeedManager. pub fn new( storage : S ) -> FeedManager< FeedClient, S > @@ -246,7 +246,7 @@ impl< S : FeedStore + ConfigStore + FrameStore + TableStore + Send > FeedManager } } -impl< C : FeedFetch, S : FeedStore + ConfigStore + FrameStore + TableStore + Send > FeedManager< C, S > +impl< C : FeedFetch, S : FeedStore + ConfigStore + FrameStore + TableStore + Store + Send > FeedManager< C, S > { /// Set configurations for subscriptions. pub fn set_config( &mut self, configs : Vec< SubscriptionConfig > ) @@ -260,18 +260,6 @@ impl< C : FeedFetch, S : FeedStore + ConfigStore + FrameStore + TableStore + Sen self.client = client; } - /// Update modified frames and save new items. - pub async fn update_feed( &mut self, subscriptions : Vec< SubscriptionConfig > ) -> Result< impl actions::Report > - { - let mut feeds = Vec::new(); - for i in 0..subscriptions.len() - { - let feed = self.client.fetch( subscriptions[ i ].link.clone() ).await?; - feeds.push( ( feed, subscriptions[ i ].update_period.clone(), subscriptions[ i ].link.to_string() ) ); - } - self.storage.process_feeds( feeds ).await - } - /// Execute custom query, print result. pub async fn execute_custom_query( &mut self, query : String ) -> Result< impl actions::Report > { diff --git a/module/move/unitore/src/storage/feed.rs b/module/move/unitore/src/storage/feed.rs new file mode 100644 index 0000000000..5ce5cf3fc6 --- /dev/null +++ b/module/move/unitore/src/storage/feed.rs @@ -0,0 +1,389 @@ +use crate::*; +use std::time::Duration; +use error_tools::{ for_app::Context, Result }; +use gluesql:: +{ + core:: + { + ast_builder::{ null, col, table, text, Execute, timestamp, ExprNode }, + data::Value, + executor::Payload, + chrono::{ Utc, DateTime, SecondsFormat }, + }, + sled_storage::SledStorage, +}; + +use executor::actions:: +{ + feed::FeedsReport, + frame::{ UpdateReport, SelectedEntries, FramesReport }, +}; +use storage::{ FeedStorage, frame::{ FrameStore, RowValue } }; +use wca::wtools::Itertools; + +#[ derive( Debug ) ] +pub struct Feed +{ + pub link : url::Url, + pub title : Option< String >, + pub updated : Option< DateTime< Utc > >, + pub authors : Option< String >, + pub description : Option< String >, + pub published : Option< DateTime< Utc > >, + pub update_period : Duration, +} + +impl Feed +{ + pub fn new( link : url::Url, update_period : Duration ) -> Self + { + Self + { + link, + title : None, + updated : None, + authors : None, + description : None, + published : None, + update_period, + } + } +} + +/// Functionality of feed storage. +#[ mockall::automock ] +#[ async_trait::async_trait( ?Send ) ] +pub trait FeedStore +{ + + /// Insert items from list into feed table. + async fn update_feed( &mut self, feed : Vec< Feed > ) -> Result< () >; + + /// Process fetched feed, new items will be saved, modified items will be updated. + async fn process_feeds( &mut self, feeds : Vec< ( feed_rs::model::Feed, Duration, url::Url ) > ) -> Result< UpdateReport >; + + /// Get all feeds from storage. + async fn get_all_feeds( &mut self ) -> Result< FeedsReport >; + + /// Add feeds entries. + async fn add_feeds( &mut self, feeds : Vec< Feed > ) -> Result< Payload >; +} + +#[ async_trait::async_trait( ?Send ) ] +impl FeedStore for FeedStorage< SledStorage > +{ + async fn get_all_feeds( &mut self ) -> Result< FeedsReport > + { + let res = table( "feed" ).select().project( "title, link, update_period" ).execute( &mut *self.storage.lock().await ).await?; + let mut report = FeedsReport::new(); + match res + { + Payload::Select { labels: label_vec, rows: rows_vec } => + { + report.0 = SelectedEntries + { + selected_rows : rows_vec, + selected_columns : label_vec, + } + }, + _ => {}, + } + + Ok( report ) + } + + async fn update_feed( &mut self, feed : Vec< Feed > ) -> Result< () > + { + //let feeds_rows = feed.into_iter().map( | feed | FeedRow::from( feed ).0 ).collect_vec(); + + for feed in feed + { + let _update = table( "feed" ) + .update() + .set( "title", feed.title.map( text ).unwrap_or( null() ) ) + .set( "updated", feed.updated.map( | d | timestamp( d.to_rfc3339_opts( SecondsFormat::Millis, true ) ) ).unwrap_or( null() ) ) + .set( "authors", feed.authors.map( text ).unwrap_or( null() ) ) + .set( "description", feed.description.map( text ).unwrap_or( null() ) ) + .set( "published", feed.published.map( | d | timestamp( d.to_rfc3339_opts( SecondsFormat::Millis, true ) ) ).unwrap_or( null() ) ) + .filter( col( "link" ).eq( feed.link.to_string() ) ) + .execute( &mut *self.storage.lock().await ) + .await + .context( "Failed to insert feed" )? + ; + } + + Ok( () ) + } + + async fn process_feeds + ( + &mut self, + feeds : Vec< ( feed_rs::model::Feed, Duration, url::Url ) >, + ) -> Result< UpdateReport > + { + let new_feed_links = feeds + .iter() + .map( | feed | + feed.0.links.iter().filter_map( | link | + { + if let Some( media_type ) = &link.media_type + { + if media_type == &String::from( "application/rss+xml" ) + { + return Some( format!( "'{}'", link.href.clone() ) ); + } + } + None + } ) + .collect::< Vec< _ > >() + .get( 0 ) + .unwrap_or( &format!( "'{}'", feed.2 ) ) + .clone() + ) + .join( "," ) + ; + + let existing_feeds = table( "feed" ) + .select() + .filter( format!( "link IN ({})", new_feed_links ).as_str() ) + .project( "link" ) + .execute( &mut *self.storage.lock().await ) + .await + .context( "Failed to select links of existing feeds while saving new frames" )? + ; + + let mut new_entries = Vec::new(); + let mut modified_entries = Vec::new(); + let mut reports = Vec::new(); + + for feed in &feeds + { + let mut frames_report = FramesReport::new( feed.0.title.clone().unwrap().content ); + // check if feed is new + if let Some( existing_feeds ) = existing_feeds.select() + { + + let existing_feeds = existing_feeds + .filter_map( | feed | feed.get( "link" ).map( | link | String::from( RowValue( link ) ) )) + .collect_vec() + ; + + let link = &feed.2.to_string(); + + if !existing_feeds.contains( link ) + { + self.add_feeds( vec![ feed.clone().into() ] ).await?; + frames_report.new_frames = feed.0.entries.len(); + frames_report.is_new_feed = true; + + new_entries.extend + ( + feed.0.entries + .clone() + .into_iter() + .zip( std::iter::repeat( feed.0.id.clone() ).take( feed.0.entries.len() ) ) + .map( | entry | entry.into() ) + ); + reports.push( frames_report ); + continue; + } + } + + let existing_frames = table( "frame" ) + .select() + .filter(col( "feed_link" ).eq( text( feed.0.id.clone() ) ) ) + .project( "id, published" ) + .execute( &mut *self.storage.lock().await ) + .await + .context( "Failed to get existing frames while saving new frames" )? + ; + + if let Some( rows ) = existing_frames.select() + { + let rows = rows.collect::< Vec< _ > >(); + frames_report.existing_frames = rows.len(); + let existing_entries = rows.iter() + .map( | r | ( r.get( "id" ).map( | &val | val.clone() ), r.get( "published" ).map( | &val | val.clone() ) ) ) + .flat_map( | ( id, published ) | + id.map( | id | + ( + id, + published.map( | date | + { + match date + { + Value::Timestamp( date_time ) => Some( date_time ), + _ => None, + } + } ) + .flatten() + ) + ) + ) + .flat_map( | ( id, published ) | match id { Value::Str( id ) => Some( ( id, published ) ), _ => None } ) + .collect_vec() + ; + + let existing_ids = existing_entries.iter().map( | ( id, _ ) | id ).collect_vec(); + for entry in &feed.0.entries + { + // if extry with same id is already in db, check if it is updated + if let Some( position ) = existing_ids.iter().position( | &id | id == &entry.id ) + { + if let Some( date ) = existing_entries[ position ].1 + { + if date.and_utc() != entry.published.unwrap() + { + frames_report.updated_frames += 1; + modified_entries.push( ( entry.clone(), feed.2.to_string() ).into() ); + } + } + } + else + { + frames_report.new_frames += 1; + new_entries.push( ( entry.clone(), feed.2.to_string() ).into() ); + } + } + } + reports.push( frames_report ); + } + + if new_entries.len() > 0 + { + let _saved_report = self.save_frames( new_entries ).await?; + } + if modified_entries.len() > 0 + { + let _updated_report = self.update_frames( modified_entries ).await?; + } + + Ok( UpdateReport( reports ) ) + } + + async fn add_feeds( &mut self, feed : Vec< Feed > ) -> Result< Payload > + { + let feeds_rows : Vec< Vec< ExprNode< 'static > > > = feed.into_iter().map( | feed | feed.into() ).collect_vec(); + + let insert = table( "feed" ) + .insert() + .columns + ( + "link, + title, + updated, + authors, + description, + published, + update_period", + ) + .values( feeds_rows ) + .execute( &mut *self.storage.lock().await ) + .await + .context( "Failed to insert feeds" )? + ; + + Ok( insert ) + } +} + +/// Feed in format convenient for saving in storage. +#[ derive( Debug ) ] +pub struct FeedRow( pub Vec< ExprNode< 'static > > ); + +impl FeedRow +{ + /// Create new feed row for storage. + pub fn new( feed_link : String, update_period : Duration ) -> Self + { + FeedRow( vec! + [ + text( feed_link ), + null(), + null(), + null(), + null(), + null(), + text( update_period.as_secs().to_string() ), + ] ) + } +} + +impl From< ( feed_rs::model::Feed, Duration, String ) > for FeedRow +{ + fn from( value : ( feed_rs::model::Feed, Duration, String ) ) -> Self + { + let duration = value.1; + let link = value.2; + let value = value.0; + + let row = vec! + [ + value.links.iter().filter_map( | link | + { + if let Some( media_type ) = &link.media_type + { + if media_type == &String::from( "application/rss+xml" ) + { + return Some( text( link.href.clone() ) ); + } + } + None + } ) + .collect::< Vec< _ > >() + .get( 0 ) + .unwrap_or( &text( link ) ) + .clone(), + value.title.clone().map( | title | text( title.content ) ).unwrap_or( null() ), + value.updated.map( | d | timestamp( d.to_rfc3339_opts( SecondsFormat::Millis, true ) ) ).unwrap_or( null() ), + text( value.authors.iter().map( | p | p.name.clone() ).fold( String::new(), | acc, val | format!( "{}, {}", acc, val ) ) ), + value.description.clone().map( | desc | text( desc.content ) ).unwrap_or( null() ), + value.published.map( | d | timestamp( d.to_rfc3339_opts( SecondsFormat::Millis, true ) ) ).unwrap_or( null() ), + text( duration.as_secs().to_string() ), + ]; + FeedRow( row ) + } +} + +impl From< ( feed_rs::model::Feed, Duration, url::Url ) > for Feed +{ + fn from( val : ( feed_rs::model::Feed, Duration, url::Url ) ) -> Self + { + let duration = val.1; + let link = val.2; + let value = val.0; + + let authors = value.authors.into_iter().map( | p | p.name ).collect::< Vec< _ > >(); + let description = value.description.map( | desc | desc.content ); + + Self + { + link, + title : value.title.map( | title | title.content ), + updated : value.updated, + published : value.published, + description, + authors : ( !authors.is_empty() ).then( || authors.join( ", " ) ), + update_period : duration, + } + + + } +} + +impl From< Feed > for Vec< ExprNode< 'static > > +{ + fn from( value : Feed ) -> Self + { + vec! + [ + text( value.link.to_string() ), + value.title.map( text ).unwrap_or( null() ), + value.updated.map( | d | timestamp( d.to_rfc3339_opts( SecondsFormat::Millis, true ) ) ).unwrap_or( null() ), + value.authors.map( text ).unwrap_or( null() ), + value.description.map( text ).unwrap_or( null() ), + value.published.map( | d | timestamp( d.to_rfc3339_opts( SecondsFormat::Millis, true ) ) ).unwrap_or( null() ), + text( value.update_period.as_secs().to_string() ), + ] + } +} diff --git a/module/move/unitore/src/storage/frame.rs b/module/move/unitore/src/storage/frame.rs index 102533c3a6..3367f8df4f 100644 --- a/module/move/unitore/src/storage/frame.rs +++ b/module/move/unitore/src/storage/frame.rs @@ -7,21 +7,15 @@ use gluesql:: { core:: { - ast_builder::{ col, table, text, Execute }, + ast_builder::{ null, col, table, text, Execute, timestamp, ExprNode }, data::Value, executor::Payload, - chrono::{ Utc, DateTime }, + chrono::{ Utc, DateTime, SecondsFormat }, }, sled_storage::SledStorage, }; -use gluesql::core:: -{ - ast_builder::{ null, timestamp, ExprNode }, - chrono::SecondsFormat, -}; - -use executor::actions::frames::{ FramesReport, ListReport, SelectedEntries }; +use executor::actions::frame::{ FramesReport, ListReport, SelectedEntries }; use storage::FeedStorage; use wca::wtools::Itertools; @@ -111,7 +105,7 @@ pub trait FrameStore async fn save_frames( &mut self, feed : Vec< Frame > ) -> Result< Payload >; /// Update items from list in feed table. - async fn update_feed( &mut self, feed : Vec< Frame > ) -> Result< () >; + async fn update_frames( &mut self, feed : Vec< Frame > ) -> Result< () >; /// Get all feed frames from storage. async fn list_frames( &mut self ) -> Result< ListReport >; @@ -184,7 +178,7 @@ impl FrameStore for FeedStorage< SledStorage > Ok( insert ) } - async fn update_feed( &mut self, feed : Vec< Frame > ) -> Result< () > + async fn update_frames( &mut self, feed : Vec< Frame > ) -> Result< () > { let entries_rows = feed.into_iter().map( | entry | FrameRow::from( entry ).0 ).collect_vec(); @@ -206,7 +200,6 @@ impl FrameStore for FeedStorage< SledStorage > } Ok( () ) } - } /// Frame row format for saving in storage. diff --git a/module/move/unitore/src/storage/mod.rs b/module/move/unitore/src/storage/mod.rs index 0fca1946bb..1bb8fbc209 100644 --- a/module/move/unitore/src/storage/mod.rs +++ b/module/move/unitore/src/storage/mod.rs @@ -1,35 +1,24 @@ use crate::*; -use std::{ sync::Arc, time::Duration }; +use std::sync::Arc; use error_tools::{ for_app::Context, Result }; use tokio::sync::Mutex; -use feed_rs::model::Feed; use gluesql:: { core:: { - ast_builder::{ col, table, text, Build, Execute }, - data::Value, - executor::Payload, + ast_builder::{ table, Build, Execute }, store::{ GStore, GStoreMut }, }, prelude::Glue, sled_storage::{ sled::Config, SledStorage }, }; -use executor::actions:: -{ - feeds::FeedsReport, - query::QueryReport, - frames::UpdateReport, -}; -use storage::frame::{ FrameStore, RowValue }; -use wca::wtools::Itertools; +use executor::actions::query::QueryReport; -pub mod model; -use model::FeedRow; pub mod config; pub mod frame; -pub mod tables; +pub mod table; +pub mod feed; /// Storage for feed frames. #[ derive( Clone ) ] @@ -111,27 +100,14 @@ impl FeedStorage< SledStorage > /// Functionality of feed storage. #[ mockall::automock ] #[ async_trait::async_trait( ?Send ) ] -pub trait FeedStore +pub trait Store { - - /// Insert items from list into feed table. - async fn save_feed( &mut self, feed : Vec< ( Feed, Duration, String ) > ) -> Result< () >; - - /// Process fetched feed, new items will be saved, modified items will be updated. - async fn process_feeds( &mut self, feeds : Vec< ( Feed, Duration, String ) > ) -> Result< UpdateReport >; - - /// Get all feeds from storage. - async fn get_all_feeds( &mut self ) -> Result< FeedsReport >; - /// Execute custom query passed as String. async fn execute_query( &mut self, query : String ) -> Result< QueryReport >; - - /// Add feeds entries. - async fn add_feeds( &mut self, feeds : Vec< FeedRow > ) -> Result< Payload >; } #[ async_trait::async_trait( ?Send ) ] -impl FeedStore for FeedStorage< SledStorage > +impl< S : GStore + GStoreMut + Send > Store for FeedStorage< S > { async fn execute_query( &mut self, query : String ) -> Result< QueryReport > { @@ -142,231 +118,4 @@ impl FeedStore for FeedStorage< SledStorage > Ok( report ) } - - async fn get_all_feeds( &mut self ) -> Result< FeedsReport > - { - let res = table( "feed" ).select().project( "title, link, update_period" ).execute( &mut *self.storage.lock().await ).await?; - let mut report = FeedsReport::new(); - match res - { - Payload::Select { labels: label_vec, rows: rows_vec } => - { - report.0 = crate::executor::actions::frames::SelectedEntries - { - selected_rows : rows_vec, - selected_columns : label_vec, - } - }, - _ => {}, - } - - Ok( report ) - } - - async fn save_feed( &mut self, feed : Vec< ( Feed, Duration, String ) > ) -> Result< () > - { - let feeds_rows = feed.into_iter().map( | feed | FeedRow::from( feed ).0 ).collect_vec(); - - for entry in feeds_rows - { - let _update = table( "feed" ) - .update() - .set( "title", entry[ 1 ].to_owned() ) - .set( "updated", entry[ 2 ].to_owned() ) - .set( "authors", entry[ 3 ].to_owned() ) - .set( "description", entry[ 4 ].to_owned() ) - .set( "published", entry[ 5 ].to_owned() ) - .filter( col( "link" ).eq( entry[ 0 ].to_owned() ) ) - .execute( &mut *self.storage.lock().await ) - .await - .context( "Failed to insert feed" )? - ; - } - - Ok( () ) - } - - async fn process_feeds - ( - &mut self, - feeds : Vec< ( Feed, Duration, String ) >, - ) -> Result< UpdateReport > - { - let new_feed_links = feeds - .iter() - .map( | feed | - feed.0.links.iter().filter_map( | link | - { - if let Some( media_type ) = &link.media_type - { - if media_type == &String::from( "application/rss+xml" ) - { - return Some( format!( "'{}'", link.href.clone() ) ); - } - } - None - } ) - .collect::< Vec< _ > >() - .get( 0 ) - .unwrap_or( &format!( "'{}'", feed.2 ) ) - .clone() - ) - .join( "," ) - ; - - let existing_feeds = table( "feed" ) - .select() - .filter( format!( "link IN ({})", new_feed_links ).as_str() ) - .project( "link" ) - .execute( &mut *self.storage.lock().await ) - .await - .context( "Failed to select links of existing feeds while saving new frames" )? - ; - - let mut new_entries = Vec::new(); - let mut modified_entries = Vec::new(); - let mut reports = Vec::new(); - - for feed in &feeds - { - let mut frames_report = crate::executor::actions::frames::FramesReport::new( feed.0.title.clone().unwrap().content ); - // check if feed is new - if let Some( existing_feeds ) = existing_feeds.select() - { - - let existing_feeds = existing_feeds - .filter_map( | feed | feed.get( "link" ).map( | link | String::from( RowValue( link ) ) )) - .collect_vec() - ; - - let links = &feed.0.links.iter().filter_map( | link | - { - if let Some( media_type ) = &link.media_type - { - if media_type == &String::from( "application/rss+xml" ) - { - return Some( link.href.clone() ); - } - } - None - } ) - .collect::< Vec< _ > >(); - - let link = links.get( 0 ).unwrap_or( &feed.2 ); - - if !existing_feeds.contains( link ) - { - self.add_feeds( vec![ FeedRow::from( feed.clone() ) ] ).await?; - frames_report.new_frames = feed.0.entries.len(); - frames_report.is_new_feed = true; - - new_entries.extend - ( - feed.0.entries - .clone() - .into_iter() - .zip( std::iter::repeat( feed.0.id.clone() ).take( feed.0.entries.len() ) ) - .map( | entry | entry.into() ) - ); - reports.push( frames_report ); - continue; - } - } - - let existing_frames = table( "frame" ) - .select() - .filter(col( "feed_link" ).eq( text( feed.0.id.clone() ) ) ) - .project( "id, published" ) - .execute( &mut *self.storage.lock().await ) - .await - .context( "Failed to get existing frames while saving new frames" )? - ; - - if let Some( rows ) = existing_frames.select() - { - let rows = rows.collect::< Vec< _ > >(); - frames_report.existing_frames = rows.len(); - let existing_entries = rows.iter() - .map( | r | ( r.get( "id" ).map( | &val | val.clone() ), r.get( "published" ).map( | &val | val.clone() ) ) ) - .flat_map( | ( id, published ) | - id.map( | id | - ( - id, - published.map( | date | - { - match date - { - Value::Timestamp( date_time ) => Some( date_time ), - _ => None, - } - } ) - .flatten() - ) - ) - ) - .flat_map( | ( id, published ) | match id { Value::Str( id ) => Some( ( id, published ) ), _ => None } ) - .collect_vec() - ; - - let existing_ids = existing_entries.iter().map( | ( id, _ ) | id ).collect_vec(); - for entry in &feed.0.entries - { - // if extry with same id is already in db, check if it is updated - if let Some( position ) = existing_ids.iter().position( | &id | id == &entry.id ) - { - if let Some( date ) = existing_entries[ position ].1 - { - if date.and_utc() != entry.published.unwrap() - { - frames_report.updated_frames += 1; - modified_entries.push( ( entry.clone(), feed.0.id.clone() ).into() ); - } - } - } - else - { - frames_report.new_frames += 1; - new_entries.push( ( entry.clone(), feed.0.id.clone() ).into() ); - } - } - } - reports.push( frames_report ); - } - - if new_entries.len() > 0 - { - let _saved_report = self.save_frames( new_entries ).await?; - } - if modified_entries.len() > 0 - { - let _updated_report = self.update_feed( modified_entries ).await?; - } - - Ok( UpdateReport( reports ) ) - } - - async fn add_feeds( &mut self, feed : Vec< FeedRow > ) -> Result< Payload > - { - let feeds_rows = feed.into_iter().map( | feed | feed.0 ).collect_vec(); - - let insert = table( "feed" ) - .insert() - .columns - ( - "link, - title, - updated, - authors, - description, - published, - update_period", - ) - .values( feeds_rows ) - .execute( &mut *self.storage.lock().await ) - .await - .context( "Failed to insert feeds" )? - ; - - Ok( insert ) - } } diff --git a/module/move/unitore/src/storage/model.rs b/module/move/unitore/src/storage/model.rs deleted file mode 100644 index 79da429630..0000000000 --- a/module/move/unitore/src/storage/model.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::time::Duration; - -use feed_rs::model::Feed; -use gluesql::core:: -{ - ast_builder::{ null, text, timestamp, ExprNode }, - chrono::SecondsFormat, -}; - -/// Feed in format convenient for saving in storage. -#[ derive( Debug ) ] -pub struct FeedRow( pub Vec< ExprNode< 'static > > ); - -impl FeedRow -{ - /// Create new feed row for storage. - pub fn new( feed_link : String, update_period : Duration ) -> Self - { - FeedRow( vec! - [ - text( feed_link ), - null(), - null(), - null(), - null(), - null(), - text( update_period.as_secs().to_string() ), - ] ) - } -} - -impl From< ( Feed, Duration, String ) > for FeedRow -{ - fn from( value : ( Feed, Duration, String ) ) -> Self - { - let duration = value.1; - let link = value.2; - let value = value.0; - - let row = vec! - [ - value.links.iter().filter_map( | link | - { - if let Some( media_type ) = &link.media_type - { - if media_type == &String::from( "application/rss+xml" ) - { - return Some( text( link.href.clone() ) ); - } - } - None - } ) - .collect::< Vec< _ > >() - .get( 0 ) - .unwrap_or( &text( link ) ) - .clone(), - value.title.clone().map( | title | text( title.content ) ).unwrap_or( null() ), - value.updated.map( | d | timestamp( d.to_rfc3339_opts( SecondsFormat::Millis, true ) ) ).unwrap_or( null() ), - text( value.authors.iter().map( | p | p.name.clone() ).fold( String::new(), | acc, val | format!( "{}, {}", acc, val ) ) ), - value.description.clone().map( | desc | text( desc.content ) ).unwrap_or( null() ), - value.published.map( | d | timestamp( d.to_rfc3339_opts( SecondsFormat::Millis, true ) ) ).unwrap_or( null() ), - text( duration.as_secs().to_string() ), - ]; - FeedRow( row ) - } -} - diff --git a/module/move/unitore/src/storage/tables.rs b/module/move/unitore/src/storage/table.rs similarity index 100% rename from module/move/unitore/src/storage/tables.rs rename to module/move/unitore/src/storage/table.rs diff --git a/module/move/unitore/tests/add_config.rs b/module/move/unitore/tests/add_config.rs index 8d0e45389d..24e83d0d8a 100644 --- a/module/move/unitore/tests/add_config.rs +++ b/module/move/unitore/tests/add_config.rs @@ -1,9 +1,10 @@ use std::path::PathBuf; use gluesql::sled_storage::sled::Config; -use unitore::{ +use unitore:: +{ executor::FeedManager, - storage::{ FeedStorage, FeedStore }, + storage::{ FeedStorage, feed::FeedStore }, }; use error_tools::Result; diff --git a/module/move/unitore/tests/save_feed.rs b/module/move/unitore/tests/save_feed.rs index 4b51962b97..eefac47b6b 100644 --- a/module/move/unitore/tests/save_feed.rs +++ b/module/move/unitore/tests/save_feed.rs @@ -2,10 +2,9 @@ use async_trait::async_trait; use feed_rs::parser as feed_parser; use unitore:: { - executor::FeedManager, feed_config::SubscriptionConfig, retriever::FeedFetch, - storage::{ FeedStorage, MockFeedStore, frame::FrameStore }, + storage::{ FeedStorage, MockStore, frame::FrameStore, feed::FeedStore }, }; use error_tools::Result; @@ -48,24 +47,22 @@ async fn test_save_feed_plain() -> Result< () > .temporary( true ) ; - let feed_storage = FeedStorage::init_storage( config ).await?; + let mut feed_storage = FeedStorage::init_storage( config ).await?; let feed_config = SubscriptionConfig { update_period : std::time::Duration::from_secs( 1000 ), - link : url::Url::parse( "https://test" )?, + link : url::Url::parse( "https://www.nasa.gov/feed/" )?, }; - let mut manager = FeedManager - { - storage : feed_storage.clone(), - client : TestClient, - config : vec![], - }; + let mut feeds = Vec::new(); + let client = TestClient; - manager.update_feed( vec![ feed_config ] ).await?; + let feed = FeedFetch::fetch( &client, feed_config.link.clone()).await?; + feeds.push( ( feed, feed_config.update_period.clone(), feed_config.link.clone() ) ); + feed_storage.process_feeds( feeds ).await?; - let entries = manager.storage.list_frames().await?; + let entries = feed_storage.list_frames().await?; let number_of_frames = entries.0[ 0 ].selected_frames.selected_rows.len(); diff --git a/module/move/unitore/tests/update_newer_feed.rs b/module/move/unitore/tests/update_newer_feed.rs index 5b87dc3858..324ed68556 100644 --- a/module/move/unitore/tests/update_newer_feed.rs +++ b/module/move/unitore/tests/update_newer_feed.rs @@ -11,10 +11,9 @@ use gluesql:: }; use unitore:: { - executor::FeedManager, feed_config::SubscriptionConfig, retriever::FeedFetch, - storage::{ FeedStorage, frame::FrameStore }, + storage::{ feed::FeedStore, frame::FrameStore, FeedStorage }, }; use wca::wtools::Itertools; use error_tools::Result; @@ -41,29 +40,30 @@ async fn test_update() -> Result< () > .temporary( true ) ; - let feed_storage = FeedStorage::init_storage( config ).await?; + let mut feed_storage = FeedStorage::init_storage( config ).await?; let feed_config = SubscriptionConfig { update_period : std::time::Duration::from_secs( 1000 ), - link : url::Url::parse( "https://test" )?, + link : url::Url::parse( "https://www.nasa.gov/feed/" )?, }; - let mut manager = FeedManager - { - storage : feed_storage, - client : TestClient( "./tests/fixtures/plain_feed.xml".to_owned() ), - config : vec![], - }; // initial fetch - manager.update_feed( vec![ feed_config.clone() ] ).await?; + let client = TestClient( "./tests/fixtures/plain_feed.xml".to_owned() ); - manager.set_client( TestClient( "./tests/fixtures/updated_one_frame.xml".to_owned() ) ); + let feed = FeedFetch::fetch( &client, feed_config.link.clone()).await?; + let feeds = vec![ ( feed, feed_config.update_period.clone(), feed_config.link.clone() ) ]; + feed_storage.process_feeds( feeds ).await?; // updated fetch - manager.update_feed( vec![ feed_config ] ).await?; + let client = TestClient( "./tests/fixtures/updated_one_frame.xml".to_owned() ); + + let feed = FeedFetch::fetch( &client, feed_config.link.clone()).await?; + let feeds = vec![ ( feed, feed_config.update_period.clone(), feed_config.link.clone() ) ]; + feed_storage.process_feeds( feeds ).await?; + // check - let payload = manager.storage.list_frames().await?; + let payload = feed_storage.list_frames().await?; let entries = payload.0.iter().map( | val | val.selected_frames.selected_rows.clone() ).flatten().collect::< Vec< _ > >(); From ba86f6adb5918953dff4c9bb1195651a592d63d2 Mon Sep 17 00:00:00 2001 From: YuliaProkopovych Date: Tue, 19 Mar 2024 15:40:21 +0200 Subject: [PATCH 5/8] add documentation --- .../unitore/src/executor/actions/table.rs | 1 + module/move/unitore/src/executor/mod.rs | 8 +++++ module/move/unitore/src/storage/feed.rs | 30 ++++++++----------- module/move/unitore/src/storage/frame.rs | 2 +- module/move/unitore/src/storage/mod.rs | 10 +++++++ module/move/unitore/tests/save_feed.rs | 2 +- 6 files changed, 34 insertions(+), 19 deletions(-) diff --git a/module/move/unitore/src/executor/actions/table.rs b/module/move/unitore/src/executor/actions/table.rs index 6f8b3cae27..d22ad6eeff 100644 --- a/module/move/unitore/src/executor/actions/table.rs +++ b/module/move/unitore/src/executor/actions/table.rs @@ -148,6 +148,7 @@ pub struct ColumnsReport impl ColumnsReport { + /// Create new table columns report. pub fn new( table_name : String, table_description : String, columns : HashMap< String, String > ) -> Self { Self diff --git a/module/move/unitore/src/executor/mod.rs b/module/move/unitore/src/executor/mod.rs index b7b79a5f9b..e49382447b 100644 --- a/module/move/unitore/src/executor/mod.rs +++ b/module/move/unitore/src/executor/mod.rs @@ -232,6 +232,14 @@ pub struct FeedManager< C, S : FeedStore + ConfigStore + FrameStore + Store + Se pub client : C, } +impl< C, S : FeedStore + ConfigStore + FrameStore + Store + Send > std::fmt::Debug for FeedManager< C, S > +{ + fn fmt( &self, f: &mut std::fmt::Formatter<'_> ) -> std::fmt::Result + { + writeln!(f, "Feed manager with storage and client" ) + } +} + impl< S : FeedStore + ConfigStore + FrameStore + TableStore + Store + Send > FeedManager< FeedClient, S > { /// Create new instance of FeedManager. diff --git a/module/move/unitore/src/storage/feed.rs b/module/move/unitore/src/storage/feed.rs index 5ce5cf3fc6..067c9f3a34 100644 --- a/module/move/unitore/src/storage/feed.rs +++ b/module/move/unitore/src/storage/feed.rs @@ -1,3 +1,5 @@ +//! Feed storage entity and storage functions. + use crate::*; use std::time::Duration; use error_tools::{ for_app::Context, Result }; @@ -21,20 +23,29 @@ use executor::actions:: use storage::{ FeedStorage, frame::{ FrameStore, RowValue } }; use wca::wtools::Itertools; +/// Feed item. #[ derive( Debug ) ] pub struct Feed { + /// Link to feed source. pub link : url::Url, + /// Ttitle of feed. pub title : Option< String >, + /// Last time the feed was fetched. pub updated : Option< DateTime< Utc > >, + /// Authors of feed. pub authors : Option< String >, + /// Short description of feed content. pub description : Option< String >, + /// Date and time when feed was published. pub published : Option< DateTime< Utc > >, + /// How often the feed frames must be fetched. pub update_period : Duration, } impl Feed { + /// Create new feed item from source url and update period. pub fn new( link : url::Url, update_period : Duration ) -> Self { Self @@ -123,23 +134,8 @@ impl FeedStore for FeedStorage< SledStorage > { let new_feed_links = feeds .iter() - .map( | feed | - feed.0.links.iter().filter_map( | link | - { - if let Some( media_type ) = &link.media_type - { - if media_type == &String::from( "application/rss+xml" ) - { - return Some( format!( "'{}'", link.href.clone() ) ); - } - } - None - } ) - .collect::< Vec< _ > >() - .get( 0 ) - .unwrap_or( &format!( "'{}'", feed.2 ) ) - .clone() - ) + .map( | feed | format!( "'{}'", feed.2.clone() ) ) + .collect::< Vec< _ > >() .join( "," ) ; diff --git a/module/move/unitore/src/storage/frame.rs b/module/move/unitore/src/storage/frame.rs index 3367f8df4f..2d957ea003 100644 --- a/module/move/unitore/src/storage/frame.rs +++ b/module/move/unitore/src/storage/frame.rs @@ -254,7 +254,7 @@ impl From< Frame > for FrameRow let source = entry.source.clone().map( | s | text( s ) ).unwrap_or( null() ); let rights = entry.rights.clone().map( | r | text( r ) ).unwrap_or( null() ); - let media = entry.categories + let media = entry.media .map( | media | text ( media ) ) .unwrap_or( null() ) ; diff --git a/module/move/unitore/src/storage/mod.rs b/module/move/unitore/src/storage/mod.rs index 1bb8fbc209..1eedc29afd 100644 --- a/module/move/unitore/src/storage/mod.rs +++ b/module/move/unitore/src/storage/mod.rs @@ -1,3 +1,5 @@ +//! Storage for frames, feeds and config files. + use crate::*; use std::sync::Arc; use error_tools::{ for_app::Context, Result }; @@ -29,6 +31,14 @@ pub struct FeedStorage< S : GStore + GStoreMut + Send > frame_fields : Vec< [ &'static str; 3 ] >, } +impl< S : GStore + GStoreMut + Send > std::fmt::Debug for FeedStorage< S > +{ + fn fmt( &self, f: &mut std::fmt::Formatter<'_> ) -> std::fmt::Result + { + writeln!(f, "GlueSQL storage" ) + } +} + impl FeedStorage< SledStorage > { /// Initialize new storage from configuration, create feed table. diff --git a/module/move/unitore/tests/save_feed.rs b/module/move/unitore/tests/save_feed.rs index eefac47b6b..e6b20c18b6 100644 --- a/module/move/unitore/tests/save_feed.rs +++ b/module/move/unitore/tests/save_feed.rs @@ -4,7 +4,7 @@ use unitore:: { feed_config::SubscriptionConfig, retriever::FeedFetch, - storage::{ FeedStorage, MockStore, frame::FrameStore, feed::FeedStore }, + storage::{ FeedStorage, frame::FrameStore, feed::FeedStore }, }; use error_tools::Result; From fe6ba0c0289f4f85192633328a9a09a7e3af034b Mon Sep 17 00:00:00 2001 From: YuliaProkopovych Date: Tue, 19 Mar 2024 15:56:03 +0200 Subject: [PATCH 6/8] remove not needed row struct --- .../unitore/src/executor/actions/config.rs | 2 +- module/move/unitore/src/storage/feed.rs | 64 +------------------ module/move/unitore/src/storage/frame.rs | 44 ++++++------- 3 files changed, 24 insertions(+), 86 deletions(-) diff --git a/module/move/unitore/src/executor/actions/config.rs b/module/move/unitore/src/executor/actions/config.rs index b3450c934b..9b01caf173 100644 --- a/module/move/unitore/src/executor/actions/config.rs +++ b/module/move/unitore/src/executor/actions/config.rs @@ -54,7 +54,7 @@ pub async fn add_config( storage : FeedStorage< SledStorage >, args : &wca::Args .collect::< Vec< _ > >() ; - let new_feeds = manager.storage.add_feeds( feeds ).await?; + let new_feeds = manager.storage.save_feeds( feeds ).await?; Ok( ConfigReport{ payload : config_report, new_feeds : Some( new_feeds ) } ) } diff --git a/module/move/unitore/src/storage/feed.rs b/module/move/unitore/src/storage/feed.rs index 067c9f3a34..664696306d 100644 --- a/module/move/unitore/src/storage/feed.rs +++ b/module/move/unitore/src/storage/feed.rs @@ -77,7 +77,7 @@ pub trait FeedStore async fn get_all_feeds( &mut self ) -> Result< FeedsReport >; /// Add feeds entries. - async fn add_feeds( &mut self, feeds : Vec< Feed > ) -> Result< Payload >; + async fn save_feeds( &mut self, feeds : Vec< Feed > ) -> Result< Payload >; } #[ async_trait::async_trait( ?Send ) ] @@ -168,7 +168,7 @@ impl FeedStore for FeedStorage< SledStorage > if !existing_feeds.contains( link ) { - self.add_feeds( vec![ feed.clone().into() ] ).await?; + self.save_feeds( vec![ feed.clone().into() ] ).await?; frames_report.new_frames = feed.0.entries.len(); frames_report.is_new_feed = true; @@ -257,7 +257,7 @@ impl FeedStore for FeedStorage< SledStorage > Ok( UpdateReport( reports ) ) } - async fn add_feeds( &mut self, feed : Vec< Feed > ) -> Result< Payload > + async fn save_feeds( &mut self, feed : Vec< Feed > ) -> Result< Payload > { let feeds_rows : Vec< Vec< ExprNode< 'static > > > = feed.into_iter().map( | feed | feed.into() ).collect_vec(); @@ -283,64 +283,6 @@ impl FeedStore for FeedStorage< SledStorage > } } -/// Feed in format convenient for saving in storage. -#[ derive( Debug ) ] -pub struct FeedRow( pub Vec< ExprNode< 'static > > ); - -impl FeedRow -{ - /// Create new feed row for storage. - pub fn new( feed_link : String, update_period : Duration ) -> Self - { - FeedRow( vec! - [ - text( feed_link ), - null(), - null(), - null(), - null(), - null(), - text( update_period.as_secs().to_string() ), - ] ) - } -} - -impl From< ( feed_rs::model::Feed, Duration, String ) > for FeedRow -{ - fn from( value : ( feed_rs::model::Feed, Duration, String ) ) -> Self - { - let duration = value.1; - let link = value.2; - let value = value.0; - - let row = vec! - [ - value.links.iter().filter_map( | link | - { - if let Some( media_type ) = &link.media_type - { - if media_type == &String::from( "application/rss+xml" ) - { - return Some( text( link.href.clone() ) ); - } - } - None - } ) - .collect::< Vec< _ > >() - .get( 0 ) - .unwrap_or( &text( link ) ) - .clone(), - value.title.clone().map( | title | text( title.content ) ).unwrap_or( null() ), - value.updated.map( | d | timestamp( d.to_rfc3339_opts( SecondsFormat::Millis, true ) ) ).unwrap_or( null() ), - text( value.authors.iter().map( | p | p.name.clone() ).fold( String::new(), | acc, val | format!( "{}, {}", acc, val ) ) ), - value.description.clone().map( | desc | text( desc.content ) ).unwrap_or( null() ), - value.published.map( | d | timestamp( d.to_rfc3339_opts( SecondsFormat::Millis, true ) ) ).unwrap_or( null() ), - text( duration.as_secs().to_string() ), - ]; - FeedRow( row ) - } -} - impl From< ( feed_rs::model::Feed, Duration, url::Url ) > for Feed { fn from( val : ( feed_rs::model::Feed, Duration, url::Url ) ) -> Self diff --git a/module/move/unitore/src/storage/frame.rs b/module/move/unitore/src/storage/frame.rs index 2d957ea003..871556c00f 100644 --- a/module/move/unitore/src/storage/frame.rs +++ b/module/move/unitore/src/storage/frame.rs @@ -161,7 +161,7 @@ impl FrameStore for FeedStorage< SledStorage > async fn save_frames( &mut self, frames : Vec< Frame > ) -> Result< Payload > { - let entries_rows = frames.into_iter().map( | entry | FrameRow::from( entry ).0 ).collect_vec(); + let entries_rows : Vec< Vec< ExprNode< 'static > > > = frames.into_iter().map( | entry | entry.into() ).collect_vec(); let insert = table( "frame" ) .insert() @@ -180,7 +180,7 @@ impl FrameStore for FeedStorage< SledStorage > async fn update_frames( &mut self, feed : Vec< Frame > ) -> Result< () > { - let entries_rows = feed.into_iter().map( | entry | FrameRow::from( entry ).0 ).collect_vec(); + let entries_rows : Vec< Vec< ExprNode< 'static > > > = feed.into_iter().map( | entry | entry.into() ).collect_vec(); for entry in entries_rows { @@ -202,11 +202,7 @@ impl FrameStore for FeedStorage< SledStorage > } } -/// Frame row format for saving in storage. -#[ derive( Debug ) ] -pub struct FrameRow( pub Vec< ExprNode< 'static > > ); - -impl From< Frame > for FrameRow +impl From< Frame > for Vec< ExprNode< 'static > > { fn from( entry : Frame ) -> Self { @@ -261,23 +257,23 @@ impl From< Frame > for FrameRow let language = entry.language.clone().map( | l | text( l ) ).unwrap_or( null() ); - FrameRow( vec! - [ - text( entry.id ), - title, - updated, - authors, - content, - links, - summary, - categories, - published, - source, - rights, - media, - language, - text( entry.feed_link ) - ] ) + vec! + [ + text( entry.id ), + title, + updated, + authors, + content, + links, + summary, + categories, + published, + source, + rights, + media, + language, + text( entry.feed_link ) + ] } } From 690190ef0c3c53c455d9ce1a4dd5e854ba7ebbf5 Mon Sep 17 00:00:00 2001 From: YuliaProkopovych Date: Tue, 19 Mar 2024 16:22:08 +0200 Subject: [PATCH 7/8] remove unnecessary check --- module/move/unitore/src/storage/feed.rs | 55 ++---------------------- module/move/unitore/src/storage/frame.rs | 6 +-- 2 files changed, 5 insertions(+), 56 deletions(-) diff --git a/module/move/unitore/src/storage/feed.rs b/module/move/unitore/src/storage/feed.rs index 664696306d..59d612bb6d 100644 --- a/module/move/unitore/src/storage/feed.rs +++ b/module/move/unitore/src/storage/feed.rs @@ -20,7 +20,7 @@ use executor::actions:: feed::FeedsReport, frame::{ UpdateReport, SelectedEntries, FramesReport }, }; -use storage::{ FeedStorage, frame::{ FrameStore, RowValue } }; +use storage::{ FeedStorage, frame::FrameStore }; use wca::wtools::Itertools; /// Feed item. @@ -29,7 +29,7 @@ pub struct Feed { /// Link to feed source. pub link : url::Url, - /// Ttitle of feed. + /// Title of feed. pub title : Option< String >, /// Last time the feed was fetched. pub updated : Option< DateTime< Utc > >, @@ -105,8 +105,6 @@ impl FeedStore for FeedStorage< SledStorage > async fn update_feed( &mut self, feed : Vec< Feed > ) -> Result< () > { - //let feeds_rows = feed.into_iter().map( | feed | FeedRow::from( feed ).0 ).collect_vec(); - for feed in feed { let _update = table( "feed" ) @@ -132,22 +130,6 @@ impl FeedStore for FeedStorage< SledStorage > feeds : Vec< ( feed_rs::model::Feed, Duration, url::Url ) >, ) -> Result< UpdateReport > { - let new_feed_links = feeds - .iter() - .map( | feed | format!( "'{}'", feed.2.clone() ) ) - .collect::< Vec< _ > >() - .join( "," ) - ; - - let existing_feeds = table( "feed" ) - .select() - .filter( format!( "link IN ({})", new_feed_links ).as_str() ) - .project( "link" ) - .execute( &mut *self.storage.lock().await ) - .await - .context( "Failed to select links of existing feeds while saving new frames" )? - ; - let mut new_entries = Vec::new(); let mut modified_entries = Vec::new(); let mut reports = Vec::new(); @@ -155,39 +137,10 @@ impl FeedStore for FeedStorage< SledStorage > for feed in &feeds { let mut frames_report = FramesReport::new( feed.0.title.clone().unwrap().content ); - // check if feed is new - if let Some( existing_feeds ) = existing_feeds.select() - { - - let existing_feeds = existing_feeds - .filter_map( | feed | feed.get( "link" ).map( | link | String::from( RowValue( link ) ) )) - .collect_vec() - ; - - let link = &feed.2.to_string(); - - if !existing_feeds.contains( link ) - { - self.save_feeds( vec![ feed.clone().into() ] ).await?; - frames_report.new_frames = feed.0.entries.len(); - frames_report.is_new_feed = true; - - new_entries.extend - ( - feed.0.entries - .clone() - .into_iter() - .zip( std::iter::repeat( feed.0.id.clone() ).take( feed.0.entries.len() ) ) - .map( | entry | entry.into() ) - ); - reports.push( frames_report ); - continue; - } - } let existing_frames = table( "frame" ) .select() - .filter(col( "feed_link" ).eq( text( feed.0.id.clone() ) ) ) + .filter( col( "feed_link" ).eq( text( feed.2.to_string() ) ) ) .project( "id, published" ) .execute( &mut *self.storage.lock().await ) .await @@ -304,8 +257,6 @@ impl From< ( feed_rs::model::Feed, Duration, url::Url ) > for Feed authors : ( !authors.is_empty() ).then( || authors.join( ", " ) ), update_period : duration, } - - } } diff --git a/module/move/unitore/src/storage/frame.rs b/module/move/unitore/src/storage/frame.rs index 871556c00f..cb4d736b35 100644 --- a/module/move/unitore/src/storage/frame.rs +++ b/module/move/unitore/src/storage/frame.rs @@ -207,7 +207,6 @@ impl From< Frame > for Vec< ExprNode< 'static > > fn from( entry : Frame ) -> Self { let title = entry.title - .clone() .map( | title | text( title ) ) .unwrap_or( null() ) ; @@ -238,7 +237,6 @@ impl From< Frame > for Vec< ExprNode< 'static > > ; let categories = entry.categories - .clone() .map( | categories | text ( categories ) ) .unwrap_or( null() ) ; @@ -248,8 +246,8 @@ impl From< Frame > for Vec< ExprNode< 'static > > .unwrap_or( null() ) ; - let source = entry.source.clone().map( | s | text( s ) ).unwrap_or( null() ); - let rights = entry.rights.clone().map( | r | text( r ) ).unwrap_or( null() ); + let source = entry.source.map( | s | text( s ) ).unwrap_or( null() ); + let rights = entry.rights.map( | r | text( r ) ).unwrap_or( null() ); let media = entry.media .map( | media | text ( media ) ) .unwrap_or( null() ) From 4539324c5f8394c68d46097b3f6eb3695982f95f Mon Sep 17 00:00:00 2001 From: YuliaProkopovych Date: Tue, 19 Mar 2024 16:43:38 +0200 Subject: [PATCH 8/8] fix --- module/move/unitore/src/executor/actions/frame.rs | 3 +-- module/move/unitore/src/executor/actions/query.rs | 5 +++-- module/move/unitore/src/executor/mod.rs | 9 ++------- module/move/unitore/src/storage/config.rs | 1 - 4 files changed, 6 insertions(+), 12 deletions(-) diff --git a/module/move/unitore/src/executor/actions/frame.rs b/module/move/unitore/src/executor/actions/frame.rs index fdd35ed871..8ce982c1cc 100644 --- a/module/move/unitore/src/executor/actions/frame.rs +++ b/module/move/unitore/src/executor/actions/frame.rs @@ -1,13 +1,12 @@ //! Frames commands actions. use crate::*; -use self::storage::feed::FeedStore; - use super::*; use executor::FeedManager; use storage:: { FeedStorage, + feed::FeedStore, config::ConfigStore, frame::{ FrameStore, RowValue } }; diff --git a/module/move/unitore/src/executor/actions/query.rs b/module/move/unitore/src/executor/actions/query.rs index 192d3d9a7f..4b49bc9c37 100644 --- a/module/move/unitore/src/executor/actions/query.rs +++ b/module/move/unitore/src/executor/actions/query.rs @@ -1,14 +1,15 @@ //! Query command endpoint and report. use crate::*; +use super::*; use gluesql::core::executor::Payload; -use super::Report; use storage::{ FeedStorage, Store }; use executor::FeedManager; use error_tools::{ err, BasicError, Result }; /// Execute query specified in query string. -pub async fn execute_query( +pub async fn execute_query +( storage : FeedStorage< gluesql::sled_storage::SledStorage >, args : &wca::Args, ) -> Result< impl Report > diff --git a/module/move/unitore/src/executor/mod.rs b/module/move/unitore/src/executor/mod.rs index e49382447b..e83d8c859e 100644 --- a/module/move/unitore/src/executor/mod.rs +++ b/module/move/unitore/src/executor/mod.rs @@ -1,16 +1,13 @@ //! Execute plan. -use self::storage::frame::FrameStore; - use super::*; use feed_config::SubscriptionConfig; use gluesql::sled_storage::{ sled::Config, SledStorage }; use retriever::{ FeedClient, FeedFetch }; -use storage::{ Store, FeedStorage, feed::FeedStore, config::ConfigStore, table::TableStore }; +use storage::{ Store, FeedStorage, feed::FeedStore, config::ConfigStore, table::TableStore, frame::FrameStore }; use wca::{ Args, Type }; use executor::actions::Report; use error_tools::Result; -// use wca::prelude::*; pub mod actions; use actions:: @@ -22,12 +19,10 @@ use actions:: table::{ list_columns, list_tables }, }; -use std::future::Future; - fn action< 'a, F, Fut, R >( async_endpoint : F, args : &'a Args ) -> Result< R > where F : FnOnce( FeedStorage< SledStorage >, &'a Args ) -> Fut, - Fut : Future< Output = Result< R > >, + Fut : std::future::Future< Output = Result< R > >, R : actions::Report, { let path_to_storage = std::env::var( "UNITORE_STORAGE_PATH" ) diff --git a/module/move/unitore/src/storage/config.rs b/module/move/unitore/src/storage/config.rs index f5b812e475..39a4d3c1fd 100644 --- a/module/move/unitore/src/storage/config.rs +++ b/module/move/unitore/src/storage/config.rs @@ -11,7 +11,6 @@ use gluesql:: }, sled_storage::SledStorage, }; -use FeedStorage; /// Config file path. #[ derive( Debug ) ]