Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oh its so goddamn fast now #40

Merged
merged 8 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions frontend/src/components/UploadPhotoDialog.vue
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

<script lang="ts">
import Vue from 'vue';
import {createPhoto} from "@/views/photo/photo";
import {createPhoto, TooManyRequests} from "@/views/photo/photo";

interface Data {
snackbar: string | null,
Expand Down Expand Up @@ -72,14 +72,31 @@ export default Vue.extend({
this.uploadProgress = 0;
this.uploadTotal = this.photos.length;

const results = await Promise.all(this.photos.map(async photoFile => {
const photoBytes = new Uint8Array(await photoFile.arrayBuffer());
const result = await createPhoto(this.albumId, photoBytes);
let results = [];
for(const photoFile of this.photos) {
while(true) {
const photoBytes = new Uint8Array(await photoFile.arrayBuffer());
const result: boolean | TooManyRequests | undefined = await createPhoto(this.albumId, photoBytes);

if(result === true) {
break;
}

if(result instanceof TooManyRequests) {
console.log("Got HTTP 429. Waiting 1s");
await new Promise(resolve => setTimeout(resolve, 1000));
TobiasDeBruijn marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

console.error("Got unknown error. Bailing");
return false;
}

this.uploadProgress++;

return result != undefined;
}));
results.push(photoFile);
}

this.loading = false;

const failures = results.filter(result => !result);
Expand Down
14 changes: 12 additions & 2 deletions frontend/src/views/photo/photo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,29 @@ export async function listPhotosInAlbum(albumId: string, low_res: boolean = fals
return result.photos.map(protoPhotoToPhotoModel);
}

export class TooManyRequests {}

/**
* Create a photo
* @param albumId The ID of the album
* @param photoData The bytes of the photo. May be `PNG` or `JPEG` format.
* @return `true` on success. `undefined` on failure.
*/
export async function createPhoto(albumId: string, photoData: Uint8Array): Promise<boolean | undefined> {
export async function createPhoto(albumId: string, photoData: Uint8Array): Promise<boolean | TooManyRequests | undefined> {
const result = await Http.post('/api/v1/photo', new CreatePhotoRequest({
albumId,
photoData
}), null);

return result.ok ? true : undefined;
if(result.ok) {
return true;
}

if(result.status == 429) {
return new TooManyRequests();
TobiasDeBruijn marked this conversation as resolved.
Show resolved Hide resolved
}

return undefined;
}

export async function deletePhoto(photoId: string): Promise<boolean | undefined> {
Expand Down
1 change: 1 addition & 0 deletions server/chroma/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ kamadak-exif = "0.5.5"
cabbage = "0.1.2"
moka = { version = "0.12.5", features = ["future"] }
dotenv = "0.15.0"
governor = "0.6.3"

[dev-dependencies]
serde_json = "1.0.93"
3 changes: 2 additions & 1 deletion server/chroma/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
extern crate core;

use crate::config::Config;
use crate::routes::appdata::{AlbumIdCache, AppData, SessionIdCache, WebData};
use crate::routes::appdata::{AlbumIdCache, AppData, Ratelimits, SessionIdCache, WebData};
use crate::routes::routable::Routable;
use actix_cors::Cors;
use actix_web::{web, App, HttpServer};
Expand Down Expand Up @@ -61,6 +61,7 @@ async fn main() -> Result<()> {
db,
storage,
config,
ratelimits: Ratelimits::new(),
};

info!("Starting web server");
Expand Down
30 changes: 27 additions & 3 deletions server/chroma/src/routes/appdata.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
use crate::config::Config;
use crate::routes::authorization::Authorization;
use std::num::NonZeroU32;
use std::sync::Arc;

use actix_web::web;
use cabbage::KoalaApi;
use governor::clock::DefaultClock;
use governor::state::{InMemoryState, NotKeyed};
use governor::{Quota, RateLimiter};
use moka::future::Cache;

use dal::database::{Album, Database};
use dal::storage_engine::Storage;
use moka::future::Cache;

use crate::config::Config;
use crate::routes::authorization::Authorization;

pub type WebData = web::Data<AppData>;
pub type SessionIdCache = Cache<String, Authorization>;
Expand All @@ -16,4 +24,20 @@ pub struct AppData {
pub storage: Storage,
pub config: Config,
pub koala: KoalaApi,
pub ratelimits: Ratelimits,
}

#[derive(Debug, Clone)]
pub struct Ratelimits {
pub photo_create: Arc<RateLimiter<NotKeyed, InMemoryState, DefaultClock>>,
SpookyBoy99 marked this conversation as resolved.
Show resolved Hide resolved
}

impl Ratelimits {
pub fn new() -> Self {
Self {
photo_create: Arc::new(RateLimiter::direct(Quota::per_second(
NonZeroU32::new(1).unwrap(),
))),
}
}
}
5 changes: 2 additions & 3 deletions server/chroma/src/routes/authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ impl FromRequest for Authorization {

// Check the cache
let session_cache: &web::Data<SessionIdCache> = req.app_data().unwrap();
match session_cache.get(authorization_id).await {
Some(v) => return Ok(v),
None => {}
if let Some(v) = session_cache.get(authorization_id).await {
return Ok(v);
}

// Check if we're dealing with a service token
Expand Down
3 changes: 3 additions & 0 deletions server/chroma/src/routes/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub enum Error {
ImageEncoding(#[from] image::ImageError),
#[error("Failed to decode WebP image")]
WebpDecode,
#[error("Slow down. Too many requests")]
Ratelimit,
}

impl ResponseError for Error {
Expand All @@ -44,6 +46,7 @@ impl ResponseError for Error {
Self::ImageEncoding(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::WebpDecode => StatusCode::INTERNAL_SERVER_ERROR,
Self::Other(s) => *s,
Self::Ratelimit => StatusCode::TOO_MANY_REQUESTS,
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions server/chroma/src/routes/v1/album/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::routes::authorization::Authorization;
use crate::routes::empty::Empty;
use crate::routes::error::{Error, WebResult};
use actix_multiresponse::Payload;
use dal::database::PhotoQuality;
use dal::database::{Album, Photo};
use dal::storage_engine::PhotoQuality;
use proto::DeleteAlbumRequest;

/// Delete an existing album.
Expand Down Expand Up @@ -39,13 +39,13 @@ pub async fn delete(
let photos = Photo::list_in_album(&data.db, &album.id).await?;
for photo in photos {
data.storage
.delete_photo(&photo.id, PhotoQuality::Original)
.delete_photo(&photo.id, &PhotoQuality::Original)
.await?;
data.storage
.delete_photo(&photo.id, PhotoQuality::W1600)
.delete_photo(&photo.id, &PhotoQuality::W1600)
.await?;
data.storage
.delete_photo(&photo.id, PhotoQuality::W400)
.delete_photo(&photo.id, &PhotoQuality::W400)
.await?;
}

Expand Down
4 changes: 2 additions & 2 deletions server/chroma/src/routes/v1/album/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::routes::authorization::Authorization;
use crate::routes::error::{Error, WebResult};
use actix_multiresponse::Payload;
use actix_web::web;
use dal::database::PhotoQuality;
use dal::database::{Album, Photo};
use dal::storage_engine::PhotoQuality;
use dal::DalError;
use futures::future::join_all;
use proto::{AlbumWithCoverPhoto, GetAlbumResponse};
Expand Down Expand Up @@ -68,7 +68,7 @@ pub async fn get(
.ok_or(Error::NotFound)?;

let photo = photo
.photo_to_proto_url(&data.storage, PhotoQuality::W400)
.photo_to_proto_url(&data.storage, &PhotoQuality::W400)
.await
.map_err(|e| match e {
DalError::Storage(e) => Error::from(e),
Expand Down
38 changes: 17 additions & 21 deletions server/chroma/src/routes/v1/album/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::routes::error::{Error, WebResult};
use crate::routes::v1::PhotoQuality;
use actix_multiresponse::Payload;
use actix_web::web;

use dal::database::{Album, Photo};
use dal::storage_engine::aws_error::GetObjectErrorKind;
use dal::storage_engine::error::{SdkError, StorageError};
Expand Down Expand Up @@ -53,7 +54,7 @@ pub async fn list(
)
.await?
.into_iter()
.filter_map(|f| f)
.flatten()
.collect::<Vec<_>>();

// Insert the newly fetched into the cache
Expand All @@ -64,12 +65,11 @@ pub async fn list(
.await;

// Merge the two sets
let mut albums = vec![
let mut albums = [
fetched_albums,
cached_albums
.into_iter()
.map(|(_, v)| v)
.filter_map(|v| v)
.filter_map(|(_, v)| v)
.collect::<Vec<_>>(),
]
.concat();
Expand All @@ -88,7 +88,7 @@ pub async fn list(
let albums = join_all(albums.into_iter().map(|album| {
let storage = data.storage.clone();
let database = data.db.clone();
let qpref: dal::storage_engine::PhotoQuality = query.quality_preference.clone().into();
let qpref: dal::database::PhotoQuality = query.quality_preference.clone().into();
let include_cover_photo = query.include_cover_photo;
let album_id_cache = &**album_id_cache;

Expand All @@ -98,28 +98,24 @@ pub async fn list(
if let Some(id) = &album.cover_photo_id {
match Photo::get_by_id(&database, id).await? {
Some(photo) => {
let quality = if !photo.is_quality_created(qpref.clone()).await? {
dal::storage_engine::PhotoQuality::Original
let quality = if !photo.is_quality_created(&qpref).await? {
dal::database::PhotoQuality::Original
} else {
qpref
};

let photo = match photo.photo_to_proto_url(&storage, quality).await {
let photo = match photo.photo_to_proto_url(&storage, &quality).await {
Ok(v) => v,
Err(e) => {
return match &e {
DalError::Storage(s) => match s {
StorageError::GetObject(s) => match s {
SdkError::ServiceError(s) => match s.err().kind {
GetObjectErrorKind::NoSuchKey(_) => {
album_id_cache.remove(&album.id).await;
album.delete(&database).await?;
Ok(None)
}
_ => Err(e),
},
_ => Err(e),
},
DalError::Storage(StorageError::GetObject(
SdkError::ServiceError(s),
)) => match s.err().kind {
GetObjectErrorKind::NoSuchKey(_) => {
album_id_cache.remove(&album.id).await;
album.delete(&database).await?;
Ok(None)
}
_ => Err(e),
},
_ => Err(e),
Expand Down Expand Up @@ -152,7 +148,7 @@ pub async fn list(
DalError::Db(e) => Error::from(e),
})?
.into_iter()
.filter_map(|v| v)
.flatten()
.collect::<Vec<_>>();

Ok(Payload(ListAlbumsResponse { albums }))
Expand Down
2 changes: 1 addition & 1 deletion server/chroma/src/routes/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub enum PhotoQuality {
W1600,
}

impl From<PhotoQuality> for dal::storage_engine::PhotoQuality {
impl From<PhotoQuality> for dal::database::PhotoQuality {
fn from(value: PhotoQuality) -> Self {
match value {
PhotoQuality::Original => Self::Original,
Expand Down
39 changes: 35 additions & 4 deletions server/chroma/src/routes/v1/photo/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use crate::routes::appdata::WebData;
use crate::routes::authorization::Authorization;
use crate::routes::error::{Error, ImagePipelineError, WebResult};
use actix_multiresponse::Payload;
use dal::database::{Album, Database, Photo};
use dal::storage_engine::{PhotoQuality, Storage};
use dal::database::{Album, Database, Photo, PhotoQuality};
use dal::storage_engine::Storage;
use exif::{In, Tag};
use image::imageops::FilterType;
use image::io::Reader;
use image::{DynamicImage, GenericImageView};
use img_parts::{Bytes, DynImage, ImageEXIF};
use proto::photo_respone::Response;
use proto::{CreatePhotoRequest, CreatePhotoResponse};
use std::io::Cursor;
use tap::TapFallible;
Expand Down Expand Up @@ -63,6 +64,11 @@ async fn image_pipeline(
album: &Album,
db: &Database,
) -> WebResult<String> {
// Make sure we don't run into AWS ratelimits here
if data.ratelimits.photo_create.check().is_err() {
return Err(Error::Ratelimit);
}

// This pipeline modifies the image. The idea is that each 'step' outputs
// a variable 'image', which the next step can then use.

Expand Down Expand Up @@ -120,7 +126,7 @@ async fn image_pipeline(
PhotoQuality::Original
);
match engine
.create_photo(&photo_id, PhotoQuality::Original, image)
.create_photo(&photo_id, &PhotoQuality::Original, image)
.await
{
Ok(_) => {}
Expand Down Expand Up @@ -181,7 +187,7 @@ fn resize_and_save(

trace!("Saving image '{photo_id}' in quality '{quality:?}'");
match engine
.create_photo(&photo_id, quality.clone(), converted_image_data)
.create_photo(&photo_id, &quality, converted_image_data)
.await
{
Ok(_) => {}
Expand All @@ -203,6 +209,31 @@ fn resize_and_save(
}
};

// Verify, this also puts it in the cache, nice speedup for later
trace!("Checking if uploaded image actually works on AWS");
TobiasDeBruijn marked this conversation as resolved.
Show resolved Hide resolved

let url = match photo.photo_to_proto_url(&engine, &quality).await {
Ok(p) => match p.data.unwrap().response.unwrap() {
Response::Url(v) => v,
_ => panic!("Invalid response type for the 'URL' method"),
},
Err(e) => {
warn!("Photo {} with quality {} was not created successfully, or another error occurred: {e}", photo_id, quality);
return;
}
};

// Fetch the photo
let ok = reqwest::Client::new().get(url).send().await.is_ok();
TobiasDeBruijn marked this conversation as resolved.
Show resolved Hide resolved

if !ok {
warn!(
"Photo {} with quality {} was not created successfully (AWS returned an error)",
photo_id, quality
);
return;
}

match photo.set_quality_created(quality, true).await {
Ok(_) => {}
Err(e) => {
Expand Down
Loading
Loading