diff --git a/ecosystem/python/sdk/aptos_sdk/async_client.py b/ecosystem/python/sdk/aptos_sdk/async_client.py index c56e07cbca7be..bcc90cea75552 100644 --- a/ecosystem/python/sdk/aptos_sdk/async_client.py +++ b/ecosystem/python/sdk/aptos_sdk/async_client.py @@ -374,11 +374,19 @@ async def create_multi_agent_bcs_transaction( return SignedTransaction(raw_transaction.inner(), authenticator) async def create_bcs_transaction( - self, sender: Account, payload: TransactionPayload + self, + sender: Account, + payload: TransactionPayload, + sequence_number: Optional[int] = None, ) -> RawTransaction: + sequence_number = ( + sequence_number + if sequence_number is not None + else await self.account_sequence_number(sender.address()) + ) return RawTransaction( sender.address(), - await self.account_sequence_number(sender.address()), + sequence_number, payload, self.client_config.max_gas_amount, self.client_config.gas_unit_price, @@ -387,9 +395,14 @@ async def create_bcs_transaction( ) async def create_bcs_signed_transaction( - self, sender: Account, payload: TransactionPayload + self, + sender: Account, + payload: TransactionPayload, + sequence_number: Optional[int] = None, ) -> SignedTransaction: - raw_transaction = await self.create_bcs_transaction(sender, payload) + raw_transaction = await self.create_bcs_transaction( + sender, payload, sequence_number + ) signature = sender.sign(raw_transaction.keyed()) authenticator = Authenticator( Ed25519Authenticator(sender.public_key(), signature) @@ -419,7 +432,11 @@ async def transfer( # :!:>bcs_transfer async def bcs_transfer( - self, sender: Account, recipient: AccountAddress, amount: int + self, + sender: Account, + recipient: AccountAddress, + amount: int, + sequence_number: Optional[int] = None, ) -> str: transaction_arguments = [ TransactionArgument(recipient, Serializer.struct), @@ -434,7 +451,7 @@ async def bcs_transfer( ) signed_transaction = await self.create_bcs_signed_transaction( - sender, TransactionPayload(payload) + sender, TransactionPayload(payload), sequence_number=sequence_number ) return await self.submit_bcs_transaction(signed_transaction) diff --git a/ecosystem/python/sdk/examples/common.py b/ecosystem/python/sdk/examples/common.py index b81711d057ecf..8272f249de443 100644 --- a/ecosystem/python/sdk/examples/common.py +++ b/ecosystem/python/sdk/examples/common.py @@ -9,3 +9,9 @@ "APTOS_FAUCET_URL", "https://faucet.devnet.aptoslabs.com", ) # <:!:section_1 + +NODE_URL = os.getenv("APTOS_NODE_URL", "http://127.0.0.1:8080/v1") +FAUCET_URL = os.getenv( + "APTOS_FAUCET_URL", + "http://127.0.0.1:8081", +) # <:!:section_1 diff --git a/ecosystem/python/sdk/examples/transaction-batching.py b/ecosystem/python/sdk/examples/transaction-batching.py index 4e5d02cfb9fbc..714707d2f19ff 100644 --- a/ecosystem/python/sdk/examples/transaction-batching.py +++ b/ecosystem/python/sdk/examples/transaction-batching.py @@ -2,26 +2,203 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio -import logging +import sys import time +import typing +from multiprocessing import Pipe, Process +from multiprocessing.connection import Connection from aptos_sdk.account import Account from aptos_sdk.account_address import AccountAddress +from aptos_sdk.account_sequence_number import AccountSequenceNumber from aptos_sdk.async_client import ClientConfig, FaucetClient, RestClient -from aptos_sdk.authenticator import Authenticator, Ed25519Authenticator from aptos_sdk.bcs import Serializer +from aptos_sdk.transaction_worker import TransactionWorker from aptos_sdk.transactions import ( EntryFunction, - RawTransaction, SignedTransaction, TransactionArgument, TransactionPayload, ) -from aptos_sdk.type_tag import StructTag, TypeTag from .common import FAUCET_URL, NODE_URL +class TransactionGenerator: + """ + Demonstrate how one might make a harness for submitting transactions. This class just keeps + submitting the same transaction payload. In practice, this could be a queue, where new payloads + accumulate and are consumed by the call to next_transaction. + + Todo: add tracking of transaction status to this and come up with some general logic to retry + or exit upon failure. + """ + + _client: RestClient + _recipient: AccountAddress + _offset: int + _remaining_transactions: int + _waiting_for_more = asyncio.Event + _complete = asyncio.Event + _lock = asyncio.Lock + + def __init__(self, client: RestClient, recipient: AccountAddress): + self._client = client + self._recipient = recipient + self._waiting_for_more = asyncio.Event() + self._waiting_for_more.clear() + self._complete = asyncio.Event() + self._complete.set() + self._lock = asyncio.Lock() + self._remaining_transactions = 0 + + async def next_transaction( + self, sender: Account, sequence_number: int + ) -> SignedTransaction: + while self._remaining_transactions == 0: + await self._waiting_for_more.wait() + + async with self._lock: + self._remaining_transactions -= 1 + if self._remaining_transactions == 0: + self._waiting_for_more.clear() + self._complete.set() + + return await transfer_transaction( + self._client, sender, sequence_number, self._recipient, 0 + ) + + async def increase_transaction_count(self, number: int): + if number <= 0: + return + + async with self._lock: + self._remaining_transactions += number + self._waiting_for_more.set() + self._complete.clear() + + async def wait(self): + await self._complete.wait() + + +class WorkerContainer: + _conn: Connection + _process: Process + + def __init__(self, node_url: str, account: Account, recipient: AccountAddress): + (self._conn, conn) = Pipe() + self._process = Process( + target=Worker.run, args=(conn, node_url, account, recipient) + ) + + def get(self) -> typing.Any: + self._conn.recv() + + def join(self): + self._process.join() + + def put(self, value: typing.Any): + self._conn.send(value) + + def start(self): + self._process.start() + + +class Worker: + _conn: Connection + _rest_client: RestClient + _account: Account + _recipient: AccountAddress + _txn_generator: TransactionGenerator + _txn_worker: TransactionWorker + + def __init__( + self, + conn: Connection, + node_url: str, + account: Account, + recipient: AccountAddress, + ): + self._conn = conn + self._rest_client = RestClient(node_url) + self._account = account + self._recipient = recipient + self._txn_generator = TransactionGenerator(self._rest_client, self._recipient) + self._txn_worker = TransactionWorker( + self._account, self._rest_client, self._txn_generator.next_transaction + ) + + def run(queue: Pipe, node_url: str, account: Account, recipient: AccountAddress): + worker = Worker(queue, node_url, account, recipient) + asyncio.run(worker.arun()) + + async def arun(self): + print(f"hello from {self._account.address()}", flush=True) + try: + self._txn_worker.start() + + self._conn.send(True) + num_txns = self._conn.recv() + + await self._txn_generator.increase_transaction_count(num_txns) + + print(f"Increase txns from {self._account.address()}", flush=True) + self._conn.send(True) + self._conn.recv() + + txn_hashes = [] + while num_txns != 0: + num_txns -= 1 + ( + sequence_number, + txn_hash, + exception, + ) = await self._txn_worker.next_processed_transaction() + if exception: + print( + f"Account {self._txn_worker.account()}, transaction {sequence_number} submission failed: {exception}" + ) + else: + txn_hashes.append(txn_hash) + + print(f"Submit txns from {self._account.address()}", flush=True) + self._conn.send(True) + self._conn.recv() + + for txn_hash in txn_hashes: + await self._rest_client.wait_for_transaction(txn_hash) + + await self._rest_client.close() + print(f"Verified txns from {self._account.address()}", flush=True) + self._conn.send(True) + except Exception as e: + print(e) + sys.stdout.flush() + + +async def transfer_transaction( + client: RestClient, + sender: Account, + sequence_number: int, + recipient: AccountAddress, + amount: int, +) -> str: + transaction_arguments = [ + TransactionArgument(recipient, Serializer.struct), + TransactionArgument(amount, Serializer.u64), + ] + payload = EntryFunction.natural( + "0x1::aptos_account", + "transfer", + [], + transaction_arguments, + ) + + return await client.create_bcs_signed_transaction( + sender, TransactionPayload(payload), sequence_number + ) + + async def main(): client_config = ClientConfig() # Toggle to benchmark @@ -30,202 +207,113 @@ async def main(): rest_client = RestClient(NODE_URL, client_config) faucet_client = FaucetClient(FAUCET_URL, rest_client) - num_accounts = 5 - read_amplification = 1000 - first_pass = 100 + num_accounts = 8 + transactions = 1000 start = time.time() print("Starting...") accounts = [] - recipient_accounts = [] - for _ in range(num_accounts): + recipients = [] + + for account in range(num_accounts): + recipients.append(Account.generate()) accounts.append(Account.generate()) - recipient_accounts.append(Account.generate()) last = time.time() print(f"Accounts generated at {last - start}") - funds = [] - for account in accounts: - funds.append(faucet_client.fund_account(account.address(), 100_000_000)) - for account in recipient_accounts: - funds.append(faucet_client.fund_account(account.address(), 0)) - await asyncio.gather(*funds) + source = Account.generate() + await faucet_client.fund_account(source.address(), 100_000_000 * num_accounts) + balance = int(await rest_client.account_balance(source.address())) - print(f"Funded accounts at {time.time() - start} {time.time() - last}") + per_node_balance = balance // (num_accounts + 1) + account_sequence_number = AccountSequenceNumber(rest_client, source.address()) + + print(f"Initial account funded at {time.time() - start} {time.time() - last}") last = time.time() - balances = [] - for _ in range(read_amplification): - for account in accounts: - balances.append(rest_client.account_balance(account.address())) - await asyncio.gather(*balances) + all_accounts = list(map(lambda account: (account.address(), True), accounts)) + all_accounts.extend(map(lambda account: (account.address(), False), recipients)) - print(f"Accounts checked at {time.time() - start} {time.time() - last}") - last = time.time() + txns = [] + txn_hashes = [] - account_sequence_numbers = [] - await_account_sequence_numbers = [] - for account in accounts: - account_sequence_number = AccountSequenceNumber(rest_client, account.address()) - await_account_sequence_numbers.append(account_sequence_number.initialize()) - account_sequence_numbers.append(account_sequence_number) - await asyncio.gather(*await_account_sequence_numbers) + for (account, fund) in all_accounts: + sequence_number = await account_sequence_number.next_sequence_number( + block=False + ) + if sequence_number is None: + txn_hashes.extend(await asyncio.gather(*txns)) + txns = [] + sequence_number = await account_sequence_number.next_sequence_number() + amount = per_node_balance if fund else 0 + txn = await transfer_transaction( + rest_client, source, sequence_number, account, amount + ) + txns.append(rest_client.submit_bcs_transaction(txn)) + + txn_hashes.extend(await asyncio.gather(*txns)) + for txn_hash in txn_hashes: + await rest_client.wait_for_transaction(txn_hash) + await account_sequence_number.synchronize() - print(f"Accounts initialized at {time.time() - start} {time.time() - last}") + print(f"Funded all accounts at {time.time() - start} {time.time() - last}") last = time.time() - txn_hashes = [] - for _ in range(first_pass): - for idx in range(num_accounts): - sender = accounts[idx] - recipient = recipient_accounts[idx].address() - sequence_number = await account_sequence_numbers[idx].next_sequence_number() - txn_hash = transfer(rest_client, sender, recipient, sequence_number, 1) - txn_hashes.append(txn_hash) - txn_hashes = await asyncio.gather(*txn_hashes) + balances = [] + for account in accounts: + balances.append(rest_client.account_balance(account.address())) + await asyncio.gather(*balances) - print(f"Transactions submitted at {time.time() - start} {time.time() - last}") + print(f"Accounts checked at {time.time() - start} {time.time() - last}") last = time.time() - wait_for = [] - for txn_hash in txn_hashes: - wait_for.append(account_sequence_number.synchronize()) - await asyncio.gather(*wait_for) + workers = [] + for (account, recipient) in zip(accounts, recipients): + workers.append(WorkerContainer(NODE_URL, account, recipient.address())) + workers[-1].start() - print(f"Transactions committed at {time.time() - start} {time.time() - last}") - last = time.time() + for worker in workers: + worker.get() - await rest_client.close() + print(f"Workers started at {time.time() - start} {time.time() - last}") + last = time.time() + to_take = (transactions // num_accounts) + ( + 1 if transactions % num_accounts != 0 else 0 + ) + remaining_transactions = transactions + for worker in workers: + taking = min(to_take, remaining_transactions) + remaining_transactions -= taking + worker.put(taking) -class AccountSequenceNumber: - """ - A managed wrapper around sequence numbers that implements the trivial flow control used by the - Aptos faucet: - * Submit up to 50 transactions per account in parallel with a timeout of 20 seconds - * If local assumes 50 are in flight, determine the actual committed state from the network - * If there are less than 50 due to some being committed, adjust the window - * If 50 are in flight Wait .1 seconds before re-evaluating - * If ever waiting more than 30 seconds restart the sequence number to the current on-chain state - - Assumptions: - * Accounts are expected to be managed by a single AccountSequenceNumber and not used otherwise. - * They are initialized to the current on-chain state, so if there are already transactions in flight, they make take some time to reset. - * Accounts are automatically initialized if not explicitly - * - """ + for worker in workers: + worker.get() - client: RestClient - account: AccountAddress - last_committed_number: int - current_number: int - maximum_in_flight: int = 50 - lock = asyncio.Lock - sleep_time = 0.01 - maximum_wait_time = 30 - - def __init__(self, client: RestClient, account: AccountAddress): - self.client = client - self.account = account - self.last_uncommitted_number = None - self.current_number = None - self.lock = asyncio.Lock() - - async def next_sequence_number(self) -> int: - await self.lock.acquire() - try: - if self.last_uncommitted_number is None or self.current_number is None: - await self.initialize() - - if ( - self.current_number - self.last_uncommitted_number - >= self.maximum_in_flight - ): - await self.__update() - - start_time = time.time() - while ( - self.last_uncommitted_number - self.current_number - >= self.maximum_in_flight - ): - asyncio.sleep(self.sleep_time) - if time.time() - start_time > self.maximum_wait_time: - logging.warn( - f"Waited over 30 seconds for a transaction to commit, resyncing {self.account.address()}" - ) - await self.__initialize() - else: - await self.__update() - - next_number = self.current_number - self.current_number += 1 - finally: - self.lock.release() - - return next_number - - async def initialize(self): - self.current_number = await self.__current_sequence_number() - self.last_uncommitted_number = self.current_number - - async def synchronize(self): - if self.last_uncommitted_number == self.current_number: - return + print(f"Transactions submitted at {time.time() - start} {time.time() - last}") + last = time.time() - await self.__update() - start_time = time.time() - while self.last_uncommitted_number != self.current_number: - if time.time() - start_time > self.maximum_wait_time: - logging.warn( - f"Waited over 30 seconds for a transaction to commit, resyncing {self.account.address()}" - ) - await self.__initialize() - else: - await asyncio.sleep(self.sleep_time) - await self.__update() + for worker in workers: + worker.put(True) - async def __update(self): - self.last_uncommitted_number = await self.__current_sequence_number() - return self.last_uncommitted_number + for worker in workers: + worker.get() - async def __current_sequence_number(self) -> int: - return await self.client.account_sequence_number(self.account) + print(f"Transactions processed at {time.time() - start} {time.time() - last}") + last = time.time() + for worker in workers: + worker.put(True) -async def transfer( - client: RestClient, - sender: Account, - recipient: AccountAddress, - sequence_number: int, - amount: int, -): - transaction_arguments = [ - TransactionArgument(recipient, Serializer.struct), - TransactionArgument(amount, Serializer.u64), - ] - payload = EntryFunction.natural( - "0x1::coin", - "transfer", - [TypeTag(StructTag.from_str("0x1::aptos_coin::AptosCoin"))], - transaction_arguments, - ) + for worker in workers: + worker.get() - raw_transaction = RawTransaction( - sender.address(), - sequence_number, - TransactionPayload(payload), - client.client_config.max_gas_amount, - client.client_config.gas_unit_price, - int(time.time()) + client.client_config.expiration_ttl, - await client.chain_id(), - ) + print(f"Transactions verified at {time.time() - start} {time.time() - last}") + last = time.time() - signature = sender.sign(raw_transaction.keyed()) - authenticator = Authenticator(Ed25519Authenticator(sender.public_key(), signature)) - signed_transaction = SignedTransaction(raw_transaction, authenticator) - return await client.submit_bcs_transaction(signed_transaction) + await rest_client.close() if __name__ == "__main__":