From 46ae683658bbd5e4f2a56330d8595d0cdd8fb6cb Mon Sep 17 00:00:00 2001 From: maayan Date: Mon, 17 Jul 2023 13:23:02 -0400 Subject: [PATCH] address comments --- .../typescript/transactions_management.ts | 121 ++++++++++----- .../transactions/account_sequence_number.ts | 81 +++++++--- .../tests/account_sequence_number.test.ts | 2 +- .../tests/transaction_worker.test.ts | 4 +- .../src/transactions/transaction_worker.ts | 144 ++++++++++++------ 5 files changed, 248 insertions(+), 104 deletions(-) diff --git a/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts b/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts index f36ce0772742eb..2f7d31190f8b87 100644 --- a/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts +++ b/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts @@ -1,5 +1,20 @@ /** * This example demonstrates how a client can utilize the TransactionWorker class. + * + * The TransactionWorker provides a simple framework for receiving payloads to be processed. It + * acquires an account new sequence number, produces a signed transaction and + * then submits the transaction. In other tasks, it waits for resolution of the submission + * process or get pre-execution validation error and waits for the resolution of the execution process + * or get an execution validation error. + * + * The TransactionWorker constructor accepts + * @param provider - a client provider + * @param sender - the sender account: AptosAccount + * @param maxWaitTime - the max wait time to wait before restarting the local sequence number to the current on-chain state + * @param maximumInFlight - submit up to `maximumInFlight` transactions per account + * @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating + * + * Read more about it here {@link https://aptos.dev/guides/transaction-management} */ import { AptosAccount, BCS, TxnBuilderTypes, TransactionWorker, FaucetClient, Provider, Types } from "aptos"; @@ -15,8 +30,9 @@ async function main() { const transactionsCount = 100; const totalTransactions = accountsCount * transactionsCount; + const start = Date.now() / 1000; // current time in seconds + console.log("starting..."); - console.log(new Date().toTimeString()); // create senders and recipients accounts const senders: AptosAccount[] = []; const recipients: AptosAccount[] = []; @@ -24,32 +40,32 @@ async function main() { senders.push(new AptosAccount()); recipients.push(new AptosAccount()); } - console.log(`${senders.length + recipients.length} sender and recipient accounts created`); + let last = Date.now() / 1000; + console.log( + `${senders.length} sender accounts and ${recipients.length} recipient accounts created in ${last - start} seconds`, + ); - // funds sender accounts + // fund sender accounts const funds: Array> = []; for (let i = 0; i < senders.length; i++) { funds.push(faucet.fundAccount(senders[i].address().noPrefix(), 10000000000)); } - // send requests await Promise.all(funds); - console.log(`${funds.length} sender accounts funded`); - for (const acc in senders) { - const curr = senders[acc] as AptosAccount; - console.log(curr.address().hex()); - } + + last = Date.now() / 1000; + console.log(`${funds.length} sender accounts funded in ${last - start} seconds`); // read sender accounts const balances: Array> = []; for (let i = 0; i < senders.length; i++) { balances.push(provider.getAccount(senders[i].address().hex())); } - // send requests await Promise.all(balances); - console.log(`${balances.length} sender account balances checked`); + last = Date.now() / 1000; + console.log(`${balances.length} sender account balances checked in ${last - start} seconds`); // create transactions const payloads: any[] = []; @@ -69,37 +85,72 @@ async function main() { } } - const batchTransactions = (payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) => { - const transactionWorker = new TransactionWorker(provider, sender); - const waitFor: Array> = []; + console.log(`sends ${totalTransactions * senders.length} transactions to chain....`); + // emit batch transactions + for (let i = 0; i < senders.length; i++) { + batchTransactions(payloads, senders[i]); + } + + function batchTransactions(payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) { + const transactionWorker = new TransactionWorker(provider, sender, 30, 100, 10); transactionWorker.start(); - transactionWorker.on("transactionsFulfilled", async (data) => { + registerToWorkerEvents(transactionWorker); + + // push transactions to worker queue + for (const payload in payloads) { + transactionWorker.push(payloads[payload]); + } + } + + function registerToWorkerEvents(transactionWorker: TransactionWorker) { + /** + * The callback from an event listener, i.e `data`, is an array with 2 elements + * data[0] - the amount of processed transactions + * data[1] - + * on a success event, is the hash value of the processed transaction + * on a failure event, is the reason for the failure + */ + transactionWorker.on("transactionSent", async (data) => { + // all expected transactions have been sent + if (data[0] === totalTransactions) { + last = Date.now() / 1000; + console.log(`transactions sent in ${last - start} seconds`); + } + }); + + transactionWorker.on("sentFailed", async (data) => { /** - * data is an array with 2 elements - * data[0] = the amount of processed transactions - * data[1] = the hash value of the processed transaction + * transaction sent failed, up to the user to decide next steps. + * whether to stop the worker by transactionWorker.stop() and handle + * the error, or simply return the error to the end user. + * At this point, we have the failed transaction queue number + * and the transaction failure reason */ - waitFor.push(provider.waitForTransaction(data[1], { checkSuccess: true })); - // all expected transactions have been fulfilled + console.log("sentFailed", data); + }); + + transactionWorker.on("transactionExecuted", async (data) => { + // all expected transactions have been executed + console.log(data); if (data[0] === totalTransactions) { - await Promise.all(waitFor); - console.log("transactions submitted"); - console.log(new Date().toTimeString()); + last = Date.now() / 1000; + console.log(`transactions executed in ${last - start} seconds`); await checkAccounts(); } }); - // push transactions to queue - for (const payload in payloads) { - transactionWorker.push(payloads[payload]); - } - }; - - // emit batch transactions - for (let i = 0; i < senders.length; i++) { - batchTransactions(payloads, senders[i]); + transactionWorker.on("executionFailed", async (data) => { + /** + * transaction execution failed, up to the user to decide next steps. + * whether to stop the worker by transactionWorker.stop() and handle + * the error, or simply return the error to the end user. + * At this point, we have the failed transaction queue number + * and the transaction object data + */ + console.log("executionFailed", data); + }); } // check for account's sequence numbers @@ -110,8 +161,8 @@ async function main() { } const res = await Promise.all(waitFor); - console.log(`transactions verified`); - console.log(new Date().toTimeString()); + last = Date.now() / 1000; + console.log(`transactions verified in ${last - start} seconds`); for (const account in res) { const currentAccount = res[account] as Types.AccountData; console.log( @@ -119,7 +170,7 @@ async function main() { ); } // exit for testing porpuses - this would stop the process. most cases we have all transactions - // commited, but in some rare cases we might be stopping it before last couple of transactions commited. + // commited, but in some rare cases we might be stopping it before last couple of transactions have commited. exit(0); }; } diff --git a/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts b/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts index f039b710adfdae..249626acb67b74 100644 --- a/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts +++ b/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts @@ -1,8 +1,33 @@ +/** + * A wrapper that handles and manages an account sequence number. + * + * Submit up to `maximumInFlight` transactions per account in parallel with a timeout of `sleepTime` + * If local assumes `maximumInFlight` are in flight, determine the actual committed state from the network + * If there are less than `maximumInFlight` due to some being committed, adjust the window + * If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating + * If ever waiting more than `maxWaitTime` restart the sequence number to the current on-chain state + * + * Assumptions: + * Accounts are expected to be managed by a single AccountSequenceNumber and not used otherwise. + * They are initialized to the current on-chain state, so if there are already transactions in + * flight, they may take some time to reset. + * Accounts are automatically initialized if not explicitly + * + * Notes: + * This is co-routine safe, that is many async tasks can be reading from this concurrently. + * The state of an account cannot be used across multiple AccountSequenceNumber services. + * The synchronize method will create a barrier that prevents additional nextSequenceNumber + * calls until it is complete. + * This only manages the distribution of sequence numbers it does not help handle transaction + * failures. + * If a transaction fails, you should call synchronize and wait for timeouts. + */ + import { AptosAccount } from "../account"; -import { Uint64 } from "../bcs"; import { Provider } from "../providers"; import { sleep } from "../utils"; +// returns `now` time in seconds const now = () => Math.floor(Date.now() / 1000); export class AccountSequenceNumber { @@ -11,38 +36,50 @@ export class AccountSequenceNumber { readonly account: AptosAccount; // sequence number on chain - lastUncommintedNumber: Uint64 | null = null; + lastUncommintedNumber: bigint | null = null; // local sequence number - currentNumber: Uint64 | null = null; + currentNumber: bigint | null = null; + /** + * We want to guarantee that we preserve ordering of workers to requests. + * + * `lock` is used to try to prevent multiple coroutines from accessing a shared resource at the same time, + * which can result in race conditions and data inconsistency. + * This code actually doesn't do it though, since we aren't giving out a slot, it is still somewhat a race condition. + * + * The ideal solution is likely that each thread grabs the next number from a incremental integer. + * When they complete, they increment that number and that entity is able to enter the `lock`. + * That would guarantee ordering. + */ lock = false; - // up to 100 outstanding sequence numbers - maximumInFlight = 100; + maxWaitTime: number; - sleepTime = 10; + maximumInFlight: number; - maxWaitTime = 30; // in seconds + sleepTime: number; - constructor(provider: Provider, account: AptosAccount) { + constructor( + provider: Provider, + account: AptosAccount, + maxWaitTime: number, + maximumInFlight: number, + sleepTime: number, + ) { this.provider = provider; this.account = account; + this.maxWaitTime = maxWaitTime; + this.maximumInFlight = maximumInFlight; + this.sleepTime = sleepTime; } /** - * Returns the next available sequnce number on this account + * Returns the next available sequence number for this account * - * @returns next available sequnce number + * @returns next available sequence number */ async nextSequenceNumber(): Promise { - /* - `lock` is used to prevent multiple coroutines from accessing a shared resource at the same time, - which can result in race conditions and data inconsistency. - This implementation is not as robust as using a proper lock implementation - like `async-mutex` because it relies on busy waiting to acquire the lock, - which can be less efficient and may not work well in all scenarios - */ /* eslint-disable no-await-in-loop */ while (this.lock) { await sleep(this.sleepTime); @@ -73,7 +110,7 @@ export class AccountSequenceNumber { nextNumber = this.currentNumber!; this.currentNumber! += BigInt(1); } catch (e) { - console.error("error", e); + console.error("error in updating this account sequence number with the one on chain", e); } finally { this.lock = false; } @@ -81,7 +118,7 @@ export class AccountSequenceNumber { } /** - * Initializes this account with the sequnce number on chain + * Initializes this account with the sequence number on chain */ async initialize(): Promise { const { sequence_number: sequenceNumber } = await this.provider.getAccount(this.account.address()); @@ -90,9 +127,9 @@ export class AccountSequenceNumber { } /** - * Updates this account sequnce number with the one on-chain + * Updates this account sequence number with the one on-chain * - * @returns on-chain sequnce number for this account + * @returns on-chain sequence number for this account */ async update(): Promise { const { sequence_number: sequenceNumber } = await this.provider.getAccount(this.account.address()); @@ -127,7 +164,7 @@ export class AccountSequenceNumber { } } } catch (e) { - console.error("error", e); + console.error("error in updating this account sequence number with the one on chain", e); } finally { this.lock = false; } diff --git a/ecosystem/typescript/sdk/src/transactions/tests/account_sequence_number.test.ts b/ecosystem/typescript/sdk/src/transactions/tests/account_sequence_number.test.ts index 3c5193199dfcd3..cb89711c1fc948 100644 --- a/ecosystem/typescript/sdk/src/transactions/tests/account_sequence_number.test.ts +++ b/ecosystem/typescript/sdk/src/transactions/tests/account_sequence_number.test.ts @@ -6,7 +6,7 @@ import { getFaucetClient, longTestTimeout, PROVIDER_LOCAL_NETWORK_CONFIG } from const provider = new Provider(PROVIDER_LOCAL_NETWORK_CONFIG); const account = new AptosAccount(); const faucet = getFaucetClient(); -const accountSequenceNumber = new AccountSequenceNumber(provider, account); +const accountSequenceNumber = new AccountSequenceNumber(provider, account, 30, 100, 10); let getAccountSpy: jest.SpyInstance; let lastSeqNumber: bigint | null; diff --git a/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts b/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts index 8632ab0eef933a..fb2046678a5710 100644 --- a/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts +++ b/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts @@ -18,7 +18,7 @@ describe("transactionWorker", () => { }); test( - "index", + "submits 5 transactions to chain for a single account", (done) => { // Specify the number of assertions expected expect.assertions(1); @@ -35,7 +35,7 @@ describe("transactionWorker", () => { const payloads = [...Array(5).fill(txn)]; // start transactions worker - const transactionWorker = new TransactionWorker(provider, sender); + const transactionWorker = new TransactionWorker(provider, sender, 30, 100, 10); transactionWorker.start(); // push transactions to queue diff --git a/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts b/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts index 3d25fab7527ac7..ba741d0dcb448d 100644 --- a/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts +++ b/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts @@ -1,10 +1,29 @@ +/** + * Provides a simple framework for receiving payloads to be processed. + * + * Once one `start()` the process, the worker acquires the current account next sequence number + * (by using the AccountSequenceNumber class), generates a signed transaction and pushes an async + * submission process into a `outstandingTransactions` queue. + * At the same time, the worker processes transactions by reading the `outstandingTransactions` queue + * and submits the next transaction to chain, it + * 1) waits for resolution of the submission process or get pre-execution validation error + * and 2) waits for the resolution of the execution process or get an execution error. + * The worker fires events for any submission and/or execution success and/or failure. + */ + import EventEmitter from "eventemitter3"; import { AptosAccount } from "../account"; -import { PendingTransaction } from "../generated"; +import { PendingTransaction, Transaction } from "../generated"; import { AptosClient, Provider } from "../providers"; import { TxnBuilderTypes } from "../transaction_builder"; import { AccountSequenceNumber } from "./account_sequence_number"; +// Events +const transactionSent = "transactionSent"; +const sentFailed = "sentFailed"; + +const transactionExecuted = "transactionExecuted"; +const executionFailed = "executionFailed"; export class TransactionWorker extends EventEmitter { readonly provider: Provider; @@ -27,15 +46,33 @@ export class TransactionWorker extends EventEmitter { outstandingTransactions: Array<[Promise, bigint]> = []; // transactions that have been submitted to chain - processedTransactions: Array<[string, bigint, any]> = []; + sentTransactions: Array<[string, bigint, any]> = []; + + // transactions that have been committed to chain + executedTransactions: Array<[string, bigint, any]> = []; - constructor(provider: Provider, account: AptosAccount) { + /** + * Provides a simple framework for receiving payloads to be processed. + * + * @param provider - a client provider + * @param sender - a sender as AptosAccount + * @param maxWaitTime - the max wait time to wait before resyncing the sequence number to the current on-chain state + * @param maximumInFlight - submit up to `maximumInFlight` transactions per account + * @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating + */ + constructor( + provider: Provider, + account: AptosAccount, + maxWaitTime: number, + maximumInFlight: number, + sleepTime: number, + ) { super(); this.provider = provider; this.account = account; this.started = false; this.stopped = false; - this.accountSequnceNumber = new AccountSequenceNumber(provider, account); + this.accountSequnceNumber = new AccountSequenceNumber(provider, account, maxWaitTime, maximumInFlight, sleepTime); } /** @@ -44,18 +81,14 @@ export class TransactionWorker extends EventEmitter { * adds the transaction to the outstanding transaction queue * to be processed later. */ - async submitTransactions() { - try { - if (this.transactionsQueue.length === 0) return; - const sequenceNumber = await this.accountSequnceNumber.nextSequenceNumber(); - if (sequenceNumber === null) return; - const transaction = await this.generateNextTransaction(this.account, sequenceNumber); - if (!transaction) return; - const pendingTransaction = this.provider.submitSignedBCSTransaction(transaction); - this.outstandingTransactions.push([pendingTransaction, sequenceNumber]); - } catch (error: any) { - throw new Error(error); - } + async submitNextTransaction() { + if (this.transactionsQueue.length === 0) return; + const sequenceNumber = await this.accountSequnceNumber.nextSequenceNumber(); + if (sequenceNumber === null) return; + const transaction = await this.generateNextTransaction(this.account, sequenceNumber); + if (!transaction) return; + const pendingTransaction = this.provider.submitSignedBCSTransaction(transaction); + this.outstandingTransactions.push([pendingTransaction, sequenceNumber]); } /** @@ -68,35 +101,58 @@ export class TransactionWorker extends EventEmitter { * transactions queue with the failure reason and fires a transactionsFailed event. */ async processTransactions() { - try { - const awaitingTransactions = []; - const awaitingSequenceNumbers = []; + const awaitingTransactions = []; + const awaitingSequenceNumbers = []; + + while (this.outstandingTransactions.length > 0) { + const [pendingTransaction, sequenceNumber] = this.outstandingTransactions.shift()!; - while (this.outstandingTransactions.length > 0) { - const [pendingTransaction, sequenceNumber] = this.outstandingTransactions.shift()!; + awaitingTransactions.push(pendingTransaction); + awaitingSequenceNumbers.push(sequenceNumber); + } - awaitingTransactions.push(pendingTransaction); - awaitingSequenceNumbers.push(sequenceNumber); + // send awaiting transactions to chain + const sentTransactions = await Promise.allSettled(awaitingTransactions); + + for (let i = 0; i < sentTransactions.length && i < awaitingSequenceNumbers.length; i += 1) { + // check sent transaction status + const sentTransaction = sentTransactions[i]; + const sequenceNumber = awaitingSequenceNumbers[i]; + if (sentTransaction.status === "fulfilled") { + // transaction sent to chain + this.sentTransactions.push([sentTransaction.value.hash, sequenceNumber, null]); + this.emit(transactionSent, [this.sentTransactions.length, sentTransaction.value.hash]); + // check sent transaction execution + this.checkTransaction(sentTransaction, sequenceNumber); + } else { + // send transaction failed + this.sentTransactions.push([sentTransaction.status, sequenceNumber, sentTransaction.reason]); + this.emit(sentFailed, [this.sentTransactions.length, sentTransaction.reason]); } + } + } - try { - const outputs = await Promise.allSettled(awaitingTransactions); - for (let i = 0; i < outputs.length && i < awaitingSequenceNumbers.length; i += 1) { - const output = outputs[i]; - const sequenceNumber = awaitingSequenceNumbers[i]; - - if (output.status === "fulfilled") { - this.processedTransactions.push([output.value.hash, sequenceNumber, null]); - this.emit("transactionsFulfilled", [this.processedTransactions.length, output.value.hash]); - } else { - this.processedTransactions.push([output.status, sequenceNumber, output.reason]); - } - } - } catch (error: any) { - throw new Error(error); + /** + * Once transaction has been sent to chain, we check for its execution status. + * @param sentTransaction transactions that were sent to chain and are now waiting to be executed + * @param sequenceNumber the account's sequence number that was sent with the transaction + */ + async checkTransaction(sentTransaction: PromiseFulfilledResult, sequenceNumber: bigint) { + const waitFor: Array> = []; + waitFor.push(this.provider.waitForTransactionWithResult(sentTransaction.value.hash, { checkSuccess: true })); + const sentTransactions = await Promise.allSettled(waitFor); + + for (let i = 0; i < sentTransactions.length; i += 1) { + const executedTransaction = sentTransactions[i]; + if (executedTransaction.status === "fulfilled") { + // transaction executed to chain + this.executedTransactions.push([executedTransaction.value.hash, sequenceNumber, null]); + this.emit(transactionExecuted, [this.executedTransactions.length, executedTransaction.value.hash]); + } else { + // transaction execution failed + this.executedTransactions.push([executedTransaction.status, sequenceNumber, executedTransaction.reason]); + this.emit(executionFailed, [this.executedTransactions.length, executedTransaction.reason]); } - } catch (error: any) { - throw new Error(error); } } @@ -127,13 +183,13 @@ export class TransactionWorker extends EventEmitter { /** * Starts transaction submission and transaction processing. */ - async runTransactions() { + async run() { try { while (!this.stopped) { /* eslint-disable no-await-in-loop, no-promise-executor-return */ - await Promise.all([this.submitTransactions(), this.processTransactions()]); + await Promise.all([this.submitNextTransaction(), this.processTransactions()]); /** - * since runTransactions() function runs continuously in a loop, it prevents the execution + * since run() function runs continuously in a loop, it prevents the execution * from reaching a callback function (e.g when client wants to gracefuly stop the worker). * Add a small delay between iterations to allow other code to run /* eslint-disable no-await-in-loop */ @@ -153,7 +209,7 @@ export class TransactionWorker extends EventEmitter { } this.started = true; this.stopped = false; - this.runTransactions(); + this.run(); } /**