Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/refactor trades downloader #24

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 99 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,106 @@
# Makefile for managing Conda environment and Docker Compose services
.ONESHELL:
.PHONY: uninstall
.PHONY: install
.PHONY: uninstall install start stop clean rebuild

# Conda environment management
ENV_NAME = quants-lab

uninstall:
conda env remove -n quants-lab
@echo "Removing Conda environment: $(ENV_NAME)"
conda env remove -n $(ENV_NAME)

install:
@echo "Creating Conda environment from environment.yml"
conda env create -f environment.yml

# Docker Compose services
COMPOSE_DB = docker compose -f docker-compose-db.yml
COMPOSE_TASKS = docker compose -f docker-compose-tasks.yml

start:
@if [ "$(SERVICE)" ]; then \
$(COMPOSE_DB) up -d $(SERVICE) || $(COMPOSE_TASKS) up -d $(SERVICE); \
else \
echo "Please specify a SERVICE to start. Use 'make start SERVICE=<service_name>'"; \
fi

stop:
@if [ "$(SERVICE)" ]; then \
$(COMPOSE_DB) down $(SERVICE) || $(COMPOSE_TASKS) down $(SERVICE); \
else \
echo "Please specify a SERVICE to stop. Use 'make stop SERVICE=<service_name>'"; \
fi

# Convenience shortcuts for database services
start-timescaledb:
$(COMPOSE_DB) up -d timescaledb

stop-timescaledb:
$(COMPOSE_DB) down timescaledb

start-optunadb:
$(COMPOSE_DB) up -d optunadb

stop-optunadb:
$(COMPOSE_DB) down optunadb

# Convenience shortcuts for task runners
start-trades:
$(COMPOSE_TASKS) up -d data-generation-runner

stop-trades:
$(COMPOSE_TASKS) down data-generation-runner

start-candles:
$(COMPOSE_TASKS) up -d candles-downloader-runner

stop-candles:
$(COMPOSE_TASKS) down candles-downloader-runner

start-report:
$(COMPOSE_TASKS) up -d screeners-report-runner

stop-report:
$(COMPOSE_TASKS) down screeners-report-runner

start-backtesting:
$(COMPOSE_TASKS) up -d backtesting-runner

stop-backtesting:
$(COMPOSE_TASKS) down backtesting-runner

# Clean and rebuild all containers
clean:
$(COMPOSE_DB) down -v
$(COMPOSE_TASKS) down -v

rebuild:
$(COMPOSE_DB) down --rmi all -v
$(COMPOSE_TASKS) down --rmi all -v
$(COMPOSE_DB) up --build -d
$(COMPOSE_TASKS) up --build -d

# Usage help
# Usage help
help:
@echo "Available targets:"
@echo " install - Create Conda environment from environment.yml"
@echo " uninstall - Remove Conda environment"
@echo " start SERVICE=<name> - Start a specific service"
@echo " stop SERVICE=<name> - Stop a specific service"
@echo " start-optunadb - Start optuna database"
@echo " stop-optunadb - Stop optuna database"
@echo " start-timescaledb - Start timescale database"
@echo " stop-timescaledb - Stop timescale database"
@echo " start-db - Start database services (timescaledb, optunadb)"
@echo " stop-db - Stop database services"
@echo " start-trades - Start trades task runner"
@echo " stop-trades - Stop trades task runner"
@echo " start-candles - Start candles downloader"
@echo " stop-candles - Stop candles downloader"
@echo " start-report - Start report generator"
@echo " stop-report - Stop report generator"
@echo " start-backtesting - Start backtesting"
@echo " stop-backtesting - Stop backtesting"
@echo " clean - Remove all containers and volumes"
@echo " rebuild - Rebuild and start all services"
51 changes: 42 additions & 9 deletions core/data_sources/clob.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Dict, List, Optional, Tuple

import pandas as pd
from bidict import bidict
from hummingbot.client.config.client_config_map import ClientConfigMap
from hummingbot.client.config.config_helpers import ClientConfigAdapter, get_connector_class
from hummingbot.client.settings import AllConnectorSettings, ConnectorType
Expand Down Expand Up @@ -75,15 +76,14 @@ def get_candles_from_cache(self,
else:
return None



async def get_candles(self,
connector_name: str,
trading_pair: str,
interval: str,
start_time: int,
end_time: int,
from_trades: bool = False) -> Candles:
from_trades: bool = False,
max_trades_per_call: int = 1_000_000) -> Candles:
cache_key = (connector_name, trading_pair, interval)

