Skip to content

Commit

Permalink
[python] cleaning up with feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
davidiw committed May 18, 2023
1 parent 4427375 commit 2cdb8fd
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 121 deletions.
121 changes: 59 additions & 62 deletions ecosystem/python/sdk/aptos_sdk/account_sequence_number.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,116 +13,113 @@ class AccountSequenceNumber:
"""
A managed wrapper around sequence numbers that implements the trivial flow control used by the
Aptos faucet:
* Submit up to 50 transactions per account in parallel with a timeout of 20 seconds
* If local assumes 50 are in flight, determine the actual committed state from the network
* If there are less than 50 due to some being committed, adjust the window
* If 50 are in flight Wait .1 seconds before re-evaluating
* Submit up to 100 transactions per account in parallel with a timeout of 20 seconds
* If local assumes 100 are in flight, determine the actual committed state from the network
* If there are less than 100 due to some being committed, adjust the window
* If 100 are in flight Wait .1 seconds before re-evaluating
* If ever waiting more than 30 seconds restart the sequence number to the current on-chain state
Assumptions:
* Accounts are expected to be managed by a single AccountSequenceNumber and not used otherwise.
* They are initialized to the current on-chain state, so if there are already transactions in
flight, they make take some time to reset.
flight, they may take some time to reset.
* Accounts are automatically initialized if not explicitly
Notes:
* This is co-routine safe, that is many async tasks can be reading from this concurrently.
* The state of an account cannot be used across multiple AccountSequenceNumber services.
* The synchronize method will create a barrier that prevents additional next_sequence_number
calls until it is complete.
* This only manages the distribution of sequence numbers it does not help handle transaction
failures.
* If a transaction fails, you should call synchronize and wait for timeouts.
* Mempool limits the number of transactions per account to 100, hence why we chose 100.
"""

client: RestClient
account: AccountAddress
lock = asyncio.Lock
_client: RestClient
_account: AccountAddress
_lock: asyncio.Lock

maximum_in_flight: int = 100
maximum_wait_time = 30
sleep_time = 0.01
_maximum_in_flight: int = 100
_maximum_wait_time: int = 30
_sleep_time: float = 0.01

last_committed_number: Optional[int]
current_number: Optional[int]
_last_committed_number: Optional[int]
_current_number: Optional[int]

def __init__(self, client: RestClient, account: AccountAddress):
self.client = client
self.account = account
self.lock = asyncio.Lock()
self._client = client
self._account = account
self._lock = asyncio.Lock()

self.last_uncommitted_number = None
self.current_number = None
self._last_uncommitted_number = None
self._current_number = None

async def next_sequence_number(self, block: bool = True) -> Optional[int]:
"""
Returns the next sequence number available on this account. This leverages a lock to
guarantee first-in, first-out ordering of requests.
"""
await self.lock.acquire()
try:
if self.last_uncommitted_number is None or self.current_number is None:
await self.initialize()
async with self._lock:
if self._last_uncommitted_number is None or self._current_number is None:
await self._initialize()
# If there are more than self._maximum_in_flight in flight, wait for a slot.
# Or at least check to see if there is a slot and exit if in non-blocking mode.
if (
self.current_number - self.last_uncommitted_number
>= self.maximum_in_flight
self._current_number - self._last_uncommitted_number
>= self._maximum_in_flight
):
await self.__update()
await self._update()
start_time = time.time()
while (
self.current_number - self.last_uncommitted_number
>= self.maximum_in_flight
self._current_number - self._last_uncommitted_number
>= self._maximum_in_flight
):
if not block:
return None
await asyncio.sleep(self.sleep_time)
if time.time() - start_time > self.maximum_wait_time:

await asyncio.sleep(self._sleep_time)
if time.time() - start_time > self._maximum_wait_time:
logging.warn(
f"Waited over 30 seconds for a transaction to commit, resyncing {self.account.address().hex()}"
f"Waited over {self._maximum_wait_time} seconds for a transaction to commit, resyncing {self._account}"
)
await self.initialize()
await self._initialize()
else:
await self.__update()
next_number = self.current_number
self.current_number += 1
finally:
self.lock.release()
await self._update()

next_number = self._current_number
self._current_number += 1
return next_number

async def initialize(self):
async def _initialize(self):
"""Optional initializer. called by next_sequence_number if not called prior."""
self.current_number = await self.__current_sequence_number()
self.last_uncommitted_number = self.current_number
self._current_number = await self._current_sequence_number()
self._last_uncommitted_number = self._current_number

async def synchronize(self):
"""
Poll the network until all submitted transactions have either been committed or until
the maximum wait time has elapsed. This will prevent any calls to next_sequence_number
until this called has returned.
"""
if self.last_uncommitted_number == self.current_number:
return

