Skip to content

Commit

Permalink
Refactor sync (#21)
Browse files Browse the repository at this point in the history
* replace in memory comparison with database queries where appropriate.
  • Loading branch information
ddboline authored Nov 16, 2023
1 parent 995c554 commit 6fb9d13
Show file tree
Hide file tree
Showing 11 changed files with 591 additions and 607 deletions.
4 changes: 2 additions & 2 deletions gdrive_lib/src/gdrive_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,8 +911,8 @@ impl GDriveInfo {
let urlname = format_sstr!("gdrive://{}/", gdrive.session_name);
let urlname = Url::parse(&urlname)?;
let urlname = export_path.iter().try_fold(urlname, |u, e| {
if e.contains('#') {
u.join(&e.replace('#', "%35"))
if e.contains('#') || e.contains('?') {
u.join(&e.replace('#', "%23").replace('?', "%3F"))
} else {
u.join(e)
}
Expand Down
15 changes: 6 additions & 9 deletions sync_app_lib/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,6 @@ impl Default for FileInfoInner {
#[derive(Clone, Debug, PartialEq, Eq, Default, Deref)]
pub struct FileInfo(Arc<FileInfoInner>);

// impl Deref for FileInfo {
// type Target = FileInfoInner;
// fn deref(&self) -> &Self::Target {
// &self.0
// }
// }

pub enum FileInfoKeyType {
FileName,
FilePath,
Expand Down Expand Up @@ -267,8 +260,12 @@ impl TryFrom<FileInfoCache> for FileInfo {
impl FileInfo {
/// # Errors
/// Return error if db query fails
pub async fn from_database(pool: &PgPool, url: &Url) -> Result<Option<Self>, Error> {
FileInfoCache::get_by_urlname(url, pool)
pub async fn from_database(
pool: &PgPool,
url: &Url,
servicesession: &str,
) -> Result<Option<Self>, Error> {
FileInfoCache::get_by_urlname(url, servicesession, pool)
.await?
.map(TryInto::try_into)
.transpose()
Expand Down
143 changes: 4 additions & 139 deletions sync_app_lib/src/file_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ use crate::{
#[derive(Clone, Debug)]
pub struct FileList {
baseurl: Url,
filemap: Arc<HashMap<StackString, FileInfo>>,
inner: Arc<FileListInner>,
min_mtime: Option<u32>,
max_mtime: Option<u32>,
}

impl Deref for FileList {
Expand All @@ -55,21 +52,17 @@ impl FileList {
config: Config,
servicetype: FileService,
servicesession: ServiceSession,
filemap: HashMap<StackString, FileInfo>,
pool: PgPool,
) -> Self {
Self {
baseurl,
filemap: Arc::new(filemap),
inner: Arc::new(FileListInner {
basepath,
config,
servicetype,
servicesession,
pool,
}),
min_mtime: None,
max_mtime: None,
}
}

Expand Down Expand Up @@ -125,11 +118,6 @@ pub trait FileListTrait: Send + Sync + Debug {
fn get_config(&self) -> &Config;

fn get_pool(&self) -> &PgPool;
fn get_filemap(&self) -> &HashMap<StackString, FileInfo>;
fn get_min_mtime(&self) -> Option<u32>;
fn get_max_mtime(&self) -> Option<u32>;

fn with_list(&mut self, filelist: Vec<FileInfo>);

// Copy operation where the origin (finfo0) has the same servicetype as self
async fn copy_from(
Expand Down Expand Up @@ -162,7 +150,8 @@ pub trait FileListTrait: Send + Sync + Debug {
panic!("not implemented for {:?}", finfo);
}

async fn fill_file_list(&self) -> Result<Vec<FileInfo>, Error>;
/// Return updated FileInfo entries
async fn update_file_cache(&self) -> Result<usize, Error>;

async fn print_list(&self, _: &StdoutChannel<StackString>) -> Result<(), Error> {
unimplemented!()
Expand Down Expand Up @@ -192,89 +181,6 @@ pub trait FileListTrait: Send + Sync + Debug {
}
}

async fn cache_file_list(&self) -> Result<usize, Error> {
let pool = self.get_pool();

// Load existing file_list, create hashmap
let current_cache: HashMap<_, _> = self
.load_file_list(false)
.await?
.into_iter()
.filter_map(|item| {
let key = item.get_key();
key.map(|k| (k, item))
})
.collect();

// Load and convert current filemap
let flist_cache_map: HashMap<_, _> = self
.get_filemap()
.iter()
.filter_map(|(_, f)| {
let item: FileInfoCache = f.into();

let key = item.get_key();
key.map(|k| (k, item))
})
.collect();

// Delete entries from current_cache not in filemap
let mut deleted_entries = 0;
for k in current_cache.keys() {
if flist_cache_map.contains_key(k) {
continue;
}
info!("remove {:?}", k);
k.delete_cache_entry(pool).await?;
deleted_entries += 1;
}

info!(
"flist_cache_map {} {} {} {} {}",
self.get_servicesession().as_str(),
self.get_servicetype(),
flist_cache_map.len(),
current_cache.len(),
deleted_entries
);

for (k, v) in &flist_cache_map {
if let Some(item) = current_cache.get(k) {
if v.md5sum != item.md5sum
|| v.sha1sum != item.sha1sum
|| v.filestat_st_mtime != item.filestat_st_mtime
|| v.filestat_st_size != item.filestat_st_size
{
let mut cache = v
.get_cache(pool)
.await?
.ok_or_else(|| format_err!("Cache doesn't exist"))?;
if let Some(md5sum) = &v.md5sum {
cache.md5sum = Some(md5sum.clone());
}
if let Some(sha1sum) = &v.sha1sum {
cache.sha1sum = Some(sha1sum.clone());
}
cache.filestat_st_mtime = v.filestat_st_mtime;
cache.filestat_st_size = v.filestat_st_size;

info!("GOT HERE {:?}", cache);
cache.insert(pool).await?;
}
}
}

let mut inserted = 0;
for (k, v) in flist_cache_map {
if current_cache.contains_key(&k) {
continue;
}
v.insert(pool).await?;
inserted += 1;
}
Ok(inserted)
}

async fn load_file_list(&self, get_deleted: bool) -> Result<Vec<FileInfoCache>, Error> {
let session = self.get_servicesession();
let stype = self.get_servicetype();
Expand Down Expand Up @@ -439,49 +345,8 @@ impl FileListTrait for FileList {
&self.pool
}

fn get_filemap(&self) -> &HashMap<StackString, FileInfo> {
&self.filemap
}

fn get_min_mtime(&self) -> Option<u32> {
self.min_mtime
}

fn get_max_mtime(&self) -> Option<u32> {
self.max_mtime
}

fn with_list(&mut self, filelist: Vec<FileInfo>) {
let mut min_mtime: Option<u32> = None;
let mut max_mtime: Option<u32> = None;
let filemap = filelist
.into_iter()
.map(|f| {
let path = f.filepath.to_string_lossy();
let key = remove_basepath(&path, &self.get_basepath().to_string_lossy());
let mut inner = f.inner().clone();
inner.servicesession = self.get_servicesession().clone();
if min_mtime.is_none() || min_mtime > Some(inner.filestat.st_mtime) {
min_mtime.replace(inner.filestat.st_mtime);
}
if max_mtime.is_none() || max_mtime < Some(inner.filestat.st_mtime) {
max_mtime.replace(inner.filestat.st_mtime);
}
let f = FileInfo::from_inner(inner);
(key, f)
})
.collect();
self.filemap = Arc::new(filemap);
self.min_mtime = min_mtime;
self.max_mtime = max_mtime;
}

async fn fill_file_list(&self) -> Result<Vec<FileInfo>, Error> {
self.load_file_list(false)
.await?
.into_iter()
.map(TryInto::try_into)
.collect()
async fn update_file_cache(&self) -> Result<usize, Error> {
Ok(0)
}
}

Expand Down
74 changes: 39 additions & 35 deletions sync_app_lib/src/file_list_gcs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use anyhow::{format_err, Error};
use async_trait::async_trait;
use checksums::{hash_file, Algorithm};
use log::info;
use futures::TryStreamExt;
use log::{debug, info};
use stack_string::{format_sstr, StackString};
use std::{
collections::HashMap,
Expand All @@ -15,10 +16,11 @@ use gdrive_lib::gcs_instance::GcsInstance;

use crate::{
config::Config,
file_info::{FileInfo, FileInfoTrait, ServiceSession},
file_info::{FileInfoTrait, ServiceSession},
file_info_gcs::FileInfoGcs,
file_list::{FileList, FileListTrait},
file_service::FileService,
models::FileInfoCache,
pgpool::PgPool,
};

Expand All @@ -42,7 +44,6 @@ impl FileListGcs {
config.clone(),
FileService::GCS,
bucket.parse()?,
HashMap::new(),
pool.clone(),
);
let gcs = GcsInstance::new(&config.gcs_token_path, &config.gcs_secret_file, bucket).await?;
Expand All @@ -62,7 +63,6 @@ impl FileListGcs {
config.clone(),
FileService::GCS,
bucket.parse()?,
HashMap::new(),
pool.clone(),
);
let config = config.clone();
Expand Down Expand Up @@ -101,35 +101,41 @@ impl FileListTrait for FileListGcs {
&self.flist.pool
}

fn get_filemap(&self) -> &HashMap<StackString, FileInfo> {
self.flist.get_filemap()
}

fn get_min_mtime(&self) -> Option<u32> {
self.flist.get_min_mtime()
}

fn get_max_mtime(&self) -> Option<u32> {
self.flist.get_max_mtime()
}

fn with_list(&mut self, filelist: Vec<FileInfo>) {
self.flist.with_list(filelist);
}

async fn fill_file_list(&self) -> Result<Vec<FileInfo>, Error> {
async fn update_file_cache(&self) -> Result<usize, Error> {
let bucket = self
.get_baseurl()
.host_str()
.ok_or_else(|| format_err!("Parse error"))?;
let prefix = self.get_baseurl().path().trim_start_matches('/');
let mut number_updated = 0;

let pool = self.get_pool();
let cached_urls: HashMap<StackString, _> = FileInfoCache::get_all_cached(
self.get_servicesession().as_str(),
self.get_servicetype().to_str(),
pool,
false,
)
.await?
.map_ok(|f| (f.urlname.clone(), f))
.try_collect()
.await?;
debug!("expected {}", cached_urls.len());

self.gcs
.get_list_of_keys(bucket, Some(prefix))
.await?
.into_iter()
.map(|f| FileInfoGcs::from_object(bucket, f).map(FileInfoTrait::into_finfo))
.collect()
for object in self.gcs.get_list_of_keys(bucket, Some(prefix)).await? {
let info: FileInfoCache = FileInfoGcs::from_object(bucket, object)?
.into_finfo()
.into();
if let Some(existing) = cached_urls.get(&info.urlname) {
if existing.deleted_at.is_none()
&& existing.filestat_st_size == info.filestat_st_size
{
continue;
}
}
number_updated += info.upsert(pool).await?;
}
Ok(number_updated)
}

async fn print_list(&self, stdout: &StdoutChannel<StackString>) -> Result<(), Error> {
Expand Down Expand Up @@ -286,20 +292,18 @@ mod tests {
.and_then(|b| b.name.clone())
.unwrap_or_else(|| "".to_string());

let mut flist = FileListGcs::new(&bucket, &config, &pool).await?;
let flist = FileListGcs::new(&bucket, &config, &pool).await?;

let new_flist = flist.fill_file_list().await?;

info!("{} {:?}", bucket, new_flist.get(0));
assert!(new_flist.len() > 0);
flist.clear_file_list().await?;

flist.with_list(new_flist);
let number_updated = flist.update_file_cache().await?;

flist.cache_file_list().await?;
info!("{} {}", bucket, number_updated);
assert!(number_updated > 0);

let new_flist = flist.load_file_list(false).await?;

assert_eq!(flist.flist.get_filemap().len(), new_flist.len());
assert_eq!(number_updated, new_flist.len());

flist.clear_file_list().await?;
Ok(())
Expand Down
Loading

0 comments on commit 6fb9d13

Please sign in to comment.