diff --git a/ecosystem/typescript/sdk/CHANGELOG.md b/ecosystem/typescript/sdk/CHANGELOG.md index be464da33930a..dcabe9a54d97f 100644 --- a/ecosystem/typescript/sdk/CHANGELOG.md +++ b/ecosystem/typescript/sdk/CHANGELOG.md @@ -4,6 +4,8 @@ All notable changes to the Aptos Node SDK will be captured in this file. This ch ## Unreleased +- Implementing `TransactionWorker` - a layer for managing and submitting as many transactions from a single account at once + ## 1.14.0 (2023-07-20) - Introduce and use `@aptos-labs/aptos-client` package to manage and handle the client used in the SDK diff --git a/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts b/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts new file mode 100644 index 0000000000000..7793fd76e70d3 --- /dev/null +++ b/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts @@ -0,0 +1,179 @@ +/** + * This example demonstrates how a client can utilize the TransactionWorker class. + * + * The TransactionWorker provides a simple framework for receiving payloads to be processed. It + * acquires an account new sequence number, produces a signed transaction and + * then submits the transaction. In other tasks, it waits for resolution of the submission + * process or get pre-execution validation error and waits for the resolution of the execution process + * or get an execution validation error. + * + * The TransactionWorker constructor accepts + * @param provider - a client provider + * @param sender - the sender account: AptosAccount + * @param maxWaitTime - the max wait time to wait before restarting the local sequence number to the current on-chain state + * @param maximumInFlight - submit up to `maximumInFlight` transactions per account + * @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating + * + * Read more about it here {@link https://aptos.dev/guides/transaction-management} + */ + +import { + AptosAccount, + BCS, + TxnBuilderTypes, + TransactionWorker, + TransactionWorkerEvents, + FaucetClient, + Provider, + Types, +} from "aptos"; +import { exit } from "process"; +import { NODE_URL, FAUCET_URL } from "./common"; + +const provider = new Provider({ fullnodeUrl: NODE_URL, indexerUrl: NODE_URL }); + +const faucet = new FaucetClient(NODE_URL, FAUCET_URL); + +async function main() { + const accountsCount = 5; + const transactionsCount = 100; + const totalTransactions = accountsCount * transactionsCount; + + const start = Date.now() / 1000; // current time in seconds + + console.log("starting..."); + // create senders and recipients accounts + const senders: AptosAccount[] = []; + const recipients: AptosAccount[] = []; + for (let i = 0; i < accountsCount; i++) { + senders.push(new AptosAccount()); + recipients.push(new AptosAccount()); + } + let last = Date.now() / 1000; + console.log( + `${senders.length} sender accounts and ${recipients.length} recipient accounts created in ${last - start} seconds`, + ); + + // fund sender accounts + const funds: Array> = []; + + for (let i = 0; i < senders.length; i++) { + funds.push(faucet.fundAccount(senders[i].address().noPrefix(), 10000000000)); + } + + await Promise.all(funds); + + console.log(`${funds.length} sender accounts funded in ${Date.now() / 1000 - last} seconds`); + last = Date.now() / 1000; + + // read sender accounts + const balances: Array> = []; + for (let i = 0; i < senders.length; i++) { + balances.push(provider.getAccount(senders[i].address().hex())); + } + await Promise.all(balances); + + console.log(`${balances.length} sender account balances checked in ${Date.now() / 1000 - last} seconds`); + last = Date.now() / 1000; + + // create transactions + const payloads: any[] = []; + // 100 transactions + for (let j = 0; j < transactionsCount; j++) { + // 5 recipients + for (let i = 0; i < recipients.length; i++) { + const txn = new TxnBuilderTypes.TransactionPayloadEntryFunction( + TxnBuilderTypes.EntryFunction.natural( + "0x1::aptos_account", + "transfer", + [], + [BCS.bcsToBytes(TxnBuilderTypes.AccountAddress.fromHex(recipients[i].address())), BCS.bcsSerializeUint64(5)], + ), + ); + payloads.push(txn); + } + } + + console.log(`sends ${totalTransactions} transactions to chain....`); + // emit batch transactions + const promises = senders.map((sender) => batchTransactions(payloads, sender)); + await Promise.all(promises); + + async function batchTransactions(payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) { + const transactionWorker = new TransactionWorker(provider, sender); + + transactionWorker.start(); + + registerToWorkerEvents(transactionWorker); + + // push transactions to worker queue + for (const payload in payloads) { + await transactionWorker.push(payloads[payload]); + } + } + + function registerToWorkerEvents(transactionWorker: TransactionWorker) { + /** + * The callback from an event listener, i.e `data`, is an array with 2 elements + * data[0] - the amount of processed transactions + * data[1] - + * on a success event, is the hash value of the processed transaction + * on a failure event, is the reason for the failure + */ + transactionWorker.on(TransactionWorkerEvents.TransactionSent, async (data) => { + // all expected transactions have been sent + if (data[0] === totalTransactions) { + console.log(`transactions sent in ${Date.now() / 1000 - last} seconds`); + } + }); + + transactionWorker.on(TransactionWorkerEvents.TransactionSendFailed, async (data) => { + /** + * transaction sent failed, up to the user to decide next steps. + * whether to stop the worker by transactionWorker.stop() and handle + * the error, or simply return the error to the end user. + * At this point, we have the failed transaction queue number + * and the transaction failure reason + */ + console.log("sentFailed", data); + }); + + transactionWorker.on(TransactionWorkerEvents.TransactionExecuted, async (data) => { + // all expected transactions have been executed + if (data[0] === totalTransactions) { + console.log(`transactions executed in ${Date.now() / 1000 - last} seconds`); + await checkAccounts(); + } + }); + + transactionWorker.on(TransactionWorkerEvents.TransactionExecutionFailed, async (data) => { + /** + * transaction execution failed, up to the user to decide next steps. + * whether to stop the worker by transactionWorker.stop() and handle + * the error, or simply return the error to the end user. + * At this point, we have the failed transaction queue number + * and the transaction object data + */ + console.log("executionFailed", data); + }); + } + + // check for account's sequence numbers + async function checkAccounts(): Promise { + const waitFor: Array> = []; + for (let i = 0; i < senders.length; i++) { + waitFor.push(provider.getAccount(senders[i].address())); + } + const res = await Promise.all(waitFor); + console.log(`transactions verified in ${Date.now() / 1000 - last} seconds`); + for (const account in res) { + const currentAccount = res[account] as Types.AccountData; + console.log( + `sender account ${currentAccount.authentication_key} final sequence number is ${currentAccount.sequence_number}`, + ); + } + exit(0); + } +} + +main(); diff --git a/ecosystem/typescript/sdk/package.json b/ecosystem/typescript/sdk/package.json index 4104fefd1334d..212510b145934 100644 --- a/ecosystem/typescript/sdk/package.json +++ b/ecosystem/typescript/sdk/package.json @@ -52,6 +52,7 @@ ], "dependencies": { "@aptos-labs/aptos-client": "^0.0.2", + "eventemitter3": "^5.0.1", "@noble/hashes": "1.1.3", "@scure/bip39": "1.1.0", "form-data": "4.0.0", diff --git a/ecosystem/typescript/sdk/pnpm-lock.yaml b/ecosystem/typescript/sdk/pnpm-lock.yaml index c87be4b819e31..496f63f385002 100644 --- a/ecosystem/typescript/sdk/pnpm-lock.yaml +++ b/ecosystem/typescript/sdk/pnpm-lock.yaml @@ -10,6 +10,9 @@ dependencies: '@scure/bip39': specifier: 1.1.0 version: 1.1.0 + eventemitter3: + specifier: ^5.0.1 + version: 5.0.1 form-data: specifier: 4.0.0 version: 4.0.0 @@ -3729,6 +3732,10 @@ packages: engines: {node: '>=0.10.0'} dev: true + /eventemitter3@5.0.1: + resolution: {integrity: sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==} + dev: false + /execa@5.1.1: resolution: {integrity: sha512-8uSpZZocAZRBAPIEINJj3Lo9HyGitllczc27Eh5YYojjMFMn8yHMDMaUHE2Jqfq05D/wucwI4JGURyXt1vchyg==} engines: {node: '>=10'} diff --git a/ecosystem/typescript/sdk/src/index.ts b/ecosystem/typescript/sdk/src/index.ts index c9a7bb67c1597..c584644499d3e 100644 --- a/ecosystem/typescript/sdk/src/index.ts +++ b/ecosystem/typescript/sdk/src/index.ts @@ -7,6 +7,7 @@ export * as BCS from "./bcs"; export * from "./utils/hex_string"; export * from "./plugins"; export * from "./transaction_builder"; +export * from "./transactions"; export * as TokenTypes from "./aptos_types/token_types"; export * as Types from "./generated/index"; export * from "./client"; diff --git a/ecosystem/typescript/sdk/src/providers/aptos_client.ts b/ecosystem/typescript/sdk/src/providers/aptos_client.ts index f824f02be16bb..0d079ba0a3650 100644 --- a/ecosystem/typescript/sdk/src/providers/aptos_client.ts +++ b/ecosystem/typescript/sdk/src/providers/aptos_client.ts @@ -42,6 +42,7 @@ export interface OptionalTransactionArgs { maxGasAmount?: Uint64; gasUnitPrice?: Uint64; expireTimestamp?: Uint64; + providedSequenceNumber?: string | bigint; } interface PaginationArgs { @@ -758,7 +759,9 @@ export class AptosClient { extraArgs?: OptionalTransactionArgs, ): Promise { const [{ sequence_number: sequenceNumber }, chainId, { gas_estimate: gasEstimate }] = await Promise.all([ - this.getAccount(accountFrom), + extraArgs?.providedSequenceNumber + ? Promise.resolve({ sequence_number: extraArgs.providedSequenceNumber }) + : this.getAccount(accountFrom), this.getChainId(), extraArgs?.gasUnitPrice ? Promise.resolve({ gas_estimate: extraArgs.gasUnitPrice }) : this.estimateGasPrice(), ]); diff --git a/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts b/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts new file mode 100644 index 0000000000000..0b0644c32a70d --- /dev/null +++ b/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts @@ -0,0 +1,175 @@ +/** + * A wrapper that handles and manages an account sequence number. + * + * Submit up to `maximumInFlight` transactions per account in parallel with a timeout of `sleepTime` + * If local assumes `maximumInFlight` are in flight, determine the actual committed state from the network + * If there are less than `maximumInFlight` due to some being committed, adjust the window + * If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating + * If ever waiting more than `maxWaitTime` restart the sequence number to the current on-chain state + * + * Assumptions: + * Accounts are expected to be managed by a single AccountSequenceNumber and not used otherwise. + * They are initialized to the current on-chain state, so if there are already transactions in + * flight, they may take some time to reset. + * Accounts are automatically initialized if not explicitly + * + * Notes: + * This is co-routine safe, that is many async tasks can be reading from this concurrently. + * The state of an account cannot be used across multiple AccountSequenceNumber services. + * The synchronize method will create a barrier that prevents additional nextSequenceNumber + * calls until it is complete. + * This only manages the distribution of sequence numbers it does not help handle transaction + * failures. + * If a transaction fails, you should call synchronize and wait for timeouts. + */ + +import { AptosAccount } from "../account"; +import { Provider } from "../providers"; +import { sleep } from "../utils"; + +// returns `now` time in seconds +const now = () => Math.floor(Date.now() / 1000); + +export class AccountSequenceNumber { + readonly provider: Provider; + + readonly account: AptosAccount; + + // sequence number on chain + lastUncommintedNumber: bigint | null = null; + + // local sequence number + currentNumber: bigint | null = null; + + /** + * We want to guarantee that we preserve ordering of workers to requests. + * + * `lock` is used to try to prevent multiple coroutines from accessing a shared resource at the same time, + * which can result in race conditions and data inconsistency. + * This code actually doesn't do it though, since we aren't giving out a slot, it is still somewhat a race condition. + * + * The ideal solution is likely that each thread grabs the next number from a incremental integer. + * When they complete, they increment that number and that entity is able to enter the `lock`. + * That would guarantee ordering. + */ + lock = false; + + maxWaitTime: number; + + maximumInFlight: number; + + sleepTime: number; + + constructor( + provider: Provider, + account: AptosAccount, + maxWaitTime: number, + maximumInFlight: number, + sleepTime: number, + ) { + this.provider = provider; + this.account = account; + this.maxWaitTime = maxWaitTime; + this.maximumInFlight = maximumInFlight; + this.sleepTime = sleepTime; + } + + /** + * Returns the next available sequence number for this account + * + * @returns next available sequence number + */ + async nextSequenceNumber(): Promise { + /* eslint-disable no-await-in-loop */ + while (this.lock) { + await sleep(this.sleepTime); + } + + this.lock = true; + let nextNumber = BigInt(0); + try { + if (this.lastUncommintedNumber === null || this.currentNumber === null) { + await this.initialize(); + } + + if (this.currentNumber! - this.lastUncommintedNumber! >= this.maximumInFlight) { + await this.update(); + + const startTime = now(); + while (this.currentNumber! - this.lastUncommintedNumber! >= this.maximumInFlight) { + await sleep(this.sleepTime); + if (now() - startTime > this.maxWaitTime) { + /* eslint-disable no-console */ + console.warn(`Waited over 30 seconds for a transaction to commit, resyncing ${this.account.address()}`); + await this.initialize(); + } else { + await this.update(); + } + } + } + nextNumber = this.currentNumber!; + this.currentNumber! += BigInt(1); + } catch (e) { + console.error("error in getting next sequence number for this account", e); + } finally { + this.lock = false; + } + return nextNumber; + } + + /** + * Initializes this account with the sequence number on chain + */ + async initialize(): Promise { + const { sequence_number: sequenceNumber } = await this.provider.getAccount(this.account.address()); + this.currentNumber = BigInt(sequenceNumber); + this.lastUncommintedNumber = BigInt(sequenceNumber); + } + + /** + * Updates this account sequence number with the one on-chain + * + * @returns on-chain sequence number for this account + */ + async update(): Promise { + const { sequence_number: sequenceNumber } = await this.provider.getAccount(this.account.address()); + this.lastUncommintedNumber = BigInt(sequenceNumber); + return this.lastUncommintedNumber; + } + + /** + * Synchronizes local sequence number with the seqeunce number on chain for this account. + * + * Poll the network until all submitted transactions have either been committed or until + * the maximum wait time has elapsed + */ + async synchronize(): Promise { + if (this.lastUncommintedNumber === this.currentNumber) return; + + /* eslint-disable no-await-in-loop */ + while (this.lock) { + await sleep(this.sleepTime); + } + + this.lock = true; + + try { + await this.update(); + const startTime = now(); + while (this.lastUncommintedNumber !== this.currentNumber) { + if (now() - startTime > this.maxWaitTime) { + /* eslint-disable no-console */ + console.warn(`Waited over 30 seconds for a transaction to commit, resyncing ${this.account.address()}`); + await this.initialize(); + } else { + await sleep(this.sleepTime); + await this.update(); + } + } + } catch (e) { + console.error("error in synchronizing this account sequence number with the one on chain", e); + } finally { + this.lock = false; + } + } +} diff --git a/ecosystem/typescript/sdk/src/transactions/async_queue.ts b/ecosystem/typescript/sdk/src/transactions/async_queue.ts new file mode 100644 index 0000000000000..1800438277f10 --- /dev/null +++ b/ecosystem/typescript/sdk/src/transactions/async_queue.ts @@ -0,0 +1,96 @@ +/** + * The AsyncQueue class is an async-aware data structure that provides a queue-like + * behavior for managing asynchronous tasks or operations. + * It allows to enqueue items and dequeue them asynchronously. + * This is not thread-safe but it is async concurrency safe and + * it does not guarantee ordering for those that call into and await on enqueue. + */ + +export class AsyncQueue { + readonly queue: T[] = []; + + // The resolveMap is used to handle the resolution of promises when items are enqueued and dequeued. + private resolveMap: Map void> = new Map(); + + private counter: number = 0; + + private cancelled: boolean = false; + + /** + * The enqueue method adds an item to the queue. If there are pending dequeued promises, + * in the resolveMap, it resolves the oldest promise with the enqueued item immediately. + * Otherwise, it adds the item to the queue. + * + * @param item T + */ + enqueue(item: T): void { + if (this.resolveMap.size > 0) { + const resolve = this.resolveMap.get(0); + if (resolve) { + this.resolveMap.delete(0); + resolve(item); + return; + } + } + this.queue.push(item); + } + + /** + * The dequeue method returns a promise that resolves to the next item in the queue. + * If the queue is not empty, it resolves the promise immediately with the next item. + * Otherwise, it creates a new promise. The promise's resolve function is stored + * in the resolveMap with a unique counter value as the key. + * The newly created promise is then returned, and it will be resolved later when an item is enqueued. + * + * @returns Promise + */ + async dequeue(): Promise { + if (this.queue.length > 0) { + return Promise.resolve(this.queue.shift()!); + } + const promise = new Promise((resolve) => { + this.counter += 1; + this.resolveMap.set(this.counter, resolve); + }); + return promise; + } + + /** + * The isEmpty method returns whether the queue is empty or not. + * + * @returns boolean + */ + isEmpty(): boolean { + return this.queue.length === 0; + } + + /** + * The cancel method cancels all pending promises in the queue. + * It rejects the promises with a AsyncQueueCancelledError error, + * ensuring that any awaiting code can handle the cancellation appropriately. + */ + cancel(): void { + this.cancelled = true; + this.resolveMap.forEach(async (resolve) => { + resolve(await Promise.reject(new AsyncQueueCancelledError("Task cancelled"))); + }); + this.resolveMap.clear(); + this.queue.length = 0; + } + + /** + * The isCancelled method returns whether the queue is cancelled or not. + * + * @returns boolean + */ + isCancelled(): boolean { + return this.cancelled; + } +} + +export class AsyncQueueCancelledError extends Error { + /* eslint-disable @typescript-eslint/no-useless-constructor */ + constructor(message: string) { + super(message); + } +} diff --git a/ecosystem/typescript/sdk/src/transactions/index.ts b/ecosystem/typescript/sdk/src/transactions/index.ts new file mode 100644 index 0000000000000..18817e38f1943 --- /dev/null +++ b/ecosystem/typescript/sdk/src/transactions/index.ts @@ -0,0 +1,2 @@ +export * from "./account_sequence_number"; +export * from "./transaction_worker"; diff --git a/ecosystem/typescript/sdk/src/transactions/tests/account_sequence_number.test.ts b/ecosystem/typescript/sdk/src/transactions/tests/account_sequence_number.test.ts new file mode 100644 index 0000000000000..cb89711c1fc94 --- /dev/null +++ b/ecosystem/typescript/sdk/src/transactions/tests/account_sequence_number.test.ts @@ -0,0 +1,86 @@ +import { AptosAccount } from "../../account"; +import { Provider } from "../../providers"; +import { AccountSequenceNumber } from "../account_sequence_number"; +import { getFaucetClient, longTestTimeout, PROVIDER_LOCAL_NETWORK_CONFIG } from "../../tests/unit/test_helper.test"; + +const provider = new Provider(PROVIDER_LOCAL_NETWORK_CONFIG); +const account = new AptosAccount(); +const faucet = getFaucetClient(); +const accountSequenceNumber = new AccountSequenceNumber(provider, account, 30, 100, 10); +let getAccountSpy: jest.SpyInstance; + +let lastSeqNumber: bigint | null; + +describe("account sequence number", () => { + beforeAll(async () => { + await faucet.fundAccount(account.address(), 1000000); + }, longTestTimeout); + + beforeEach(() => { + getAccountSpy = jest.spyOn(provider, "getAccount"); + }); + + afterEach(() => { + getAccountSpy.mockRestore(); + }); + + it("initializes with correct sequence number", async () => { + await accountSequenceNumber.initialize(); + expect(accountSequenceNumber.currentNumber).toEqual(BigInt(0)); + expect(accountSequenceNumber.lastUncommintedNumber).toEqual(BigInt(0)); + }); + + it("updates with correct sequence number", async () => { + const seqNum = "2"; + getAccountSpy.mockResolvedValue({ + sequence_number: seqNum, + authentication_key: account.authKey().hex(), + }); + await accountSequenceNumber.update(); + expect(accountSequenceNumber.lastUncommintedNumber).toEqual(BigInt(parseInt(seqNum))); + }); + + it( + "returns sequential number starting from 0", + async () => { + getAccountSpy.mockResolvedValue({ + sequence_number: "0", + authentication_key: account.authKey().hex(), + }); + for (let seqNum = 0; seqNum < 5; seqNum++) { + lastSeqNumber = await accountSequenceNumber.nextSequenceNumber(); + expect(lastSeqNumber).toEqual(BigInt(seqNum)); + } + }, + longTestTimeout, + ); + + it( + "includes updated on-chain sequnce number in local sequence number", + async () => { + const previousSeqNum = "5"; + getAccountSpy.mockResolvedValue({ + sequence_number: previousSeqNum, + authentication_key: account.authKey().hex(), + }); + for (let seqNum = 0; seqNum < accountSequenceNumber.maximumInFlight; seqNum++) { + lastSeqNumber = await accountSequenceNumber.nextSequenceNumber(); + expect(lastSeqNumber).toEqual(BigInt(seqNum + parseInt(previousSeqNum))); + } + }, + longTestTimeout, + ); + + it("synchronize completes when local and on-chain sequnec number equal", async () => { + const nextSequenceNumber = lastSeqNumber! + BigInt(1); + + getAccountSpy.mockResolvedValue({ + sequence_number: nextSequenceNumber + "", + authentication_key: account.authKey().hex(), + }); + + expect(accountSequenceNumber.currentNumber).not.toEqual(lastSeqNumber); + await accountSequenceNumber.synchronize(); + expect(accountSequenceNumber.currentNumber).toEqual(nextSequenceNumber); + }); +}); diff --git a/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts b/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts new file mode 100644 index 0000000000000..d333038d03ed7 --- /dev/null +++ b/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts @@ -0,0 +1,98 @@ +import { AptosAccount } from "../../account"; +import { bcsToBytes, bcsSerializeUint64 } from "../../bcs"; +import { Provider } from "../../providers"; +import { TxnBuilderTypes } from "../../transaction_builder"; +import { getFaucetClient, longTestTimeout, PROVIDER_LOCAL_NETWORK_CONFIG } from "../../tests/unit/test_helper.test"; +import { TransactionWorker, TransactionWorkerEvents } from "../transaction_worker"; + +const provider = new Provider(PROVIDER_LOCAL_NETWORK_CONFIG); + +const sender = new AptosAccount(); +const recipient = new AptosAccount(); + +const faucet = getFaucetClient(); + +describe("transactionWorker", () => { + beforeAll(async () => { + await faucet.fundAccount(sender.address(), 1000000000); + }); + + test("throws when starting an already started worker", async () => { + // start transactions worker + const transactionWorker = new TransactionWorker(provider, sender); + transactionWorker.start(); + expect(async () => { + transactionWorker.start(); + }).rejects.toThrow(`worker has already started`); + }); + + test("throws when stopping an already stopped worker", async () => { + // start transactions worker + const transactionWorker = new TransactionWorker(provider, sender); + transactionWorker.start(); + transactionWorker.stop(); + expect(async () => { + transactionWorker.stop(); + }).rejects.toThrow(`worker has already stopped`); + }); + + test( + "adds transaction into the transactionsQueue", + async () => { + const transactionWorker = new TransactionWorker(provider, sender); + transactionWorker.start(); + const txn = new TxnBuilderTypes.TransactionPayloadEntryFunction( + TxnBuilderTypes.EntryFunction.natural( + "0x1::aptos_account", + "transfer", + [], + [bcsToBytes(TxnBuilderTypes.AccountAddress.fromHex(recipient.address())), bcsSerializeUint64(5)], + ), + ); + transactionWorker.push(txn).then(() => { + transactionWorker.stop(); + expect(transactionWorker.transactionsQueue.queue).toHaveLength(1); + }); + }, + longTestTimeout, + ); + + test( + "submits 5 transactions to chain for a single account", + (done) => { + // Specify the number of assertions expected + expect.assertions(1); + + // create 5 transactions + const txn = new TxnBuilderTypes.TransactionPayloadEntryFunction( + TxnBuilderTypes.EntryFunction.natural( + "0x1::aptos_account", + "transfer", + [], + [bcsToBytes(TxnBuilderTypes.AccountAddress.fromHex(recipient.address())), bcsSerializeUint64(5)], + ), + ); + const payloads = [...Array(5).fill(txn)]; + + // start transactions worker + const transactionWorker = new TransactionWorker(provider, sender); + transactionWorker.start(); + + // push transactions to queue + for (const payload in payloads) { + transactionWorker.push(payloads[payload]); + } + + // stop transaction worker for testing purposes. + setTimeout(async () => { + transactionWorker.stop(); + const accountData = await provider.getAccount(sender.address()); + // call done() when all asynchronous operations are finished + done(); + // expect sender sequence number to be 5 + expect(accountData.sequence_number).toBe("5"); + }, 1000 * 30); + }, + longTestTimeout, + ); +}); diff --git a/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts b/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts new file mode 100644 index 0000000000000..52a330c369b0a --- /dev/null +++ b/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts @@ -0,0 +1,273 @@ +/* eslint-disable no-await-in-loop */ + +/** + * TransactionWorker provides a simple framework for receiving payloads to be processed. + * + * Once one `start()` the process and pushes a new transaction, the worker acquires + * the current account's next sequence number (by using the AccountSequenceNumber class), + * generates a signed transaction and pushes an async submission process into the `outstandingTransactions` queue. + * At the same time, the worker processes transactions by reading the `outstandingTransactions` queue + * and submits the next transaction to chain, it + * 1) waits for resolution of the submission process or get pre-execution validation error + * and 2) waits for the resolution of the execution process or get an execution error. + * The worker fires events for any submission and/or execution success and/or failure. + */ + +import EventEmitter from "eventemitter3"; +import { AptosAccount } from "../account"; +import { PendingTransaction, Transaction } from "../generated"; +import { AptosClient, Provider } from "../providers"; +import { TxnBuilderTypes } from "../transaction_builder"; +import { AccountSequenceNumber } from "./account_sequence_number"; +import { AsyncQueue, AsyncQueueCancelledError } from "./async_queue"; + +const promiseFulfilledStatus = "fulfilled"; + +export enum TransactionWorkerEvents { + TransactionSent = "transactionSent", + TransactionSendFailed = "transactionsendFailed", + TransactionExecuted = "transactionExecuted", + TransactionExecutionFailed = "transactionexecutionFailed", +} + +export class TransactionWorker extends EventEmitter { + readonly provider: Provider; + + readonly account: AptosAccount; + + // current account sequence number + readonly accountSequnceNumber: AccountSequenceNumber; + + readonly taskQueue: AsyncQueue<() => Promise> = new AsyncQueue<() => Promise>(); + + // process has started + started: boolean; + + /** + * transactions payloads waiting to be generated and signed + * + * TODO support entry function payload from ABI builder + */ + transactionsQueue = new AsyncQueue(); + + /** + * signed transactions waiting to be submitted + */ + outstandingTransactions = new AsyncQueue<[Promise, bigint]>(); + + /** + * transactions that have been submitted to chain + */ + sentTransactions: Array<[string, bigint, any]> = []; + + /** + * transactions that have been committed to chain + */ + executedTransactions: Array<[string, bigint, any]> = []; + + /** + * Provides a simple framework for receiving payloads to be processed. + * + * @param provider - a client provider + * @param sender - a sender as AptosAccount + * @param maxWaitTime - the max wait time to wait before resyncing the sequence number + * to the current on-chain state, default to 30 + * @param maximumInFlight - submit up to `maximumInFlight` transactions per account. + * Mempool limits the number of transactions per account to 100, hence why we default to 100. + * @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating, default to 10 + */ + constructor( + provider: Provider, + account: AptosAccount, + maxWaitTime: number = 30, + maximumInFlight: number = 100, + sleepTime: number = 10, + ) { + super(); + this.provider = provider; + this.account = account; + this.started = false; + this.accountSequnceNumber = new AccountSequenceNumber(provider, account, maxWaitTime, maximumInFlight, sleepTime); + } + + /** + * Gets the current account sequence number, + * generates the transaction with the account sequence number, + * adds the transaction to the outstanding transaction queue + * to be processed later. + */ + async submitNextTransaction() { + try { + /* eslint-disable no-constant-condition */ + while (true) { + if (this.transactionsQueue.isEmpty()) return; + const sequenceNumber = await this.accountSequnceNumber.nextSequenceNumber(); + if (sequenceNumber === null) return; + const transaction = await this.generateNextTransaction(this.account, sequenceNumber); + if (!transaction) return; + const pendingTransaction = this.provider.submitSignedBCSTransaction(transaction); + await this.outstandingTransactions.enqueue([pendingTransaction, sequenceNumber]); + } + } catch (error: any) { + if (error instanceof AsyncQueueCancelledError) { + return; + } + // TODO use future log service + /* eslint-disable no-console */ + console.log(error); + } + } + + /** + * Reads the outstanding transaction queue and submits the transaction to chain. + * + * If the transaction has fulfilled, it pushes the transaction to the processed + * transactions queue and fires a transactionsFulfilled event. + * + * If the transaction has failed, it pushes the transaction to the processed + * transactions queue with the failure reason and fires a transactionsFailed event. + */ + async processTransactions() { + try { + /* eslint-disable no-constant-condition */ + while (true) { + const awaitingTransactions = []; + const sequenceNumbers = []; + let [pendingTransaction, sequenceNumber] = await this.outstandingTransactions.dequeue(); + + awaitingTransactions.push(pendingTransaction); + sequenceNumbers.push(sequenceNumber); + + while (!this.outstandingTransactions.isEmpty()) { + [pendingTransaction, sequenceNumber] = await this.outstandingTransactions.dequeue(); + + awaitingTransactions.push(pendingTransaction); + sequenceNumbers.push(sequenceNumber); + } + // send awaiting transactions to chain + const sentTransactions = await Promise.allSettled(awaitingTransactions); + for (let i = 0; i < sentTransactions.length && i < sequenceNumbers.length; i += 1) { + // check sent transaction status + const sentTransaction = sentTransactions[i]; + sequenceNumber = sequenceNumbers[i]; + if (sentTransaction.status === promiseFulfilledStatus) { + // transaction sent to chain + this.sentTransactions.push([sentTransaction.value.hash, sequenceNumber, null]); + this.emit(TransactionWorkerEvents.TransactionSent, [ + this.sentTransactions.length, + sentTransaction.value.hash, + ]); + // check sent transaction execution + await this.checkTransaction(sentTransaction, sequenceNumber); + } else { + // send transaction failed + this.sentTransactions.push([sentTransaction.status, sequenceNumber, sentTransaction.reason]); + this.emit(TransactionWorkerEvents.TransactionSendFailed, [ + this.sentTransactions.length, + sentTransaction.reason, + ]); + } + } + } + } catch (error: any) { + if (error instanceof AsyncQueueCancelledError) { + return; + } + // TODO use future log service + /* eslint-disable no-console */ + console.log(error); + } + } + + /** + * Once transaction has been sent to chain, we check for its execution status. + * @param sentTransaction transactions that were sent to chain and are now waiting to be executed + * @param sequenceNumber the account's sequence number that was sent with the transaction + */ + async checkTransaction(sentTransaction: PromiseFulfilledResult, sequenceNumber: bigint) { + const waitFor: Array> = []; + waitFor.push(this.provider.waitForTransactionWithResult(sentTransaction.value.hash, { checkSuccess: true })); + const sentTransactions = await Promise.allSettled(waitFor); + + for (let i = 0; i < sentTransactions.length; i += 1) { + const executedTransaction = sentTransactions[i]; + if (executedTransaction.status === promiseFulfilledStatus) { + // transaction executed to chain + this.executedTransactions.push([executedTransaction.value.hash, sequenceNumber, null]); + this.emit(TransactionWorkerEvents.TransactionExecuted, [ + this.executedTransactions.length, + executedTransaction.value.hash, + ]); + } else { + // transaction execution failed + this.executedTransactions.push([executedTransaction.status, sequenceNumber, executedTransaction.reason]); + this.emit(TransactionWorkerEvents.TransactionExecutionFailed, [ + this.executedTransactions.length, + executedTransaction.reason, + ]); + } + } + } + + /** + * Push transaction to the transactions queue + * @param payload Transaction payload + */ + async push(payload: TxnBuilderTypes.TransactionPayload): Promise { + await this.transactionsQueue.enqueue(payload); + } + + /** + * Generates a signed transaction that can be submitted to chain + * @param account an Aptos account + * @param sequenceNumber a sequence number the transaction will be generated with + * @returns + */ + async generateNextTransaction(account: AptosAccount, sequenceNumber: bigint): Promise { + if (this.transactionsQueue.isEmpty()) return undefined; + const payload = await this.transactionsQueue.dequeue(); + const rawTransaction = await this.provider.generateRawTransaction(account.address(), payload, { + providedSequenceNumber: sequenceNumber, + }); + const signedTransaction = AptosClient.generateBCSTransaction(account, rawTransaction); + return signedTransaction; + } + + /** + * Starts transaction submission and transaction processing. + */ + async run() { + try { + while (!this.taskQueue.isCancelled()) { + const task = await this.taskQueue.dequeue(); + await task(); + } + } catch (error: any) { + throw new Error(error); + } + } + + /** + * Starts the transaction management process. + */ + start() { + if (this.started) { + throw new Error("worker has already started"); + } + this.started = true; + this.taskQueue.enqueue(() => this.submitNextTransaction()); + this.taskQueue.enqueue(() => this.processTransactions()); + this.run(); + } + + /** + * Stops the the transaction management process. + */ + stop() { + if (this.taskQueue.isCancelled()) { + throw new Error("worker has already stopped"); + } + this.started = false; + this.taskQueue.cancel(); + } +}