if cache_key in self._candles_cache:
Expand Down Expand Up @@ -111,9 +111,19 @@ async def get_candles(self,
try:
logger.info(f"Fetching data for {connector_name} {trading_pair} {interval} from {new_start_time} to {new_end_time}")
if from_trades:
trades = await self.get_trades(connector_name, trading_pair, new_start_time, new_end_time)
all_trades = pd.DataFrame()
async for trades in self.yield_trades_chunk(connector_name=connector_name,
trading_pair=trading_pair,
start_time=new_start_time,
end_time=new_end_time,
max_trades_per_call=max_trades_per_call):
if trades.empty:
break
trades["connector_name"] = connector_name
trades["trading_pair"] = trading_pair
all_trades = pd.concat([all_trades, trades])
pandas_interval = self.convert_interval_to_pandas_freq(interval)
candles_df = trades.resample(pandas_interval).agg({"price": "ohlc", "volume": "sum"}).ffill()
candles_df = all_trades.resample(pandas_interval).agg({"price": "ohlc", "volume": "sum"}).ffill()
candles_df.columns = candles_df.columns.droplevel(0)
candles_df["timestamp"] = pd.to_numeric(candles_df.index) // 1e9
else:
Expand Down Expand Up @@ -246,14 +256,37 @@ def load_candles_cache(self, root_path: str = ""):
except Exception as e:
logger.error(f"Error loading {file}: {type(e).__name__} - {e}")

async def get_trades(self, connector_name: str, trading_pair: str, start_time: int, end_time: int,
from_id: Optional[int] = None):
return await self.trades_feeds[connector_name].get_historical_trades(trading_pair, start_time, end_time,
from_id)
async def yield_trades_chunk(self, connector_name: str, trading_pair: str, start_time: int, end_time: int,
from_id: Optional[int] = None, max_trades_per_call: int = 1_000_000):
async for chunk in self.trades_feeds[connector_name].get_historical_trades(
trading_pair, start_time, end_time, from_id, max_trades_per_call
):
yield chunk

@staticmethod
def convert_interval_to_pandas_freq(interval: str) -> str:
"""
Converts a candle interval string to a pandas frequency string.
"""
return INTERVAL_MAPPING.get(interval, 'T')

@property
def interval_to_seconds(self):
return bidict({
"1s": 1,
"1m": 60,
"3m": 180,
"5m": 300,
"15m": 900,
"30m": 1800,
"1h": 3600,
"2h": 7200,
"4h": 14400,
"6h": 21600,
"8h": 28800,
"12h": 43200,
"1d": 86400,
"3d": 259200,
"1w": 604800,
"1M": 2592000
})
39 changes: 28 additions & 11 deletions core/data_sources/trades_feed/connectors/binance_perpetual.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def get_exchange_trading_pair(self, trading_pair: str) -> str:
base, quote = trading_pair.split("-")
return f"{base}{quote}"

async def _get_historical_trades(self, trading_pair: str, start_time: int, end_time: int, from_id: Optional[int] = None):
async def _get_historical_trades(self, trading_pair: str, start_time: int, end_time: int,
from_id: Optional[int] = None, max_trades_per_call: int = 1_000_000):
all_trades_collected = False
end_ts = int(end_time * 1000)
start_ts = int(start_time * 1000)
Expand All @@ -58,19 +59,36 @@ async def _get_historical_trades(self, trading_pair: str, start_time: int, end_t
if trades:
last_timestamp = trades[-1]["T"]
all_trades.extend(trades)
all_trades_collected = last_timestamp >= end_ts
from_id = trades[-1]["a"]

# Check if the buffer size is sufficient for yielding
if len(all_trades) >= max_trades_per_call:
df = pd.DataFrame(all_trades)
df.rename(columns={"T": "timestamp", "p": "price", "q": "volume", "m": "sell_taker", "a": "id"},
inplace=True)
df.drop(columns=["f", "l"], inplace=True)
df["timestamp"] = df["timestamp"] / 1000
df.index = pd.to_datetime(df["timestamp"], unit="s")
df["price"] = df["price"].astype(float)
df["volume"] = df["volume"].astype(float)
yield df
all_trades = [] # Reset buffer after yielding

all_trades_collected = last_timestamp >= end_ts
else:
all_trades_collected = True

df = pd.DataFrame(all_trades)
df.rename(columns={"T": "timestamp", "p": "price", "q": "volume", "m": "sell_taker", "a": "id"}, inplace=True)
df.drop(columns=["f", "l"], inplace=True)
df["timestamp"] = df["timestamp"] / 1000
df.index = pd.to_datetime(df["timestamp"], unit="s")
df["price"] = df["price"].astype(float)
df["volume"] = df["volume"].astype(float)
return df
# Yield any remaining trades after the loop
if all_trades:
df = pd.DataFrame(all_trades)
df.rename(columns={"T": "timestamp", "p": "price", "q": "volume", "m": "sell_taker", "a": "id"},
inplace=True)
df.drop(columns=["f", "l"], inplace=True)
df["timestamp"] = df["timestamp"] / 1000
df.index = pd.to_datetime(df["timestamp"], unit="s")
df["price"] = df["price"].astype(float)
df["volume"] = df["volume"].astype(float)
yield df

async def _get_historical_trades_request(self, params: Dict):
try:
Expand Down Expand Up @@ -98,7 +116,6 @@ async def _enforce_rate_limit(self):
if current_weight_usage >= self.REQUEST_WEIGHT_LIMIT:
# Calculate how long to sleep to stay within the rate limit
sleep_time = self.ONE_MINUTE - (current_time - self._request_timestamps[0])
self.logger().info(f"Rate limit reached. Sleeping for {sleep_time:.2f} seconds.")
await asyncio.sleep(sleep_time)

def _record_request(self):
Expand Down
11 changes: 7 additions & 4 deletions core/data_sources/trades_feed/trades_feed_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ def get_exchange_trading_pair(self, trading_pair: str) -> str:
...

async def get_historical_trades(self, trading_pair: str, start_time: int, end_time: Optional[int] = None,
from_id: Optional[int] = None):
from_id: Optional[int] = None, max_trades_per_call: int = 1_000_000):
if not end_time:
end_time = int(time.time())
historical_trades = await self._get_historical_trades(trading_pair, start_time, end_time, from_id)
return historical_trades

async for chunk in self._get_historical_trades(trading_pair, start_time, end_time, from_id,
max_trades_per_call):
yield chunk # Yield chunks instead of returning a single result

@abstractmethod
async def _get_historical_trades(self, trading_pair: str, start_time: int, end_time: int, from_id: Optional[int] = None):
async def _get_historical_trades(self, trading_pair: str, start_time: int, end_time: int,
from_id: Optional[int] = None, max_trades_per_call: int = 1_000_000):
...
33 changes: 21 additions & 12 deletions core/services/timescale_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ def get_ohlc_table_name(connector_name: str, trading_pair: str, interval: str) -
return f"{connector_name}_{trading_pair.lower().replace('-', '_')}_{interval}"

@property
def metrics_table_name(self):
return "summary_metrics"
def trades_summary_table_name(self):
return "trades_summary"

@property
def screener_table_name(self):
Expand Down Expand Up @@ -102,7 +102,7 @@ async def create_screener_table(self):
async def create_metrics_table(self):
async with self.pool.acquire() as conn:
await conn.execute(f'''
CREATE TABLE IF NOT EXISTS {self.metrics_table_name} (
CREATE TABLE IF NOT EXISTS {self.trades_summary_table_name} (
connector_name TEXT NOT NULL,
trading_pair TEXT NOT NULL,
trade_amount REAL,
Expand Down Expand Up @@ -316,21 +316,30 @@ async def compute_resampled_ohlc(self, connector_name: str, trading_pair: str, i
low NUMERIC NOT NULL,
close NUMERIC NOT NULL,
volume NUMERIC NOT NULL,
quote_asset_volume NUMERIC NOT NULL,
n_trades INTEGER NOT NULL,
taker_buy_base_volume NUMERIC NOT NULL,
taker_buy_quote_volume NUMERIC NOT NULL,
PRIMARY KEY (timestamp)
)
''')
# Insert the resampled candles into the new table
await conn.executemany(f'''
INSERT INTO {ohlc_table_name} (timestamp, open, high, low, close, volume)
VALUES ($1, $2, $3, $4, $5, $6)
INSERT INTO {ohlc_table_name} (timestamp, open, high, low, close, volume, quote_asset_volume, n_trades,
taker_buy_base_volume, taker_buy_quote_volume)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
''', [
(
datetime.fromtimestamp(row["timestamp"]),
row['open'],
row['high'],
row['low'],
row['close'],
row['volume']
row['volume'],
0.0,
0,
0.0,
0.0
)
for i, row in candles.data.iterrows()
])
Expand Down Expand Up @@ -378,9 +387,9 @@ async def get_screener_df(self):

async def get_db_status_df(self):
async with self.pool.acquire() as conn:
rows = await conn.fetch("""
rows = await conn.fetch(f"""
SELECT *
FROM summary_metrics""")
FROM {self.trades_summary_table_name}""")
df_cols = [
"connector_name",
"trading_pair",
Expand All @@ -404,11 +413,11 @@ async def append_db_status_metrics(self, connector_name: str, trading_pair: str)
metric_data["connector_name"] = connector_name
metric_data["trading_pair"] = trading_pair
delete_query = f"""
DELETE FROM {self.metrics_table_name}
DELETE FROM {self.trades_summary_table_name}
WHERE connector_name = '{metric_data["connector_name"]}' AND trading_pair = '{metric_data["trading_pair"]}';
"""
query = f"""
INSERT INTO {self.metrics_table_name} (
INSERT INTO {self.trades_summary_table_name} (
connector_name,
trading_pair,
trade_amount,
Expand Down Expand Up @@ -559,8 +568,8 @@ async def get_data_range(self, connector_name: str, trading_pair: str) -> Dict[s

query = f'''
SELECT
MIN(timestamp) as start_time,
MAX(timestamp) as end_time
MIN(timestamp) as start_time,
MAX(timestamp) as end_time
FROM {table_name}
'''

Expand Down
Loading