Skip to content

Commit

Permalink
initial code for geoconnector (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
zerj9 authored Oct 21, 2024
1 parent 3e278b8 commit c1c5038
Show file tree
Hide file tree
Showing 19 changed files with 510 additions and 167 deletions.
3 changes: 3 additions & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ aws-sdk-dynamodb = "1.48"
axum = { version = "0.7", features = ["macros", "multipart"] }
axum-extra = { version = "0.9.4", features = ["typed-header"] }
base64 = "0.22.1"
deadpool-postgres = "0.14.0"
duckdb-postgis = "0.1.4"
martin = { git = "https://github.com/enmeshed-analytics/martin.git", features = ["postgres"] }
martin-tile-utils = { git = "https://github.com/enmeshed-analytics/martin.git" }
Expand All @@ -23,7 +24,9 @@ serde_json = "1.0"
strum = "0.26"
strum_macros = "0.26"
tokio = { version = "1.40.0", features = ["full"] }
tokio-postgres = "0.7.12"
tower-http = { version = "0.5", features = ["trace"] }
tracing = "0.1"
tracing-subscriber = "0.3"
url = "2.5.2"
uuid = "1.10.0"
12 changes: 6 additions & 6 deletions backend/src/app_state.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::core::GeospatialConfig;
use crate::data::Database;
use martin::Source;
use std::collections::HashMap;
use std::sync::Arc;

#[derive(Debug, Clone)]
pub struct AppState<D: Database> {
pub app_data: D,
pub sources: HashMap<String, Box<dyn Source>>,
#[derive(Clone)]
pub struct AppState {
pub app_data: Arc<dyn Database>,
pub geospatial_config: GeospatialConfig,
}
9 changes: 4 additions & 5 deletions backend/src/auth.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::core::User;
use crate::data::Database;
use crate::{app_state::AppState, core::Session};
use axum::{
body::Body,
Expand All @@ -17,17 +16,17 @@ pub struct AuthUser {
pub user: Option<User>,
}

pub async fn auth_middleware<D: Database>(
State(state): State<Arc<AppState<D>>>,
pub async fn auth_middleware(
State(state): State<Arc<AppState>>,
TypedHeader(auth): TypedHeader<Authorization<Bearer>>,
mut request: Request<Body>,
next: Next,
) -> Result<Response, Response> {
let token = auth.token();
match Session::from_id(state.app_data.clone(), token).await {
match Session::from_id(&state.app_data, token).await {
Ok(session) => {
if let Some(user_id) = session.user_id {
match User::from_id(state.app_data.clone(), &user_id).await {
match User::from_id(&state.app_data, &user_id).await {
Ok(user) => {
request
.extensions_mut()
Expand Down
161 changes: 161 additions & 0 deletions backend/src/core/connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use deadpool_postgres::{Config, Pool, Runtime};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_postgres::NoTls;

use crate::data::Database;

// TODO: Switch connector_type/postgis_uri to enum to support other connectors
#[derive(Debug, Clone, Deserialize)]
pub struct Connection {
pub id: String,
pub workspace_id: String,
pub name: String,
pub created_by: String,
pub connector_type: String,
pub config: PostgresConnection,
}

impl Connection {
pub async fn create_record(self, database: &Arc<dyn Database>) -> Result<()> {
database.create_connection(&self).await?;
Ok(())
}

pub async fn from_id(
database: &Arc<dyn Database>,
workspace_id: &str,
connection_id: &str,
) -> Result<Self> {
let con = database
.get_workspace_connection(workspace_id, connection_id)
.await?;
Ok(con)
}
}

// Trait for all geospatial data sources
#[async_trait]
pub trait GeoConnector: Send + Sync {
async fn connect(&mut self) -> Result<()>;
async fn disconnect(&mut self) -> Result<()>;
async fn list_sources(&self) -> Result<Vec<String>>;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PostgresConnection {
pub host: String,
pub port: u16,
pub database: String,
pub username: String,
pub password: String,
}

#[derive(Clone, Debug)]
pub struct PostgisConfig {
pool: Arc<Pool>,
}

impl PostgisConfig {
pub fn new(connection: PostgresConnection) -> Result<Self> {
println!("Creating new PostgisConfig with connection parameters");
let mut config = Config::new();
config.host = Some(connection.host.to_string());
config.port = Some(connection.port);
config.dbname = Some(connection.database.to_string());
config.user = Some(connection.username.to_string());
config.password = Some(connection.password.to_string());

let pool = config
.create_pool(Some(Runtime::Tokio1), NoTls)
.map_err(|e| anyhow!("Failed to create connection pool: {}", e))?;

Ok(PostgisConfig {
pool: Arc::new(pool),
})
}
}

#[async_trait]
impl GeoConnector for PostgisConfig {
async fn connect(&mut self) -> Result<()> {
println!("Testing connection to PostGIS database");
let client = self
.pool
.get()
.await
.map_err(|e| anyhow!("Failed to get client from pool: {}", e))?;
client
.query("SELECT 1", &[])
.await
.map_err(|e| anyhow!("Failed to execute test query: {}", e))?;
println!("Connection test successful");
Ok(())
}

async fn disconnect(&mut self) -> Result<()> {
println!("Disconnect called, but pool remains active for potential future use");
Ok(())
}

async fn list_sources(&self) -> Result<Vec<String>> {
println!("Listing sources from PostGIS database");
let client = self
.pool
.get()
.await
.map_err(|e| anyhow!("Failed to get client from pool: {}", e))?;

let rows = client
.query(
"SELECT DISTINCT f.table_name
FROM information_schema.columns f
JOIN pg_type t ON f.udt_name = t.typname
WHERE t.typtype = 'b'
AND t.typname IN ('geometry', 'geography')
AND f.table_schema = 'public'",
&[],
)
.await
.map_err(|e| anyhow!("Failed to execute query to list sources: {}", e))?;

let sources: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
println!("Found {} sources", sources.len());
Ok(sources)
}
}

#[derive(Clone)]
pub struct GeospatialConfig {
sources: Arc<RwLock<HashMap<String, Arc<dyn GeoConnector>>>>,
}

impl GeospatialConfig {
pub fn new() -> Self {
GeospatialConfig {
sources: Arc::new(RwLock::new(HashMap::new())),
}
}

pub async fn add_connection<T: GeoConnector + 'static>(&self, name: String, source: T) {
let mut sources = self.sources.write().await;
sources.insert(name, Arc::new(source));
}

pub async fn get_connection(&self, connection_id: &str) -> Result<Arc<dyn GeoConnector>> {
let sources = self.sources.read().await;
sources
.get(connection_id)
.cloned()
.ok_or_else(|| anyhow!("Source not found"))
}

pub async fn remove_connection(&self, name: &str) -> Option<Arc<dyn GeoConnector>> {
let mut sources = self.sources.write().await;
sources.remove(name)
}
}
4 changes: 2 additions & 2 deletions backend/src/core/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
pub mod common;
pub mod layer;
pub mod connector;
pub mod session;
pub mod user;
pub mod workspace;

pub use common::*;
pub use layer::*;
pub use connector::*;
pub use session::*;
pub use user::*;
pub use workspace::*;
11 changes: 5 additions & 6 deletions backend/src/core/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ pub struct Session {
}

#[async_trait]
impl<S, D> FromRequestParts<S> for Session
impl<S> FromRequestParts<S> for Session
where
S: Send + Sync,
D: Database + Send + Sync + 'static,
S: std::ops::Deref<Target = Arc<AppState<D>>>,
S: std::ops::Deref<Target = Arc<AppState>>,
{
type Rejection = (StatusCode, String);

Expand All @@ -38,7 +37,7 @@ where
))?;

// Use the existing from_id method to validate and retrieve the session
match Session::from_id(state.app_data.clone(), auth_header).await {
match Session::from_id(&state.app_data, auth_header).await {
Ok(session) => Ok(session),
Err(_) => Err((
StatusCode::UNAUTHORIZED,
Expand All @@ -50,7 +49,7 @@ where

impl Session {
// TODO: dead code
pub async fn from_id<T: Database>(database: T, id: &str) -> Result<Self> {
pub async fn from_id(database: &Arc<dyn Database>, id: &str) -> Result<Self> {
database.get_session_by_id(id).await
}

Expand All @@ -74,7 +73,7 @@ impl Session {
}
}

pub async fn delete<T: Database>(&self, database: T) -> Result<()> {
pub async fn delete(&self, database: &Arc<dyn Database>) -> Result<()> {
database.delete_session(&self.id).await?;
Ok(())
}
Expand Down
7 changes: 4 additions & 3 deletions backend/src/core/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::core::{create_id, get_unix_timestamp, hash_password};
use crate::data::Database;
use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct User {
Expand Down Expand Up @@ -50,7 +51,7 @@ impl From<User> for Profile {
}

impl User {
pub async fn create<T: Database>(database: T, user: &CreateUser) -> Result<()> {
pub async fn create(database: &Arc<dyn Database>, user: &CreateUser) -> Result<()> {
let user_id = create_id(10).await;
let new_user = User::from_create_user(user, &user_id, true);
match database.get_user_by_email(&new_user.email).await {
Expand All @@ -59,11 +60,11 @@ impl User {
}
}

pub async fn from_id<T: Database>(database: T, id: &str) -> Result<User> {
pub async fn from_id(database: &Arc<dyn Database>, id: &str) -> Result<User> {
database.get_user_by_id(id).await
}

pub async fn from_email<T: Database>(database: T, email: &str) -> Result<User> {
pub async fn from_email(database: &Arc<dyn Database>, email: &str) -> Result<User> {
database.get_user_by_email(email).await
}

Expand Down
29 changes: 14 additions & 15 deletions backend/src/core/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Workspace {
Expand Down Expand Up @@ -64,11 +65,11 @@ pub struct RemoveOrgMember {
}

impl Workspace {
pub async fn from_id<T: Database>(database: T, id: &str) -> Result<Self> {
pub async fn from_id(database: &Arc<dyn Database>, id: &str) -> Result<Self> {
Ok(database.get_workspace_by_id(id).await.unwrap())
}

pub async fn create<T: Database>(database: T, wsp: &Workspace) -> Result<()> {
pub async fn create(database: &Arc<dyn Database>, wsp: &Workspace) -> Result<()> {
// Check for existing org with same name
let db_resp = database.create_workspace(wsp).await;

Expand All @@ -78,17 +79,14 @@ impl Workspace {
}
}

pub async fn add_member<T: Database>(
pub async fn add_member(
self,
database: T,
database: &Arc<dyn Database>,
req_user: &User,
user: &User,
role: WorkspaceRole,
) -> Result<()> {
let requesting_member = self
.clone()
.get_member(database.clone(), req_user.clone())
.await?;
let requesting_member = self.clone().get_member(&database, req_user.clone()).await?;

println!(
"{} is {} of the {} workspace",
Expand All @@ -106,21 +104,22 @@ impl Workspace {
Ok(())
}

pub async fn get_member<D: Database>(self, database: D, user: User) -> Result<WorkspaceMember> {
pub async fn get_member(
self,
database: &Arc<dyn Database>,
user: User,
) -> Result<WorkspaceMember> {
// TODO: Fix unwrap
Ok(database.get_workspace_member(self, user).await.unwrap())
}

pub async fn remove_member<T: Database>(
pub async fn remove_member(
self,
database: T,
database: &Arc<dyn Database>,
req_user: &User,
user: &User,
) -> Result<()> {
let requesting_member = self
.clone()
.get_member(database.clone(), req_user.clone())
.await?;
let requesting_member = self.clone().get_member(&database, req_user.clone()).await?;

println!(
"{} is {} of the {} workspace",
Expand Down
Loading

0 comments on commit c1c5038

Please sign in to comment.