Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a new error type for ObjectStorage api #45

Merged
merged 4 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub enum Error {
#[error("JSON provided to query api doesn't contain {0}")]
JsonQuery(&'static str),
#[error("Storage error: {0}")]
Storage(Box<dyn ObjectStorageError>),
Storage(ObjectStorageError),
#[error("Event error: {0}")]
Event(#[from] EventError),
#[error("Parquet error: {0}")]
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn liveness() -> HttpResponse {
}

pub async fn readiness() -> HttpResponse {
if S3::new().is_available().await {
if let Ok(()) = S3::new().check().await {
return HttpResponse::new(StatusCode::OK);
}

Expand Down
1 change: 1 addition & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async fn main() -> anyhow::Result<()> {
CONFIG.print();
CONFIG.validate();
let storage = S3::new();
CONFIG.validate_storage(&storage).await;
if let Err(e) = metadata::STREAM_INFO.load(&storage).await {
warn!("could not populate local metadata. {:?}", e);
}
Expand Down
47 changes: 22 additions & 25 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use bytes::Bytes;
use lazy_static::lazy_static;
use log::warn;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::RwLock;
Expand Down Expand Up @@ -126,17 +125,28 @@ impl STREAM_INFO {
// to load the stream metadata based on whatever is available.
//
// TODO: ignore failure(s) if any and skip to next stream
let alert_config = parse_string(storage.get_alert(&stream.name).await)
.map_err(|_| Error::AlertNotInStore(stream.name.to_owned()))?;
let schema = parse_string(storage.get_schema(&stream.name).await)
.map_err(|_| Error::SchemaNotInStore(stream.name.to_owned()))?;
let alert_config = storage
.get_alert(&stream.name)
.await
.map_err(|e| e.into())
.and_then(parse_string)
.map_err(|_| Error::AlertNotInStore(stream.name.to_owned()));

let schema = storage
.get_schema(&stream.name)
.await
.map_err(|e| e.into())
.and_then(parse_string)
.map_err(|_| Error::SchemaNotInStore(stream.name.to_owned()));

let metadata = LogStreamMetadata {
schema,
alert_config,
schema: schema.unwrap_or_default(),
alert_config: alert_config.unwrap_or_default(),
Copy link
Contributor Author

@trueleo trueleo Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of returning or not adding metadata to stream we set it to empty string for now. This allows first event to not fail in case parseable was restarted just after adding a stream.

This behaviour could be changed later by using Option type in LogStreamMetadata.

..Default::default()
};

let mut map = self.write().unwrap();
map.insert(stream.name.to_owned(), metadata);
map.insert(stream.name.clone(), metadata);
}

Ok(())
Expand All @@ -159,21 +169,8 @@ impl STREAM_INFO {
}
}

fn parse_string(result: Result<Bytes, Error>) -> Result<String, Error> {
let mut string = String::new();
let bytes = match result {
Ok(bytes) => bytes,
Err(e) => {
warn!("Storage error: {}", e);
return Ok(string);
}
};

if !bytes.is_empty() {
string = String::from_utf8(bytes.to_vec())?;
}

Ok(string)
fn parse_string(bytes: Bytes) -> Result<String, Error> {
String::from_utf8(bytes.to_vec()).map_err(|e| e.into())
}

#[cfg(test)]
Expand Down Expand Up @@ -214,14 +211,14 @@ mod tests {
#[case::empty_string("")]
fn test_parse_string(#[case] string: String) {
let bytes = Bytes::from(string);
assert!(parse_string(Ok(bytes)).is_ok())
assert!(parse_string(bytes).is_ok())
}

#[test]
fn test_bad_parse_string() {
let bad: Vec<u8> = vec![195, 40];
let bytes = Bytes::from(bad);
assert!(parse_string(Ok(bytes)).is_err());
assert!(parse_string(bytes).is_err());
}

#[rstest]
Expand Down
18 changes: 18 additions & 0 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use structopt::StructOpt;

use crate::banner;
use crate::s3::S3Config;
use crate::storage::{ObjectStorage, ObjectStorageError};

lazy_static::lazy_static! {
#[derive(Debug)]
Expand Down Expand Up @@ -72,6 +73,23 @@ impl Config {
}
}

pub async fn validate_storage(&self, storage: &impl ObjectStorage) {
match storage.check().await {
Ok(_) => (),
Err(ObjectStorageError::NoSuchBucket(name)) => panic!(
"Could not start because the bucket doesn't exist. Please ensure bucket {bucket} exists on {url}",
bucket = name,
nitisht marked this conversation as resolved.
Show resolved Hide resolved
url = self.storage.endpoint_url()
nitisht marked this conversation as resolved.
Show resolved Hide resolved
),
Err(ObjectStorageError::ConnectionError(inner)) => panic!(
"Failed to connect to the Object Storage Service on {url}\nCaused by: {cause}",
url = self.storage.endpoint_url(),
cause = inner
),
Err(error) => { panic!("{error}") }
}
}

fn status_info(&self, scheme: &str) {
let url = format!("{}://{}", scheme, CONFIG.parseable.address).underlined();
eprintln!(
Expand Down
81 changes: 58 additions & 23 deletions server/src/s3.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use aws_sdk_s3::error::{HeadBucketError, HeadBucketErrorKind};
use aws_sdk_s3::model::{Delete, ObjectIdentifier};
use aws_sdk_s3::types::ByteStream;
use aws_sdk_s3::types::{ByteStream, SdkError};
use aws_sdk_s3::Error as AwsSdkError;
use aws_sdk_s3::{Client, Credentials, Endpoint, Region};
use aws_types::credentials::SharedCredentialsProvider;
Expand All @@ -18,7 +19,6 @@ use std::sync::Arc;
use structopt::StructOpt;
use tokio_stream::StreamExt;

use crate::error::Error;
use crate::metadata::Stats;
use crate::option::{StorageOpt, CONFIG};
use crate::query::Query;
Expand Down Expand Up @@ -96,14 +96,6 @@ impl StorageOpt for S3Config {
}
}

impl ObjectStorageError for AwsSdkError {}

impl From<AwsSdkError> for Error {
fn from(e: AwsSdkError) -> Self {
Self::Storage(Box::new(e))
}
}

struct S3Options {
endpoint: Endpoint,
region: Region,
Expand Down Expand Up @@ -304,70 +296,83 @@ impl S3 {

#[async_trait]
impl ObjectStorage for S3 {
async fn is_available(&self) -> bool {
async fn check(&self) -> Result<(), ObjectStorageError> {
self.client
.head_bucket()
.bucket(&S3_CONFIG.s3_bucket_name)
.send()
.await
.is_ok()
.map(|_| ())
.map_err(|err| err.into())
}

async fn put_schema(&self, stream_name: String, body: String) -> Result<(), Error> {
async fn put_schema(
&self,
stream_name: String,
body: String,
) -> Result<(), ObjectStorageError> {
self._put_schema(stream_name, body).await?;

Ok(())
}

async fn create_stream(&self, stream_name: &str) -> Result<(), Error> {
async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
self._create_stream(stream_name).await?;

Ok(())
}

async fn delete_stream(&self, stream_name: &str) -> Result<(), Error> {
async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
self._delete_stream(stream_name).await?;

Ok(())
}

async fn create_alert(&self, stream_name: &str, body: String) -> Result<(), Error> {
async fn create_alert(
&self,
stream_name: &str,
body: String,
) -> Result<(), ObjectStorageError> {
self._create_alert(stream_name, body).await?;

Ok(())
}

async fn get_schema(&self, stream_name: &str) -> Result<Bytes, Error> {
async fn get_schema(&self, stream_name: &str) -> Result<Bytes, ObjectStorageError> {
let body_bytes = self._get_schema(stream_name).await?;

Ok(body_bytes)
}

async fn get_alert(&self, stream_name: &str) -> Result<Bytes, Error> {
async fn get_alert(&self, stream_name: &str) -> Result<Bytes, ObjectStorageError> {
let body_bytes = self._alert_exists(stream_name).await?;

Ok(body_bytes)
}

async fn get_stats(&self, stream_name: &str) -> Result<Stats, Error> {
async fn get_stats(&self, stream_name: &str) -> Result<Stats, ObjectStorageError> {
let stats = serde_json::from_slice(&self._get_stats(stream_name).await?)?;

Ok(stats)
}

async fn list_streams(&self) -> Result<Vec<LogStream>, Error> {
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let streams = self._list_streams().await?;

Ok(streams)
}

async fn upload_file(&self, key: &str, path: &str) -> Result<(), Error> {
async fn upload_file(&self, key: &str, path: &str) -> Result<(), ObjectStorageError> {
self._upload_file(key, path).await?;

Ok(())
}

async fn query(&self, query: &Query, results: &mut Vec<RecordBatch>) -> Result<(), Error> {
async fn query(
&self,
query: &Query,
results: &mut Vec<RecordBatch>,
) -> Result<(), ObjectStorageError> {
let s3_file_system = Arc::new(
S3FileSystem::new(
Some(SharedCredentialsProvider::new(self.options.creds.clone())),
Expand Down Expand Up @@ -397,9 +402,39 @@ impl ObjectStorage for S3 {

// execute the query and collect results
let df = ctx.sql(query.query.as_str()).await?;
results.extend(df.collect().await.map_err(Error::DataFusion)?);
results.extend(df.collect().await?);
}

Ok(())
}
}

impl From<AwsSdkError> for ObjectStorageError {
fn from(error: AwsSdkError) -> Self {
ObjectStorageError::UnhandledError(error.into())
}
}

impl From<SdkError<HeadBucketError>> for ObjectStorageError {
fn from(error: SdkError<HeadBucketError>) -> Self {
match error {
SdkError::ServiceError {
err:
HeadBucketError {
kind: HeadBucketErrorKind::NotFound(_),
..
},
..
} => ObjectStorageError::NoSuchBucket(S3_CONFIG.bucket_name().to_string()),
SdkError::DispatchFailure(err) => ObjectStorageError::ConnectionError(err.into()),
SdkError::TimeoutError(err) => ObjectStorageError::ConnectionError(err),
err => ObjectStorageError::UnhandledError(err.into()),
}
}
}

impl From<serde_json::Error> for ObjectStorageError {
fn from(error: serde_json::Error) -> Self {
ObjectStorageError::UnhandledError(error.into())
}
}
Loading