Skip to content

Commit

Permalink
feat: add geoconnection handling logic
Browse files Browse the repository at this point in the history
  • Loading branch information
zerj9 committed Dec 14, 2024
1 parent 0a860b8 commit 1c7c6be
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 113 deletions.
4 changes: 2 additions & 2 deletions gridwalk-backend/src/app_state.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::core::GeospatialConfig;
use crate::core::GeoConnections;
use crate::data::Database;
use std::sync::Arc;

#[derive(Clone)]
pub struct AppState {
pub app_data: Arc<dyn Database>,
pub geospatial_config: GeospatialConfig,
pub geo_connections: GeoConnections,
}
57 changes: 36 additions & 21 deletions gridwalk-backend/src/core/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,20 @@ impl ConnectionAccess {
) -> Result<Vec<ConnectionAccess>> {
database.get_accessible_connections(wsp).await
}

pub async fn get(database: &Arc<dyn Database>, wsp: &Workspace, con_id: &str) -> Result<Self> {
let con = database.get_accessible_connection(wsp, con_id).await?;
Ok(con)
}
}

// Trait for all geospatial data sources
#[async_trait]
pub trait GeoConnector: Send + Sync {
async fn connect(&mut self) -> Result<()>;
async fn disconnect(&mut self) -> Result<()>;
async fn list_sources(&self) -> Result<Vec<String>>;
async fn create_namespace(&self, name: &str) -> Result<()>;
async fn list_sources(&self, namespace: &str) -> Result<Vec<String>>;
async fn get_tile(&self, z: u32, x: u32, y: u32) -> Result<Vec<u8>>;
}

Expand All @@ -111,13 +117,12 @@ pub struct PostgresConnection {
}

