Skip to content

Commit

Permalink
Merge pull request #37 from bcnmy/dev
Browse files Browse the repository at this point in the history
Sync
  • Loading branch information
ankurdubey521 authored Jul 17, 2024
2 parents 9dc2fd7 + 717df83 commit c8970c7
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 31 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ jobs:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
cache-all-crates: true
- run: cargo test --all-features
env:
BUNGEE_API_KEY: ${{ secrets.BUNGEE_API_KEY }}
Expand Down
18 changes: 11 additions & 7 deletions bin/reflux/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,25 +118,29 @@ async fn run_solver(config: Arc<Config>) {
let redis_client = RedisClient::build(&config.infra.redis_url)
.await
.expect("Failed to instantiate redis client");

let erc20_instance_map = generate_erc20_instance_map(&config).unwrap();
let token_price_provider = Arc::new(Mutex::new(CoingeckoClient::new(
config.coingecko.base_url.clone(),
config.coingecko.api_key.clone(),
redis_client.clone(),
Duration::from_secs(config.coingecko.expiry_sec),
)));

let routing_engine = Arc::new(RoutingEngine::new(
account_service.clone(),
buckets,
redis_client.clone(),
config.solver_config.clone(),
chain_configs,
token_configs,
Arc::clone(&config),
Arc::clone(&token_price_provider),
));

// Initialize Settlement Engine and Dependencies
let erc20_instance_map = generate_erc20_instance_map(&config).unwrap();
let bungee_client = BungeeClient::new(&config.bungee.base_url, &config.bungee.api_key)
.expect("Failed to Instantiate Bungee Client");
let token_price_provider = Arc::new(Mutex::new(CoingeckoClient::new(
config.coingecko.base_url.clone(),
config.coingecko.api_key.clone(),
redis_client.clone(),
Duration::from_secs(config.coingecko.expiry_sec),
)));
let settlement_engine = Arc::new(SettlementEngine::new(
Arc::clone(&config),
bungee_client,
Expand Down
2 changes: 1 addition & 1 deletion crates/account-aggregation/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ pub struct PathQuery {
pub account: String,
pub to_chain: u32,
pub to_token: String,
pub to_value: f64,
pub to_amount_token: f64,
}
13 changes: 9 additions & 4 deletions crates/api/src/service_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use routing_engine::token_price::TokenPriceProvider;

pub struct ServiceController<Source: RouteSource, PriceProvider: TokenPriceProvider> {
account_service: Arc<AccountAggregationService>,
routing_engine: Arc<RoutingEngine>,
routing_engine: Arc<RoutingEngine<PriceProvider>>,
settlement_engine: Arc<SettlementEngine<Source, PriceProvider>>,
token_chain_map: HashMap<String, HashMap<u32, bool>>,
chain_supported: Vec<(u32, String)>,
Expand All @@ -24,7 +24,7 @@ impl<Source: RouteSource + 'static, PriceProvider: TokenPriceProvider + 'static>
{
pub fn new(
account_service: Arc<AccountAggregationService>,
routing_engine: Arc<RoutingEngine>,
routing_engine: Arc<RoutingEngine<PriceProvider>>,
settlement_engine: Arc<SettlementEngine<Source, PriceProvider>>,
token_chain_map: HashMap<String, HashMap<u32, bool>>,
chain_supported: Vec<(u32, String)>,
Expand Down Expand Up @@ -214,7 +214,7 @@ impl<Source: RouteSource + 'static, PriceProvider: TokenPriceProvider + 'static>

/// Get best cost path for asset consolidation
pub async fn get_best_path(
routing_engine: Arc<RoutingEngine>,
routing_engine: Arc<RoutingEngine<PriceProvider>>,
settlement_engine: Arc<SettlementEngine<Source, PriceProvider>>,
token_chain_map: HashMap<String, HashMap<u32, bool>>,
query: types::PathQuery,
Expand All @@ -240,7 +240,12 @@ impl<Source: RouteSource + 'static, PriceProvider: TokenPriceProvider + 'static>
}

let routes_result = routing_engine
.get_best_cost_paths(&query.account, query.to_chain, &query.to_token, query.to_value)
.get_best_cost_paths(
&query.account,
query.to_chain,
&query.to_token,
query.to_amount_token,
)
.await;

if let Err(err) = routes_result {
Expand Down
55 changes: 36 additions & 19 deletions crates/routing-engine/src/routing_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,29 @@ use std::cmp;
use std::collections::HashMap;
use std::sync::Arc;

use alloy::dyn_abi::abi::Token;
use futures::stream::{self, StreamExt};
use log::{debug, error, info};
use thiserror::Error;
use tokio::sync::RwLock;
use tokio::sync::{Mutex, RwLock};

use account_aggregation::{service::AccountAggregationService, types::TokenWithBalance};
use config::{config::BucketConfig, ChainConfig, Config, SolverConfig, TokenConfig};
use config::{ChainConfig, Config, config::BucketConfig, SolverConfig, TokenConfig};
use storage::{KeyValueStore, RedisClient, RedisClientError};

use crate::{
estimator::{Estimator, LinearRegressionEstimator},
BridgeResult, BridgeResultVecWrapper, Route,
BridgeResult,
BridgeResultVecWrapper, estimator::{Estimator, LinearRegressionEstimator}, Route,
};
use crate::token_price::TokenPriceProvider;
use crate::token_price::utils::{Errors, get_token_price};

/// (from_chain, to_chain, from_token, to_token)
#[derive(Debug)]
struct PathQuery(u32, u32, String, String);

#[derive(Error, Debug)]
pub enum RoutingEngineError {
pub enum RoutingEngineError<T: TokenPriceProvider> {
#[error("Redis error: {0}")]
RedisError(#[from] RedisClientError),

Expand All @@ -36,29 +39,36 @@ pub enum RoutingEngineError {

#[error("User balance fetch error: {0}")]
UserBalanceFetchError(String),

#[error("Token price provider error: {0}")]
TokenPriceProviderError(Errors<T::Error>),
}

/// Routing Engine
/// This struct is responsible for calculating the best cost path for a user
#[derive(Debug)]
pub struct RoutingEngine {
pub struct RoutingEngine<T: TokenPriceProvider> {
buckets: Vec<Arc<BucketConfig>>,
aas_client: Arc<AccountAggregationService>,
cache: Arc<RwLock<HashMap<String, String>>>, // (hash(bucket), hash(estimator_value)
redis_client: RedisClient,
estimates: Arc<SolverConfig>,
chain_configs: HashMap<u32, Arc<ChainConfig>>,
token_configs: HashMap<String, Arc<TokenConfig>>,
config: Arc<Config>,
price_provider: Arc<Mutex<T>>,
}

impl RoutingEngine {
impl<PriceProvider: TokenPriceProvider> RoutingEngine<PriceProvider> {
pub fn new(
aas_client: Arc<AccountAggregationService>,
buckets: Vec<Arc<BucketConfig>>,
redis_client: RedisClient,
solver_config: Arc<SolverConfig>,
chain_configs: HashMap<u32, Arc<ChainConfig>>,
token_configs: HashMap<String, Arc<TokenConfig>>,
config: Arc<Config>,
price_provider: Arc<Mutex<PriceProvider>>,
) -> Self {
let cache = Arc::new(RwLock::new(HashMap::new()));

Expand All @@ -70,6 +80,8 @@ impl RoutingEngine {
estimates: solver_config,
chain_configs,
token_configs,
config,
price_provider,
}
}

Expand Down Expand Up @@ -99,11 +111,11 @@ impl RoutingEngine {
account: &str,
to_chain: u32,
to_token: &str,
to_value: f64,
) -> Result<Vec<BridgeResult>, RoutingEngineError> {
to_amount_token: f64,
) -> Result<Vec<BridgeResult>, RoutingEngineError<PriceProvider>> {
debug!(
"Getting best cost path for user: {}, to_chain: {}, to_token: {}, to_value: {}",
account, to_chain, to_token, to_value
"Getting best cost path for user: {}, to_chain: {}, to_token: {}, to_amount_token: {}",
account, to_chain, to_token, to_amount_token
);
let user_balances = self.get_user_balance_from_agg_service(&account).await?;
debug!("User balances: {:?}", user_balances);
Expand All @@ -114,10 +126,15 @@ impl RoutingEngine {
debug!("Direct assets: {:?}", direct_assets);
debug!("Non-direct assets: {:?}", non_direct_assets);

// let to_value_usd =
let to_value_usd =
get_token_price(&self.config, &self.price_provider.lock().await, &to_token.to_string())
.await
.map_err(RoutingEngineError::TokenPriceProviderError)?
* to_amount_token;
debug!("To value in USD: {}", to_value_usd);

let (mut selected_routes, total_amount_needed, mut total_cost) = self
.generate_optimal_routes(direct_assets, to_chain, to_token, to_value, account)
.generate_optimal_routes(direct_assets, to_chain, to_token, to_value_usd, account)
.await?;

// Handle swap/bridge for remaining amount if needed (non-direct assets)
Expand Down Expand Up @@ -152,7 +169,7 @@ impl RoutingEngine {
to_token: &str,
to_value_usd: f64,
to_address: &str,
) -> Result<(Vec<BridgeResult>, f64, f64), RoutingEngineError> {
) -> Result<(Vec<BridgeResult>, f64, f64), RoutingEngineError<PriceProvider>> {
// Sort direct assets by Balance^x / Fee_Cost^y, here x=2 and y=1
let x = self.estimates.x_value;
let y = self.estimates.y_value;
Expand Down Expand Up @@ -236,7 +253,7 @@ impl RoutingEngine {
&self,
target_amount_in_usd: f64,
path: PathQuery,
) -> Result<f64, RoutingEngineError> {
) -> Result<f64, RoutingEngineError<PriceProvider>> {
// TODO: Maintain sorted list cache in cache, binary search
let bucket = self
.buckets
Expand Down Expand Up @@ -277,7 +294,7 @@ impl RoutingEngine {
async fn get_user_balance_from_agg_service(
&self,
account: &str,
) -> Result<Vec<TokenWithBalance>, RoutingEngineError> {
) -> Result<Vec<TokenWithBalance>, RoutingEngineError<PriceProvider>> {
let balance = self
.aas_client
.get_user_accounts_balance(&account.to_string())
Expand Down Expand Up @@ -306,7 +323,7 @@ impl RoutingEngine {
is_smart_contract_deposit: bool,
from_address: &str,
to_address: &str,
) -> Result<BridgeResult, RoutingEngineError> {
) -> Result<BridgeResult, RoutingEngineError<PriceProvider>> {
let from_chain = Arc::clone(self.chain_configs.get(&from_chain_id).ok_or_else(|| {
RoutingEngineError::CacheError(format!(
"Chain config not found for ID {}",
Expand Down Expand Up @@ -344,12 +361,12 @@ mod tests {
use config::{BucketConfig, ChainConfig, SolverConfig, TokenConfig, TokenConfigByChainConfigs};
use storage::mongodb_client::MongoDBClient;

use crate::estimator::Estimator;
use crate::routing_engine::PathQuery;
use crate::{
estimator::{DataPoint, LinearRegressionEstimator},
routing_engine::{RoutingEngine, RoutingEngineError},
};
use crate::estimator::Estimator;
use crate::routing_engine::PathQuery;

#[tokio::test]
async fn test_get_cached_data() -> Result<(), RoutingEngineError> {
Expand Down

0 comments on commit c8970c7

Please sign in to comment.