Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(multitenancy): add support for multitenancy and handle the same in router, producer, consumer, drainer and analytics #4630

Merged
merged 42 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
85e3676
feat(multitenancy): add logical sepration of tenant using db schema
jagan-jaya Apr 30, 2024
2265882
feat(multitenancy): add base_url for redirection and pay start
jagan-jaya May 13, 2024
50559b7
feat(multitenancy): format code and remove unused deps
jagan-jaya May 13, 2024
54c1aee
feat(multitenancy): create sessionstate for scheduler
jagan-jaya May 13, 2024
f6d3dac
Merge branch 'main' of https://github.com/juspay/hyperswitch into mul…
jagan-jaya May 14, 2024
7d35c89
chore: run formatter
hyperswitch-bot[bot] May 14, 2024
b2b05c3
feat(multitenancy): update tenants
jagan-jaya May 14, 2024
4a5748c
Merge branch 'multitenancy' of https://github.com/juspay/hyperswitch …
jagan-jaya May 14, 2024
e60f9b5
feat(multitenancy): fix clippy warnings
jagan-jaya May 15, 2024
c1602da
Merge branch 'main' of https://github.com/juspay/hyperswitch into mul…
jagan-jaya May 15, 2024
c9405db
feat(multitenancy): fix clippy warnings
jagan-jaya May 17, 2024
dda08c8
Merge branch 'main' of https://github.com/juspay/hyperswitch into mul…
jagan-jaya May 17, 2024
c376ffc
chore: run formatter
hyperswitch-bot[bot] May 17, 2024
c78729e
feat(multitenancy): add support to enable/disable multitenancy support
jagan-jaya May 17, 2024
66d8d4d
Merge branch 'multitenancy' of https://github.com/juspay/hyperswitch …
jagan-jaya May 17, 2024
f103acc
fix: clear redis entry on update operation
jagan-jaya May 21, 2024
b3609b8
Merge branch 'main' of https://github.com/juspay/hyperswitch into mul…
jagan-jaya May 21, 2024
4e941d6
chore: run formatter
hyperswitch-bot[bot] May 21, 2024
f852184
feat: add tenant support in in-memory cache
jagan-jaya May 23, 2024
9e27425
Merge branch 'multitenancy' of https://github.com/juspay/hyperswitch …
jagan-jaya May 23, 2024
6a174a2
Merge branch 'main' of https://github.com/juspay/hyperswitch into mul…
jagan-jaya May 23, 2024
e0cb6f0
feat: add tenant support in in-memory cache
jagan-jaya May 23, 2024
a6d1936
feat: address PR comments
jagan-jaya May 27, 2024
69f5fb4
feat: address PR comments
jagan-jaya May 27, 2024
18a75c0
Merge branch 'main' of https://github.com/juspay/hyperswitch into mul…
jagan-jaya May 27, 2024
93aa1f9
fix: clippy warnings
jagan-jaya May 27, 2024
9516014
chore: run formatter
hyperswitch-bot[bot] May 27, 2024
827113f
fix: clippy warnings
jagan-jaya May 28, 2024
e6eea95
Merge branch 'multitenancy' of https://github.com/juspay/hyperswitch …
jagan-jaya May 28, 2024
479bb2f
Merge branch 'main' of https://github.com/juspay/hyperswitch into mul…
jagan-jaya May 28, 2024
4cb732e
refactor: drainer to have single redis conn for error handling
jagan-jaya May 28, 2024
fb10f00
refactor: get redis key prefix from config
jagan-jaya May 29, 2024
6f57723
Merge branch 'main' of https://github.com/juspay/hyperswitch into mul…
jagan-jaya May 29, 2024
3688621
fix: clippy warnings
jagan-jaya May 29, 2024
4283f8f
fix: clippy warnings
jagan-jaya May 29, 2024
079f8c7
Merge branch 'main' of https://github.com/juspay/hyperswitch into mul…
jagan-jaya May 30, 2024
2afcf2c
Merge branch 'main' of https://github.com/juspay/hyperswitch into mul…
jagan-jaya May 30, 2024
07ea1e6
chore: run formatter
hyperswitch-bot[bot] May 30, 2024
d5d5f84
Merge branch 'main' of https://github.com/juspay/hyperswitch into mul…
jagan-jaya Jun 3, 2024
df02be4
Merge branch 'main' of https://github.com/juspay/hyperswitch into mul…
jagan-jaya Jun 3, 2024
3294ed8
Merge branch 'main' of https://github.com/juspay/hyperswitch into mul…
jagan-jaya Jun 3, 2024
1223f14
Merge branch 'main' into multitenancy
jarnura Jun 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,9 @@ disputes = "hyperswitch-dispute-events"

