-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.py
96 lines (81 loc) · 4.16 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from datetime import timedelta
from typing import List, Dict
from asyncio import gather
from aiohttp import ClientSession
from memoize.wrapper import memoize
from memoize.configuration import MutableCacheConfiguration, DefaultInMemoryCacheConfiguration
from memoize.key import EncodedMethodNameAndArgsKeyExtractor
from entities import Blockchain, Transaction, HeightPaginatedResponse
from blockchains import BLOCKCHAINS
from providers.abstract import AbstractProvider
from providers.blockchair import BlockChairProvider
from providers.bitgo import BitgoFeeProvider
from providers.etherscan import EtherscanProvider
from providers.ripple import RippleProvider
from providers.tezos import TezosProvider
def blockchain_cache_config():
return MutableCacheConfiguration.initialized_with(DefaultInMemoryCacheConfiguration(
capacity=1000,
method_timeout=timedelta(seconds=60),
update_after=timedelta(seconds=10),
expire_after=timedelta(seconds=10),
)).set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=True))
class Client(ClientSession):
provider_map: Dict[str, AbstractProvider]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
bitgo = BitgoFeeProvider()
blockchair = BlockChairProvider(bitgo)
self.provider_map = {
'bitcoin-mainnet': blockchair,
'bitcoin-testnet': blockchair,
'bitcoincash-mainnet': blockchair,
'litecoin-mainnet': blockchair,
'dogecoin-mainnet': blockchair,
'ethereum-mainnet': EtherscanProvider(),
'ripple-mainnet': RippleProvider(),
'tezos-mainnet': TezosProvider(),
}
def _get_provider(self, blockchain_id: str) -> AbstractProvider:
if blockchain_id not in self.provider_map:
raise ValueError(f'Unsupported chain: {blockchain_id}')
return self.provider_map[blockchain_id]
@memoize(configuration=blockchain_cache_config())
async def get_blockchain(self, chain_id: str) -> Blockchain:
print('rendering blockchain')
provider = self._get_provider(chain_id)
return await provider.get_blockchain_data(self, chain_id)
@memoize(configuration=blockchain_cache_config())
async def get_blockchains(self, testnet: bool) -> List[Blockchain]:
print('rendering blockchains')
tasks = []
for chain in BLOCKCHAINS:
if (testnet and chain['is_mainnet']) or (not testnet and not chain['is_mainnet']):
continue
provider = self._get_provider(chain['id'])
tasks.append(provider.get_blockchain_data(self, chain['id']))
results = await gather(*tasks)
return list(results)
async def get_transactions(self, addresses: List[str], blockchain_id: str, start_height: int, end_height: int,
max_page_size: int, include_raw: bool) -> HeightPaginatedResponse[Transaction]:
provider = self._get_provider(blockchain_id)
tasks = [provider.get_address_transactions(session=self, chain_id=blockchain_id,
address=addr, start_height=start_height,
end_height=end_height) for addr in addresses]
results = await gather(*tasks)
all_txns = []
lowest_next_start_height = None
highest_next_end_height = None
for resp in results:
if resp.has_more:
if lowest_next_start_height is None or resp.next_start_height < lowest_next_start_height:
lowest_next_start_height = resp.next_start_height
if highest_next_end_height is None or resp.next_end_height > highest_next_end_height:
highest_next_end_height = resp.next_end_height
all_txns.extend(resp.contents)
resp = HeightPaginatedResponse(contents=all_txns, has_more=False)
if lowest_next_start_height is not None or highest_next_end_height is not None:
resp.has_more = True
resp.next_start_height = lowest_next_start_height
resp.next_end_height = highest_next_end_height
return resp