From 34ae8775db08198f71ac0b6b9ec3ed7fd9e075fc Mon Sep 17 00:00:00 2001 From: Thomas Bouamoud Date: Mon, 15 Jan 2024 23:51:27 +0000 Subject: [PATCH 01/10] Updated Coinbase: from Coinbase Pro to Advanced Trade --- cryptofeed/exchanges/coinbase.py | 349 +++++++++---------------------- 1 file changed, 98 insertions(+), 251 deletions(-) diff --git a/cryptofeed/exchanges/coinbase.py b/cryptofeed/exchanges/coinbase.py index f3ccf89fa..7a0901e3a 100644 --- a/cryptofeed/exchanges/coinbase.py +++ b/cryptofeed/exchanges/coinbase.py @@ -5,6 +5,8 @@ associated with this software. ''' import asyncio +import hashlib +import hmac import logging import time from decimal import Decimal @@ -14,26 +16,25 @@ from yapic import json from cryptofeed.connection import AsyncConnection, RestEndpoint, Routes, WebsocketEndpoint -from cryptofeed.defines import BID, ASK, BUY, COINBASE, L2_BOOK, L3_BOOK, SELL, TICKER, TRADES +from cryptofeed.defines import BID, ASK, BUY, COINBASE, L2_BOOK, L3_BOOK, SELL, TICKER, TRADES, CANDLES, ORDERS from cryptofeed.feed import Feed from cryptofeed.symbols import Symbol from cryptofeed.exchanges.mixins.coinbase_rest import CoinbaseRestMixin from cryptofeed.types import OrderBook, Ticker, Trade - LOG = logging.getLogger('feedhandler') class Coinbase(Feed, CoinbaseRestMixin): id = COINBASE - websocket_endpoints = [WebsocketEndpoint('wss://ws-feed.pro.coinbase.com', options={'compression': None})] - rest_endpoints = [RestEndpoint('https://api.pro.coinbase.com', routes=Routes('/products', l3book='/products/{}/book?level=3'))] + websocket_endpoints = [WebsocketEndpoint('wss://advanced-trade-ws.coinbase.com', options={'compression': None})] + rest_endpoints = [ + RestEndpoint('https://api.pro.coinbase.com', routes=Routes('/products', l3book='/products/{}/book?level=3'))] + # TODO: implement candles and user channels websocket_channels = { L2_BOOK: 'level2', - L3_BOOK: 'full', - TRADES: 'matches', - TICKER: 'ticker', + TRADES: 'market_trades', } request_limit = 10 @@ -55,107 +56,40 @@ def __init__(self, callbacks=None, **kwargs): # we only keep track of the L3 order book if we have at least one subscribed order-book callback. # use case: subscribing to the L3 book plus Trade type gives you order_type information (see _received below), # and we don't need to do the rest of the book-keeping unless we have an active callback - self.keep_l3_book = False - if callbacks and L3_BOOK in callbacks: - self.keep_l3_book = True self.__reset() def __reset(self): self.order_map = {} self.order_type_map = {} self.seq_no = None - # sequence number validation only works when the FULL data stream is enabled - chan = self.std_channel_to_exchange(L3_BOOK) - if chan in self.subscription: - pairs = self.subscription[chan] - self.seq_no = {pair: None for pair in pairs} - self._l3_book = {} self._l2_book = {} async def _ticker(self, msg: dict, timestamp: float): + # TODO: The ticker endpoint payload has been updated and no longer includes best ask and bid. + # Do we want to: + # 1. get rid of that callback + # 2. implement ticker based on l2 book sub + # 3. implement a new ticker callback (will be different from other exchanges) + raise NotImplementedError + + async def _trade_update(self, msg: dict, timestamp: float): ''' { - 'type': 'ticker', - 'sequence': 5928281084, - 'product_id': 'BTC-USD', - 'price': '8500.01000000', - 'open_24h': '8217.24000000', - 'volume_24h': '4529.1293778', - 'low_24h': '8172.00000000', - 'high_24h': '8600.00000000', - 'volume_30d': '329178.93594133', - 'best_bid': '8500', - 'best_ask': '8500.01' - } - - { - 'type': 'ticker', - 'sequence': 5928281348, - 'product_id': 'BTC-USD', - 'price': '8500.00000000', - 'open_24h': '8217.24000000', - 'volume_24h': '4529.13179472', - 'low_24h': '8172.00000000', - 'high_24h': '8600.00000000', - 'volume_30d': '329178.93835825', - 'best_bid': '8500', - 'best_ask': '8500.01', - 'side': 'sell', - 'time': '2018-05-21T00:30:11.587000Z', - 'trade_id': 43736677, - 'last_size': '0.00241692' - } - ''' - - ts = self.timestamp_normalize(msg['time']) if 'time' in msg else None - await self.callback(TICKER, Ticker(self.id, self.exchange_symbol_to_std_symbol(msg['product_id']), Decimal(msg['best_bid']), Decimal(msg['best_ask']), ts, raw=msg), timestamp) - - async def _book_update(self, msg: dict, timestamp: float): - ''' - { - 'type': 'match', or last_match 'trade_id': 43736593 - 'maker_order_id': '2663b65f-b74e-4513-909d-975e3910cf22', - 'taker_order_id': 'd058d737-87f1-4763-bbb4-c2ccf2a40bde', - 'side': 'buy', + 'side': 'BUY' or 'SELL', 'size': '0.01235647', 'price': '8506.26000000', 'product_id': 'BTC-USD', - 'sequence': 5928276661, 'time': '2018-05-21T00:26:05.585000Z' } ''' pair = self.exchange_symbol_to_std_symbol(msg['product_id']) ts = self.timestamp_normalize(msg['time']) - - if self.keep_l3_book and 'full' in self.subscription and pair in self.subscription['full']: - delta = {BID: [], ASK: []} - price = Decimal(msg['price']) - side = ASK if msg['side'] == 'sell' else BID - size = Decimal(msg['size']) - maker_order_id = msg['maker_order_id'] - - _, new_size = self.order_map[maker_order_id] - new_size -= size - if new_size <= 0: - del self.order_map[maker_order_id] - self.order_type_map.pop(maker_order_id, None) - delta[side].append((maker_order_id, price, 0)) - del self._l3_book[pair].book[side][price][maker_order_id] - if len(self._l3_book[pair].book[side][price]) == 0: - del self._l3_book[pair].book[side][price] - else: - self.order_map[maker_order_id] = (price, new_size) - self._l3_book[pair].book[side][price][maker_order_id] = new_size - delta[side].append((maker_order_id, price, new_size)) - - await self.book_callback(L3_BOOK, self._l3_book[pair], timestamp, timestamp=ts, delta=delta, raw=msg, sequence_number=self.seq_no[pair]) - - order_type = self.order_type_map.get(msg['taker_order_id']) + order_type = 'market' t = Trade( self.id, - self.exchange_symbol_to_std_symbol(msg['product_id']), - SELL if msg['side'] == 'buy' else BUY, + pair, + SELL if msg['side'] == 'SELL' else BUY, Decimal(msg['size']), Decimal(msg['price']), ts, @@ -167,8 +101,10 @@ async def _book_update(self, msg: dict, timestamp: float): async def _pair_level2_snapshot(self, msg: dict, timestamp: float): pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - bids = {Decimal(price): Decimal(amount) for price, amount in msg['bids']} - asks = {Decimal(price): Decimal(amount) for price, amount in msg['asks']} + bids = {Decimal(update['price_level']): Decimal(update['new_quantity']) for update in msg['updates'] if + update['side'] == 'bid'} + asks = {Decimal(update['price_level']): Decimal(update['new_quantity']) for update in msg['updates'] if + update['side'] == 'ask'} if pair not in self._l2_book: self._l2_book[pair] = OrderBook(self.id, pair, max_depth=self.max_depth, bids=bids, asks=asks) else: @@ -179,21 +115,22 @@ async def _pair_level2_snapshot(self, msg: dict, timestamp: float): async def _pair_level2_update(self, msg: dict, timestamp: float): pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - ts = self.timestamp_normalize(msg['time']) delta = {BID: [], ASK: []} - for side, price, amount in msg['changes']: - side = BID if side == 'buy' else ASK - price = Decimal(price) - amount = Decimal(amount) + for update in msg['updates']: + ts = self.timestamp_normalize(update['event_time']) + side = BID if update['side'] == 'bid' else ASK + price = Decimal(update['price_level']) + amount = Decimal(update['new_quantity']) if amount == 0: - del self._l2_book[pair].book[side][price] - delta[side].append((price, 0)) + if price in self._l2_book[pair].book[side]: + del self._l2_book[pair].book[side][price] + delta[side].append((price, 0)) else: self._l2_book[pair].book[side][price] = amount delta[side].append((price, amount)) - await self.book_callback(L2_BOOK, self._l2_book[pair], timestamp, timestamp=ts, raw=msg, delta=delta) + await self.book_callback(L2_BOOK, self._l2_book[pair], timestamp, timestamp=ts, raw=msg, delta=delta) async def _book_snapshot(self, pairs: list): # Coinbase needs some time to send messages to us @@ -202,6 +139,7 @@ async def _book_snapshot(self, pairs: list): # the subsequent messages, causing a seq no mismatch. await asyncio.sleep(2) + # TODO: not yet updated urls = [self.rest_endpoints[0].route('l3book', self.sandbox).format(pair) for pair in pairs] results = [] @@ -228,164 +166,73 @@ async def _book_snapshot(self, pairs: list): self.order_map[order_id] = (price, size) await self.book_callback(L3_BOOK, self._l3_book[npair], timestamp, raw=orders) - async def _open(self, msg: dict, timestamp: float): - if not self.keep_l3_book: - return - delta = {BID: [], ASK: []} - price = Decimal(msg['price']) - side = ASK if msg['side'] == 'sell' else BID - size = Decimal(msg['remaining_size']) - pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - order_id = msg['order_id'] - ts = self.timestamp_normalize(msg['time']) - - if price in self._l3_book[pair].book[side]: - self._l3_book[pair].book[side][price][order_id] = size - else: - self._l3_book[pair].book[side][price] = {order_id: size} - self.order_map[order_id] = (price, size) - - delta[side].append((order_id, price, size)) - - await self.book_callback(L3_BOOK, self._l3_book[pair], timestamp, timestamp=ts, delta=delta, raw=msg, sequence_number=msg['sequence']) - - async def _done(self, msg: dict, timestamp: float): - """ - per Coinbase API Docs: - - A done message will be sent for received orders which are fully filled or canceled due - to self-trade prevention. There will be no open message for such orders. Done messages - for orders which are not on the book should be ignored when maintaining a real-time order book. - """ - if 'price' not in msg: - # market order life cycle: received -> done - self.order_type_map.pop(msg['order_id'], None) - self.order_map.pop(msg['order_id'], None) - return - - order_id = msg['order_id'] - self.order_type_map.pop(order_id, None) - if order_id not in self.order_map: - return - - del self.order_map[order_id] - if self.keep_l3_book: - delta = {BID: [], ASK: []} - - price = Decimal(msg['price']) - side = ASK if msg['side'] == 'sell' else BID - pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - ts = self.timestamp_normalize(msg['time']) - - del self._l3_book[pair].book[side][price][order_id] - if len(self._l3_book[pair].book[side][price]) == 0: - del self._l3_book[pair].book[side][price] - delta[side].append((order_id, price, 0)) - - await self.book_callback(L3_BOOK, self._l3_book[pair], timestamp, delta=delta, timestamp=ts, raw=msg, sequence_number=msg['sequence']) - - async def _received(self, msg: dict, timestamp: float): - """ - per Coinbase docs: - A valid order has been received and is now active. This message is emitted for every single - valid order as soon as the matching engine receives it whether it fills immediately or not. - - This message is the only time we receive the order type (limit vs market) for a given order, - so we keep it in a map by order ID. - """ - order_id = msg["order_id"] - order_type = msg["order_type"] - self.order_type_map[order_id] = order_type - - async def _change(self, msg: dict, timestamp: float): - """ - Like done, these updates can be sent for orders that are not in the book. Per the docs: - - Not all done or change messages will result in changing the order book. These messages will - be sent for received orders which are not yet on the order book. Do not alter - the order book for such messages, otherwise your order book will be incorrect. - - {'price': '16556.88', 'old_size': '0.24076471', 'new_size': '0.04076471', 'order_id': '9675d63e-0432-413d-a3f3-f30d7df39614', 'reason': 'STP', 'type': 'change', 'side': 'buy', 'product_id': 'BTC-USD', 'time': datetime.datetime(2022, 11, 24, 0, 35, 28, 904847, tzinfo=datetime.timezone.utc), 'sequence': 50703787284} - """ - if not self.keep_l3_book: - return - - delta = {BID: [], ASK: []} - - if 'price' not in msg or not msg['price']: - return - - order_id = msg['order_id'] - if order_id not in self.order_map: - return - - ts = self.timestamp_normalize(msg['time']) - price = Decimal(msg['price']) - side = ASK if msg['side'] == 'sell' else BID - new_size = Decimal(msg['new_size']) - pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - - self._l3_book[pair].book[side][price][order_id] = new_size - self.order_map[order_id] = (price, new_size) - - delta[side].append((order_id, price, new_size)) - - await self.book_callback(L3_BOOK, self._l3_book[pair], timestamp, delta=delta, timestamp=ts, raw=msg, sequence_number=msg['sequence']) - async def message_handler(self, msg: str, conn: AsyncConnection, timestamp: float): # PERF perf_start(self.id, 'msg') msg = json.loads(msg, parse_float=Decimal) - if self.seq_no: - if 'product_id' in msg and 'sequence' in msg: - pair = self.exchange_symbol_to_std_symbol(msg['product_id']) - if not self.seq_no.get(pair, None): - return - if msg['sequence'] <= self.seq_no[pair]: - return - if msg['sequence'] != self.seq_no[pair] + 1: - LOG.warning("%s: Missing sequence number detected for %s. Received %d, expected %d", self.id, pair, msg['sequence'], self.seq_no[pair] + 1) - LOG.warning("%s: Resetting data for %s", self.id, pair) - self.__reset(symbol=pair) - await self._book_snapshot([pair]) - return - - self.seq_no[pair] = msg['sequence'] - - if 'type' in msg: - if msg['type'] == 'ticker': - await self._ticker(msg, timestamp) - elif msg['type'] == 'match' or msg['type'] == 'last_match': - await self._book_update(msg, timestamp) - elif msg['type'] == 'snapshot': - await self._pair_level2_snapshot(msg, timestamp) - elif msg['type'] == 'l2update': - await self._pair_level2_update(msg, timestamp) - elif msg['type'] == 'open': - await self._open(msg, timestamp) - elif msg['type'] == 'done': - await self._done(msg, timestamp) - elif msg['type'] == 'change': - await self._change(msg, timestamp) - elif msg['type'] == 'received': - await self._received(msg, timestamp) - elif msg['type'] == 'activate': - pass - elif msg['type'] == 'subscriptions': - pass - else: - LOG.warning("%s: Invalid message type %s", self.id, msg) - # PERF perf_end(self.id, 'msg') - # PERF perf_log(self.id, 'msg') + if 'channel' in msg and 'events' in msg: + for event in msg['events']: + if self.seq_no and 'product_id' in event and 'sequence_num' in msg: + pair = self.exchange_symbol_to_std_symbol(event['product_id']) + if not self.seq_no.get(pair, None): + return + if msg['sequence_num'] <= self.seq_no[pair]: + return + if msg['sequence_num'] != self.seq_no[pair] + 1: + LOG.warning("%s: Missing sequence number detected for %s. Received %d, expected %d", self.id, + pair, msg['sequence_num'], self.seq_no[pair] + 1) + LOG.warning("%s: Resetting data for %s", self.id, pair) + self.__reset() + await self._book_snapshot([pair]) + return + + self.seq_no[pair] = msg['sequence_num'] + + if msg['channel'] == 'market_trades': + if event.get('type') == 'update': + for trade in event['trades']: + await self._trade_update(trade, timestamp) + else: + pass # TODO: do we want to implement trades snapshots? + elif msg['channel'] == 'l2_data': + if event.get('type') == 'update': + await self._pair_level2_update(event, timestamp) + elif event.get('type') == 'snapshot': + await self._pair_level2_snapshot(event, timestamp) + elif msg['channel'] == 'subscriptions': + pass + else: + LOG.warning("%s: Invalid message type %s", self.id, msg) + # PERF perf_end(self.id, 'msg') + # PERF perf_log(self.id, 'msg') + + async def get_private_parameters(self, chan: str, product_ids_str: list) -> dict: + timestamp = str(int(time.time())) + product_ids_str = ",".join(product_ids_str) + message = f"{timestamp}{chan}{product_ids_str}" + signature = hmac.new( + self.config["coinbase"]["key_secret"].encode("utf-8"), + message.encode("utf-8"), + digestmod=hashlib.sha256, + ).hexdigest() + return dict(api_key=self.config["coinbase"]["key_id"], timestamp=timestamp, signature=signature) async def subscribe(self, conn: AsyncConnection): self.__reset() - - for chan in self.subscription: - await conn.write(json.dumps({"type": "subscribe", - "product_ids": self.subscription[chan], - "channels": [chan] - })) - - chan = self.std_channel_to_exchange(L3_BOOK) - if chan in self.subscription: - await self._book_snapshot(self.subscription[chan]) + all_pairs = list() + + async def _subscribe(chan: str, product_ids: list): + params = {"type": "subscribe", + "product_ids": product_ids, + "channel": chan + } + private_params = await self.get_private_parameters(chan, product_ids) + if private_params: + params = {**params, **private_params} + await conn.write(json.dumps(params)) + + for channel in self.subscription: + all_pairs += self.subscription[channel] + await _subscribe(channel, self.subscription[channel]) + all_pairs = list(dict.fromkeys(all_pairs)) + await _subscribe('heartbeat', all_pairs) + # Implementing heartbase as per Best Practices doc: https://docs.cloud.coinbase.com/advanced-trade-api/docs/ws-best-practices From 116bc2e68ec39df645170619aa4f7b591af26dee Mon Sep 17 00:00:00 2001 From: Thomas Bouamoud Date: Mon, 15 Jan 2024 23:54:27 +0000 Subject: [PATCH 02/10] updated AUTHORS.md --- AUTHORS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/AUTHORS.md b/AUTHORS.md index 0f26272f0..6479f20fe 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -14,3 +14,4 @@ Cryptofeed was originally created by Bryant Moscon, but many others have contrib * [O. Janche](https://github.com/toyan) - * [Bastien Enjalbert](https://github.com/bastienjalbert) - * [Jonggyun Kim](https://github.com/gyunt) - +* [Thomas Bouamoud](https://github.com/thomasbs17) - \ No newline at end of file From 0520120c41b7ed3e269a46f76e59f78c678a27f0 Mon Sep 17 00:00:00 2001 From: Thomas Bouamoud Date: Wed, 17 Jan 2024 01:11:53 +0000 Subject: [PATCH 03/10] finalized Coinbase updates --- CHANGES.md | 3 ++ cryptofeed/exchange.py | 6 +-- cryptofeed/exchanges/coinbase.py | 76 ++++++++++++++++++++------------ 3 files changed, 54 insertions(+), 31 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index b6eda98fb..7b80eee8c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,8 @@ ## Changelog +### 2.5.0 + * Update: transitioned from Coinbase Pro (retired) to Coinbase Advanced Trades + ### 2.4.1 * Bugfix: Handle empty nextFundingRate in OKX diff --git a/cryptofeed/exchange.py b/cryptofeed/exchange.py index a698db805..6917fd7a4 100644 --- a/cryptofeed/exchange.py +++ b/cryptofeed/exchange.py @@ -87,7 +87,7 @@ def _symbol_endpoint_prepare(cls, ep: RestEndpoint) -> Union[List[str], str]: return ep.route('instruments') @classmethod - def symbol_mapping(cls, refresh=False) -> Dict: + def symbol_mapping(cls, refresh=False, headers: dict = None) -> Dict: if Symbols.populated(cls.id) and not refresh: return Symbols.get(cls.id)[0] try: @@ -97,10 +97,10 @@ def symbol_mapping(cls, refresh=False) -> Dict: if isinstance(addr, list): for ep in addr: LOG.debug("%s: reading symbol information from %s", cls.id, ep) - data.append(cls.http_sync.read(ep, json=True, uuid=cls.id)) + data.append(cls.http_sync.read(ep, json=True, headers=headers, uuid=cls.id)) else: LOG.debug("%s: reading symbol information from %s", cls.id, addr) - data.append(cls.http_sync.read(addr, json=True, uuid=cls.id)) + data.append(cls.http_sync.read(addr, json=True, headers=headers, uuid=cls.id)) syms, info = cls._parse_symbol_data(data if len(data) > 1 else data[0]) Symbols.set(cls.id, syms, info) diff --git a/cryptofeed/exchanges/coinbase.py b/cryptofeed/exchanges/coinbase.py index 7a0901e3a..09342ce5d 100644 --- a/cryptofeed/exchanges/coinbase.py +++ b/cryptofeed/exchanges/coinbase.py @@ -15,6 +15,7 @@ from yapic import json +from cryptofeed.config import Config from cryptofeed.connection import AsyncConnection, RestEndpoint, Routes, WebsocketEndpoint from cryptofeed.defines import BID, ASK, BUY, COINBASE, L2_BOOK, L3_BOOK, SELL, TICKER, TRADES, CANDLES, ORDERS from cryptofeed.feed import Feed @@ -25,11 +26,33 @@ LOG = logging.getLogger('feedhandler') +def get_private_parameters(config: Config, chan: str = None, product_ids_str: list = None, + rest_api: bool = False, endpoint: str = None) -> dict: + timestamp = str(int(time.time())) + if rest_api: + base_endpoint = '/api/v3/brokerage/' + endpoint = base_endpoint + endpoint + message = f'{timestamp}GET{endpoint}' + else: + product_ids_str = ",".join(product_ids_str) + message = f"{timestamp}{chan}{product_ids_str}" + signature = hmac.new( + config["coinbase"]["key_secret"].encode("utf-8"), + message.encode("utf-8"), + digestmod=hashlib.sha256, + ).hexdigest() + if rest_api: + return {'CB-ACCESS-KEY': config["coinbase"]["key_id"], 'CB-ACCESS-TIMESTAMP': timestamp, + 'CB-ACCESS-SIGN': signature} + else: + return {'api_key': config["coinbase"]["key_id"], 'timestamp': timestamp, 'signature': signature} + + class Coinbase(Feed, CoinbaseRestMixin): id = COINBASE websocket_endpoints = [WebsocketEndpoint('wss://advanced-trade-ws.coinbase.com', options={'compression': None})] rest_endpoints = [ - RestEndpoint('https://api.pro.coinbase.com', routes=Routes('/products', l3book='/products/{}/book?level=3'))] + RestEndpoint('https://api.coinbase.com/api/v3/brokerage', routes=Routes('/products', l3book='/product_book?product_id={}'))] # TODO: implement candles and user channels websocket_channels = { @@ -43,14 +66,21 @@ def _parse_symbol_data(cls, data: list) -> Tuple[Dict, Dict]: ret = {} info = defaultdict(dict) - for entry in data: - base, quote = entry['id'].split("-") - sym = Symbol(base, quote) + for entry in data['products']: + sym = Symbol(entry['base_currency_id'], entry['quote_currency_id']) info['tick_size'][sym.normalized] = entry['quote_increment'] info['instrument_type'][sym.normalized] = sym.type - ret[sym.normalized] = entry['id'] + ret[sym.normalized] = entry['product_id'] return ret, info + @classmethod + def symbols(cls, config: dict = None, refresh=False) -> list: + config = Config(config) + if 'coinbase' not in config or 'key_id' not in config['coinbase'] or 'key_secret' not in config['coinbase']: + raise ValueError('You must provide key_id and key_secret in config to retrieve symbols from Coinbase.') + headers = get_private_parameters(config, rest_api=True, endpoint='products') + return list(cls.symbol_mapping(refresh=refresh, headers=headers).keys()) + def __init__(self, callbacks=None, **kwargs): super().__init__(callbacks=callbacks, **kwargs) # we only keep track of the L3 order book if we have at least one subscribed order-book callback. @@ -139,12 +169,12 @@ async def _book_snapshot(self, pairs: list): # the subsequent messages, causing a seq no mismatch. await asyncio.sleep(2) - # TODO: not yet updated urls = [self.rest_endpoints[0].route('l3book', self.sandbox).format(pair) for pair in pairs] results = [] + headers = get_private_parameters(self.config, rest_api=True, endpoint='product_book') for url in urls: - ret = await self.http_conn.read(url) + ret = await self.http_conn.read(url, header=headers) results.append(ret) # rate limit - 3 per second await asyncio.sleep(0.3) @@ -153,18 +183,19 @@ async def _book_snapshot(self, pairs: list): for res, pair in zip(results, pairs): orders = json.loads(res, parse_float=Decimal) npair = self.exchange_symbol_to_std_symbol(pair) - self._l3_book[npair] = OrderBook(self.id, pair, max_depth=self.max_depth) - self.seq_no[npair] = orders['sequence'] + self._l2_book[npair] = OrderBook(self.id, pair, max_depth=self.max_depth) + # self.seq_no[npair] = orders['sequence'] # TODO: no longer available post transition to Coinbase Advanced Trade for side in (BID, ASK): - for price, size, order_id in orders[side + 's']: - price = Decimal(price) - size = Decimal(size) - if price in self._l3_book[npair].book[side]: - self._l3_book[npair].book[side][price][order_id] = size + for order in orders['pricebook'][side + 's']: + price = Decimal(order['price']) + size = Decimal(order['size']) + order_id = f'{side}@{size}@{price}' + if price in self._l2_book[npair].book[side]: + self._l2_book[npair].book[side][price][order_id] = size else: - self._l3_book[npair].book[side][price] = {order_id: size} + self._l2_book[npair].book[side][price] = {order_id: size} self.order_map[order_id] = (price, size) - await self.book_callback(L3_BOOK, self._l3_book[npair], timestamp, raw=orders) + await self.book_callback(L2_BOOK, self._l2_book[npair], timestamp, raw=orders) async def message_handler(self, msg: str, conn: AsyncConnection, timestamp: float): # PERF perf_start(self.id, 'msg') @@ -205,17 +236,6 @@ async def message_handler(self, msg: str, conn: AsyncConnection, timestamp: floa # PERF perf_end(self.id, 'msg') # PERF perf_log(self.id, 'msg') - async def get_private_parameters(self, chan: str, product_ids_str: list) -> dict: - timestamp = str(int(time.time())) - product_ids_str = ",".join(product_ids_str) - message = f"{timestamp}{chan}{product_ids_str}" - signature = hmac.new( - self.config["coinbase"]["key_secret"].encode("utf-8"), - message.encode("utf-8"), - digestmod=hashlib.sha256, - ).hexdigest() - return dict(api_key=self.config["coinbase"]["key_id"], timestamp=timestamp, signature=signature) - async def subscribe(self, conn: AsyncConnection): self.__reset() all_pairs = list() @@ -225,7 +245,7 @@ async def _subscribe(chan: str, product_ids: list): "product_ids": product_ids, "channel": chan } - private_params = await self.get_private_parameters(chan, product_ids) + private_params = get_private_parameters(self.config, chan, product_ids) if private_params: params = {**params, **private_params} await conn.write(json.dumps(params)) From 7122f6acec053d7bed9f7adcd78d07fb73de003f Mon Sep 17 00:00:00 2001 From: Thomas Bouamoud Date: Wed, 17 Jan 2024 01:40:25 +0000 Subject: [PATCH 04/10] flake8 --- cryptofeed/exchanges/coinbase.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cryptofeed/exchanges/coinbase.py b/cryptofeed/exchanges/coinbase.py index 09342ce5d..2f88fcc45 100644 --- a/cryptofeed/exchanges/coinbase.py +++ b/cryptofeed/exchanges/coinbase.py @@ -17,11 +17,11 @@ from cryptofeed.config import Config from cryptofeed.connection import AsyncConnection, RestEndpoint, Routes, WebsocketEndpoint -from cryptofeed.defines import BID, ASK, BUY, COINBASE, L2_BOOK, L3_BOOK, SELL, TICKER, TRADES, CANDLES, ORDERS +from cryptofeed.defines import BID, ASK, BUY, COINBASE, L2_BOOK, SELL, TRADES from cryptofeed.feed import Feed from cryptofeed.symbols import Symbol from cryptofeed.exchanges.mixins.coinbase_rest import CoinbaseRestMixin -from cryptofeed.types import OrderBook, Ticker, Trade +from cryptofeed.types import OrderBook, Trade LOG = logging.getLogger('feedhandler') From 0420a4545d9e5f07c58950a22d091bd7e7a82269 Mon Sep 17 00:00:00 2001 From: Thomas Bouamoud Date: Wed, 17 Jan 2024 02:23:12 +0000 Subject: [PATCH 05/10] bug and typo fix --- CHANGES.md | 2 +- cryptofeed/exchanges/coinbase.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 7b80eee8c..ca6208365 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,7 +1,7 @@ ## Changelog ### 2.5.0 - * Update: transitioned from Coinbase Pro (retired) to Coinbase Advanced Trades + * Update: transitioned from Coinbase Pro (retired) to Coinbase Advanced Trade ### 2.4.1 * Bugfix: Handle empty nextFundingRate in OKX diff --git a/cryptofeed/exchanges/coinbase.py b/cryptofeed/exchanges/coinbase.py index 2f88fcc45..31d4ffc68 100644 --- a/cryptofeed/exchanges/coinbase.py +++ b/cryptofeed/exchanges/coinbase.py @@ -146,8 +146,8 @@ async def _pair_level2_snapshot(self, msg: dict, timestamp: float): async def _pair_level2_update(self, msg: dict, timestamp: float): pair = self.exchange_symbol_to_std_symbol(msg['product_id']) delta = {BID: [], ASK: []} + ts = self.timestamp_normalize(msg['timestamp']) for update in msg['updates']: - ts = self.timestamp_normalize(update['event_time']) side = BID if update['side'] == 'bid' else ASK price = Decimal(update['price_level']) amount = Decimal(update['new_quantity']) @@ -160,7 +160,7 @@ async def _pair_level2_update(self, msg: dict, timestamp: float): self._l2_book[pair].book[side][price] = amount delta[side].append((price, amount)) - await self.book_callback(L2_BOOK, self._l2_book[pair], timestamp, timestamp=ts, raw=msg, delta=delta) + await self.book_callback(L2_BOOK, self._l2_book[pair], timestamp, timestamp=ts, raw=msg, delta=delta) async def _book_snapshot(self, pairs: list): # Coinbase needs some time to send messages to us From a88c95f1702836f032506a7e6dd2c0a12e5ebe1d Mon Sep 17 00:00:00 2001 From: Thomas Bouamoud Date: Wed, 17 Jan 2024 02:37:00 +0000 Subject: [PATCH 06/10] bug fix --- cryptofeed/exchanges/coinbase.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cryptofeed/exchanges/coinbase.py b/cryptofeed/exchanges/coinbase.py index 31d4ffc68..2b4fec678 100644 --- a/cryptofeed/exchanges/coinbase.py +++ b/cryptofeed/exchanges/coinbase.py @@ -5,6 +5,7 @@ associated with this software. ''' import asyncio +import datetime import hashlib import hmac import logging @@ -143,10 +144,9 @@ async def _pair_level2_snapshot(self, msg: dict, timestamp: float): await self.book_callback(L2_BOOK, self._l2_book[pair], timestamp, raw=msg) - async def _pair_level2_update(self, msg: dict, timestamp: float): + async def _pair_level2_update(self, msg: dict, timestamp: float, ts: datetime): pair = self.exchange_symbol_to_std_symbol(msg['product_id']) delta = {BID: [], ASK: []} - ts = self.timestamp_normalize(msg['timestamp']) for update in msg['updates']: side = BID if update['side'] == 'bid' else ASK price = Decimal(update['price_level']) @@ -226,7 +226,7 @@ async def message_handler(self, msg: str, conn: AsyncConnection, timestamp: floa pass # TODO: do we want to implement trades snapshots? elif msg['channel'] == 'l2_data': if event.get('type') == 'update': - await self._pair_level2_update(event, timestamp) + await self._pair_level2_update(event, timestamp, msg['timestamp']) elif event.get('type') == 'snapshot': await self._pair_level2_snapshot(event, timestamp) elif msg['channel'] == 'subscriptions': From 4997c32d02e3cdebf0d37501860a66db093878a4 Mon Sep 17 00:00:00 2001 From: Thomas Bouamoud Date: Wed, 17 Jan 2024 12:00:46 +0000 Subject: [PATCH 07/10] bug fix --- cryptofeed/exchanges/coinbase.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cryptofeed/exchanges/coinbase.py b/cryptofeed/exchanges/coinbase.py index 2b4fec678..a0c8d0ed8 100644 --- a/cryptofeed/exchanges/coinbase.py +++ b/cryptofeed/exchanges/coinbase.py @@ -93,6 +93,11 @@ def __reset(self): self.order_map = {} self.order_type_map = {} self.seq_no = None + # sequence number validation only works when the FULL data stream is enabled + chan = self.std_channel_to_exchange(L2_BOOK) + if chan in self.subscription: + pairs = self.subscription[chan] + self.seq_no = {pair: None for pair in pairs} self._l2_book = {} async def _ticker(self, msg: dict, timestamp: float): @@ -255,4 +260,4 @@ async def _subscribe(chan: str, product_ids: list): await _subscribe(channel, self.subscription[channel]) all_pairs = list(dict.fromkeys(all_pairs)) await _subscribe('heartbeat', all_pairs) - # Implementing heartbase as per Best Practices doc: https://docs.cloud.coinbase.com/advanced-trade-api/docs/ws-best-practices + # Implementing heartbeat as per Best Practices doc: https://docs.cloud.coinbase.com/advanced-trade-api/docs/ws-best-practices From f039f69d6e828ddc3c4b0f80a46480dc1e607e85 Mon Sep 17 00:00:00 2001 From: Thomas Bouamoud Date: Wed, 17 Jan 2024 22:14:11 +0000 Subject: [PATCH 08/10] Removed seq_no as per doc: Subscribe to the level2 channel to guarantee that messages are delivered and your order book is in sync. --- cryptofeed/exchanges/coinbase.py | 63 -------------------------------- 1 file changed, 63 deletions(-) diff --git a/cryptofeed/exchanges/coinbase.py b/cryptofeed/exchanges/coinbase.py index a0c8d0ed8..bf2f41b0e 100644 --- a/cryptofeed/exchanges/coinbase.py +++ b/cryptofeed/exchanges/coinbase.py @@ -4,7 +4,6 @@ Please see the LICENSE file for the terms and conditions associated with this software. ''' -import asyncio import datetime import hashlib import hmac @@ -84,20 +83,9 @@ def symbols(cls, config: dict = None, refresh=False) -> list: def __init__(self, callbacks=None, **kwargs): super().__init__(callbacks=callbacks, **kwargs) - # we only keep track of the L3 order book if we have at least one subscribed order-book callback. - # use case: subscribing to the L3 book plus Trade type gives you order_type information (see _received below), - # and we don't need to do the rest of the book-keeping unless we have an active callback self.__reset() def __reset(self): - self.order_map = {} - self.order_type_map = {} - self.seq_no = None - # sequence number validation only works when the FULL data stream is enabled - chan = self.std_channel_to_exchange(L2_BOOK) - if chan in self.subscription: - pairs = self.subscription[chan] - self.seq_no = {pair: None for pair in pairs} self._l2_book = {} async def _ticker(self, msg: dict, timestamp: float): @@ -167,62 +155,11 @@ async def _pair_level2_update(self, msg: dict, timestamp: float, ts: datetime): await self.book_callback(L2_BOOK, self._l2_book[pair], timestamp, timestamp=ts, raw=msg, delta=delta) - async def _book_snapshot(self, pairs: list): - # Coinbase needs some time to send messages to us - # before we request the snapshot. If we don't sleep - # the snapshot seq no could be much earlier than - # the subsequent messages, causing a seq no mismatch. - await asyncio.sleep(2) - - urls = [self.rest_endpoints[0].route('l3book', self.sandbox).format(pair) for pair in pairs] - - results = [] - headers = get_private_parameters(self.config, rest_api=True, endpoint='product_book') - for url in urls: - ret = await self.http_conn.read(url, header=headers) - results.append(ret) - # rate limit - 3 per second - await asyncio.sleep(0.3) - - timestamp = time.time() - for res, pair in zip(results, pairs): - orders = json.loads(res, parse_float=Decimal) - npair = self.exchange_symbol_to_std_symbol(pair) - self._l2_book[npair] = OrderBook(self.id, pair, max_depth=self.max_depth) - # self.seq_no[npair] = orders['sequence'] # TODO: no longer available post transition to Coinbase Advanced Trade - for side in (BID, ASK): - for order in orders['pricebook'][side + 's']: - price = Decimal(order['price']) - size = Decimal(order['size']) - order_id = f'{side}@{size}@{price}' - if price in self._l2_book[npair].book[side]: - self._l2_book[npair].book[side][price][order_id] = size - else: - self._l2_book[npair].book[side][price] = {order_id: size} - self.order_map[order_id] = (price, size) - await self.book_callback(L2_BOOK, self._l2_book[npair], timestamp, raw=orders) - async def message_handler(self, msg: str, conn: AsyncConnection, timestamp: float): # PERF perf_start(self.id, 'msg') msg = json.loads(msg, parse_float=Decimal) if 'channel' in msg and 'events' in msg: for event in msg['events']: - if self.seq_no and 'product_id' in event and 'sequence_num' in msg: - pair = self.exchange_symbol_to_std_symbol(event['product_id']) - if not self.seq_no.get(pair, None): - return - if msg['sequence_num'] <= self.seq_no[pair]: - return - if msg['sequence_num'] != self.seq_no[pair] + 1: - LOG.warning("%s: Missing sequence number detected for %s. Received %d, expected %d", self.id, - pair, msg['sequence_num'], self.seq_no[pair] + 1) - LOG.warning("%s: Resetting data for %s", self.id, pair) - self.__reset() - await self._book_snapshot([pair]) - return - - self.seq_no[pair] = msg['sequence_num'] - if msg['channel'] == 'market_trades': if event.get('type') == 'update': for trade in event['trades']: From 12994cf7606150554f12e16105a05570c4990c3a Mon Sep 17 00:00:00 2001 From: Thomas Bouamoud Date: Wed, 24 Jan 2024 18:42:34 +0000 Subject: [PATCH 09/10] updates based on PR comments --- CHANGES.md | 2 +- cryptofeed/exchanges/coinbase.py | 8 -------- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ca6208365..f23c7c7ba 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,6 @@ ## Changelog -### 2.5.0 +### 2.4.2 * Update: transitioned from Coinbase Pro (retired) to Coinbase Advanced Trade ### 2.4.1 diff --git a/cryptofeed/exchanges/coinbase.py b/cryptofeed/exchanges/coinbase.py index bf2f41b0e..3cf63882a 100644 --- a/cryptofeed/exchanges/coinbase.py +++ b/cryptofeed/exchanges/coinbase.py @@ -88,14 +88,6 @@ def __init__(self, callbacks=None, **kwargs): def __reset(self): self._l2_book = {} - async def _ticker(self, msg: dict, timestamp: float): - # TODO: The ticker endpoint payload has been updated and no longer includes best ask and bid. - # Do we want to: - # 1. get rid of that callback - # 2. implement ticker based on l2 book sub - # 3. implement a new ticker callback (will be different from other exchanges) - raise NotImplementedError - async def _trade_update(self, msg: dict, timestamp: float): ''' { From 2ef127694dcb08c07cdc6496dacce7bf3ecf78e7 Mon Sep 17 00:00:00 2001 From: Thomas Bouamoud Date: Fri, 26 Jan 2024 02:51:43 +0000 Subject: [PATCH 10/10] updated CHANGES.md based on PR comment --- CHANGES.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f23c7c7ba..4fa3390fa 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,10 +1,8 @@ ## Changelog -### 2.4.2 - * Update: transitioned from Coinbase Pro (retired) to Coinbase Advanced Trade - ### 2.4.1 * Bugfix: Handle empty nextFundingRate in OKX + * Update: transitioned from Coinbase Pro (retired) to Coinbase Advanced Trade ### 2.4.0 (2024-01-07) * Update: Fix tests