[saved_payment_methods]
sdk_eligible_payment_methods = "card"

[multitenancy]
enabled = false

[multitenancy.tenants]
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""} # schema -> Postgres db schema, redis_key_prefix -> redis key distinguisher, base_url -> url of the tenant
7 changes: 6 additions & 1 deletion config/deployments/env_specific.toml
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ recon_admin_api_key = "recon_test_admin" # recon_admin API key for recon authent

# Server configuration
[server]
base_url = "https://server_base_url"
workers = 8
port = 8080
host = "127.0.0.1"
Expand All @@ -253,3 +252,9 @@ encryption_manager = "aws_kms" # Encryption manager client to be used
[encryption_management.aws_kms]
key_id = "kms_key_id" # The AWS key ID used by the KMS SDK for decrypting data.
region = "kms_region" # The AWS region used by the KMS SDK for decrypting data.

[multitenancy]
enabled = false

[multitenancy.tenants]
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""}
6 changes: 6 additions & 0 deletions config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -628,3 +628,9 @@ disputes = "hyperswitch-dispute-events"

[saved_payment_methods]
sdk_eligible_payment_methods = "card"

[multitenancy]
enabled = false

[multitenancy.tenants]
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""}
6 changes: 6 additions & 0 deletions config/docker_compose.toml
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,9 @@ disputes = "hyperswitch-dispute-events"

[saved_payment_methods]
sdk_eligible_payment_methods = "card"

[multitenancy]
enabled = false

[multitenancy.tenants]
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""}
8 changes: 4 additions & 4 deletions crates/analytics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,20 +601,20 @@ impl AnalyticsProvider {
}
}

