Skip to content

Commit

Permalink
below fixes are added -
Browse files Browse the repository at this point in the history
1. update role sync between querier to ingestors
2. get user role priviledge to reader, writer and editor roles
  • Loading branch information
nikhilsinhaparseable committed Sep 17, 2024
1 parent c9c32b0 commit 50e5f91
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 8 deletions.
59 changes: 59 additions & 0 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use crate::handlers::http::cluster::utils::{
};
use crate::handlers::http::ingest::{ingest_internal_stream, PostError};
use crate::handlers::http::logstream::error::StreamError;
use crate::handlers::http::role::RoleError;
use crate::option::CONFIG;

use crate::metrics::prom_utils::Metrics;
use crate::rbac::role::model::DefaultPrivilege;
use crate::rbac::user::User;
use crate::stats::Stats;
use crate::storage::object_storage::ingestor_metadata_path;
Expand Down Expand Up @@ -364,6 +366,63 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(),
Ok(())
}

// forward the put role request to all ingestors to keep them in sync
pub async fn sync_role_update_with_ingestors(
name: String,
body: Vec<DefaultPrivilege>,
) -> Result<(), RoleError> {
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
RoleError::Anyhow(err)
})?;

let roles = to_vec(&body).map_err(|err| {
log::error!("Fatal: failed to serialize roles: {:?}", err);
RoleError::SerdeError(err)
})?;
let roles = Bytes::from(roles);
let client = reqwest::Client::new();

for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
}
let url = format!(
"{}{}/role/{}",
ingestor.domain_name,
base_path_without_preceding_slash(),
name
);

let res = client
.put(url)
.header(header::AUTHORIZATION, &ingestor.token)
.header(header::CONTENT_TYPE, "application/json")
.body(roles.clone())
.send()
.await
.map_err(|err| {
log::error!(
"Fatal: failed to forward request to ingestor: {}\n Error: {:?}",
ingestor.domain_name,
err
);
RoleError::Network(err)
})?;

if !res.status().is_success() {
log::error!(
"failed to forward request to ingestor: {}\nResponse Returned: {:?}",
ingestor.domain_name,
res.text().await
);
}
}

Ok(())
}

pub async fn fetch_daily_stats_from_ingestors(
stream_name: &str,
date: &str,
Expand Down
1 change: 1 addition & 0 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ impl IngestServer {
.service(Self::analytics_factory())
.service(Server::get_liveness_factory())
.service(Self::get_user_webscope())
.service(Server::get_user_role_webscope())
.service(Server::get_metrics_webscope())
.service(Server::get_readiness_factory()),
)
Expand Down
34 changes: 26 additions & 8 deletions server/src/handlers/http/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,38 @@
*/

use actix_web::{http::header::ContentType, web, HttpResponse, Responder};
use bytes::Bytes;
use http::StatusCode;

use crate::{
option::CONFIG,
option::{Mode, CONFIG},
rbac::{
map::{mut_roles, DEFAULT_ROLE},
role::model::DefaultPrivilege,
},
storage::{self, ObjectStorageError, StorageMetadata},
};

use super::cluster::sync_role_update_with_ingestors;

// Handler for PUT /api/v1/role/{name}
// Creates a new role or update existing one
pub async fn put(
name: web::Path<String>,
body: web::Json<Vec<DefaultPrivilege>>,
) -> Result<impl Responder, RoleError> {
pub async fn put(name: web::Path<String>, body: Bytes) -> Result<impl Responder, RoleError> {
let name = name.into_inner();
let privileges = body.into_inner();
let privileges = serde_json::from_slice::<Vec<DefaultPrivilege>>(&body)?;
let mut metadata = get_metadata().await?;
metadata.roles.insert(name.clone(), privileges.clone());
put_metadata(&metadata).await?;
mut_roles().insert(name, privileges);
if CONFIG.parseable.mode == Mode::Ingest {
let _ = storage::put_staging_metadata(&metadata);
mut_roles().insert(name.clone(), privileges.clone());
} else {
put_metadata(&metadata).await?;
mut_roles().insert(name.clone(), privileges.clone());
if CONFIG.parseable.mode == Mode::Query {
sync_role_update_with_ingestors(name.clone(), privileges.clone()).await?;
}
}

Ok(HttpResponse::Ok().finish())
}

Expand Down Expand Up @@ -118,13 +127,22 @@ pub enum RoleError {
ObjectStorageError(#[from] ObjectStorageError),
#[error("Cannot perform this operation as role is assigned to an existing user.")]
RoleInUse,
#[error("Error: {0}")]
Anyhow(#[from] anyhow::Error),
#[error("{0}")]
SerdeError(#[from] serde_json::Error),
#[error("Network Error: {0}")]
Network(#[from] reqwest::Error),
}

impl actix_web::ResponseError for RoleError {
fn status_code(&self) -> http::StatusCode {
match self {
Self::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::RoleInUse => StatusCode::BAD_REQUEST,
Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::SerdeError(_) => StatusCode::BAD_REQUEST,
Self::Network(_) => StatusCode::BAD_GATEWAY,
}
}

Expand Down
3 changes: 3 additions & 0 deletions server/src/rbac/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ pub mod model {
Action::GetDashboard,
Action::CreateDashboard,
Action::DeleteDashboard,
Action::GetUserRoles,
],
stream: Some("*".to_string()),
tag: None,
Expand Down Expand Up @@ -269,6 +270,7 @@ pub mod model {
Action::ListFilter,
Action::CreateFilter,
Action::DeleteFilter,
Action::GetUserRoles,
],
stream: None,
tag: None,
Expand All @@ -294,6 +296,7 @@ pub mod model {
Action::CreateDashboard,
Action::DeleteDashboard,
Action::GetStreamInfo,
Action::GetUserRoles,
],
stream: None,
tag: None,
Expand Down

0 comments on commit 50e5f91

Please sign in to comment.