Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oh its so goddamn fast now #40

Merged
merged 8 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions frontend/src/components/UploadPhotoDialog.vue
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

<script lang="ts">
import Vue from 'vue';
import {createPhoto} from "@/views/photo/photo";
import {createPhoto, TooManyRequests} from "@/views/photo/photo";

interface Data {
snackbar: string | null,
Expand Down Expand Up @@ -72,14 +72,36 @@ export default Vue.extend({
this.uploadProgress = 0;
this.uploadTotal = this.photos.length;

const results = await Promise.all(this.photos.map(async photoFile => {
const photoBytes = new Uint8Array(await photoFile.arrayBuffer());
const result = await createPhoto(this.albumId, photoBytes);
let results = [];
for(const photoFile of this.photos) {
while(true) {
const photoBytes = new Uint8Array(await photoFile.arrayBuffer());
let result: boolean | undefined;
try {
result = await createPhoto(this.albumId, photoBytes);
} catch (e: any) {
if(e instanceof TooManyRequests) {
console.log(`Got HTTP 429. Waiting ${e.retryAfter} seconds`);
await new Promise(resolve => setTimeout(resolve, e.retryAfter));
continue;
TobiasDeBruijn marked this conversation as resolved.
Show resolved Hide resolved
} else {
continue;
}
}

if(result === true) {
break;
}

console.error("Got unknown error. Bailing");
return false;
}

this.uploadProgress++;

return result != undefined;
}));
results.push(photoFile);
}

this.loading = false;

const failures = results.filter(result => !result);
Expand Down
22 changes: 21 additions & 1 deletion frontend/src/views/photo/photo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,20 @@ export async function listPhotosInAlbum(albumId: string, low_res: boolean = fals
return result.photos.map(protoPhotoToPhotoModel);
}

export class TooManyRequests extends Error {
retryAfter: number;

constructor(retryAfter: number) {
super();
this.retryAfter = retryAfter;
}
}

/**
* Create a photo
* @param albumId The ID of the album
* @param photoData The bytes of the photo. May be `PNG` or `JPEG` format.
* @throws TooManyRequests If too many requests are issued
* @return `true` on success. `undefined` on failure.
*/
export async function createPhoto(albumId: string, photoData: Uint8Array): Promise<boolean | undefined> {
Expand All @@ -93,7 +103,17 @@ export async function createPhoto(albumId: string, photoData: Uint8Array): Promi
photoData
}), null);

return result.ok ? true : undefined;
if(result.ok) {
return true;
}

if(result.status == 429) {
throw new TooManyRequests(
Number.parseInt(result.headers.get('retry-after') ?? "1")
);
}

return undefined;
}

