Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amazon S3 support: GetObject, PutObject #193

Merged
merged 23 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c903180
Add api method for get data
vovac12 Mar 8, 2021
6f6e8f3
Merge branch 'master' of github.com:qoollo/bob into rest-api-for-data
vovac12 Mar 9, 2021
2ca04e7
Add API method for put data
vovac12 Mar 9, 2021
e568827
Add converters from Error to StatusExt
vovac12 Mar 9, 2021
de3a458
Merge branch 'master' of github.com:qoollo/bob into rest-api-for-data
vovac12 Mar 22, 2021
3e2ac84
S3: GetObject implementation
vovac12 Mar 24, 2021
9123552
Merge branch 'rest-api-for-data' of github.com:qoollo/bob into 185-ge…
vovac12 Mar 24, 2021
0ae0f5f
Use default bucket name
vovac12 Mar 24, 2021
b4f03e9
GetObject and PutObject fully implemented
vovac12 Mar 24, 2021
528c9d9
Fix naming and use pattern matching for error handling
vovac12 Mar 29, 2021
b69b93d
Merge branch 'rest-api-for-data' of github.com:qoollo/bob into 185-ge…
vovac12 Mar 29, 2021
368d72a
Handle request headers for GetObject
vovac12 Mar 30, 2021
44e4470
Fix DateTime parse and remove unneeded header handlers
vovac12 Mar 30, 2021
3ece31a
Use kind() method instead of public attribute
vovac12 Mar 31, 2021
c18898f
gh-185 Add CopyObject method
vovac12 Apr 7, 2021
e60f83f
Merge branch 'master' of github.com:qoollo/bob into rest-api-for-data
vovac12 Apr 19, 2021
ade94df
Merge branch 'rest-api-for-data' of github.com:qoollo/bob into 185-ge…
vovac12 Apr 19, 2021
1a71d8f
unwrap_or_else instead of match
vovac12 Apr 19, 2021
c84631b
Replace match by if-let statements
vovac12 Apr 19, 2021
95cb682
import IoError and BobError
vovac12 Apr 19, 2021
bb7fb1e
Remove unnecessary error conversion
vovac12 Apr 19, 2021
d72cc8a
Merge branch 'master' of github.com:qoollo/bob into 185-get-and-put-o…
vovac12 Jul 22, 2021
9ec2371
Update CHANGELOG.md
vovac12 Jul 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Bob versions changelog
- Add api method to start disk ([#182](https://github.com/qoollo/bob/pull/182))
- Rest api for data ([#187](https://github.com/qoollo/bob/pull/187))
- Add bloom filter buffer size to config ([#218](https://github.com/qoollo/bob/pull/218))
- Add Amazon S3 GetObject and PutObject api ([#193](https://github.com/qoollo/bob/pull/193))

#### Changed
- rename bob-tools, remove redundant versions of workspace deps ([#220](https://github.com/qoollo/bob/pull/220))
Expand Down
39 changes: 20 additions & 19 deletions bob/src/api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::server::Server as BobServer;
use bob_backend::pearl::{Group as PearlGroup, Holder};
use bob_common::{
data::{BobData, BobKey, BobMeta, BobOptions, VDisk as DataVDisk},
error::Error as BobError,
node::Disk as NodeDisk,
};
use futures::{future::BoxFuture, FutureExt};
Expand All @@ -13,12 +14,14 @@ use rocket::{
};
use rocket_contrib::json::Json;
use std::{
io::{Cursor, ErrorKind, Read},
io::{Cursor, Error as IoError, ErrorKind, Read},
path::{Path, PathBuf},
str::FromStr,
};
use tokio::fs::{read_dir, ReadDir};

mod s3;

#[derive(Debug, Clone)]
pub(crate) enum Action {
Attach,
Expand Down Expand Up @@ -116,6 +119,7 @@ pub(crate) fn spawn(bob: BobServer, port: u16) {
config.set_port(port);
Rocket::custom(config)
.manage(bob)
.mount("/s3", s3::routes())
.mount("/", routes)
.launch();
};
Expand Down Expand Up @@ -600,31 +604,21 @@ async fn read_directory_children(mut read_dir: ReadDir, name: &str, path: &str)
#[get("/data/<key>")]
fn get_data(bob: State<BobServer>, key: BobKey) -> Result<Content<Vec<u8>>, StatusExt> {
let opts = BobOptions::new_get(None);
let result = bob
.block_on(async { bob.grinder().get(key, &opts).await })
.map_err(|err| -> StatusExt { err.into() })?;
let mime_type = infer::get(result.inner());
let mime_type = match mime_type {
None => ContentType::Any,
Some(t) => ContentType::from_str(t.mime_type()).unwrap_or_default(),
};
Ok(Content(mime_type, result.inner().to_owned()))
let result = bob.block_on(async { bob.grinder().get(key, &opts).await })?;
Ok(Content(infer_data_type(&result), result.inner().to_owned()))
}

#[post("/data/<key>", data = "<data>")]
fn put_data(bob: State<BobServer>, key: BobKey, data: Data) -> Result<StatusExt, StatusExt> {
let mut data_buf = vec![];
data.open()
.read_to_end(&mut data_buf)
.map_err(|err| -> StatusExt { err.into() })?;
data.open().read_to_end(&mut data_buf)?;
let data = BobData::new(
data_buf,
BobMeta::new(chrono::Local::now().timestamp() as u64),
);

let opts = BobOptions::new_put(None);
bob.block_on(async { bob.grinder().put(key, data, opts).await })
.map_err(|err| -> StatusExt { err.into() })?;
bob.block_on(async { bob.grinder().put(key, data, opts).await })?;

Ok(Status::Created.into())
}
Expand Down Expand Up @@ -672,8 +666,8 @@ impl From<Status> for StatusExt {
}
}

impl From<std::io::Error> for StatusExt {
fn from(err: std::io::Error) -> Self {
impl From<IoError> for StatusExt {
fn from(err: IoError) -> Self {
Self {
status: match err.kind() {
ErrorKind::NotFound => Status::NotFound,
Expand All @@ -685,8 +679,8 @@ impl From<std::io::Error> for StatusExt {
}
}

impl From<bob_common::error::Error> for StatusExt {
fn from(err: bob_common::error::Error) -> Self {
impl From<BobError> for StatusExt {
fn from(err: BobError) -> Self {
use bob_common::error::Kind;
let status = match err.kind() {
Kind::DuplicateKey => Status::Conflict,
Expand All @@ -702,3 +696,10 @@ impl From<bob_common::error::Error> for StatusExt {
}
}
}

pub(crate) fn infer_data_type(data: &BobData) -> ContentType {
match infer::get(data.inner()) {
None => ContentType::Any,
Some(t) => ContentType::from_str(t.mime_type()).unwrap_or_default(),
}
}
207 changes: 207 additions & 0 deletions bob/src/api/http/s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
use std::{
convert::TryInto,
io::{Cursor, Read},
str::FromStr,
};

use super::{infer_data_type, StatusExt};
use crate::server::Server as BobServer;
use bob_common::data::{BobData, BobKey, BobMeta, BobOptions};
use rocket::{
get,
http::{ContentType, Header, Status},
put,
request::{FromRequest, Outcome},
response,
response::Responder,
Data, Request, Response, Route, State,
};

#[derive(Debug)]
pub(crate) enum StatusS3 {
StatusExt(StatusExt),
Status(Status),
}

impl From<StatusExt> for StatusS3 {
fn from(inner: StatusExt) -> Self {
Self::StatusExt(inner)
}
}

impl<'r> Responder<'r> for StatusS3 {
fn respond_to(self, request: &Request) -> response::Result<'r> {
match self {
Self::StatusExt(status_ext) => {
let resp = status_ext.respond_to(request)?;
Response::build().status(resp.status()).ok()
}
Self::Status(status) => Response::build().status(status).ok(),
}
}
}

pub(crate) fn routes() -> impl Into<Vec<Route>> {
routes![get_object, put_object]
}

#[derive(Debug, Default)]
pub(crate) struct GetObjectHeaders {
content_type: Option<ContentType>,
if_modified_since: Option<u64>,
if_unmodified_since: Option<u64>,
}

impl<'r> FromRequest<'_, 'r> for GetObjectHeaders {
type Error = StatusS3;
fn from_request(request: &Request<'r>) -> Outcome<Self, Self::Error> {
let headers = request.headers();
Outcome::Success(GetObjectHeaders {
content_type: headers
.get_one("response-content-type")
.and_then(|x| ContentType::from_str(x).ok()),
if_modified_since: headers
.get_one("If-Modified-Since")
.and_then(|x| chrono::DateTime::parse_from_rfc2822(x).ok())
.and_then(|x| x.timestamp().try_into().ok()),
if_unmodified_since: headers
.get_one("If-Unmodified-Since")
.and_then(|x| chrono::DateTime::parse_from_rfc2822(x).ok())
.and_then(|x| x.timestamp().try_into().ok()),
})
}
}

#[derive(Debug)]
pub(crate) struct GetObjectOutput {
data: BobData,
content_type: ContentType,
}

impl<'r> Responder<'r> for GetObjectOutput {
fn respond_to(self, _: &Request) -> response::Result<'r> {
Response::build()
.status(Status::Ok)
.header(self.content_type)
.header(Header::new(
"Last-Modified",
self.data.meta().timestamp().to_string(),
))
.sized_body(Cursor::new(self.data.into_inner()))
.ok()
}
}

#[get("/default/<key>")]
pub(crate) fn get_object(
bob: State<BobServer>,
key: BobKey,
headers: GetObjectHeaders,
) -> Result<GetObjectOutput, StatusS3> {
let opts = BobOptions::new_get(None);
let data = bob
.block_on(async { bob.grinder().get(key, &opts).await })
.map_err(|err| -> StatusExt { err.into() })?;
let content_type = headers
.content_type
.unwrap_or_else(|| infer_data_type(&data));
let last_modified = data.meta().timestamp();
if let Some(time) = headers.if_modified_since {
if time > last_modified {
return Err(StatusS3::Status(Status::NotModified));
}
}
if let Some(time) = headers.if_unmodified_since {
if time < last_modified {
return Err(StatusS3::Status(Status::PreconditionFailed));
}
}
Ok(GetObjectOutput { data, content_type })
}

#[put("/default/<key>", data = "<data>", rank = 2)]
pub(crate) fn put_object(
bob: State<BobServer>,
key: BobKey,
data: Data,
) -> Result<StatusS3, StatusS3> {
let mut data_buf = vec![];
data.open()
.read_to_end(&mut data_buf)
.map_err(|err| -> StatusExt { err.into() })?;
let data = BobData::new(
data_buf,
BobMeta::new(chrono::Local::now().timestamp() as u64),
);

let opts = BobOptions::new_put(None);
bob.block_on(async { bob.grinder().put(key, data, opts).await })
.map_err(|err| -> StatusExt { err.into() })?;

Ok(StatusS3::from(StatusExt::from(Status::Created)))
}

#[derive(Debug, Default)]
pub(crate) struct CopyObjectHeaders {
if_modified_since: Option<u64>,
if_unmodified_since: Option<u64>,
source_key: BobKey,
}

impl<'r> FromRequest<'_, 'r> for CopyObjectHeaders {
type Error = StatusS3;
fn from_request(request: &Request<'r>) -> Outcome<Self, Self::Error> {
let headers = request.headers();
let source_key = match headers
.get_one("x-amz-copy-source")
.and_then(|x| x.parse().ok())
{
Some(key) => key,
None => return Outcome::Forward(()),
};
Outcome::Success(CopyObjectHeaders {
if_modified_since: headers
.get_one("If-Modified-Since")
.and_then(|x| chrono::DateTime::parse_from_rfc2822(x).ok())
.and_then(|x| x.timestamp().try_into().ok()),
if_unmodified_since: headers
.get_one("If-Unmodified-Since")
.and_then(|x| chrono::DateTime::parse_from_rfc2822(x).ok())
.and_then(|x| x.timestamp().try_into().ok()),
source_key,
})
}
}

#[put("/default/<key>")]
pub(crate) fn copy_object(
bob: State<BobServer>,
key: BobKey,
headers: CopyObjectHeaders,
) -> Result<StatusS3, StatusS3> {
let opts = BobOptions::new_get(None);
let data = bob
.block_on(async { bob.grinder().get(key, &opts).await })
.map_err(|err| -> StatusExt { err.into() })?;
let last_modified = data.meta().timestamp();
if let Some(time) = headers.if_modified_since {
if time > last_modified {
return Err(StatusS3::Status(Status::NotModified));
}
}
if let Some(time) = headers.if_unmodified_since {
if time < last_modified {
return Err(StatusS3::Status(Status::PreconditionFailed));
}
}
let data = BobData::new(
data.into_inner(),
BobMeta::new(chrono::Local::now().timestamp() as u64),
);

let opts = BobOptions::new_put(None);
bob.block_on(async { bob.grinder().put(key, data, opts).await })
.map_err(|err| -> StatusExt { err.into() })?;

Ok(StatusS3::from(StatusExt::from(Status::Ok)))
}