forked from hummingbot/hummingbot
-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch '0.32/connector-altmarkets' into 0.32-altmarkets
- Loading branch information
Showing
17 changed files
with
2,023 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
256 changes: 256 additions & 0 deletions
256
hummingbot/connector/exchange/altmarkets/altmarkets_api_order_book_data_source.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,256 @@ | ||
#!/usr/bin/env python | ||
|
||
import aiohttp | ||
import asyncio | ||
import json | ||
import random | ||
import logging | ||
import pandas as pd | ||
import time | ||
from typing import ( | ||
Any, | ||
AsyncIterable, | ||
Dict, | ||
List, | ||
Optional, | ||
) | ||
from decimal import Decimal | ||
import requests | ||
import cachetools.func | ||
import websockets | ||
from websockets.exceptions import ConnectionClosed | ||
|
||
from hummingbot.core.data_type.order_book import OrderBook | ||
from hummingbot.core.data_type.order_book_message import OrderBookMessage | ||
from hummingbot.core.data_type.order_book_tracker_data_source import OrderBookTrackerDataSource | ||
from hummingbot.logger import HummingbotLogger | ||
from hummingbot.connector.exchange.altmarkets.altmarkets_order_book import AltmarketsOrderBook | ||
from hummingbot.connector.exchange.altmarkets.altmarkets_constants import Constants | ||
from hummingbot.connector.exchange.altmarkets.altmarkets_utils import ( | ||
convert_from_exchange_trading_pair, | ||
convert_to_exchange_trading_pair | ||
) | ||
|
||
|
||
class AltmarketsAPIOrderBookDataSource(OrderBookTrackerDataSource): | ||
|
||
_haobds_logger: Optional[HummingbotLogger] = None | ||
|
||
@classmethod | ||
def logger(cls) -> HummingbotLogger: | ||
if cls._haobds_logger is None: | ||
cls._haobds_logger = logging.getLogger(__name__) | ||
return cls._haobds_logger | ||
|
||
def __init__(self, trading_pairs: List[str]): | ||
super().__init__(trading_pairs) | ||
|
||
@classmethod | ||
async def get_last_traded_prices(cls, trading_pairs: List[str]) -> Dict[str, float]: | ||
results = dict() | ||
# Altmarkets rate limit is 100 https requests per 10 seconds | ||
random.seed() | ||
randSleep = (random.randint(1, 9) + random.randint(1, 9)) / 10 | ||
await asyncio.sleep(0.5 + randSleep) | ||
async with aiohttp.ClientSession() as client: | ||
resp = await client.get(Constants.EXCHANGE_ROOT_API + Constants.TICKER_URI) | ||
resp_json = await resp.json() | ||
for trading_pair in trading_pairs: | ||
resp_record = [resp_json[symbol] for symbol in list(resp_json.keys()) if symbol == trading_pair][0]['ticker'] | ||
results[trading_pair] = float(resp_record["last"]) | ||
return results | ||
|
||
@staticmethod | ||
@cachetools.func.ttl_cache(ttl=10) | ||
def get_mid_price(trading_pair: str) -> Optional[Decimal]: | ||
resp = requests.get(url=Constants.EXCHANGE_ROOT_API + Constants.TICKER_URI) | ||
records = resp.json() | ||
result = None | ||
for tag in list(records.keys()): | ||
record = records[tag] | ||
pair = convert_from_exchange_trading_pair(tag) | ||
if trading_pair == pair and record["ticker"]["open"] is not None and record["ticker"]["last"] is not None: | ||
result = ((Decimal(record["ticker"]["open"]) * Decimal('1')) + (Decimal(record["ticker"]["last"]) * Decimal('3'))) / Decimal("4") | ||
if result <= 0: | ||
result = Decimal('0.00000001') | ||
break | ||
return result | ||
|
||
@staticmethod | ||
async def fetch_trading_pairs() -> List[str]: | ||
try: | ||
async with aiohttp.ClientSession() as client: | ||
async with client.get(Constants.EXCHANGE_ROOT_API + Constants.SYMBOLS_URI, timeout=Constants.API_CALL_TIMEOUT) as response: | ||
if response.status == 200: | ||
products: List[Dict[str, Any]] = await response.json() | ||
return [ | ||
product["name"].replace("/", "-") for product in products | ||
if product['state'] == "enabled" | ||
] | ||
|
||
except Exception: | ||
# Do nothing if the request fails -- there will be no autocomplete for huobi trading pairs | ||
pass | ||
|
||
return [] | ||
|
||
@staticmethod | ||
async def get_snapshot(client: aiohttp.ClientSession, trading_pair: str) -> Dict[str, Any]: | ||
# when type is set to "step0", the default value of "depth" is 150 | ||
# params: Dict = {"symbol": trading_pair, "type": "step0"} | ||
# Altmarkets rate limit is 100 https requests per 10 seconds | ||
random.seed() | ||
randSleep = (random.randint(1, 9) + random.randint(1, 9)) / 10 | ||
await asyncio.sleep(0.5 + randSleep) | ||
async with client.get(Constants.EXCHANGE_ROOT_API + Constants.DEPTH_URI.format(trading_pair=convert_to_exchange_trading_pair(trading_pair))) as response: | ||
response: aiohttp.ClientResponse = response | ||
if response.status != 200: | ||
raise IOError(f"Error fetching Altmarkets market snapshot for {trading_pair}. " | ||
f"HTTP status is {response.status}.") | ||
api_data = await response.read() | ||
data: Dict[str, Any] = json.loads(api_data) | ||
return data | ||
|
||
async def get_new_order_book(self, trading_pair: str) -> OrderBook: | ||
async with aiohttp.ClientSession() as client: | ||
snapshot: Dict[str, Any] = await self.get_snapshot(client, trading_pair) | ||
snapshot_msg: OrderBookMessage = AltmarketsOrderBook.snapshot_message_from_exchange( | ||
snapshot, | ||
metadata={"trading_pair": trading_pair} | ||
) | ||
order_book: OrderBook = self.order_book_create_function() | ||
order_book.apply_snapshot(snapshot_msg.bids, snapshot_msg.asks, snapshot_msg.update_id) | ||
return order_book | ||
|
||
async def _inner_messages(self, | ||
ws: websockets.WebSocketClientProtocol) -> AsyncIterable[str]: | ||
# Terminate the recv() loop as soon as the next message timed out, so the outer loop can reconnect. | ||
try: | ||
while True: | ||
try: | ||
msg: str = await asyncio.wait_for(ws.recv(), timeout=Constants.MESSAGE_TIMEOUT) | ||
yield msg | ||
except asyncio.TimeoutError: | ||
pong_waiter = await ws.ping() | ||
await asyncio.wait_for(pong_waiter, timeout=Constants.PING_TIMEOUT) | ||
except asyncio.TimeoutError: | ||
self.logger().warning("WebSocket ping timed out. Going to reconnect...") | ||
return | ||
except ConnectionClosed: | ||
return | ||
finally: | ||
await ws.close() | ||
|
||
async def listen_for_trades(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue): | ||
while True: | ||
try: | ||
trading_pairs: List[str] = self._trading_pairs | ||
async with websockets.connect(Constants.EXCHANGE_WS_URI) as ws: | ||
ws: websockets.WebSocketClientProtocol = ws | ||
for trading_pair in trading_pairs: | ||
subscribe_request: Dict[str, Any] = { | ||
"event": Constants.WS_PUSHER_SUBSCRIBE_EVENT, | ||
"streams": [stream.format(trading_pair=convert_to_exchange_trading_pair(trading_pair)) for stream in Constants.WS_TRADE_SUBSCRIBE_STREAMS] | ||
} | ||
await ws.send(json.dumps(subscribe_request)) | ||
|
||
async for raw_msg in self._inner_messages(ws): | ||
# Altmarkets's data value for id is a large int too big for ujson to parse | ||
msg: Dict[str, Any] = json.loads(raw_msg) | ||
if "ping" in raw_msg: | ||
await ws.send(f'{{"op":"pong","timestamp": {str(msg["ping"])}}}') | ||
elif "subscribed" in raw_msg: | ||
pass | ||
elif ".trades" in raw_msg: | ||
trading_pair = list(msg.keys())[0].split(".")[0] | ||
for trade in msg[f"{trading_pair}.trades"]["trades"]: | ||
trade_message: OrderBookMessage = AltmarketsOrderBook.trade_message_from_exchange( | ||
trade, | ||
metadata={"trading_pair": trading_pair} | ||
) | ||
output.put_nowait(trade_message) | ||
else: | ||
# Debug log output for pub WS messages | ||
self.logger().info(f"Unrecognized message received from Altmarkets websocket: {msg}") | ||
except asyncio.CancelledError: | ||
raise | ||
except Exception: | ||
self.logger().error("Trades: Unexpected error with WebSocket connection. Retrying after 30 seconds...", | ||
exc_info=True) | ||
await asyncio.sleep(Constants.MESSAGE_TIMEOUT) | ||
|
||
async def listen_for_order_book_diffs(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue): | ||
while True: | ||
try: | ||
trading_pairs: List[str] = self._trading_pairs | ||
async with websockets.connect(Constants.EXCHANGE_WS_URI) as ws: | ||
ws: websockets.WebSocketClientProtocol = ws | ||
for trading_pair in trading_pairs: | ||
subscribe_request: Dict[str, Any] = { | ||
"event": "subscribe", | ||
"streams": [stream.format(trading_pair=convert_to_exchange_trading_pair(trading_pair)) for stream in Constants.WS_OB_SUBSCRIBE_STREAMS] | ||
} | ||
await ws.send(json.dumps(subscribe_request)) | ||
|
||
async for raw_msg in self._inner_messages(ws): | ||
# Altmarkets's data value for id is a large int too big for ujson to parse | ||
msg: Dict[str, Any] = json.loads(raw_msg) | ||
if "ping" in raw_msg: | ||
await ws.send(f'{{"op":"pong","timestamp": {str(msg["ping"])}}}') | ||
elif "subscribed" in raw_msg: | ||
pass | ||
elif ".ob-inc" in raw_msg: | ||
# msg_key = list(msg.keys())[0] | ||
trading_pair = list(msg.keys())[0].split(".")[0] | ||
order_book_message: OrderBookMessage = AltmarketsOrderBook.diff_message_from_exchange( | ||
msg[f"{trading_pair}.ob-inc"], | ||
metadata={"trading_pair": trading_pair} | ||
) | ||
output.put_nowait(order_book_message) | ||
elif ".ob-snap" in raw_msg: | ||
# msg_key = list(msg.keys())[0] | ||
trading_pair = list(msg.keys())[0].split(".")[0] | ||
order_book_message: OrderBookMessage = AltmarketsOrderBook.snapshot_message_from_exchange( | ||
msg[f"{trading_pair}.ob-snap"], | ||
metadata={"trading_pair": trading_pair} | ||
) | ||
output.put_nowait(order_book_message) | ||
else: | ||
# Debug log output for pub WS messages | ||
self.logger().info(f"OB: Unrecognized message received from Altmarkets websocket: {msg}") | ||
except asyncio.CancelledError: | ||
raise | ||
except Exception: | ||
self.logger().error("Unexpected error with WebSocket connection. Retrying after 30 seconds...", | ||
exc_info=True) | ||
await asyncio.sleep(Constants.MESSAGE_TIMEOUT) | ||
|
||
async def listen_for_order_book_snapshots(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue): | ||
while True: | ||
try: | ||
trading_pairs: List[str] = self._trading_pairs | ||
async with aiohttp.ClientSession() as client: | ||
for trading_pair in trading_pairs: | ||
try: | ||
snapshot: Dict[str, Any] = await self.get_snapshot(client, trading_pair) | ||
snapshot_message: OrderBookMessage = AltmarketsOrderBook.snapshot_message_from_exchange( | ||
snapshot, | ||
metadata={"trading_pair": trading_pair} | ||
) | ||
output.put_nowait(snapshot_message) | ||
self.logger().debug(f"Saved order book snapshot for {trading_pair}") | ||
await asyncio.sleep(5.0) | ||
except asyncio.CancelledError: | ||
raise | ||
except Exception: | ||
self.logger().error("Unexpected error.", exc_info=True) | ||
await asyncio.sleep(5.0) | ||
this_hour: pd.Timestamp = pd.Timestamp.utcnow().replace(minute=0, second=0, microsecond=0) | ||
next_hour: pd.Timestamp = this_hour + pd.Timedelta(hours=1) | ||
delta: float = next_hour.timestamp() - time.time() | ||
await asyncio.sleep(delta) | ||
except asyncio.CancelledError: | ||
raise | ||
except Exception: | ||
self.logger().error("Unexpected error.", exc_info=True) | ||
await asyncio.sleep(5.0) |
117 changes: 117 additions & 0 deletions
117
hummingbot/connector/exchange/altmarkets/altmarkets_api_user_stream_data_source.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
#!/usr/bin/env python | ||
|
||
import asyncio | ||
import logging | ||
from typing import ( | ||
Any, | ||
AsyncIterable, | ||
Dict, | ||
Optional, | ||
List, | ||
) | ||
import time | ||
import ujson | ||
import websockets | ||
from websockets.exceptions import ConnectionClosed | ||
from hummingbot.logger import HummingbotLogger | ||
from hummingbot.core.data_type.user_stream_tracker_data_source import UserStreamTrackerDataSource | ||
from hummingbot.connector.exchange.altmarkets.altmarkets_constants import Constants | ||
from hummingbot.connector.exchange.altmarkets.altmarkets_auth import AltmarketsAuth | ||
|
||
|
||
class AltmarketsAPIUserStreamDataSource(UserStreamTrackerDataSource): | ||
|
||
_lausds_logger: Optional[HummingbotLogger] = None | ||
|
||
@classmethod | ||
def logger(cls) -> HummingbotLogger: | ||
if cls._lausds_logger is None: | ||
cls._lausds_logger = logging.getLogger(__name__) | ||
return cls._lausds_logger | ||
|
||
def __init__(self, altmarkets_auth: AltmarketsAuth, trading_pairs: Optional[List[str]] = []): | ||
self._altmarkets_auth: AltmarketsAuth = altmarkets_auth | ||
self._trading_pairs = trading_pairs | ||
self._current_listen_key = None | ||
self._listen_for_user_stream_task = None | ||
self._last_recv_time: float = 0 | ||
super().__init__() | ||
|
||
@property | ||
def last_recv_time(self) -> float: | ||
return self._last_recv_time | ||
|
||
async def listen_for_user_stream(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue): | ||
""" | ||
*required | ||
Subscribe to user stream via web socket, and keep the connection open for incoming messages | ||
:param ev_loop: ev_loop to execute this function in | ||
:param output: an async queue where the incoming messages are stored | ||
""" | ||
while True: | ||
try: | ||
auth_headers = self._altmarkets_auth.get_headers() | ||
async with websockets.connect(Constants.EXCHANGE_WS_AUTH_URI, extra_headers=auth_headers) as ws: | ||
ws: websockets.WebSocketClientProtocol = ws | ||
|
||
# # We don't need an auth request since token is sent in headers. | ||
# auth_request: Dict[str, Any] = { | ||
# "event": Constants.WS_AUTH_REQUEST_EVENT, | ||
# "data": "empty" | ||
# } | ||
# await ws.send(ujson.dumps(auth_request)) | ||
|
||
for trading_pair in self._trading_pairs: | ||
subscribe_request: Dict[str, Any] = { | ||
"event": Constants.WS_PUSHER_SUBSCRIBE_EVENT, | ||
"streams": Constants.WS_USER_SUBSCRIBE_STREAMS | ||
} | ||
await ws.send(ujson.dumps(subscribe_request)) | ||
async for raw_msg in self._inner_messages(ws): | ||
diff_msg: Dict[str, Any] = ujson.loads(raw_msg) | ||
# Debug printing. | ||
# self.logger().info(f"PrvWS msg: {diff_msg}") | ||
if "ping" in raw_msg: | ||
await ws.send(f'{{"op":"pong","timestamp": {str(diff_msg["ping"])}}}') | ||
elif "subscribed" in raw_msg: | ||
pass | ||
elif "order" in raw_msg or "trade" in raw_msg: | ||
output.put_nowait(diff_msg) | ||
else: | ||
# Debug log output for pub WS messages | ||
self.logger().info(f"Unrecognized message received from Altmarkets websocket: {diff_msg}") | ||
except asyncio.CancelledError: | ||
raise | ||
except Exception: | ||
self.logger().error("Unexpected error with Altmarkets WebSocket connection. " | ||
"Retrying after 30 seconds...", exc_info=True) | ||
await asyncio.sleep(30.0) | ||
|
||
async def _inner_messages(self, | ||
ws: websockets.WebSocketClientProtocol) -> AsyncIterable[str]: | ||
""" | ||
Generator function that returns messages from the web socket stream | ||
:param ws: current web socket connection | ||
:returns: message in AsyncIterable format | ||
""" | ||
# Terminate the recv() loop as soon as the next message timed out, so the outer loop can reconnect. | ||
try: | ||
while True: | ||
try: | ||
msg: str = await asyncio.wait_for(ws.recv(), timeout=Constants.MESSAGE_TIMEOUT) | ||
self._last_recv_time = time.time() | ||
yield msg | ||
except asyncio.TimeoutError: | ||
try: | ||
pong_waiter = await ws.ping() | ||
self._last_recv_time = time.time() | ||
await asyncio.wait_for(pong_waiter, timeout=Constants.PING_TIMEOUT) | ||
except asyncio.TimeoutError: | ||
raise | ||
except asyncio.TimeoutError: | ||
self.logger().warning("WebSocket ping timed out. Going to reconnect...") | ||
return | ||
except ConnectionClosed: | ||
return | ||
finally: | ||
await ws.close() |
Oops, something went wrong.