Skip to content

Commit

Permalink
Removed seq_no as per doc: Subscribe to the level2 channel to guarant…
Browse files Browse the repository at this point in the history
…ee that messages are delivered and your order book is in sync.
  • Loading branch information
thomasbs17 committed Jan 17, 2024
1 parent 4997c32 commit f039f69
Showing 1 changed file with 0 additions and 63 deletions.
63 changes: 0 additions & 63 deletions cryptofeed/exchanges/coinbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
import asyncio
import datetime
import hashlib
import hmac
Expand Down Expand Up @@ -84,20 +83,9 @@ def symbols(cls, config: dict = None, refresh=False) -> list:

def __init__(self, callbacks=None, **kwargs):
super().__init__(callbacks=callbacks, **kwargs)
# we only keep track of the L3 order book if we have at least one subscribed order-book callback.
# use case: subscribing to the L3 book plus Trade type gives you order_type information (see _received below),
# and we don't need to do the rest of the book-keeping unless we have an active callback
self.__reset()

def __reset(self):
self.order_map = {}
self.order_type_map = {}
self.seq_no = None
# sequence number validation only works when the FULL data stream is enabled
chan = self.std_channel_to_exchange(L2_BOOK)
if chan in self.subscription:
pairs = self.subscription[chan]
self.seq_no = {pair: None for pair in pairs}
self._l2_book = {}

async def _ticker(self, msg: dict, timestamp: float):
Expand Down Expand Up @@ -167,62 +155,11 @@ async def _pair_level2_update(self, msg: dict, timestamp: float, ts: datetime):

await self.book_callback(L2_BOOK, self._l2_book[pair], timestamp, timestamp=ts, raw=msg, delta=delta)

async def _book_snapshot(self, pairs: list):
# Coinbase needs some time to send messages to us
# before we request the snapshot. If we don't sleep
# the snapshot seq no could be much earlier than
# the subsequent messages, causing a seq no mismatch.
await asyncio.sleep(2)

urls = [self.rest_endpoints[0].route('l3book', self.sandbox).format(pair) for pair in pairs]

results = []
headers = get_private_parameters(self.config, rest_api=True, endpoint='product_book')
for url in urls:
ret = await self.http_conn.read(url, header=headers)
results.append(ret)
# rate limit - 3 per second
await asyncio.sleep(0.3)

timestamp = time.time()
for res, pair in zip(results, pairs):
orders = json.loads(res, parse_float=Decimal)
npair = self.exchange_symbol_to_std_symbol(pair)
self._l2_book[npair] = OrderBook(self.id, pair, max_depth=self.max_depth)
# self.seq_no[npair] = orders['sequence'] # TODO: no longer available post transition to Coinbase Advanced Trade
for side in (BID, ASK):
for order in orders['pricebook'][side + 's']:
price = Decimal(order['price'])
size = Decimal(order['size'])
order_id = f'{side}@{size}@{price}'
if price in self._l2_book[npair].book[side]:
self._l2_book[npair].book[side][price][order_id] = size
else:
self._l2_book[npair].book[side][price] = {order_id: size}
self.order_map[order_id] = (price, size)
await self.book_callback(L2_BOOK, self._l2_book[npair], timestamp, raw=orders)

async def message_handler(self, msg: str, conn: AsyncConnection, timestamp: float):
# PERF perf_start(self.id, 'msg')
msg = json.loads(msg, parse_float=Decimal)
if 'channel' in msg and 'events' in msg:
for event in msg['events']:
if self.seq_no and 'product_id' in event and 'sequence_num' in msg:
pair = self.exchange_symbol_to_std_symbol(event['product_id'])
if not self.seq_no.get(pair, None):
return
if msg['sequence_num'] <= self.seq_no[pair]:
return
if msg['sequence_num'] != self.seq_no[pair] + 1:
LOG.warning("%s: Missing sequence number detected for %s. Received %d, expected %d", self.id,
pair, msg['sequence_num'], self.seq_no[pair] + 1)
LOG.warning("%s: Resetting data for %s", self.id, pair)
self.__reset()
await self._book_snapshot([pair])
return

self.seq_no[pair] = msg['sequence_num']

if msg['channel'] == 'market_trades':
if event.get('type') == 'update':
for trade in event['trades']:
Expand Down

0 comments on commit f039f69

Please sign in to comment.