Skip to content

Commit

Permalink
Merge pull request #40 from svsticky/sonic-boom
Browse files Browse the repository at this point in the history
Oh its so goddamn fast now
  • Loading branch information
TobiasDeBruijn authored Jul 24, 2024
2 parents b747aa3 + 29fb215 commit 13b7ac4
Show file tree
Hide file tree
Showing 21 changed files with 337 additions and 225 deletions.
33 changes: 27 additions & 6 deletions frontend/src/components/UploadPhotoDialog.vue
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

<script lang="ts">
import Vue from 'vue';
import {createPhoto} from "@/views/photo/photo";
import {createPhoto, TooManyRequests} from "@/views/photo/photo";
interface Data {
snackbar: string | null,
Expand Down Expand Up @@ -72,14 +72,35 @@ export default Vue.extend({
this.uploadProgress = 0;
this.uploadTotal = this.photos.length;
const results = await Promise.all(this.photos.map(async photoFile => {
const photoBytes = new Uint8Array(await photoFile.arrayBuffer());
const result = await createPhoto(this.albumId, photoBytes);
let results = [];
for(const photoFile of this.photos) {
while(true) {
const photoBytes = new Uint8Array(await photoFile.arrayBuffer());
let result: boolean | undefined;
try {
result = await createPhoto(this.albumId, photoBytes);
} catch (e: any) {
if(e instanceof TooManyRequests) {
console.log(`Got HTTP 429. Waiting ${e.retryAfter} seconds`);
await new Promise(resolve => setTimeout(resolve, e.retryAfter));
}
continue;
}
if(result === true) {
break;
}
console.error("Got unknown error. Bailing");
return false;
}
this.uploadProgress++;
return result != undefined;
}));
results.push(photoFile);
}
this.loading = false;
const failures = results.filter(result => !result);
Expand Down
22 changes: 21 additions & 1 deletion frontend/src/views/photo/photo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,20 @@ export async function listPhotosInAlbum(albumId: string, low_res: boolean = fals
return result.photos.map(protoPhotoToPhotoModel);
}

export class TooManyRequests extends Error {
retryAfter: number;

constructor(retryAfter: number) {
super();
this.retryAfter = retryAfter;
}
}

/**
* Create a photo
* @param albumId The ID of the album
* @param photoData The bytes of the photo. May be `PNG` or `JPEG` format.
* @throws TooManyRequests If too many requests are issued
* @return `true` on success. `undefined` on failure.
*/
export async function createPhoto(albumId: string, photoData: Uint8Array): Promise<boolean | undefined> {
Expand All @@ -93,7 +103,17 @@ export async function createPhoto(albumId: string, photoData: Uint8Array): Promi
photoData
}), null);

return result.ok ? true : undefined;
if(result.ok) {
return true;
}

if(result.status == 429) {
throw new TooManyRequests(
Number.parseInt(result.headers.get('retry-after') ?? "1")
);
}

return undefined;
}