#[derive(Clone, Debug)]
pub struct PostgisConfig {
pub struct PostgisConnector {
pool: Arc<Pool>,
}

impl PostgisConfig {
impl PostgisConnector {
pub fn new(connection: PostgresConnection) -> Result<Self> {
println!("Creating new PostgisConfig with connection parameters");
let mut config = Config::new();
config.host = Some(connection.host.to_string());
config.port = Some(connection.port);
Expand All @@ -129,14 +134,14 @@ impl PostgisConfig {
.create_pool(Some(Runtime::Tokio1), NoTls)
.map_err(|e| anyhow!("Failed to create connection pool: {}", e))?;

Ok(PostgisConfig {
Ok(PostgisConnector {
pool: Arc::new(pool),
})
}
}

#[async_trait]
impl GeoConnector for PostgisConfig {
impl GeoConnector for PostgisConnector {
async fn connect(&mut self) -> Result<()> {
println!("Testing connection to PostGIS database");
let client = self
Expand All @@ -157,29 +162,39 @@ impl GeoConnector for PostgisConfig {
Ok(())
}

async fn list_sources(&self) -> Result<Vec<String>> {
println!("Listing sources from PostGIS database");
async fn create_namespace(&self, name: &str) -> Result<()> {
let client = self
.pool
.get()
.await
.map_err(|e| anyhow!("Failed to get client from pool: {}", e))?;
let escaped_name = format!("\"{}\"", name);
let query = format!("CREATE SCHEMA IF NOT EXISTS {}", escaped_name);
client
.execute(&query, &[])
.await
.map_err(|e| anyhow!("Failed to execute query to create namespace: {}", e))?;
Ok(())
}

async fn list_sources(&self, namespace: &str) -> Result<Vec<String>> {
let client = self
.pool
.get()
.await
.map_err(|e| anyhow!("Failed to get client from pool: {}", e))?;

// query to list all tables within the schema given in the namespace parameter
let rows = client
.query(
"SELECT DISTINCT f.table_name
FROM information_schema.columns f
JOIN pg_type t ON f.udt_name = t.typname
WHERE t.typtype = 'b'
AND t.typname IN ('geometry', 'geography')
AND f.table_schema = 'public'",
&[],
"SELECT table_name
FROM information_schema.tables
WHERE table_schema = $1",
&[&namespace], // Remove the format! and quotes
)
.await
.map_err(|e| anyhow!("Failed to execute query to list sources: {}", e))?;

let sources: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
println!("Found {} sources", sources.len());
Ok(sources)
}

Expand Down Expand Up @@ -228,15 +243,15 @@ FROM mvt_data;
}
}

// The GeospatialConfig struct and its impl block are used to manage live connections
// The GeospatialConnections struct and its impl block are used to manage live connections
#[derive(Clone)]
pub struct GeospatialConfig {
pub struct GeoConnections {
sources: Arc<RwLock<HashMap<String, Arc<dyn GeoConnector>>>>,
}

impl GeospatialConfig {
impl GeoConnections {
pub fn new() -> Self {
GeospatialConfig {
GeoConnections {
sources: Arc::new(RwLock::new(HashMap::new())),
}
}
Expand Down
21 changes: 7 additions & 14 deletions gridwalk-backend/src/core/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::fmt;
use std::str::FromStr;
use std::sync::Arc;

use super::{ConnectionAccess, ConnectionAccessConfig};
use super::{ConnectionAccess, ConnectionAccessConfig, GeoConnector};

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Workspace {
Expand Down Expand Up @@ -70,7 +70,11 @@ impl Workspace {
Ok(database.get_workspace_by_id(id).await?)
}

pub async fn create(database: &Arc<dyn Database>, wsp: &Workspace) -> Result<()> {
pub async fn create(
database: &Arc<dyn Database>,
connection: &Arc<dyn GeoConnector>,
wsp: &Workspace,
) -> Result<()> {
// Check for existing org with same name
let db_resp = database.create_workspace(wsp).await;

Expand All @@ -81,6 +85,7 @@ impl Workspace {
access_config: ConnectionAccessConfig::ReadWrite(wsp.id.clone()),
};
connection_access.create_record(database).await?;
let _ = &connection.create_namespace(&wsp.id).await?;

match db_resp {
Ok(_) => Ok(()),
Expand All @@ -96,12 +101,6 @@ impl Workspace {
role: WorkspaceRole,
) -> Result<()> {
let requesting_member = self.clone().get_member(&database, &req_user).await?;

println!(
"{} is {} of the {} workspace",
req_user.first_name, requesting_member.role, self.name
);

if requesting_member.role != WorkspaceRole::Admin {
Err(anyhow!("Only Admin can add members"))?
}
Expand Down Expand Up @@ -133,12 +132,6 @@ impl Workspace {
user: &User,
) -> Result<()> {
let requesting_member = self.clone().get_member(&database, &req_user).await?;

println!(
"{} is {} of the {} workspace",
req_user.first_name, requesting_member.role, self.name
);

if requesting_member.role != WorkspaceRole::Admin {
Err(anyhow!("Only Admin can remove members"))?
}
Expand Down
5 changes: 5 additions & 0 deletions gridwalk-backend/src/data/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ pub trait UserStore: Send + Sync + 'static {
async fn get_connection(&self, connection_id: &str) -> Result<Connection>;
async fn create_connection_access(&self, ca: &ConnectionAccess) -> Result<()>;
async fn get_accessible_connections(&self, wsp: &Workspace) -> Result<Vec<ConnectionAccess>>;
async fn get_accessible_connection(
&self,
wsp: &Workspace,
con_id: &str,
) -> Result<ConnectionAccess>;
async fn create_layer(&self, layer: &Layer) -> Result<()>;
async fn create_project(&self, project: &Project) -> Result<()>;
async fn get_workspaces(&self, user: &User) -> Result<Vec<String>>;
Expand Down
43 changes: 40 additions & 3 deletions gridwalk-backend/src/data/dynamodb/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ impl UserStore for Dynamodb {
}

async fn get_workspace_member(&self, wsp: &Workspace, user: &User) -> Result<WorkspaceMember> {
println!("{wsp:?}");
let pk = format!("WSP#{0}", wsp.id);
let sk = format!("USER#{0}", user.id);
match self
Expand Down Expand Up @@ -548,8 +547,11 @@ impl UserStore for Dynamodb {
.send()
.await
{
Ok(response) => Ok(response.item.unwrap().into()),
Err(_e) => Err(anyhow!("connection not found")),
Ok(response) => response
.item
.ok_or_else(|| anyhow!("connection not found"))
.map(Into::into),
Err(e) => Err(anyhow!("failed to fetch connection: {}", e)),
}
}

Expand Down Expand Up @@ -607,6 +609,41 @@ impl UserStore for Dynamodb {
Ok(connections)
}

async fn get_accessible_connection(
&self,
wsp: &Workspace,
con_id: &str,
) -> Result<ConnectionAccess> {
let pk = format!("WSP#{}", wsp.id);
let sk_prefix = format!("CONACC#{}", con_id);

let accessible_connections = self
.client
.query()
.table_name(&self.table_name)
.key_condition_expression("PK = :pk AND begins_with(SK, :sk_prefix)")
.expression_attribute_values(":pk", AV::S(pk))
.expression_attribute_values(":sk_prefix", AV::S(sk_prefix.to_string()))
.send()
.await?;

let connections: Vec<ConnectionAccess> = accessible_connections
.items
.unwrap_or_default()
.into_iter()
.map(|item| item.into())
.collect();

// If connections is empty or has more than one item, return an error
if connections.is_empty() {
return Err(anyhow!("connection not found"));
} else if connections.len() > 1 {
return Err(anyhow!("multiple connections found"));
}

Ok(connections[0].clone())
}

async fn create_layer(&self, layer: &Layer) -> Result<()> {
let mut item = std::collections::HashMap::new();

Expand Down
1 change: 0 additions & 1 deletion gridwalk-backend/src/data/dynamodb/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ impl From<HashMap<String, AV>> for Connection {
// Convert DynamoDB response into ConnectionAccess struct
impl From<HashMap<String, AV>> for ConnectionAccess {
fn from(value: HashMap<String, AV>) -> Self {
println!("{:?}", value);
let parts: Vec<&str> = value
.get("SK")
.unwrap()
Expand Down
24 changes: 19 additions & 5 deletions gridwalk-backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod routes;
mod server;

use crate::app_state::AppState;
use crate::core::GeospatialConfig;
use crate::core::{GeoConnections, PostgisConnector};
use crate::data::Dynamodb;

use anyhow::Result;
Expand All @@ -22,17 +22,31 @@ async fn main() -> Result<()> {
let table_name = env::var("DYNAMODB_TABLE").unwrap_or_else(|_| "gridwalk".to_string());
// let app_db = Dynamodb::new(false, &table_name).await.unwrap();
// FOR LOCAL DB DEV
let app_db = Dynamodb::new(false, &table_name).await.unwrap();
let app_db = Dynamodb::new(true, &table_name).await.unwrap();

// Create GeospatialConfig
let geospatial_config = GeospatialConfig::new();
// Create GeospatialConnections
let geo_connections = GeoConnections::new();

// Create initial App State
let app_state = AppState {
app_data: app_db,
geospatial_config,
geo_connections,
};

// Check for primary connection info in app_data and add to geo_connections if found
let geoconnection_record_primary = app_state.app_data.get_connection("primary").await;
match geoconnection_record_primary {
Ok(geoconnection_primary) => {
info!("Primary connection found");
let postgis_connector = PostgisConnector::new(geoconnection_primary.config).unwrap();
app_state
.geo_connections
.add_connection("primary".to_string(), postgis_connector)
.await;
}
Err(_) => info!("Primary connection not found. Skipping..."),
}

// Run app
let app = server::create_app(app_state);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3001").await?;
Expand Down
Loading

0 comments on commit 1c7c6be

Please sign in to comment.