await self.lock.acquire()
try:
await self.__update()
async with self._lock:
await self._update()
start_time = time.time()
while self.last_uncommitted_number != self.current_number:
print(f"{self.last_uncommitted_number} {self.current_number}")
if time.time() - start_time > self.maximum_wait_time:
while self._last_uncommitted_number != self._current_number:
if time.time() - start_time > self._maximum_wait_time:
logging.warn(
f"Waited over 30 seconds for a transaction to commit, resyncing {self.account.address}"
f"Waited over {self._maximum_wait_time} seconds for a transaction to commit, resyncing {self._account}"
)
await self.initialize()
await self._initialize()
else:
await asyncio.sleep(self.sleep_time)
await self.__update()
finally:
self.lock.release()
await asyncio.sleep(self._sleep_time)
await self._update()

async def __update(self):
self.last_uncommitted_number = await self.__current_sequence_number()
return self.last_uncommitted_number
async def _update(self):
self._last_uncommitted_number = await self._current_sequence_number()
return self._last_uncommitted_number

async def __current_sequence_number(self) -> int:
return await self.client.account_sequence_number(self.account)
async def _current_sequence_number(self) -> int:
return await self._client.account_sequence_number(self._account)


import unittest
Expand Down Expand Up @@ -158,7 +155,7 @@ async def test_common_path(self):
)
patcher.start()

for seq_num in range(AccountSequenceNumber.maximum_in_flight):
for seq_num in range(AccountSequenceNumber._maximum_in_flight):
last_seq_num = await account_sequence_number.next_sequence_number()
self.assertEqual(last_seq_num, seq_num + 5)

Expand All @@ -173,6 +170,6 @@ async def test_common_path(self):
)
patcher.start()

self.assertNotEqual(account_sequence_number.current_number, last_seq_num)
self.assertNotEqual(account_sequence_number._current_number, last_seq_num)
await account_sequence_number.synchronize()
self.assertEqual(account_sequence_number.current_number, next_sequence_number)
self.assertEqual(account_sequence_number._current_number, next_sequence_number)
9 changes: 6 additions & 3 deletions ecosystem/python/sdk/aptos_sdk/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ClientConfig:
expiration_ttl: int = 600
gas_unit_price: int = 100
max_gas_amount: int = 100_000
transaction_wait_in_seconds: int = 20
transaction_wait_in_seconds: int = 60
http2: bool = False


Expand All @@ -45,12 +45,15 @@ class RestClient:
def __init__(self, base_url: str, client_config: ClientConfig = ClientConfig()):
self.base_url = base_url
# Default limits
limits = httpx.Limits()
limits = httpx.Limits(max_connections=20)
# Default timeouts but do not set a pool timeout, since the idea is that jobs will wait as
# long as progress is being made.
timeout = httpx.Timeout(60.0, pool=None)
self.client = httpx.AsyncClient(
http2=client_config.http2, limits=limits, timeout=timeout
http2=client_config.http2,
limits=limits,
timeout=timeout,
http1=False,
)
self.client_config = client_config
self._chain_id = None
Expand Down
15 changes: 8 additions & 7 deletions ecosystem/python/sdk/aptos_sdk/transaction_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(
self._outstanding_transactions = asyncio.Queue()
self._processed_transactions = asyncio.Queue()

def account(self) -> AccountAddress:
def address(self) -> AccountAddress:
return self._account.address()

async def _submit_transactions_task(self):
Expand All @@ -84,22 +84,23 @@ async def _submit_transactions_task(self):
async def _process_transactions_task(self):
try:
while True:
# Always start waiting for one
# Always start waiting for one, that way we can acquire a batch in the loop below.
(
txn_awaitable,
txn_hash_awaitable,
sequence_number,
) = await self._outstanding_transactions.get()
awaitables = [txn_awaitable]
awaitables = [txn_hash_awaitable]
sequence_numbers = [sequence_number]

# Only acquire if there are more
# Now acquire our batch.
while not self._outstanding_transactions.empty():
(
txn_awaitable,
txn_hash_awaitable,
sequence_number,
) = await self._outstanding_transactions.get()
awaitables.append(txn_awaitable)
awaitables.append(txn_hash_awaitable)
sequence_numbers.append(sequence_number)

outputs = await asyncio.gather(*awaitables, return_exceptions=True)

for (output, sequence_number) in zip(outputs, sequence_numbers):
Expand Down
6 changes: 0 additions & 6 deletions ecosystem/python/sdk/examples/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,3 @@
"APTOS_FAUCET_URL",
"https://faucet.devnet.aptoslabs.com",
) # <:!:section_1

NODE_URL = os.getenv("APTOS_NODE_URL", "http://127.0.0.1:8080/v1")
FAUCET_URL = os.getenv(
"APTOS_FAUCET_URL",
"http://127.0.0.1:8081",
) # <:!:section_1
Loading

0 comments on commit 2cdb8fd

Please sign in to comment.