export async function deletePhoto(photoId: string): Promise<boolean | undefined> {
Expand Down
1 change: 1 addition & 0 deletions server/chroma/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ kamadak-exif = "0.5.5"
cabbage = "0.1.2"
moka = { version = "0.12.5", features = ["future"] }
dotenv = "0.15.0"
governor = "0.6.3"

[dev-dependencies]
serde_json = "1.0.93"
15 changes: 13 additions & 2 deletions server/chroma/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub struct Config {
/// but should be `false` or left unspecified when targeting
/// Amazon S3.
pub s3_force_path_style: Option<bool>,
/// Create a bucket on startup. This should only
/// be used when working with MinIO.
/// The provided access key should have bucket creation privileges.
pub s3_create_bucket_on_startup: Option<bool>,

/// OAuth2 client ID created in Koala
pub koala_client_id: String,
Expand Down Expand Up @@ -152,13 +156,20 @@ impl Config {
.unwrap_or(&self.koala_base_uri)
}

/// Force S3 path styles instead of virtual hosts
/// Force S3 path styles instead of virtual hosts.
///
/// See also: `s3_force_path_style` field
/// See also: `s3_force_path_style` field.
pub fn s3_force_path_style(&self) -> bool {
self.s3_force_path_style.unwrap_or(false)
}

/// Create an S3 bucket on application startup.
///
/// See also: `s3_create_bucket_on_startup` field.
pub fn s3_create_bucket_on_startup(&self) -> bool {
self.s3_create_bucket_on_startup.unwrap_or(false)
}

/// Get configured service tokens
pub fn service_tokens(&self) -> Vec<&str> {
self.service_tokens.split(',').collect()
Expand Down
4 changes: 3 additions & 1 deletion server/chroma/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
extern crate core;

use crate::config::Config;
use crate::routes::appdata::{AlbumIdCache, AppData, SessionIdCache, WebData};
use crate::routes::appdata::{AlbumIdCache, AppData, Ratelimits, SessionIdCache, WebData};
use crate::routes::routable::Routable;
use actix_cors::Cors;
use actix_web::{web, App, HttpServer};
Expand Down Expand Up @@ -53,6 +53,7 @@ async fn main() -> Result<()> {
access_key_id: config.s3_access_key_id.clone().unwrap(),
secret_access_key: config.s3_secret_access_key.clone().unwrap(),
use_path_style: config.s3_force_path_style(),
create_bucket: config.s3_create_bucket_on_startup(),
})
.await?;

Expand All @@ -61,6 +62,7 @@ async fn main() -> Result<()> {
db,
storage,
config,
ratelimits: Ratelimits::new(),
};

info!("Starting web server");
Expand Down
30 changes: 27 additions & 3 deletions server/chroma/src/routes/appdata.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
use crate::config::Config;
use crate::routes::authorization::Authorization;
use std::num::NonZeroU32;
use std::sync::Arc;

use actix_web::web;
use cabbage::KoalaApi;
use governor::clock::DefaultClock;
use governor::state::{InMemoryState, NotKeyed};
use governor::{Quota, RateLimiter};
use moka::future::Cache;

use dal::database::{Album, Database};
use dal::storage_engine::Storage;
use moka::future::Cache;

use crate::config::Config;
use crate::routes::authorization::Authorization;

pub type WebData = web::Data<AppData>;
pub type SessionIdCache = Cache<String, Authorization>;
Expand All @@ -16,4 +24,20 @@ pub struct AppData {
pub storage: Storage,
pub config: Config,
pub koala: KoalaApi,
pub ratelimits: Ratelimits,
}

#[derive(Debug, Clone)]
pub struct Ratelimits {
pub photo_create: Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock>>,
}

impl Ratelimits {
pub fn new() -> Self {
Self {
photo_create: Arc::new(RateLimiter::direct(Quota::per_second(
NonZeroU32::new(1).unwrap(),
))),
}
}
}
5 changes: 2 additions & 3 deletions server/chroma/src/routes/authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ impl FromRequest for Authorization {

// Check the cache
let session_cache: &web::Data<SessionIdCache> = req.app_data().unwrap();
match session_cache.get(authorization_id).await {
Some(v) => return Ok(v),
None => {}
if let Some(v) = session_cache.get(authorization_id).await {
return Ok(v);
}

// Check if we're dealing with a service token
Expand Down
15 changes: 14 additions & 1 deletion server/chroma/src/routes/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use actix_web::body::BoxBody;
use actix_web::http::StatusCode;
use actix_web::ResponseError;
use actix_web::{HttpResponse, ResponseError};
use thiserror::Error;

pub type WebResult<T> = Result<T, Error>;
Expand Down Expand Up @@ -28,6 +29,8 @@ pub enum Error {
ImageEncoding(#[from] image::ImageError),
#[error("Failed to decode WebP image")]
WebpDecode,
#[error("Slow down. Too many requests")]
Ratelimit { retry_after: u64 },
}

impl ResponseError for Error {
Expand All @@ -44,6 +47,16 @@ impl ResponseError for Error {
Self::ImageEncoding(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::WebpDecode => StatusCode::INTERNAL_SERVER_ERROR,
Self::Other(s) => *s,
Self::Ratelimit { .. } => StatusCode::TOO_MANY_REQUESTS,
}
}

fn error_response(&self) -> HttpResponse<BoxBody> {
match self {
Self::Ratelimit { retry_after } => HttpResponse::build(self.status_code())
.insert_header(("Retry-After".to_string(), format!("{retry_after}")))
.body("Too many requests"),
_ => ResponseError::error_response(self),
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions server/chroma/src/routes/v1/album/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::routes::authorization::Authorization;
use crate::routes::empty::Empty;
use crate::routes::error::{Error, WebResult};
use actix_multiresponse::Payload;
use dal::database::PhotoQuality;
use dal::database::{Album, Photo};
use dal::storage_engine::PhotoQuality;
use proto::DeleteAlbumRequest;

/// Delete an existing album.
Expand Down Expand Up @@ -39,13 +39,13 @@ pub async fn delete(
let photos = Photo::list_in_album(&data.db, &album.id).await?;
for photo in photos {
data.storage
.delete_photo(&photo.id, PhotoQuality::Original)
.delete_photo(&photo.id, &PhotoQuality::Original)
.await?;
data.storage
.delete_photo(&photo.id, PhotoQuality::W1600)
.delete_photo(&photo.id, &PhotoQuality::W1600)
.await?;
data.storage
.delete_photo(&photo.id, PhotoQuality::W400)
.delete_photo(&photo.id, &PhotoQuality::W400)
.await?;
}

Expand Down
4 changes: 2 additions & 2 deletions server/chroma/src/routes/v1/album/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::routes::authorization::Authorization;
use crate::routes::error::{Error, WebResult};
use actix_multiresponse::Payload;
use actix_web::web;
use dal::database::PhotoQuality;
use dal::database::{Album, Photo};
use dal::storage_engine::PhotoQuality;
use dal::DalError;
use futures::future::join_all;
use proto::{AlbumWithCoverPhoto, GetAlbumResponse};
Expand Down Expand Up @@ -68,7 +68,7 @@ pub async fn get(
.ok_or(Error::NotFound)?;

let photo = photo
.photo_to_proto_url(&data.storage, PhotoQuality::W400)
.photo_to_proto_url(&data.storage, &PhotoQuality::W400)
.await
.map_err(|e| match e {
DalError::Storage(e) => Error::from(e),
Expand Down
38 changes: 17 additions & 21 deletions server/chroma/src/routes/v1/album/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::routes::error::{Error, WebResult};
use crate::routes::v1::PhotoQuality;
use actix_multiresponse::Payload;
use actix_web::web;

use dal::database::{Album, Photo};
use dal::storage_engine::aws_error::GetObjectErrorKind;
use dal::storage_engine::error::{SdkError, StorageError};
Expand Down Expand Up @@ -53,7 +54,7 @@ pub async fn list(
)
.await?
.into_iter()
.filter_map(|f| f)
.flatten()
.collect::<Vec<_>>();

// Insert the newly fetched into the cache
Expand All @@ -64,12 +65,11 @@ pub async fn list(
.await;

// Merge the two sets
let mut albums = vec![
let mut albums = [
fetched_albums,
cached_albums
.into_iter()
.map(|(_, v)| v)
.filter_map(|v| v)
.filter_map(|(_, v)| v)
.collect::<Vec<_>>(),
]
.concat();
Expand All @@ -88,7 +88,7 @@ pub async fn list(
let albums = join_all(albums.into_iter().map(|album| {
let storage = data.storage.clone();
let database = data.db.clone();
let qpref: dal::storage_engine::PhotoQuality = query.quality_preference.clone().into();
let qpref: dal::database::PhotoQuality = query.quality_preference.clone().into();
let include_cover_photo = query.include_cover_photo;
let album_id_cache = &**album_id_cache;

Expand All @@ -98,28 +98,24 @@ pub async fn list(
if let Some(id) = &album.cover_photo_id {
match Photo::get_by_id(&database, id).await? {
Some(photo) => {
let quality = if !photo.is_quality_created(qpref.clone()).await? {
dal::storage_engine::PhotoQuality::Original
let quality = if !photo.is_quality_created(&qpref).await? {
dal::database::PhotoQuality::Original
} else {
qpref
};

let photo = match photo.photo_to_proto_url(&storage, quality).await {
let photo = match photo.photo_to_proto_url(&storage, &quality).await {
Ok(v) => v,
Err(e) => {
return match &e {
DalError::Storage(s) => match s {
StorageError::GetObject(s) => match s {
SdkError::ServiceError(s) => match s.err().kind {
GetObjectErrorKind::NoSuchKey(_) => {
album_id_cache.remove(&album.id).await;
album.delete(&database).await?;
Ok(None)
}
_ => Err(e),
},
_ => Err(e),
},
DalError::Storage(StorageError::GetObject(
SdkError::ServiceError(s),
)) => match s.err().kind {
GetObjectErrorKind::NoSuchKey(_) => {
album_id_cache.remove(&album.id).await;
album.delete(&database).await?;
Ok(None)
}
_ => Err(e),
},
_ => Err(e),
Expand Down Expand Up @@ -152,7 +148,7 @@ pub async fn list(
DalError::Db(e) => Error::from(e),
})?
.into_iter()
.filter_map(|v| v)
.flatten()
.collect::<Vec<_>>();

Ok(Payload(ListAlbumsResponse { albums }))
Expand Down
2 changes: 1 addition & 1 deletion server/chroma/src/routes/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub enum PhotoQuality {
W1600,
}

impl From<PhotoQuality> for dal::storage_engine::PhotoQuality {
impl From<PhotoQuality> for dal::database::PhotoQuality {
fn from(value: PhotoQuality) -> Self {
match value {
PhotoQuality::Original => Self::Original,
Expand Down
Loading

0 comments on commit 13b7ac4

Please sign in to comment.