Skip to content

Commit

Permalink
Change stream_name type (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
trueleo authored Aug 14, 2022
1 parent 5da4839 commit 3a9a7f5
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 23 deletions.
4 changes: 2 additions & 2 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Event {
&self,
storage: &impl ObjectStorage,
) -> Result<response::EventResponse, Error> {
let schema = metadata::STREAM_INFO.schema(self.stream_name.clone())?;
let schema = metadata::STREAM_INFO.schema(&self.stream_name)?;
if schema.is_empty() {
self.first_event(storage).await
} else {
Expand Down Expand Up @@ -147,7 +147,7 @@ impl Event {
);

// validate schema before attempting to append to parquet file
let stream_schema = metadata::STREAM_INFO.schema(self.stream_name.clone())?;
let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?;
if stream_schema != event_schema.string_schema {
return Err(Error::SchemaMismatch(self.stream_name.clone()));
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/handlers/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub async fn post_event(req: HttpRequest, body: web::Json<serde_json::Value>) ->
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let labels = utils::collect_labels(&req);

if let Err(e) = metadata::STREAM_INFO.schema(stream_name.clone()) {
if let Err(e) = metadata::STREAM_INFO.schema(&stream_name) {
// if stream doesn't exist, fail to post data
return response::ServerResponse {
msg: format!(
Expand Down
14 changes: 6 additions & 8 deletions server/src/handlers/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub async fn delete(req: HttpRequest) -> HttpResponse {
.to_http();
}

if let Err(e) = metadata::STREAM_INFO.delete_stream(stream_name.to_string()) {
if let Err(e) = metadata::STREAM_INFO.delete_stream(&stream_name) {
return response::ServerResponse {
msg: format!(
"failed to delete log stream {} from metadata due to err: {}",
Expand All @@ -82,7 +82,7 @@ pub async fn list(_: HttpRequest) -> impl Responder {
pub async fn schema(req: HttpRequest) -> HttpResponse {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

match metadata::STREAM_INFO.schema(stream_name.clone()) {
match metadata::STREAM_INFO.schema(&stream_name) {
Ok(schema) => response::ServerResponse {
msg: schema,
code: StatusCode::OK,
Expand Down Expand Up @@ -116,7 +116,7 @@ pub async fn schema(req: HttpRequest) -> HttpResponse {
pub async fn get_alert(req: HttpRequest) -> HttpResponse {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

match metadata::STREAM_INFO.alert(stream_name.clone()) {
match metadata::STREAM_INFO.alert(&stream_name) {
Ok(alert) => response::ServerResponse {
msg: alert,
code: StatusCode::OK,
Expand Down Expand Up @@ -178,9 +178,7 @@ pub async fn put(req: HttpRequest) -> HttpResponse {
// Fail if unable to create log stream on object store backend
if let Err(e) = s3.create_stream(&stream_name).await {
// delete the stream from metadata because we couldn't create it on object store backend
metadata::STREAM_INFO
.delete_stream(stream_name.to_string())
.unwrap();
metadata::STREAM_INFO.delete_stream(&stream_name).unwrap();
return response::ServerResponse {
msg: format!(
"failed to create log stream {} due to err: {}",
Expand Down Expand Up @@ -217,8 +215,8 @@ pub async fn put_alert(req: HttpRequest, body: web::Json<serde_json::Value>) ->
.await
{
Ok(_) => {
if let Err(e) = metadata::STREAM_INFO
.set_alert(stream_name.to_string(), alert_config.to_string())
if let Err(e) =
metadata::STREAM_INFO.set_alert(stream_name.clone(), alert_config.to_string())
{
return response::ServerResponse {
msg: format!(
Expand Down
24 changes: 12 additions & 12 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ use std::sync::RwLock;
use crate::error::Error;
use crate::storage::ObjectStorage;

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct LogStreamMetadata {
pub schema: String,
pub alert_config: String,
pub stats: Stats,
}

#[derive(Debug, Deserialize, Serialize, Default, Clone)]
#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq, Eq)]
pub struct Stats {
pub size: u64,
pub compressed_size: u64,
Expand Down Expand Up @@ -67,29 +67,29 @@ lazy_static! {
#[allow(clippy::all)]
impl STREAM_INFO {
pub fn set_schema(&self, stream_name: String, schema: String) -> Result<(), Error> {
let alert_config = self.alert(stream_name.clone())?;
let alert_config = self.alert(&stream_name)?;
self.add_stream(stream_name, schema, alert_config)
}

pub fn schema(&self, stream_name: String) -> Result<String, Error> {
pub fn schema(&self, stream_name: &str) -> Result<String, Error> {
let map = self.read().unwrap();
let meta = map
.get(&stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name))?;
.get(stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name.to_string()))?;

Ok(meta.schema.clone())
}

pub fn set_alert(&self, stream_name: String, alert_config: String) -> Result<(), Error> {
let schema = self.schema(stream_name.clone())?;
let schema = self.schema(&stream_name)?;
self.add_stream(stream_name, schema, alert_config)
}

pub fn alert(&self, stream_name: String) -> Result<String, Error> {
pub fn alert(&self, stream_name: &str) -> Result<String, Error> {
let map = self.read().unwrap();
let meta = map
.get(&stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name))?;
.get(stream_name)
.ok_or(Error::StreamMetaNotFound(stream_name.to_owned()))?;

Ok(meta.alert_config.clone())
}
Expand All @@ -112,10 +112,10 @@ impl STREAM_INFO {
Ok(())
}

pub fn delete_stream(&self, stream_name: String) -> Result<(), Error> {
pub fn delete_stream(&self, stream_name: &str) -> Result<(), Error> {
let mut map = self.write().unwrap();
// TODO: Add check to confirm data deletion
map.remove(&stream_name);
map.remove(stream_name);

Ok(())
}
Expand Down

0 comments on commit 3a9a7f5

Please sign in to comment.