export async function deletePhoto(photoId: string): Promise<boolean | undefined> {
Expand Down
1 change: 1 addition & 0 deletions server/chroma/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ kamadak-exif = "0.5.5"
cabbage = "0.1.2"
moka = { version = "0.12.5", features = ["future"] }
dotenv = "0.15.0"
governor = "0.6.3"

[dev-dependencies]
serde_json = "1.0.93"
15 changes: 13 additions & 2 deletions server/chroma/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub struct Config {
/// but should be `false` or left unspecified when targeting
/// Amazon S3.
pub s3_force_path_style: Option<bool>,
/// Create a bucket on startup. This should only
/// be used when working with MinIO.
/// The provided access key should have bucket creation privileges.
pub s3_create_bucket_on_startup: Option<bool>,

/// OAuth2 client ID created in Koala
pub koala_client_id: String,
Expand Down Expand Up @@ -152,13 +156,20 @@ impl Config {
.unwrap_or(&self.koala_base_uri)
}

/// Force S3 path styles instead of virtual hosts
/// Force S3 path styles instead of virtual hosts.
///
/// See also: `s3_force_path_style` field
/// See also: `s3_force_path_style` field.
pub fn s3_force_path_style(&self) -> bool {
self.s3_force_path_style.unwrap_or(false)
}

/// Create an S3 bucket on application startup.
///
/// See also: `s3_create_bucket_on_startup` field.
pub fn s3_create_bucket_on_startup(&self) -> bool {
self.s3_create_bucket_on_startup.unwrap_or(false)
}

/// Get configured service tokens
pub fn service_tokens(&self) -> Vec<&str> {
self.service_tokens.split(',').collect()
Expand Down
4 changes: 3 additions & 1 deletion server/chroma/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
extern crate core;

use crate::config::Config;
use crate::routes::appdata::{AlbumIdCache, AppData, SessionIdCache, WebData};
use crate::routes::appdata::{AlbumIdCache, AppData, Ratelimits, SessionIdCache, WebData};
use crate::routes::routable::Routable;
use actix_cors::Cors;
use actix_web::{web, App, HttpServer};
Expand Down Expand Up @@ -53,6 +53,7 @@ async fn main() -> Result<()> {
access_key_id: config.s3_access_key_id.clone().unwrap(),
secret_access_key: config.s3_secret_access_key.clone().unwrap(),
use_path_style: config.s3_force_path_style(),
create_bucket: config.s3_create_bucket_on_startup(),
})
.await?;

Expand All @@ -61,6 +62,7 @@ async fn main() -> Result<()> {
db,
storage,
config,
ratelimits: Ratelimits::new(),
};

info!("Starting web server");
Expand Down
30 changes: 27 additions & 3 deletions server/chroma/src/routes/appdata.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
use crate::config::Config;
use crate::routes::authorization::Authorization;
use std::num::NonZeroU32;
use std::sync::Arc;

use actix_web::web;
use cabbage::KoalaApi;
use governor::clock::DefaultClock;
use governor::state::{InMemoryState, NotKeyed};
use governor::{Quota, RateLimiter};
use moka::future::Cache;

use dal::database::{Album, Database};
use dal::storage_engine::Storage;
use moka::future::Cache;

use crate::config::Config;
use crate::routes::authorization::Authorization;

pub type WebData = web::Data<AppData>;
pub type SessionIdCache = Cache<String, Authorization>;
Expand All @@ -16,4 +24,20 @@ pub struct AppData {
pub storage: Storage,
pub config: Config,
pub koala: KoalaApi,
pub ratelimits: Ratelimits,
}

#[derive(Debug, Clone)]
pub struct Ratelimits {
pub photo_create: Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock>>,
SpookyBoy99 marked this conversation as resolved.
Show resolved Hide resolved
}

impl Ratelimits {
pub fn new() -> Self {
Self {
photo_create: Arc::new(RateLimiter::direct(Quota::per_second(
NonZeroU32::new(1).unwrap(),
))),
}
}
}
5 changes: 2 additions & 3 deletions server/chroma/src/routes/authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ impl FromRequest for Authorization {

// Check the cache
let session_cache: &web::Data<SessionIdCache> = req.app_data().unwrap();
match session_cache.get(authorization_id).await {
Some(v) => return Ok(v),
None => {}
if let Some(v) = session_cache.get(authorization_id).await {
return Ok(v);
}

// Check if we're dealing with a service token
Expand Down
15 changes: 14 additions & 1 deletion server/chroma/src/routes/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use actix_web::body::BoxBody;
use actix_web::http::StatusCode;
use actix_web::ResponseError;
use actix_web::{HttpResponse, ResponseError};
use thiserror::Error;

pub type WebResult<T> = Result<T, Error>;
Expand Down Expand Up @@ -28,6 +29,8 @@ pub enum Error {
ImageEncoding(#[from] image::ImageError),
#[error("Failed to decode WebP image")]
WebpDecode,
#[error("Slow down. Too many requests")]
Ratelimit { retry_after: u64 },
}

impl ResponseError for Error {
Expand All @@ -44,6 +47,16 @@ impl ResponseError for Error {
Self::ImageEncoding(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::WebpDecode => StatusCode::INTERNAL_SERVER_ERROR,
Self::Other(s) => *s,
Self::Ratelimit { .. } => StatusCode::TOO_MANY_REQUESTS,
}
}

fn error_response(&self) -> HttpResponse<BoxBody> {
match self {
Self::Ratelimit { retry_after } => HttpResponse::build(self.status_code())
.insert_header(("Retry-After".to_string(), format!("{retry_after}")))
.body("Too many requests"),
_ => ResponseError::error_response(self),
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions server/chroma/src/routes/v1/album/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::routes::authorization::Authorization;
use crate::routes::empty::Empty;
use crate::routes::error::{Error, WebResult};
use actix_multiresponse::Payload;
use dal::database::PhotoQuality;
use dal::database::{Album, Photo};
use dal::storage_engine::PhotoQuality;
use proto::DeleteAlbumRequest;

/// Delete an existing album.
Expand Down Expand Up @@ -39,13 +39,13 @@ pub async fn delete(
let photos = Photo::list_in_album(&data.db, &album.id).await?;
for photo in photos {
data.storage
.delete_photo(&photo.id, PhotoQuality::Original)
.delete_photo(&photo.id, &PhotoQuality::Original)
.await?;
data.storage
.delete_photo(&photo.id, PhotoQuality::W1600)
.delete_photo(&photo.id, &PhotoQuality::W1600)
.await?;
data.storage
.delete_photo(&photo.id, PhotoQuality::W400)
.delete_photo(&photo.id, &PhotoQuality::W400)
.await?;
}

Expand Down
4 changes: 2 additions & 2 deletions server/chroma/src/routes/v1/album/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::routes::authorization::Authorization;
use crate::routes::error::{Error, WebResult};
use actix_multiresponse::Payload;
use actix_web::web;
use dal::database::PhotoQuality;
use dal::database::{Album, Photo};
use dal::storage_engine::PhotoQuality;
use dal::DalError;
use futures::future::join_all;
use proto::{AlbumWithCoverPhoto, GetAlbumResponse};
Expand Down Expand Up @@ -68,7 +68,7 @@ pub async fn get(
.ok_or(Error::NotFound)?;

let photo = photo
.photo_to_proto_url(&data.storage, PhotoQuality::W400)
.photo_to_proto_url(&data.storage, &PhotoQuality::W400)
.await
.map_err(|e| match e {
DalError::Storage(e) => Error::from(e),
Expand Down
38 changes: 17 additions & 21 deletions server/chroma/src/routes/v1/album/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::routes::error::{Error, WebResult};
use crate::routes::v1::PhotoQuality;
use actix_multiresponse::Payload;
use actix_web::web;

use dal::database::{Album, Photo};
use dal::storage_engine::aws_error::GetObjectErrorKind;
use dal::storage_engine::error::{SdkError, StorageError};
Expand Down Expand Up @@ -53,7 +54,7 @@ pub async fn list(
)
.await?
.into_iter()
.filter_map(|f| f)
.flatten()
.collect::<Vec<_>>();

// Insert the newly fetched into the cache
Expand All @@ -64,12 +65,11 @@ pub async fn list(
.await;

// Merge the two sets
let mut albums = vec![
let mut albums = [
fetched_albums,
cached_albums
.into_iter()
.map(|(_, v)| v)
.filter_map(|v| v)
.filter_map(|(_, v)| v)
.collect::<Vec<_>>(),
]
.concat();
Expand All @@ -88,7 +88,7 @@ pub async fn list(
let albums = join_all(albums.into_iter().map(|album| {
let storage = data.storage.clone();
let database = data.db.clone();
let qpref: dal::storage_engine::PhotoQuality = query.quality_preference.clone().into();
let qpref: dal::database::PhotoQuality = query.quality_preference.clone().into();
let include_cover_photo = query.include_cover_photo;
let album_id_cache = &**album_id_cache;

Expand All @@ -98,28 +98,24 @@ pub async fn list(
if let Some(id) = &album.cover_photo_id {
match Photo::get_by_id(&database, id).await? {
Some(photo) => {
let quality = if !photo.is_quality_created(qpref.clone()).await? {
dal::storage_engine::PhotoQuality::Original
let quality = if !photo.is_quality_created(&qpref).await? {
dal::database::PhotoQuality::Original
} else {
qpref
};

let photo = match photo.photo_to_proto_url(&storage, quality).await {
let photo = match photo.photo_to_proto_url(&storage, &quality).await {
Ok(v) => v,
Err(e) => {
return match &e {
DalError::Storage(s) => match s {
StorageError::GetObject(s) => match s {
SdkError::ServiceError(s) => match s.err().kind {
GetObjectErrorKind::NoSuchKey(_) => {
album_id_cache.remove(&album.id).await;
album.delete(&database).await?;
Ok(None)
}
_ => Err(e),
},
_ => Err(e),
},
DalError::Storage(StorageError::GetObject(
SdkError::ServiceError(s),
)) => match s.err().kind {
GetObjectErrorKind::NoSuchKey(_) => {
album_id_cache.remove(&album.id).await;
album.delete(&database).await?;
Ok(None)
}
_ => Err(e),
},
_ => Err(e),
Expand Down Expand Up @@ -152,7 +148,7 @@ pub async fn list(
DalError::Db(e) => Error::from(e),
})?
.into_iter()
.filter_map(|v| v)
.flatten()
.collect::<Vec<_>>();

Ok(Payload(ListAlbumsResponse { albums }))
Expand Down
2 changes: 1 addition & 1 deletion server/chroma/src/routes/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub enum PhotoQuality {
W1600,
}

impl From<PhotoQuality> for dal::storage_engine::PhotoQuality {
impl From<PhotoQuality> for dal::database::PhotoQuality {
fn from(value: PhotoQuality) -> Self {
match value {
PhotoQuality::Original => Self::Original,
Expand Down
Loading
Loading