From 1c7c6be6154c4f0d234ba4b05cf9bdead82752f2 Mon Sep 17 00:00:00 2001 From: zerj9 <96551236+zerj9@users.noreply.github.com> Date: Sat, 14 Dec 2024 15:07:22 +0000 Subject: [PATCH] feat: add geoconnection handling logic --- gridwalk-backend/src/app_state.rs | 4 +- gridwalk-backend/src/core/connector.rs | 57 +++++--- gridwalk-backend/src/core/workspace.rs | 21 +-- gridwalk-backend/src/data/config.rs | 5 + gridwalk-backend/src/data/dynamodb/config.rs | 43 +++++- .../src/data/dynamodb/conversions.rs | 1 - gridwalk-backend/src/main.rs | 24 +++- gridwalk-backend/src/routes/connector.rs | 128 +++++++++--------- gridwalk-backend/src/routes/workspace.rs | 14 +- gridwalk-backend/src/server.rs | 4 + 10 files changed, 188 insertions(+), 113 deletions(-) diff --git a/gridwalk-backend/src/app_state.rs b/gridwalk-backend/src/app_state.rs index 83a2038..35ecd15 100644 --- a/gridwalk-backend/src/app_state.rs +++ b/gridwalk-backend/src/app_state.rs @@ -1,9 +1,9 @@ -use crate::core::GeospatialConfig; +use crate::core::GeoConnections; use crate::data::Database; use std::sync::Arc; #[derive(Clone)] pub struct AppState { pub app_data: Arc, - pub geospatial_config: GeospatialConfig, + pub geo_connections: GeoConnections, } diff --git a/gridwalk-backend/src/core/connector.rs b/gridwalk-backend/src/core/connector.rs index a7121c5..2d20d98 100644 --- a/gridwalk-backend/src/core/connector.rs +++ b/gridwalk-backend/src/core/connector.rs @@ -89,6 +89,11 @@ impl ConnectionAccess { ) -> Result> { database.get_accessible_connections(wsp).await } + + pub async fn get(database: &Arc, wsp: &Workspace, con_id: &str) -> Result { + let con = database.get_accessible_connection(wsp, con_id).await?; + Ok(con) + } } // Trait for all geospatial data sources @@ -96,7 +101,8 @@ impl ConnectionAccess { pub trait GeoConnector: Send + Sync { async fn connect(&mut self) -> Result<()>; async fn disconnect(&mut self) -> Result<()>; - async fn list_sources(&self) -> Result>; + async fn create_namespace(&self, name: &str) -> Result<()>; + async fn list_sources(&self, namespace: &str) -> Result>; async fn get_tile(&self, z: u32, x: u32, y: u32) -> Result>; } @@ -111,13 +117,12 @@ pub struct PostgresConnection { } #[derive(Clone, Debug)] -pub struct PostgisConfig { +pub struct PostgisConnector { pool: Arc, } -impl PostgisConfig { +impl PostgisConnector { pub fn new(connection: PostgresConnection) -> Result { - println!("Creating new PostgisConfig with connection parameters"); let mut config = Config::new(); config.host = Some(connection.host.to_string()); config.port = Some(connection.port); @@ -129,14 +134,14 @@ impl PostgisConfig { .create_pool(Some(Runtime::Tokio1), NoTls) .map_err(|e| anyhow!("Failed to create connection pool: {}", e))?; - Ok(PostgisConfig { + Ok(PostgisConnector { pool: Arc::new(pool), }) } } #[async_trait] -impl GeoConnector for PostgisConfig { +impl GeoConnector for PostgisConnector { async fn connect(&mut self) -> Result<()> { println!("Testing connection to PostGIS database"); let client = self @@ -157,29 +162,39 @@ impl GeoConnector for PostgisConfig { Ok(()) } - async fn list_sources(&self) -> Result> { - println!("Listing sources from PostGIS database"); + async fn create_namespace(&self, name: &str) -> Result<()> { + let client = self + .pool + .get() + .await + .map_err(|e| anyhow!("Failed to get client from pool: {}", e))?; + let escaped_name = format!("\"{}\"", name); + let query = format!("CREATE SCHEMA IF NOT EXISTS {}", escaped_name); + client + .execute(&query, &[]) + .await + .map_err(|e| anyhow!("Failed to execute query to create namespace: {}", e))?; + Ok(()) + } + + async fn list_sources(&self, namespace: &str) -> Result> { let client = self .pool .get() .await .map_err(|e| anyhow!("Failed to get client from pool: {}", e))?; + // query to list all tables within the schema given in the namespace parameter let rows = client .query( - "SELECT DISTINCT f.table_name - FROM information_schema.columns f - JOIN pg_type t ON f.udt_name = t.typname - WHERE t.typtype = 'b' - AND t.typname IN ('geometry', 'geography') - AND f.table_schema = 'public'", - &[], + "SELECT table_name + FROM information_schema.tables + WHERE table_schema = $1", + &[&namespace], // Remove the format! and quotes ) .await .map_err(|e| anyhow!("Failed to execute query to list sources: {}", e))?; - let sources: Vec = rows.iter().map(|row| row.get(0)).collect(); - println!("Found {} sources", sources.len()); Ok(sources) } @@ -228,15 +243,15 @@ FROM mvt_data; } } -// The GeospatialConfig struct and its impl block are used to manage live connections +// The GeospatialConnections struct and its impl block are used to manage live connections #[derive(Clone)] -pub struct GeospatialConfig { +pub struct GeoConnections { sources: Arc>>>, } -impl GeospatialConfig { +impl GeoConnections { pub fn new() -> Self { - GeospatialConfig { + GeoConnections { sources: Arc::new(RwLock::new(HashMap::new())), } } diff --git a/gridwalk-backend/src/core/workspace.rs b/gridwalk-backend/src/core/workspace.rs index c327484..1d80fd2 100644 --- a/gridwalk-backend/src/core/workspace.rs +++ b/gridwalk-backend/src/core/workspace.rs @@ -6,7 +6,7 @@ use std::fmt; use std::str::FromStr; use std::sync::Arc; -use super::{ConnectionAccess, ConnectionAccessConfig}; +use super::{ConnectionAccess, ConnectionAccessConfig, GeoConnector}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Workspace { @@ -70,7 +70,11 @@ impl Workspace { Ok(database.get_workspace_by_id(id).await?) } - pub async fn create(database: &Arc, wsp: &Workspace) -> Result<()> { + pub async fn create( + database: &Arc, + connection: &Arc, + wsp: &Workspace, + ) -> Result<()> { // Check for existing org with same name let db_resp = database.create_workspace(wsp).await; @@ -81,6 +85,7 @@ impl Workspace { access_config: ConnectionAccessConfig::ReadWrite(wsp.id.clone()), }; connection_access.create_record(database).await?; + let _ = &connection.create_namespace(&wsp.id).await?; match db_resp { Ok(_) => Ok(()), @@ -96,12 +101,6 @@ impl Workspace { role: WorkspaceRole, ) -> Result<()> { let requesting_member = self.clone().get_member(&database, &req_user).await?; - - println!( - "{} is {} of the {} workspace", - req_user.first_name, requesting_member.role, self.name - ); - if requesting_member.role != WorkspaceRole::Admin { Err(anyhow!("Only Admin can add members"))? } @@ -133,12 +132,6 @@ impl Workspace { user: &User, ) -> Result<()> { let requesting_member = self.clone().get_member(&database, &req_user).await?; - - println!( - "{} is {} of the {} workspace", - req_user.first_name, requesting_member.role, self.name - ); - if requesting_member.role != WorkspaceRole::Admin { Err(anyhow!("Only Admin can remove members"))? } diff --git a/gridwalk-backend/src/data/config.rs b/gridwalk-backend/src/data/config.rs index b76894b..4f732f7 100644 --- a/gridwalk-backend/src/data/config.rs +++ b/gridwalk-backend/src/data/config.rs @@ -29,6 +29,11 @@ pub trait UserStore: Send + Sync + 'static { async fn get_connection(&self, connection_id: &str) -> Result; async fn create_connection_access(&self, ca: &ConnectionAccess) -> Result<()>; async fn get_accessible_connections(&self, wsp: &Workspace) -> Result>; + async fn get_accessible_connection( + &self, + wsp: &Workspace, + con_id: &str, + ) -> Result; async fn create_layer(&self, layer: &Layer) -> Result<()>; async fn create_project(&self, project: &Project) -> Result<()>; async fn get_workspaces(&self, user: &User) -> Result>; diff --git a/gridwalk-backend/src/data/dynamodb/config.rs b/gridwalk-backend/src/data/dynamodb/config.rs index fe9ac48..b4e5353 100644 --- a/gridwalk-backend/src/data/dynamodb/config.rs +++ b/gridwalk-backend/src/data/dynamodb/config.rs @@ -388,7 +388,6 @@ impl UserStore for Dynamodb { } async fn get_workspace_member(&self, wsp: &Workspace, user: &User) -> Result { - println!("{wsp:?}"); let pk = format!("WSP#{0}", wsp.id); let sk = format!("USER#{0}", user.id); match self @@ -548,8 +547,11 @@ impl UserStore for Dynamodb { .send() .await { - Ok(response) => Ok(response.item.unwrap().into()), - Err(_e) => Err(anyhow!("connection not found")), + Ok(response) => response + .item + .ok_or_else(|| anyhow!("connection not found")) + .map(Into::into), + Err(e) => Err(anyhow!("failed to fetch connection: {}", e)), } } @@ -607,6 +609,41 @@ impl UserStore for Dynamodb { Ok(connections) } + async fn get_accessible_connection( + &self, + wsp: &Workspace, + con_id: &str, + ) -> Result { + let pk = format!("WSP#{}", wsp.id); + let sk_prefix = format!("CONACC#{}", con_id); + + let accessible_connections = self + .client + .query() + .table_name(&self.table_name) + .key_condition_expression("PK = :pk AND begins_with(SK, :sk_prefix)") + .expression_attribute_values(":pk", AV::S(pk)) + .expression_attribute_values(":sk_prefix", AV::S(sk_prefix.to_string())) + .send() + .await?; + + let connections: Vec = accessible_connections + .items + .unwrap_or_default() + .into_iter() + .map(|item| item.into()) + .collect(); + + // If connections is empty or has more than one item, return an error + if connections.is_empty() { + return Err(anyhow!("connection not found")); + } else if connections.len() > 1 { + return Err(anyhow!("multiple connections found")); + } + + Ok(connections[0].clone()) + } + async fn create_layer(&self, layer: &Layer) -> Result<()> { let mut item = std::collections::HashMap::new(); diff --git a/gridwalk-backend/src/data/dynamodb/conversions.rs b/gridwalk-backend/src/data/dynamodb/conversions.rs index fb44e63..dc2dbbe 100644 --- a/gridwalk-backend/src/data/dynamodb/conversions.rs +++ b/gridwalk-backend/src/data/dynamodb/conversions.rs @@ -134,7 +134,6 @@ impl From> for Connection { // Convert DynamoDB response into ConnectionAccess struct impl From> for ConnectionAccess { fn from(value: HashMap) -> Self { - println!("{:?}", value); let parts: Vec<&str> = value .get("SK") .unwrap() diff --git a/gridwalk-backend/src/main.rs b/gridwalk-backend/src/main.rs index 955e4a9..cac32e1 100644 --- a/gridwalk-backend/src/main.rs +++ b/gridwalk-backend/src/main.rs @@ -6,7 +6,7 @@ mod routes; mod server; use crate::app_state::AppState; -use crate::core::GeospatialConfig; +use crate::core::{GeoConnections, PostgisConnector}; use crate::data::Dynamodb; use anyhow::Result; @@ -22,17 +22,31 @@ async fn main() -> Result<()> { let table_name = env::var("DYNAMODB_TABLE").unwrap_or_else(|_| "gridwalk".to_string()); // let app_db = Dynamodb::new(false, &table_name).await.unwrap(); // FOR LOCAL DB DEV - let app_db = Dynamodb::new(false, &table_name).await.unwrap(); + let app_db = Dynamodb::new(true, &table_name).await.unwrap(); - // Create GeospatialConfig - let geospatial_config = GeospatialConfig::new(); + // Create GeospatialConnections + let geo_connections = GeoConnections::new(); // Create initial App State let app_state = AppState { app_data: app_db, - geospatial_config, + geo_connections, }; + // Check for primary connection info in app_data and add to geo_connections if found + let geoconnection_record_primary = app_state.app_data.get_connection("primary").await; + match geoconnection_record_primary { + Ok(geoconnection_primary) => { + info!("Primary connection found"); + let postgis_connector = PostgisConnector::new(geoconnection_primary.config).unwrap(); + app_state + .geo_connections + .add_connection("primary".to_string(), postgis_connector) + .await; + } + Err(_) => info!("Primary connection not found. Skipping..."), + } + // Run app let app = server::create_app(app_state); let listener = tokio::net::TcpListener::bind("0.0.0.0:3001").await?; diff --git a/gridwalk-backend/src/routes/connector.rs b/gridwalk-backend/src/routes/connector.rs index e637392..9544eca 100644 --- a/gridwalk-backend/src/routes/connector.rs +++ b/gridwalk-backend/src/routes/connector.rs @@ -1,13 +1,13 @@ use crate::app_state::AppState; use crate::auth::AuthUser; use crate::core::{ - Connection, ConnectionAccess, GlobalRole, PostgisConfig, PostgresConnection, Workspace, + Connection, ConnectionAccess, GlobalRole, PostgisConnector, PostgresConnection, Workspace, WorkspaceMember, }; use axum::{ extract::{Extension, Path, State}, http::StatusCode, - response::{IntoResponse, Response}, + response::IntoResponse, Json, }; use serde::{Deserialize, Serialize}; @@ -56,9 +56,29 @@ pub async fn create_connection( // Create connection info let connection_info = Connection::from_req(req); + // Check if connection already exists + if state + .app_data + .get_connection(&connection_info.id) + .await + .is_ok() + { + return (StatusCode::CONFLICT, "Connection already exists").into_response(); + } + + // Create postgis connector + let postgis_connector = PostgisConnector::new(connection_info.clone().config).unwrap(); + // Attempt to create record - match connection_info.create_record(&state.app_data).await { - Ok(_) => (StatusCode::OK, "Connection creation submitted").into_response(), + match connection_info.clone().create_record(&state.app_data).await { + Ok(_) => { + // Add connection to geo_connections + state + .geo_connections + .add_connection(connection_info.id, postgis_connector) + .await; + (StatusCode::OK, "Connection creation submitted").into_response() + } Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, format!("Connection creation failed: {}", e), @@ -80,7 +100,7 @@ impl From for ConnectionResponse { ConnectionResponse { id: con.connection_id.clone(), name: con.connection_id, - connector_type: con.access_config.path().to_string(), + connector_type: "postgis".into(), } } } @@ -106,8 +126,6 @@ pub async fn list_connections( .ok() .unwrap(); - println!("Connection access list: {:?}", connection_access_list); - // Convert Vec to Vec // Removes the config from the response let connection_responses: Vec = connection_access_list @@ -121,57 +139,45 @@ pub async fn list_connections( } } -//pub async fn list_sources( -// State(state): State>, -// Extension(auth_user): Extension, -// Path((workspace_id, connection_id)): Path<(String, String)>, -//) -> impl IntoResponse { -// let workspace = Workspace::from_id(&state.app_data, &workspace_id) -// .await -// .unwrap(); -// let _member = workspace -// .get_member(&state.app_data, &auth_user.user.unwrap()) -// .await -// .unwrap(); -// // Check member role -// let connection = Connection::from_id(&state.app_data, &workspace_id, &connection_id) -// .await -// .unwrap(); -// -// match state.geospatial_config.get_connection(&connection_id).await { -// Ok(connector) => match connector.list_sources().await { -// Ok(sources) => Json(sources).into_response(), -// Err(e) => { -// eprintln!("Error listing sources: {:?}", e); -// (StatusCode::INTERNAL_SERVER_ERROR, "Failed to list sources").into_response() -// } -// }, -// Err(_) => { -// println!("Connection not found, adding new connection to state"); -// let pg_config = PostgisConfig::new(connection.config).unwrap(); -// state -// .geospatial_config -// .add_connection(connection_id.clone(), pg_config) -// .await; -// -// match state.geospatial_config.get_connection(&connection_id).await { -// Ok(connector) => match connector.list_sources().await { -// Ok(sources) => Json(sources).into_response(), -// Err(e) => { -// eprintln!("Error listing sources: {:?}", e); -// (StatusCode::INTERNAL_SERVER_ERROR, "Failed to list sources") -// .into_response() -// } -// }, -// Err(e) => { -// eprintln!("Error getting connection after adding: {:?}", e); -// ( -// StatusCode::INTERNAL_SERVER_ERROR, -// "Failed to get connection", -// ) -// .into_response() -// } -// } -// } -// } -//} +pub async fn list_sources( + State(state): State>, + Extension(auth_user): Extension, + Path((workspace_id, connection_id)): Path<(String, String)>, +) -> impl IntoResponse { + match auth_user.user { + Some(user) => { + let workspace = Workspace::from_id(&state.app_data, &workspace_id) + .await + .map_err(|_| (StatusCode::NOT_FOUND, "".to_string()))?; + + // Check if the requesting user is a member of the workspace + WorkspaceMember::get(&state.app_data, &workspace, &user) + .await + .map_err(|_| (StatusCode::FORBIDDEN, "unauthorized".to_string()))?; + + // TODO: Check Access Level + let _connection_access = + ConnectionAccess::get(&state.app_data, &workspace, &connection_id) + .await + .map_err(|_| (StatusCode::NOT_FOUND, "".to_string()))?; + + let connection = state + .geo_connections + .get_connection(&connection_id) + .await + .unwrap(); + + match connection.list_sources(&workspace.id).await { + Ok(sources) => Ok(Json(sources)), + Err(e) => { + eprintln!("Error listing sources: {:?}", e); + Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to list sources".to_string(), + )) + } + } + } + None => Err((StatusCode::FORBIDDEN, "unauthorized".to_string())), + } +} diff --git a/gridwalk-backend/src/routes/workspace.rs b/gridwalk-backend/src/routes/workspace.rs index 4c9749d..5a617af 100644 --- a/gridwalk-backend/src/routes/workspace.rs +++ b/gridwalk-backend/src/routes/workspace.rs @@ -54,15 +54,20 @@ impl Workspace { } } +// TODO: Create all records within a transaction pub async fn create_workspace( State(state): State>, Extension(auth_user): Extension, Json(req): Json, ) -> Response { if let Some(owner) = auth_user.user { - println!("{owner:?}"); let wsp = Workspace::from_req(req, owner.clone().id); - match Workspace::create(&state.app_data, &wsp).await { + let primary_connection = state + .geo_connections + .get_connection("primary") + .await + .unwrap(); + match Workspace::create(&state.app_data, &primary_connection, &wsp).await { Ok(_) => { let now = get_unix_timestamp(); // TODO: Handle response from adding member @@ -210,7 +215,6 @@ pub async fn get_workspaces( Extension(auth_user): Extension, ) -> Response { if let Some(user) = auth_user.user { - println!("Fetching workspaces for user: {:?}", user.id); match Workspace::get_user_workspaces(&state.app_data, &user).await { Ok(workspace_ids) => { let workspaces: Vec = join_all( @@ -225,8 +229,7 @@ pub async fn get_workspaces( Json(workspaces).into_response() } - Err(e) => { - println!("Error fetching workspaces: {:?}", e); + Err(_) => { let error = ErrorResponse { error: "Failed to fetch workspaces".to_string(), }; @@ -234,7 +237,6 @@ pub async fn get_workspaces( } } } else { - println!("No authenticated user found"); let error = ErrorResponse { error: "Unauthorized".to_string(), }; diff --git a/gridwalk-backend/src/server.rs b/gridwalk-backend/src/server.rs index 9576eda..86eb7fb 100644 --- a/gridwalk-backend/src/server.rs +++ b/gridwalk-backend/src/server.rs @@ -45,6 +45,10 @@ pub fn create_app(app_state: AppState) -> Router { "/workspaces/:workspace_id/connections", get(list_connections), ) + .route( + "/workspaces/:workspace_id/connections/:connection_id/sources", + get(list_sources), + ) .route("/create_project", post(create_project)) .route("/upload_layer", post(upload_layer)) .layer(DefaultBodyLimit::disable())