diff --git a/AUTHORS.md b/AUTHORS.md index 0f26272f0..6479f20fe 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -14,3 +14,4 @@ Cryptofeed was originally created by Bryant Moscon, but many others have contrib * [O. Janche](https://github.com/toyan) - * [Bastien Enjalbert](https://github.com/bastienjalbert) - * [Jonggyun Kim](https://github.com/gyunt) - +* [Thomas Bouamoud](https://github.com/thomasbs17) - \ No newline at end of file diff --git a/CHANGES.md b/CHANGES.md index 8a7a83932..947905c43 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,40 @@ ## Changelog -### 2.3.0 +### 2.4.1 + * Bugfix: Handle empty nextFundingRate in OKX + * Bugfix: Handle null next_funding_time and estimated_rate in HuobiSwap funding + * Update: transitioned from Coinbase Pro (retired) to Coinbase Advanced Trade + +### 2.4.0 (2024-01-07) + * Update: Fix tests + * Update: Okcoin moved to v5 API used by OKX + * Bugfix: InfluxDB none type conversions + * New Exchange: GateIO Futures + * Bugfix: Fix instrument types in symbol parsing on Bitmex + * Bugfix: fix crash issue when init symbol data on Kraken Futures + * Updates: Remove closed exchanges, clean up feeds (update APIs, adjust symbol parsing, etc) + +### 2.3.2 (2023-05-27) + * Bugfix: Fix Socket backend + * Bugfix: Fix AUCTION symbol parsing on Coinbase + * Bugfix: Fix PERPETUAL symbol parsing on Phemex + * Bugfix: Fix PERPETUAL symbol parsing on Kraken Futures + * Feature: Access to all AIOKafka configuration options + * Feature: Use backend Queue for Kafka + * Feature: Add support for storing book snapshots in Redis as key-value + * Update: Switch from unmaintained aioredis to redis-py + * Bugfix: Correct value for Crypto.com Ask price + * Update: Remove cChardet dependency + * Feature: Binance TR support + +### 2.3.1 (2022-10-31) + * Bugfix: timestamp not reset correctly on reconnect + * Bugfix: Arctic backend failing to write Trades when trade type was not present in data + * Bugfix: Timestamp sometimes not present in Coinbase ticker updates + * Bugfix: Phemex, symbols parsing + * Bugfix: OKx - handle empty liquidations correctly + +### 2.3.0 (2022-09-04) * Bugfix: added list and str support to websocket_endpoint creation (allows more than 200 symbols on Binance) * Feature: Add support for OKx streaming candles * Bugfix: Binance Futures, double slash in open interest url @@ -61,7 +95,7 @@ * Update: Exchange name change OKEx -> OKX * Bugfix: OKX candle REST code was setting values incorrectly * Update: OKX now uses v5 for all connections (REST and WS). Update endpoints to new exchange name: okex.com -> okx.com - + ### 2.2.0 (2021-02-16) * Feature: New exchange: Bit.com * Feature: Rework how exchanges that have multiple websocket endpoints are managed and configured. @@ -88,7 +122,7 @@ * Feature: Add ByBit sandbox endpoints. * Bugfix: Fix calculation in OrderInfo on Binance. * Feature: Support list of bootstrap servers for Kafka backend. - * Feature: Add OrderInfo and Fills zmq callbacks + * Feature: Add OrderInfo and Fills zmq callbacks ### 2.1.1 (2021-11-29) * Bugfix: Position data type missing side field. diff --git a/cryptofeed/exchange.py b/cryptofeed/exchange.py index 4d6aeeae1..626594ef3 100644 --- a/cryptofeed/exchange.py +++ b/cryptofeed/exchange.py @@ -87,7 +87,7 @@ def _symbol_endpoint_prepare(cls, ep: RestEndpoint) -> Union[List[str], str]: return ep.route('instruments') @classmethod - def symbol_mapping(cls, refresh=False) -> Dict: + def symbol_mapping(cls, refresh=False, headers: dict = None) -> Dict: if Symbols.populated(cls.id) and not refresh: return Symbols.get(cls.id)[0] try: @@ -97,10 +97,10 @@ def symbol_mapping(cls, refresh=False) -> Dict: if isinstance(addr, list): for ep in addr: LOG.debug("%s: reading symbol information from %s", cls.id, ep) - data.append(cls.http_sync.read(ep, json=True, uuid=cls.id)) + data.append(cls.http_sync.read(ep, json=True, headers=headers, uuid=cls.id)) else: LOG.debug("%s: reading symbol information from %s", cls.id, addr) - data.append(cls.http_sync.read(addr, json=True, uuid=cls.id)) + data.append(cls.http_sync.read(addr, json=True, headers=headers, uuid=cls.id)) syms, info = cls._parse_symbol_data(data if len(data) > 1 else data[0]) Symbols.set(cls.id, syms, info) diff --git a/cryptofeed/exchanges/coinbase.py b/cryptofeed/exchanges/coinbase.py index abb9e3851..7b3b6f09f 100644 --- a/cryptofeed/exchanges/coinbase.py +++ b/cryptofeed/exchanges/coinbase.py @@ -4,7 +4,9 @@ Please see the LICENSE file for the terms and conditions associated with this software. ''' -import asyncio +import datetime +import hashlib +import hmac import logging import time import datetime @@ -16,28 +18,49 @@ from cryptography.hazmat.primitives import serialization from yapic import json +from cryptofeed.config import Config from cryptofeed.connection import AsyncConnection, RestEndpoint, Routes, WebsocketEndpoint -from cryptofeed.defines import BID, ASK, BUY, COINBASE, L2_BOOK, L3_BOOK, SELL, TICKER, TRADES, POSITIONS, TRANSACTIONS, BALANCES, ORDER_INFO, FILLS +from cryptofeed.defines import BID, ASK, BUY, COINBASE, L2_BOOK, SELL, TRADES from cryptofeed.feed import Feed from cryptofeed.symbols import Symbol from cryptofeed.exchanges.mixins.coinbase_rest import CoinbaseRestMixin -from cryptofeed.types import OrderBook, Ticker, Trade - +from cryptofeed.types import OrderBook, Trade LOG = logging.getLogger('feedhandler') +def get_private_parameters(config: Config, chan: str = None, product_ids_str: list = None, + rest_api: bool = False, endpoint: str = None) -> dict: + timestamp = str(int(time.time())) + if rest_api: + base_endpoint = '/api/v3/brokerage/' + endpoint = base_endpoint + endpoint + message = f'{timestamp}GET{endpoint}' + else: + product_ids_str = ",".join(product_ids_str) + message = f"{timestamp}{chan}{product_ids_str}" + signature = hmac.new( + config["coinbase"]["key_secret"].encode("utf-8"), + message.encode("utf-8"), + digestmod=hashlib.sha256, + ).hexdigest() + if rest_api: + return {'CB-ACCESS-KEY': config["coinbase"]["key_id"], 'CB-ACCESS-TIMESTAMP': timestamp, + 'CB-ACCESS-SIGN': signature} + else: + return {'api_key': config["coinbase"]["key_id"], 'timestamp': timestamp, 'signature': signature} + + class Coinbase(Feed, CoinbaseRestMixin): id = COINBASE websocket_endpoints = [WebsocketEndpoint('wss://advanced-trade-ws.coinbase.com', options={'compression': None})] - # rest_endpoints = [RestEndpoint('https://api.coinbase.com', routes=Routes('/api/v3/brokerage/products', l3book='/api/v3/brokerage/product_book?level=3'))] - rest_endpoints = [RestEndpoint('https://api.pro.coinbase.com', routes=Routes('/products', l3book='/products/{}/book?level=3'))] + rest_endpoints = [ + RestEndpoint('https://api.coinbase.com/api/v3/brokerage', routes=Routes('/products', l3book='/product_book?product_id={}'))] + # TODO: implement candles and user channels websocket_channels = { L2_BOOK: 'level2', - L3_BOOK: 'full', TRADES: 'market_trades', - TICKER: 'ticker', } request_limit = 10 @@ -46,120 +69,46 @@ def _parse_symbol_data(cls, data: list) -> Tuple[Dict, Dict]: ret = {} info = defaultdict(dict) - for entry in data: - base, quote = entry['id'].split("-") - sym = Symbol(base, quote) + for entry in data['products']: + sym = Symbol(entry['base_currency_id'], entry['quote_currency_id']) info['tick_size'][sym.normalized] = entry['quote_increment'] info['instrument_type'][sym.normalized] = sym.type - ret[sym.normalized] = entry['id'] + ret[sym.normalized] = entry['product_id'] return ret, info + @classmethod + def symbols(cls, config: dict = None, refresh=False) -> list: + config = Config(config) + if 'coinbase' not in config or 'key_id' not in config['coinbase'] or 'key_secret' not in config['coinbase']: + raise ValueError('You must provide key_id and key_secret in config to retrieve symbols from Coinbase.') + headers = get_private_parameters(config, rest_api=True, endpoint='products') + return list(cls.symbol_mapping(refresh=refresh, headers=headers).keys()) + def __init__(self, callbacks=None, **kwargs): super().__init__(callbacks=callbacks, **kwargs) - # we only keep track of the L3 order book if we have at least one subscribed order-book callback. - # use case: subscribing to the L3 book plus Trade type gives you order_type information (see _received below), - # and we don't need to do the rest of the book-keeping unless we have an active callback - self.keep_l3_book = False - if callbacks and L3_BOOK in callbacks: - self.keep_l3_book = True self.__reset() def __reset(self): - self.order_map = {} - self.order_type_map = {} - self.seq_no = None - # sequence number validation only works when the FULL data stream is enabled - chan = self.std_channel_to_exchange(L3_BOOK) - if chan in self.subscription: - pairs = self.subscription[chan] - self.seq_no = {pair: None for pair in pairs} - self._l3_book = {} self._l2_book = {} - async def _ticker(self, msg: dict, timestamp: float): + async def _trade_update(self, msg: dict, timestamp: float): ''' { - 'type': 'ticker', - 'sequence': 5928281084, - 'product_id': 'BTC-USD', - 'price': '8500.01000000', - 'open_24h': '8217.24000000', - 'volume_24h': '4529.1293778', - 'low_24h': '8172.00000000', - 'high_24h': '8600.00000000', - 'volume_30d': '329178.93594133', - 'best_bid': '8500', - 'best_ask': '8500.01' - } - - { - 'type': 'ticker', - 'sequence': 5928281348, - 'product_id': 'BTC-USD', - 'price': '8500.00000000', - 'open_24h': '8217.24000000', - 'volume_24h': '4529.13179472', - 'low_24h': '8172.00000000', - 'high_24h': '8600.00000000', - 'volume_30d': '329178.93835825', - 'best_bid': '8500', - 'best_ask': '8500.01', - 'side': 'sell', - 'time': '2018-05-21T00:30:11.587000Z', - 'trade_id': 43736677, - 'last_size': '0.00241692' - } - ''' - - ts = self.timestamp_normalize(msg['time']) if 'time' in msg else None - await self.callback(TICKER, Ticker(self.id, self.exchange_symbol_to_std_symbol(msg['product_id']), Decimal(msg['best_bid']), Decimal(msg['best_ask']), ts, raw=msg), timestamp) - - async def _book_update(self, msg: dict, timestamp: float): - ''' - { - 'type': 'match', or last_match 'trade_id': 43736593 - 'maker_order_id': '2663b65f-b74e-4513-909d-975e3910cf22', - 'taker_order_id': 'd058d737-87f1-4763-bbb4-c2ccf2a40bde', - 'side': 'buy', + 'side': 'BUY' or 'SELL', 'size': '0.01235647', 'price': '8506.26000000', 'product_id': 'BTC-USD', - 'sequence': 5928276661, 'time': '2018-05-21T00:26:05.585000Z' } ''' pair = self.exchange_symbol_to_std_symbol(msg['product_id']) ts = self.timestamp_normalize(msg['time']) - - if self.keep_l3_book and 'full' in self.subscription and pair in self.subscription['full']: - delta = {BID: [], ASK: []} - price = Decimal(msg['price']) - side = ASK if msg['side'] == 'sell' else BID - size = Decimal(msg['size']) - maker_order_id = msg['maker_order_id'] - - _, new_size = self.order_map[maker_order_id] - new_size -= size - if new_size <= 0: - del self.order_map[maker_order_id] - self.order_type_map.pop(maker_order_id, None) - delta[side].append((maker_order_id, price, 0)) - del self._l3_book[pair].book[side][price][maker_order_id] - if len(self._l3_book[pair].book[side][price]) == 0: - del self._l3_book[pair].book[side][price] - else: - self.order_map[maker_order_id] = (price, new_size) - self._l3_book[pair].book[side][price][maker_order_id] = new_size - delta[side].append((maker_order_id, price, new_size)) - - await self.book_callback(L3_BOOK, self._l3_book[pair], timestamp, timestamp=ts, delta=delta, raw=msg, sequence_number=self.seq_no[pair]) - - order_type = self.order_type_map.get(msg['taker_order_id']) + order_type = 'market' t = Trade( self.id, - self.exchange_symbol_to_std_symbol(msg['product_id']), - SELL if msg['side'] == 'buy' else BUY, + pair, + SELL if msg['side'] == 'SELL' else BUY, Decimal(msg['size']), Decimal(msg['price']), ts, @@ -185,265 +134,73 @@ async def _trades(self, msg: dict, timestamp: float): async def _pair_level2_snapshot(self, msg: dict, timestamp: float, origin_dt, seq_no): pair = self.exchange_symbol_to_std_symbol(msg['product_id']) + bids = {Decimal(update['price_level']): Decimal(update['new_quantity']) for update in msg['updates'] if + update['side'] == 'bid'} + asks = {Decimal(update['price_level']): Decimal(update['new_quantity']) for update in msg['updates'] if + update['side'] == 'ask'} if pair not in self._l2_book: self._l2_book[pair] = OrderBook(self.id, pair, max_depth=self.max_depth, bids={}, asks={}) await self._pair_level2_update(msg, timestamp, origin_dt, seq_no) - async def _pair_level2_update(self, msg: dict, timestamp: float, origin_dt, seq_no): + async def _pair_level2_update(self, msg: dict, timestamp: float, ts: datetime): pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - ts = self.timestamp_normalize(origin_dt) - delta = {BID: [], ASK: []} - for update_dict in msg['updates']: - side, _time, price, amount = update_dict.values() - side = BID if side == 'bid' else ASK - price = Decimal(price) - amount = Decimal(amount) + for update in msg['updates']: + side = BID if update['side'] == 'bid' else ASK + price = Decimal(update['price_level']) + amount = Decimal(update['new_quantity']) if amount == 0: - try: + if price in self._l2_book[pair].book[side]: del self._l2_book[pair].book[side][price] delta[side].append((price, 0)) - except KeyError: - pass else: self._l2_book[pair].book[side][price] = amount delta[side].append((price, amount)) await self.book_callback(L2_BOOK, self._l2_book[pair], timestamp, timestamp=ts, raw=msg, delta=delta, sequence_number=seq_no) - async def _book_snapshot(self, pairs: list): - # Coinbase needs some time to send messages to us - # before we request the snapshot. If we don't sleep - # the snapshot seq no could be much earlier than - # the subsequent messages, causing a seq no mismatch. - await asyncio.sleep(2) - - urls = [self.rest_endpoints[0].route('l3book', self.sandbox).format(pair) for pair in pairs] - - results = [] - for url in urls: - ret = await self.http_conn.read(url) - results.append(ret) - # rate limit - 3 per second - await asyncio.sleep(0.3) - - timestamp = time.time() - for res, pair in zip(results, pairs): - orders = json.loads(res, parse_float=Decimal) - npair = self.exchange_symbol_to_std_symbol(pair) - self._l3_book[npair] = OrderBook(self.id, pair, max_depth=self.max_depth) - self.seq_no[npair] = orders['sequence'] - for side in (BID, ASK): - for price, size, order_id in orders[side + 's']: - price = Decimal(price) - size = Decimal(size) - if price in self._l3_book[npair].book[side]: - self._l3_book[npair].book[side][price][order_id] = size - else: - self._l3_book[npair].book[side][price] = {order_id: size} - self.order_map[order_id] = (price, size) - await self.book_callback(L3_BOOK, self._l3_book[npair], timestamp, raw=orders) - - async def _open(self, msg: dict, timestamp: float): - if not self.keep_l3_book: - return - delta = {BID: [], ASK: []} - price = Decimal(msg['price']) - side = ASK if msg['side'] == 'sell' else BID - size = Decimal(msg['remaining_size']) - pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - order_id = msg['order_id'] - ts = self.timestamp_normalize(msg['time']) - - if price in self._l3_book[pair].book[side]: - self._l3_book[pair].book[side][price][order_id] = size - else: - self._l3_book[pair].book[side][price] = {order_id: size} - self.order_map[order_id] = (price, size) - - delta[side].append((order_id, price, size)) - - await self.book_callback(L3_BOOK, self._l3_book[pair], timestamp, timestamp=ts, delta=delta, raw=msg, sequence_number=msg['sequence']) - - async def _done(self, msg: dict, timestamp: float): - """ - per Coinbase API Docs: - - A done message will be sent for received orders which are fully filled or canceled due - to self-trade prevention. There will be no open message for such orders. Done messages - for orders which are not on the book should be ignored when maintaining a real-time order book. - """ - if 'price' not in msg: - # market order life cycle: received -> done - self.order_type_map.pop(msg['order_id'], None) - self.order_map.pop(msg['order_id'], None) - return - - order_id = msg['order_id'] - self.order_type_map.pop(order_id, None) - if order_id not in self.order_map: - return - - del self.order_map[order_id] - if self.keep_l3_book: - delta = {BID: [], ASK: []} - - price = Decimal(msg['price']) - side = ASK if msg['side'] == 'sell' else BID - pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - ts = self.timestamp_normalize(msg['time']) - - del self._l3_book[pair].book[side][price][order_id] - if len(self._l3_book[pair].book[side][price]) == 0: - del self._l3_book[pair].book[side][price] - delta[side].append((order_id, price, 0)) - - await self.book_callback(L3_BOOK, self._l3_book[pair], timestamp, delta=delta, timestamp=ts, raw=msg, sequence_number=msg['sequence']) - - async def _received(self, msg: dict, timestamp: float): - """ - per Coinbase docs: - A valid order has been received and is now active. This message is emitted for every single - valid order as soon as the matching engine receives it whether it fills immediately or not. - - This message is the only time we receive the order type (limit vs market) for a given order, - so we keep it in a map by order ID. - """ - order_id = msg["order_id"] - order_type = msg["order_type"] - self.order_type_map[order_id] = order_type - - async def _change(self, msg: dict, timestamp: float): - """ - Like done, these updates can be sent for orders that are not in the book. Per the docs: - - Not all done or change messages will result in changing the order book. These messages will - be sent for received orders which are not yet on the order book. Do not alter - the order book for such messages, otherwise your order book will be incorrect. - """ - if not self.keep_l3_book: - return - - delta = {BID: [], ASK: []} - - if 'price' not in msg or not msg['price']: - return - - order_id = msg['order_id'] - if order_id not in self.order_map: - return - - ts = self.timestamp_normalize(msg['time']) - price = Decimal(msg['price']) - side = ASK if msg['side'] == 'sell' else BID - new_size = Decimal(msg['new_size']) - pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - - self._l3_book[pair].book[side][price][order_id] = new_size - self.order_map[order_id] = (price, new_size) - - delta[side].append((order_id, price, new_size)) - - await self.book_callback(L3_BOOK, self._l3_book[pair], timestamp, delta=delta, timestamp=ts, raw=msg, sequence_number=msg['sequence']) - async def message_handler(self, msg: str, conn: AsyncConnection, timestamp: float): # PERF perf_start(self.id, 'msg') msg = json.loads(msg, parse_float=Decimal) - if self.seq_no: - if 'product_id' in msg and 'sequence' in msg: - pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - if not self.seq_no.get(pair, None): - return - if msg['sequence'] <= self.seq_no[pair]: - return - if msg['sequence'] != self.seq_no[pair] + 1: - LOG.warning("%s: Missing sequence number detected for %s. Received %d, expected %d", self.id, pair, msg['sequence'], self.seq_no[pair] + 1) - LOG.warning("%s: Resetting data for %s", self.id, pair) - self.__reset(symbol=pair) - await self._book_snapshot([pair]) - return - - self.seq_no[pair] = msg['sequence'] - - if 'channel' in msg: - if msg['channel'] == 'ticker': - await self._ticker(msg, timestamp) - elif msg['channel'] == 'match' or msg['channel'] == 'last_match': - await self._book_update(msg, timestamp) - elif msg['channel'] == 'l2_data' and msg['events'][0]['type'] == 'snapshot' and len(msg['events']) == 1: - await self._pair_level2_snapshot(msg['events'][0], timestamp, msg['timestamp'], msg['sequence_num']) - elif msg['channel'] == 'l2_data' and msg['events'][0]['type'] == 'update' and len(msg['events']) == 1: - await self._pair_level2_update(msg['events'][0], timestamp, msg['timestamp'], msg['sequence_num']) - elif msg['channel'] == 'market_trades' and msg['events'][0]['type'] == 'snapshot' and len(msg['events']) == 1: - pass - elif msg['channel'] == 'market_trades' and msg['events'][0]['type'] == 'update' and len(msg['events']) == 1: - await self._trades(msg['events'][0]['trades'], timestamp) - elif msg['channel'] == 'open': - await self._open(msg, timestamp) - elif msg['channel'] == 'done': - await self._done(msg, timestamp) - elif msg['channel'] == 'change': - await self._change(msg, timestamp) - elif msg['channel'] == 'received': - await self._received(msg, timestamp) - elif msg['channel'] == 'activate': - pass - elif msg['channel'] == 'subscriptions': - pass - else: - LOG.warning("%s: Invalid message channel %s", self.id, msg) - # PERF perf_end(self.id, 'msg') - # PERF perf_log(self.id, 'msg') + if 'channel' in msg and 'events' in msg: + for event in msg['events']: + if msg['channel'] == 'market_trades': + if event.get('type') == 'update': + for trade in event['trades']: + await self._trade_update(trade, timestamp) + else: + pass # TODO: do we want to implement trades snapshots? + elif msg['channel'] == 'l2_data': + if event.get('type') == 'update': + await self._pair_level2_update(event, timestamp, msg['timestamp']) + elif event.get('type') == 'snapshot': + await self._pair_level2_snapshot(event, timestamp) + elif msg['channel'] == 'subscriptions': + pass + else: + LOG.warning("%s: Invalid message type %s", self.id, msg) + # PERF perf_end(self.id, 'msg') + # PERF perf_log(self.id, 'msg') async def subscribe(self, conn: AsyncConnection): self.__reset() - - jwt_string = self._build_jwt(self.key_id, self.key_secret) - for chan in self.subscription: - normalized_chan = self.exchange_channel_to_std(chan) - if self.is_authenticated_channel(normalized_chan): - data = json.dumps({"type": "subscribe", - "product_ids": self.subscription[chan], - "channel": chan, - "jwt": jwt_string, - "timestamp": int(time.time()), - }) - await conn.write(data) - - else: - await conn.write(json.dumps({"type": "subscribe", - "product_ids": self.subscription[chan], - "channels": [chan] - })) - - chan = self.std_channel_to_exchange(L3_BOOK) - if chan in self.subscription: - await self._book_snapshot(self.subscription[chan]) - - @classmethod - def is_authenticated_channel(cls, channel: str) -> bool: - return channel in (ORDER_INFO, FILLS, TRANSACTIONS, BALANCES, POSITIONS, L2_BOOK, L3_BOOK, TRADES) - - @staticmethod - def _build_jwt(key_name, key_secret, service = 'public_websocket_api'): - private_key_bytes = key_secret.encode('utf-8') - private_key = serialization.load_pem_private_key(private_key_bytes, password=None) - - jwt_payload = { - 'sub': key_name, - 'iss': "coinbase-cloud", - 'nbf': int(time.time()), - 'exp': int(time.time()) + 60, - 'aud': [service], - } - - jwt_token = jwt.encode( - jwt_payload, - private_key, - algorithm='ES256', - headers={'kid': key_name, 'nonce': str(int(time.time()))}, - ) - - return jwt_token + all_pairs = list() + + async def _subscribe(chan: str, product_ids: list): + params = {"type": "subscribe", + "product_ids": product_ids, + "channel": chan + } + private_params = get_private_parameters(self.config, chan, product_ids) + if private_params: + params = {**params, **private_params} + await conn.write(json.dumps(params)) + + for channel in self.subscription: + all_pairs += self.subscription[channel] + await _subscribe(channel, self.subscription[channel]) + all_pairs = list(dict.fromkeys(all_pairs)) + await _subscribe('heartbeat', all_pairs) + # Implementing heartbeat as per Best Practices doc: https://docs.cloud.coinbase.com/advanced-trade-api/docs/ws-best-practices