pub async fn from_conf(config: &AnalyticsConfig) -> Self {
pub async fn from_conf(config: &AnalyticsConfig, tenant: &str) -> Self {
match config {
AnalyticsConfig::Sqlx { sqlx } => Self::Sqlx(SqlxClient::from_conf(sqlx).await),
AnalyticsConfig::Sqlx { sqlx } => Self::Sqlx(SqlxClient::from_conf(sqlx, tenant).await),
AnalyticsConfig::Clickhouse { clickhouse } => Self::Clickhouse(ClickhouseClient {
config: Arc::new(clickhouse.clone()),
}),
AnalyticsConfig::CombinedCkh { sqlx, clickhouse } => Self::CombinedCkh(
SqlxClient::from_conf(sqlx).await,
SqlxClient::from_conf(sqlx, tenant).await,
ClickhouseClient {
config: Arc::new(clickhouse.clone()),
},
),
AnalyticsConfig::CombinedSqlx { sqlx, clickhouse } => Self::CombinedSqlx(
SqlxClient::from_conf(sqlx).await,
SqlxClient::from_conf(sqlx, tenant).await,
ClickhouseClient {
config: Arc::new(clickhouse.clone()),
},
Expand Down
14 changes: 6 additions & 8 deletions crates/analytics/src/sqlx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use api_models::{
analytics::refunds::RefundType,
enums::{DisputeStage, DisputeStatus},
};
use common_utils::errors::{CustomResult, ParsingError};
use common_utils::{
errors::{CustomResult, ParsingError},
DbConnectionParams,
};
use diesel_models::enums::{
AttemptStatus, AuthenticationType, Currency, PaymentMethod, RefundStatus,
};
use error_stack::ResultExt;
use masking::PeekInterface;
use sqlx::{
postgres::{PgArgumentBuffer, PgPoolOptions, PgRow, PgTypeInfo, PgValueRef},
Decode, Encode,
Expand Down Expand Up @@ -49,12 +51,8 @@ impl Default for SqlxClient {
}

impl SqlxClient {
pub async fn from_conf(conf: &Database) -> Self {
let password = &conf.password.peek();
let database_url = format!(
"postgres://{}:{}@{}:{}/{}",
conf.username, password, conf.host, conf.port, conf.dbname
);
pub async fn from_conf(conf: &Database, schema: &str) -> Self {
let database_url = conf.get_database_url(schema);
#[allow(clippy::expect_used)]
let pool = PgPoolOptions::new()
.max_connections(conf.pool_size)
Expand Down
3 changes: 3 additions & 0 deletions crates/common_utils/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ pub const DEFAULT_TTL_FOR_EXTENDED_CARD_INFO: u16 = 15 * 60;
/// Max ttl for Extended card info in redis (in seconds)
pub const MAX_TTL_FOR_EXTENDED_CARD_INFO: u16 = 60 * 60 * 2;

/// Default tenant to be used when multitenancy is disabled
pub const DEFAULT_TENANT: &str = "public";

/// Max Length for MerchantReferenceId
pub const MAX_ALLOWED_MERCHANT_REFERENCE_ID_LENGTH: u8 = 64;

Expand Down
23 changes: 23 additions & 0 deletions crates/common_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#![warn(missing_docs, missing_debug_implementations)]
#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR" ), "/", "README.md"))]

use masking::{PeekInterface, Secret};

use crate::{
consts::ID_LENGTH,
id_type::{CustomerId, MerchantReferenceId},
Expand Down Expand Up @@ -224,6 +226,27 @@ pub fn generate_time_ordered_id(prefix: &str) -> String {
format!("{prefix}_{}", uuid::Uuid::now_v7().as_simple())
}

#[allow(missing_docs)]
pub trait DbConnectionParams {
fn get_username(&self) -> &str;
fn get_password(&self) -> Secret<String>;
fn get_host(&self) -> &str;
fn get_port(&self) -> u16;
fn get_dbname(&self) -> &str;
fn get_database_url(&self, schema: &str) -> String {
format!(
"postgres://{}:{}@{}:{}/{}?application_name={}&options=-c search_path%3D{}",
self.get_username(),
self.get_password().peek(),
self.get_host(),
self.get_port(),
self.get_dbname(),
schema,
schema,
)
}
}

#[cfg(test)]
mod nanoid_tests {
#![allow(clippy::unwrap_used)]
Expand Down
17 changes: 7 additions & 10 deletions crates/drainer/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bb8::PooledConnection;
use common_utils::DbConnectionParams;
use diesel::PgConnection;
use masking::PeekInterface;

use crate::{settings::Database, Settings};

Expand All @@ -18,15 +18,12 @@ pub async fn redis_connection(conf: &Settings) -> redis_interface::RedisConnecti
///
/// Will panic if could not create a db pool
#[allow(clippy::expect_used)]
pub async fn diesel_make_pg_pool(database: &Database, _test_transaction: bool) -> PgPool {
let database_url = format!(
"postgres://{}:{}@{}:{}/{}",
database.username,
database.password.peek(),
database.host,
database.port,
database.dbname
);
pub async fn diesel_make_pg_pool(
database: &Database,
_test_transaction: bool,
schema: &str,
) -> PgPool {
let database_url = database.get_database_url(schema);
let manager = async_bb8_diesel::ConnectionManager::<PgConnection>::new(database_url);
let pool = bb8::Pool::builder()
.max_size(database.pool_size)
Expand Down
77 changes: 50 additions & 27 deletions crates/drainer/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::{atomic, Arc};
use std::{
collections::HashMap,
sync::{atomic, Arc},
};

use router_env::tracing::Instrument;
use tokio::{
Expand Down Expand Up @@ -31,12 +34,12 @@ pub struct HandlerInner {
loop_interval: Duration,
active_tasks: Arc<atomic::AtomicU64>,
conf: DrainerSettings,
store: Arc<Store>,
stores: HashMap<String, Arc<Store>>,
running: Arc<atomic::AtomicBool>,
}

impl Handler {
pub fn from_conf(conf: DrainerSettings, store: Arc<Store>) -> Self {
pub fn from_conf(conf: DrainerSettings, stores: HashMap<String, Arc<Store>>) -> Self {
let shutdown_interval = Duration::from_millis(conf.shutdown_interval.into());
let loop_interval = Duration::from_millis(conf.loop_interval.into());

Expand All @@ -49,7 +52,7 @@ impl Handler {
loop_interval,
active_tasks,
conf,
store,
stores,
running,
};

Expand All @@ -68,21 +71,23 @@ impl Handler {

while self.running.load(atomic::Ordering::SeqCst) {
metrics::DRAINER_HEALTH.add(&metrics::CONTEXT, 1, &[]);
if self.store.is_stream_available(stream_index).await {
let _task_handle = tokio::spawn(
drainer_handler(
self.store.clone(),
stream_index,
self.conf.max_read_count,
self.active_tasks.clone(),
jobs_picked.clone(),
)
.in_current_span(),
);
for store in self.stores.values() {
if store.is_stream_available(stream_index).await {
let _task_handle = tokio::spawn(
drainer_handler(
store.clone(),
stream_index,
self.conf.max_read_count,
self.active_tasks.clone(),
jobs_picked.clone(),
)
.in_current_span(),
);
}
}
stream_index = utils::increment_stream_index(
(stream_index, jobs_picked.clone()),
self.store.config.drainer_num_partitions,
self.conf.num_partitions,
)
.await;
time::sleep(self.loop_interval).await;
Expand Down Expand Up @@ -116,18 +121,33 @@ impl Handler {

pub fn spawn_error_handlers(&self, tx: mpsc::Sender<()>) -> errors::DrainerResult<()> {
let (redis_error_tx, redis_error_rx) = oneshot::channel();
let redis_conn_clone = self
.stores
.values()
.next()
.map(|store| store.redis_conn.clone());
match redis_conn_clone {
None => {
logger::error!("No redis connection found");
Err(
errors::DrainerError::UnexpectedError("No redis connection found".to_string())
.into(),
)
}
Some(redis_conn_clone) => {
// Spawn a task to monitor if redis is down or not
let _task_handle = tokio::spawn(
async move { redis_conn_clone.on_error(redis_error_tx).await }
.in_current_span(),
);

let redis_conn_clone = self.store.redis_conn.clone();

// Spawn a task to monitor if redis is down or not
let _task_handle = tokio::spawn(
async move { redis_conn_clone.on_error(redis_error_tx).await }.in_current_span(),
);

//Spawns a task to send shutdown signal if redis goes down
let _task_handle = tokio::spawn(redis_error_receiver(redis_error_rx, tx).in_current_span());
//Spawns a task to send shutdown signal if redis goes down
let _task_handle =
tokio::spawn(redis_error_receiver(redis_error_rx, tx).in_current_span());

Ok(())
Ok(())
}
}
}
}

Expand Down Expand Up @@ -208,7 +228,10 @@ async fn drainer(
};

// parse_stream_entries returns error if no entries is found, handle it
let entries = utils::parse_stream_entries(&stream_read, stream_name)?;
let entries = utils::parse_stream_entries(
&stream_read,
store.redis_conn.add_prefix(stream_name).as_str(),
)?;
let read_count = entries.len();

metrics::JOBS_PICKED_PER_STREAM.add(
Expand Down
Loading
Loading