From 2cdb8fd158986b15544e5b8c675dadd29700c373 Mon Sep 17 00:00:00 2001 From: David Wolinsky Date: Tue, 2 May 2023 23:07:41 -0700 Subject: [PATCH] [python] cleaning up with feedback --- .../sdk/aptos_sdk/account_sequence_number.py | 121 ++++++----- .../python/sdk/aptos_sdk/async_client.py | 9 +- .../sdk/aptos_sdk/transaction_worker.py | 15 +- ecosystem/python/sdk/examples/common.py | 6 - .../sdk/examples/transaction-batching.py | 193 ++++++++++++++---- 5 files changed, 223 insertions(+), 121 deletions(-) diff --git a/ecosystem/python/sdk/aptos_sdk/account_sequence_number.py b/ecosystem/python/sdk/aptos_sdk/account_sequence_number.py index 266a25047d708..72f8a9c15e191 100644 --- a/ecosystem/python/sdk/aptos_sdk/account_sequence_number.py +++ b/ecosystem/python/sdk/aptos_sdk/account_sequence_number.py @@ -13,83 +13,87 @@ class AccountSequenceNumber: """ A managed wrapper around sequence numbers that implements the trivial flow control used by the Aptos faucet: - * Submit up to 50 transactions per account in parallel with a timeout of 20 seconds - * If local assumes 50 are in flight, determine the actual committed state from the network - * If there are less than 50 due to some being committed, adjust the window - * If 50 are in flight Wait .1 seconds before re-evaluating + * Submit up to 100 transactions per account in parallel with a timeout of 20 seconds + * If local assumes 100 are in flight, determine the actual committed state from the network + * If there are less than 100 due to some being committed, adjust the window + * If 100 are in flight Wait .1 seconds before re-evaluating * If ever waiting more than 30 seconds restart the sequence number to the current on-chain state Assumptions: * Accounts are expected to be managed by a single AccountSequenceNumber and not used otherwise. * They are initialized to the current on-chain state, so if there are already transactions in - flight, they make take some time to reset. + flight, they may take some time to reset. * Accounts are automatically initialized if not explicitly Notes: * This is co-routine safe, that is many async tasks can be reading from this concurrently. + * The state of an account cannot be used across multiple AccountSequenceNumber services. * The synchronize method will create a barrier that prevents additional next_sequence_number calls until it is complete. * This only manages the distribution of sequence numbers it does not help handle transaction failures. + * If a transaction fails, you should call synchronize and wait for timeouts. + * Mempool limits the number of transactions per account to 100, hence why we chose 100. """ - client: RestClient - account: AccountAddress - lock = asyncio.Lock + _client: RestClient + _account: AccountAddress + _lock: asyncio.Lock - maximum_in_flight: int = 100 - maximum_wait_time = 30 - sleep_time = 0.01 + _maximum_in_flight: int = 100 + _maximum_wait_time: int = 30 + _sleep_time: float = 0.01 - last_committed_number: Optional[int] - current_number: Optional[int] + _last_committed_number: Optional[int] + _current_number: Optional[int] def __init__(self, client: RestClient, account: AccountAddress): - self.client = client - self.account = account - self.lock = asyncio.Lock() + self._client = client + self._account = account + self._lock = asyncio.Lock() - self.last_uncommitted_number = None - self.current_number = None + self._last_uncommitted_number = None + self._current_number = None async def next_sequence_number(self, block: bool = True) -> Optional[int]: """ Returns the next sequence number available on this account. This leverages a lock to guarantee first-in, first-out ordering of requests. """ - await self.lock.acquire() - try: - if self.last_uncommitted_number is None or self.current_number is None: - await self.initialize() + async with self._lock: + if self._last_uncommitted_number is None or self._current_number is None: + await self._initialize() + # If there are more than self._maximum_in_flight in flight, wait for a slot. + # Or at least check to see if there is a slot and exit if in non-blocking mode. if ( - self.current_number - self.last_uncommitted_number - >= self.maximum_in_flight + self._current_number - self._last_uncommitted_number + >= self._maximum_in_flight ): - await self.__update() + await self._update() start_time = time.time() while ( - self.current_number - self.last_uncommitted_number - >= self.maximum_in_flight + self._current_number - self._last_uncommitted_number + >= self._maximum_in_flight ): if not block: return None - await asyncio.sleep(self.sleep_time) - if time.time() - start_time > self.maximum_wait_time: + + await asyncio.sleep(self._sleep_time) + if time.time() - start_time > self._maximum_wait_time: logging.warn( - f"Waited over 30 seconds for a transaction to commit, resyncing {self.account.address().hex()}" + f"Waited over {self._maximum_wait_time} seconds for a transaction to commit, resyncing {self._account}" ) - await self.initialize() + await self._initialize() else: - await self.__update() - next_number = self.current_number - self.current_number += 1 - finally: - self.lock.release() + await self._update() + + next_number = self._current_number + self._current_number += 1 return next_number - async def initialize(self): + async def _initialize(self): """Optional initializer. called by next_sequence_number if not called prior.""" - self.current_number = await self.__current_sequence_number() - self.last_uncommitted_number = self.current_number + self._current_number = await self._current_sequence_number() + self._last_uncommitted_number = self._current_number async def synchronize(self): """ @@ -97,32 +101,25 @@ async def synchronize(self): the maximum wait time has elapsed. This will prevent any calls to next_sequence_number until this called has returned. """ - if self.last_uncommitted_number == self.current_number: - return - - await self.lock.acquire() - try: - await self.__update() + async with self._lock: + await self._update() start_time = time.time() - while self.last_uncommitted_number != self.current_number: - print(f"{self.last_uncommitted_number} {self.current_number}") - if time.time() - start_time > self.maximum_wait_time: + while self._last_uncommitted_number != self._current_number: + if time.time() - start_time > self._maximum_wait_time: logging.warn( - f"Waited over 30 seconds for a transaction to commit, resyncing {self.account.address}" + f"Waited over {self._maximum_wait_time} seconds for a transaction to commit, resyncing {self._account}" ) - await self.initialize() + await self._initialize() else: - await asyncio.sleep(self.sleep_time) - await self.__update() - finally: - self.lock.release() + await asyncio.sleep(self._sleep_time) + await self._update() - async def __update(self): - self.last_uncommitted_number = await self.__current_sequence_number() - return self.last_uncommitted_number + async def _update(self): + self._last_uncommitted_number = await self._current_sequence_number() + return self._last_uncommitted_number - async def __current_sequence_number(self) -> int: - return await self.client.account_sequence_number(self.account) + async def _current_sequence_number(self) -> int: + return await self._client.account_sequence_number(self._account) import unittest @@ -158,7 +155,7 @@ async def test_common_path(self): ) patcher.start() - for seq_num in range(AccountSequenceNumber.maximum_in_flight): + for seq_num in range(AccountSequenceNumber._maximum_in_flight): last_seq_num = await account_sequence_number.next_sequence_number() self.assertEqual(last_seq_num, seq_num + 5) @@ -173,6 +170,6 @@ async def test_common_path(self): ) patcher.start() - self.assertNotEqual(account_sequence_number.current_number, last_seq_num) + self.assertNotEqual(account_sequence_number._current_number, last_seq_num) await account_sequence_number.synchronize() - self.assertEqual(account_sequence_number.current_number, next_sequence_number) + self.assertEqual(account_sequence_number._current_number, next_sequence_number) diff --git a/ecosystem/python/sdk/aptos_sdk/async_client.py b/ecosystem/python/sdk/aptos_sdk/async_client.py index bcc90cea75552..b33c9d51f390b 100644 --- a/ecosystem/python/sdk/aptos_sdk/async_client.py +++ b/ecosystem/python/sdk/aptos_sdk/async_client.py @@ -30,7 +30,7 @@ class ClientConfig: expiration_ttl: int = 600 gas_unit_price: int = 100 max_gas_amount: int = 100_000 - transaction_wait_in_seconds: int = 20 + transaction_wait_in_seconds: int = 60 http2: bool = False @@ -45,12 +45,15 @@ class RestClient: def __init__(self, base_url: str, client_config: ClientConfig = ClientConfig()): self.base_url = base_url # Default limits - limits = httpx.Limits() + limits = httpx.Limits(max_connections=20) # Default timeouts but do not set a pool timeout, since the idea is that jobs will wait as # long as progress is being made. timeout = httpx.Timeout(60.0, pool=None) self.client = httpx.AsyncClient( - http2=client_config.http2, limits=limits, timeout=timeout + http2=client_config.http2, + limits=limits, + timeout=timeout, + http1=False, ) self.client_config = client_config self._chain_id = None diff --git a/ecosystem/python/sdk/aptos_sdk/transaction_worker.py b/ecosystem/python/sdk/aptos_sdk/transaction_worker.py index ce0a7648a22d2..22ec9ca87ac72 100644 --- a/ecosystem/python/sdk/aptos_sdk/transaction_worker.py +++ b/ecosystem/python/sdk/aptos_sdk/transaction_worker.py @@ -57,7 +57,7 @@ def __init__( self._outstanding_transactions = asyncio.Queue() self._processed_transactions = asyncio.Queue() - def account(self) -> AccountAddress: + def address(self) -> AccountAddress: return self._account.address() async def _submit_transactions_task(self): @@ -84,22 +84,23 @@ async def _submit_transactions_task(self): async def _process_transactions_task(self): try: while True: - # Always start waiting for one + # Always start waiting for one, that way we can acquire a batch in the loop below. ( - txn_awaitable, + txn_hash_awaitable, sequence_number, ) = await self._outstanding_transactions.get() - awaitables = [txn_awaitable] + awaitables = [txn_hash_awaitable] sequence_numbers = [sequence_number] - # Only acquire if there are more + # Now acquire our batch. while not self._outstanding_transactions.empty(): ( - txn_awaitable, + txn_hash_awaitable, sequence_number, ) = await self._outstanding_transactions.get() - awaitables.append(txn_awaitable) + awaitables.append(txn_hash_awaitable) sequence_numbers.append(sequence_number) + outputs = await asyncio.gather(*awaitables, return_exceptions=True) for (output, sequence_number) in zip(outputs, sequence_numbers): diff --git a/ecosystem/python/sdk/examples/common.py b/ecosystem/python/sdk/examples/common.py index 8272f249de443..b81711d057ecf 100644 --- a/ecosystem/python/sdk/examples/common.py +++ b/ecosystem/python/sdk/examples/common.py @@ -9,9 +9,3 @@ "APTOS_FAUCET_URL", "https://faucet.devnet.aptoslabs.com", ) # <:!:section_1 - -NODE_URL = os.getenv("APTOS_NODE_URL", "http://127.0.0.1:8080/v1") -FAUCET_URL = os.getenv( - "APTOS_FAUCET_URL", - "http://127.0.0.1:8081", -) # <:!:section_1 diff --git a/ecosystem/python/sdk/examples/transaction-batching.py b/ecosystem/python/sdk/examples/transaction-batching.py index 714707d2f19ff..3f9472e271bff 100644 --- a/ecosystem/python/sdk/examples/transaction-batching.py +++ b/ecosystem/python/sdk/examples/transaction-batching.py @@ -1,16 +1,19 @@ # Copyright © Aptos Foundation # SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + import asyncio -import sys +import logging import time -import typing from multiprocessing import Pipe, Process from multiprocessing.connection import Connection +from typing import Any, List from aptos_sdk.account import Account from aptos_sdk.account_address import AccountAddress from aptos_sdk.account_sequence_number import AccountSequenceNumber +from aptos_sdk.aptos_token_client import AptosTokenClient, Property, PropertyMap from aptos_sdk.async_client import ClientConfig, FaucetClient, RestClient from aptos_sdk.bcs import Serializer from aptos_sdk.transaction_worker import TransactionWorker @@ -91,13 +94,13 @@ def __init__(self, node_url: str, account: Account, recipient: AccountAddress): target=Worker.run, args=(conn, node_url, account, recipient) ) - def get(self) -> typing.Any: + def get(self) -> Any: self._conn.recv() def join(self): self._process.join() - def put(self, value: typing.Any): + def put(self, value: Any): self._conn.send(value) def start(self): @@ -130,10 +133,10 @@ def __init__( def run(queue: Pipe, node_url: str, account: Account, recipient: AccountAddress): worker = Worker(queue, node_url, account, recipient) - asyncio.run(worker.arun()) + asyncio.run(worker.async_run()) - async def arun(self): - print(f"hello from {self._account.address()}", flush=True) + async def async_run(self): + print(f"hello from {self._account.address()}") try: self._txn_worker.start() @@ -142,7 +145,7 @@ async def arun(self): await self._txn_generator.increase_transaction_count(num_txns) - print(f"Increase txns from {self._account.address()}", flush=True) + print(f"Increase txns from {self._account.address()}") self._conn.send(True) self._conn.recv() @@ -155,13 +158,14 @@ async def arun(self): exception, ) = await self._txn_worker.next_processed_transaction() if exception: - print( - f"Account {self._txn_worker.account()}, transaction {sequence_number} submission failed: {exception}" + logging.error( + f"Account {self._txn_worker.address()}, transaction {sequence_number} submission failed.", + exc_info=exception, ) else: txn_hashes.append(txn_hash) - print(f"Submit txns from {self._account.address()}", flush=True) + print(f"Submitted txns from {self._account.address()}", flush=True) self._conn.send(True) self._conn.recv() @@ -172,10 +176,13 @@ async def arun(self): print(f"Verified txns from {self._account.address()}", flush=True) self._conn.send(True) except Exception as e: - print(e) - sys.stdout.flush() + logging.error( + "Failed during run.", + exc_info=e, + ) +# This performs a simple p2p transaction async def transfer_transaction( client: RestClient, sender: Account, @@ -199,42 +206,109 @@ async def transfer_transaction( ) -async def main(): - client_config = ClientConfig() - # Toggle to benchmark - client_config.http2 = False - client_config.http2 = True - rest_client = RestClient(NODE_URL, client_config) +# This will create a collection in the first transaction and then create NFTs thereafter. +# Note: Please adjust the sequence number and the name of the collection if run on the same set of +# accounts, otherwise you may end up not creating a collection and failing all transactions. +async def token_transaction( + client: RestClient, + sender: Account, + sequence_number: int, + recipient: AccountAddress, + amount: int, +) -> str: + collection_name = "Funky Alice's" + if sequence_number == 8351: + payload = AptosTokenClient.create_collection_payload( + "Alice's simple collection", + 20000000000, + collection_name, + "https://aptos.dev", + True, + True, + True, + True, + True, + True, + True, + True, + True, + 0, + 1, + ) + else: + payload = AptosTokenClient.mint_token_payload( + collection_name, + "Alice's simple token", + f"token {sequence_number}", + "https://aptos.dev/img/nyan.jpeg", + PropertyMap([Property.string("string", "string value")]), + ) + return await client.create_bcs_signed_transaction(sender, payload, sequence_number) + + +class Accounts: + source: Account + senders: List[Account] + receivers: List[Account] + + def __init__(self, source, senders, receivers): + self.source = source + self.senders = senders + self.receivers = receivers + + def generate(path: str, num_accounts: int) -> Accounts: + source = Account.generate() + source.store(f"{path}/source.txt") + senders = [] + receivers = [] + for idx in range(num_accounts): + senders.append(Account.generate()) + receivers.append(Account.generate()) + senders[-1].store(f"{path}/sender_{idx}.txt") + receivers[-1].store(f"{path}/receiver_{idx}.txt") + return Accounts(source, senders, receivers) + + def load(path: str, num_accounts: int) -> Accounts: + source = Account.load(f"{path}/source.txt") + senders = [] + receivers = [] + for idx in range(num_accounts): + senders.append(Account.load(f"{path}/sender_{idx}.txt")) + receivers.append(Account.load(f"{path}/receiver_{idx}.txt")) + return Accounts(source, senders, receivers) + + +async def fund_from_faucet(rest_client: RestClient, source: Account): faucet_client = FaucetClient(FAUCET_URL, rest_client) - num_accounts = 8 - transactions = 1000 - start = time.time() - - print("Starting...") - - accounts = [] - recipients = [] - - for account in range(num_accounts): - recipients.append(Account.generate()) - accounts.append(Account.generate()) + fund_txns = [] + for _ in range(40): + fund_txns.append(faucet_client.fund_account(source.address(), 100_000_000_000)) + await asyncio.gather(*fund_txns) - last = time.time() - print(f"Accounts generated at {last - start}") - source = Account.generate() - await faucet_client.fund_account(source.address(), 100_000_000 * num_accounts) +async def distribute_portionally( + rest_client: RestClient, + source: Account, + senders: List[Account], + receivers: List[Account], +): balance = int(await rest_client.account_balance(source.address())) + per_node_balance = balance // (len(senders) + 1) + await distribute(rest_client, source, senders, receivers, per_node_balance) - per_node_balance = balance // (num_accounts + 1) - account_sequence_number = AccountSequenceNumber(rest_client, source.address()) - print(f"Initial account funded at {time.time() - start} {time.time() - last}") - last = time.time() +async def distribute( + rest_client: RestClient, + source: Account, + senders: List[Account], + receivers: List[Account], + per_node_amount: int, +): + all_accounts = list(map(lambda account: (account.address(), True), senders)) + all_accounts.extend(map(lambda account: (account.address(), False), receivers)) - all_accounts = list(map(lambda account: (account.address(), True), accounts)) - all_accounts.extend(map(lambda account: (account.address(), False), recipients)) + account_sequence_number = AccountSequenceNumber(rest_client, source.address()) txns = [] txn_hashes = [] @@ -247,7 +321,7 @@ async def main(): txn_hashes.extend(await asyncio.gather(*txns)) txns = [] sequence_number = await account_sequence_number.next_sequence_number() - amount = per_node_balance if fund else 0 + amount = per_node_amount if fund else 0 txn = await transfer_transaction( rest_client, source, sequence_number, account, amount ) @@ -258,6 +332,39 @@ async def main(): await rest_client.wait_for_transaction(txn_hash) await account_sequence_number.synchronize() + +async def main(): + client_config = ClientConfig() + client_config.http2 = True + rest_client = RestClient(NODE_URL, client_config) + + num_accounts = 16 + transactions = 10000 + start = time.time() + + print("Starting...") + + # Generate will create new accounts, load will load existing accounts + all_accounts = Accounts.generate("nodes", num_accounts) + # all_accounts = Accounts.load("nodes", num_accounts) + accounts = all_accounts.senders + receivers = all_accounts.receivers + source = all_accounts.source + + print(f"source: {source.address()}") + + last = time.time() + print(f"Accounts generated / loaded at {last - start}") + + await fund_from_faucet(rest_client, source) + + print(f"Initial account funded at {time.time() - start} {time.time() - last}") + last = time.time() + + balance = await rest_client.account_balance(source.address()) + amount = int(balance * 0.9 / num_accounts) + await distribute(rest_client, source, accounts, receivers, amount) + print(f"Funded all accounts at {time.time() - start} {time.time() - last}") last = time.time() @@ -270,7 +377,7 @@ async def main(): last = time.time() workers = [] - for (account, recipient) in zip(accounts, recipients): + for (account, recipient) in zip(accounts, receivers): workers.append(WorkerContainer(NODE_URL, account, recipient.address())) workers[-1].start()