From 5a02e5b8bcdf2bc0b71e76aa9e7e4865de7df6f2 Mon Sep 17 00:00:00 2001 From: Maayan Date: Thu, 27 Jul 2023 17:37:00 -0700 Subject: [PATCH] [TS SDK] Add account transaction management (#8854) * add account transaction management transaction management layer add sequnce number tests transaction worker class add transaction worker and queue class add start and stop to transaction worker add txn worker tests write full flow test add account transactions management * add method comments * address comments * implement async queue * events as enum * address feedback --- ecosystem/typescript/sdk/CHANGELOG.md | 2 + .../typescript/transactions_management.ts | 179 ++++++++++++ ecosystem/typescript/sdk/package.json | 1 + ecosystem/typescript/sdk/pnpm-lock.yaml | 7 + ecosystem/typescript/sdk/src/index.ts | 1 + .../sdk/src/providers/aptos_client.ts | 5 +- .../transactions/account_sequence_number.ts | 175 +++++++++++ .../sdk/src/transactions/async_queue.ts | 96 ++++++ .../typescript/sdk/src/transactions/index.ts | 2 + .../tests/account_sequence_number.test.ts | 86 ++++++ .../tests/transaction_worker.test.ts | 98 +++++++ .../src/transactions/transaction_worker.ts | 273 ++++++++++++++++++ 12 files changed, 924 insertions(+), 1 deletion(-) create mode 100644 ecosystem/typescript/sdk/examples/typescript/transactions_management.ts create mode 100644 ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts create mode 100644 ecosystem/typescript/sdk/src/transactions/async_queue.ts create mode 100644 ecosystem/typescript/sdk/src/transactions/index.ts create mode 100644 ecosystem/typescript/sdk/src/transactions/tests/account_sequence_number.test.ts create mode 100644 ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts create mode 100644 ecosystem/typescript/sdk/src/transactions/transaction_worker.ts diff --git a/ecosystem/typescript/sdk/CHANGELOG.md b/ecosystem/typescript/sdk/CHANGELOG.md index be464da33930a..dcabe9a54d97f 100644 --- a/ecosystem/typescript/sdk/CHANGELOG.md +++ b/ecosystem/typescript/sdk/CHANGELOG.md @@ -4,6 +4,8 @@ All notable changes to the Aptos Node SDK will be captured in this file. This ch ## Unreleased +- Implementing `TransactionWorker` - a layer for managing and submitting as many transactions from a single account at once + ## 1.14.0 (2023-07-20) - Introduce and use `@aptos-labs/aptos-client` package to manage and handle the client used in the SDK diff --git a/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts b/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts new file mode 100644 index 0000000000000..7793fd76e70d3 --- /dev/null +++ b/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts @@ -0,0 +1,179 @@ +/** + * This example demonstrates how a client can utilize the TransactionWorker class. + * + * The TransactionWorker provides a simple framework for receiving payloads to be processed. It + * acquires an account new sequence number, produces a signed transaction and + * then submits the transaction. In other tasks, it waits for resolution of the submission + * process or get pre-execution validation error and waits for the resolution of the execution process + * or get an execution validation error. + * + * The TransactionWorker constructor accepts + * @param provider - a client provider + * @param sender - the sender account: AptosAccount + * @param maxWaitTime - the max wait time to wait before restarting the local sequence number to the current on-chain state + * @param maximumInFlight - submit up to `maximumInFlight` transactions per account + * @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating + * + * Read more about it here {@link https://aptos.dev/guides/transaction-management} + */ + +import { + AptosAccount, + BCS, + TxnBuilderTypes, + TransactionWorker, + TransactionWorkerEvents, + FaucetClient, + Provider, + Types, +} from "aptos"; +import { exit } from "process"; +import { NODE_URL, FAUCET_URL } from "./common"; + +const provider = new Provider({ fullnodeUrl: NODE_URL, indexerUrl: NODE_URL }); + +const faucet = new FaucetClient(NODE_URL, FAUCET_URL); + +async function main() { + const accountsCount = 5; + const transactionsCount = 100; + const totalTransactions = accountsCount * transactionsCount; + + const start = Date.now() / 1000; // current time in seconds + + console.log("starting..."); + // create senders and recipients accounts + const senders: AptosAccount[] = []; + const recipients: AptosAccount[] = []; + for (let i = 0; i < accountsCount; i++) { + senders.push(new AptosAccount()); + recipients.push(new AptosAccount()); + } + let last = Date.now() / 1000; + console.log( + `${senders.length} sender accounts and ${recipients.length} recipient accounts created in ${last - start} seconds`, + ); + + // fund sender accounts + const funds: Array> = []; + + for (let i = 0; i < senders.length; i++) { + funds.push(faucet.fundAccount(senders[i].address().noPrefix(), 10000000000)); + } + + await Promise.all(funds); + + console.log(`${funds.length} sender accounts funded in ${Date.now() / 1000 - last} seconds`); + last = Date.now() / 1000; + + // read sender accounts + const balances: Array> = []; + for (let i = 0; i < senders.length; i++) { + balances.push(provider.getAccount(senders[i].address().hex())); + } + await Promise.all(balances); + + console.log(`${balances.length} sender account balances checked in ${Date.now() / 1000 - last} seconds`); + last = Date.now() / 1000; + + // create transactions + const payloads: any[] = []; + // 100 transactions + for (let j = 0; j < transactionsCount; j++) { + // 5 recipients + for (let i = 0; i < recipients.length; i++) { + const txn = new TxnBuilderTypes.TransactionPayloadEntryFunction( + TxnBuilderTypes.EntryFunction.natural( + "0x1::aptos_account", + "transfer", + [], + [BCS.bcsToBytes(TxnBuilderTypes.AccountAddress.fromHex(recipients[i].address())), BCS.bcsSerializeUint64(5)], + ), + ); + payloads.push(txn); + } + } + + console.log(`sends ${totalTransactions} transactions to chain....`); + // emit batch transactions + const promises = senders.map((sender) => batchTransactions(payloads, sender)); + await Promise.all(promises); + + async function batchTransactions(payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) { + const transactionWorker = new TransactionWorker(provider, sender); + + transactionWorker.start(); + + registerToWorkerEvents(transactionWorker); + + // push transactions to worker queue + for (const payload in payloads) { + await transactionWorker.push(payloads[payload]); + } + } + + function registerToWorkerEvents(transactionWorker: TransactionWorker) { + /** + * The callback from an event listener, i.e `data`, is an array with 2 elements + * data[0] - the amount of processed transactions + * data[1] - + * on a success event, is the hash value of the processed transaction + * on a failure event, is the reason for the failure + */ + transactionWorker.on(TransactionWorkerEvents.TransactionSent, async (data) => { + // all expected transactions have been sent + if (data[0] === totalTransactions) { + console.log(`transactions sent in ${Date.now() / 1000 - last} seconds`); + } + }); + + transactionWorker.on(TransactionWorkerEvents.TransactionSendFailed, async (data) => { + /** + * transaction sent failed, up to the user to decide next steps. + * whether to stop the worker by transactionWorker.stop() and handle + * the error, or simply return the error to the end user. + * At this point, we have the failed transaction queue number + * and the transaction failure reason + */ + console.log("sentFailed", data); + }); + + transactionWorker.on(TransactionWorkerEvents.TransactionExecuted, async (data) => { + // all expected transactions have been executed + if (data[0] === totalTransactions) { + console.log(`transactions executed in ${Date.now() / 1000 - last} seconds`); + await checkAccounts(); + } + }); + + transactionWorker.on(TransactionWorkerEvents.TransactionExecutionFailed, async (data) => { + /** + * transaction execution failed, up to the user to decide next steps. + * whether to stop the worker by transactionWorker.stop() and handle + * the error, or simply return the error to the end user. + * At this point, we have the failed transaction queue number + * and the transaction object data + */ + console.log("executionFailed", data); + }); + } + + // check for account's sequence numbers + async function checkAccounts(): Promise { + const waitFor: Array> = []; + for (let i = 0; i < senders.length; i++) { + waitFor.push(provider.getAccount(senders[i].address())); + } + const res = await Promise.all(waitFor); + console.log(`transactions verified in ${Date.now() / 1000 - last} seconds`); + for (const account in res) { + const currentAccount = res[account] as Types.AccountData; + console.log( + `sender account ${currentAccount.authentication_key} final sequence number is ${currentAccount.sequence_number}`, + ); + } + exit(0); + } +} + +main(); diff --git a/ecosystem/typescript/sdk/package.json b/ecosystem/typescript/sdk/package.json index 4104fefd1334d..212510b145934 100644 --- a/ecosystem/typescript/sdk/package.json +++ b/ecosystem/typescript/sdk/package.json @@ -52,6 +52,7 @@ ], "dependencies": { "@aptos-labs/aptos-client": "^0.0.2", + "eventemitter3": "^5.0.1", "@noble/hashes": "1.1.3", "@scure/bip39": "1.1.0", "form-data": "4.0.0", diff --git a/ecosystem/typescript/sdk/pnpm-lock.yaml b/ecosystem/typescript/sdk/pnpm-lock.yaml index c87be4b819e31..496f63f385002 100644 --- a/ecosystem/typescript/sdk/pnpm-lock.yaml +++ b/ecosystem/typescript/sdk/pnpm-lock.yaml @@ -10,6 +10,9 @@ dependencies: '@scure/bip39': specifier: 1.1.0 version: 1.1.0 + eventemitter3: + specifier: ^5.0.1 + version: 5.0.1 form-data: specifier: 4.0.0 version: 4.0.0 @@ -3729,6 +3732,10 @@ packages: engines: {node: '>=0.10.0'} dev: true + /eventemitter3@5.0.1: + resolution: {integrity: sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==} + dev: false + /execa@5.1.1: resolution: {integrity: sha512-8uSpZZocAZRBAPIEINJj3Lo9HyGitllczc27Eh5YYojjMFMn8yHMDMaUHE2Jqfq05D/wucwI4JGURyXt1vchyg==} engines: {node: '>=10'} diff --git a/ecosystem/typescript/sdk/src/index.ts b/ecosystem/typescript/sdk/src/index.ts index c9a7bb67c1597..c584644499d3e 100644 --- a/ecosystem/typescript/sdk/src/index.ts +++ b/ecosystem/typescript/sdk/src/index.ts @@ -7,6 +7,7 @@ export * as BCS from "./bcs"; export * from "./utils/hex_string"; export * from "./plugins"; export * from "./transaction_builder"; +export * from "./transactions"; export * as TokenTypes from "./aptos_types/token_types"; export * as Types from "./generated/index"; export * from "./client"; diff --git a/ecosystem/typescript/sdk/src/providers/aptos_client.ts b/ecosystem/typescript/sdk/src/providers/aptos_client.ts index f824f02be16bb..0d079ba0a3650 100644 --- a/ecosystem/typescript/sdk/src/providers/aptos_client.ts +++ b/ecosystem/typescript/sdk/src/providers/aptos_client.ts @@ -42,6 +42,7 @@ export interface OptionalTransactionArgs { maxGasAmount?: Uint64; gasUnitPrice?: Uint64; expireTimestamp?: Uint64; + providedSequenceNumber?: string | bigint; } interface PaginationArgs { @@ -758,7 +759,9 @@ export class AptosClient { extraArgs?: OptionalTransactionArgs, ): Promise { const [{ sequence_number: sequenceNumber }, chainId, { gas_estimate: gasEstimate }] = await Promise.all([ - this.getAccount(accountFrom), + extraArgs?.providedSequenceNumber + ? Promise.resolve({ sequence_number: extraArgs.providedSequenceNumber }) + : this.getAccount(accountFrom), this.getChainId(), extraArgs?.gasUnitPrice ? Promise.resolve({ gas_estimate: extraArgs.gasUnitPrice }) : this.estimateGasPrice(), ]); diff --git a/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts b/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts new file mode 100644 index 0000000000000..0b0644c32a70d --- /dev/null +++ b/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts @@ -0,0 +1,175 @@ +/** + * A wrapper that handles and manages an account sequence number. + * + * Submit up to `maximumInFlight` transactions per account in parallel with a timeout of `sleepTime` + * If local assumes `maximumInFlight` are in flight, determine the actual committed state from the network + * If there are less than `maximumInFlight` due to some being committed, adjust the window + * If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating + * If ever waiting more than `maxWaitTime` restart the sequence number to the current on-chain state + * + * Assumptions: + * Accounts are expected to be managed by a single AccountSequenceNumber and not used otherwise. + * They are initialized to the current on-chain state, so if there are already transactions in + * flight, they may take some time to reset. + * Accounts are automatically initialized if not explicitly + * + * Notes: + * This is co-routine safe, that is many async tasks can be reading from this concurrently. + * The state of an account cannot be used across multiple AccountSequenceNumber services. + * The synchronize method will create a barrier that prevents additional nextSequenceNumber + * calls until it is complete. + * This only manages the distribution of sequence numbers it does not help handle transaction + * failures. + * If a transaction fails, you should call synchronize and wait for timeouts. + */ + +import { AptosAccount } from "../account"; +import { Provider } from "../providers"; +import { sleep } from "../utils"; + +// returns `now` time in seconds +const now = () => Math.floor(Date.now() / 1000); + +export class AccountSequenceNumber { + readonly provider: Provider; + + readonly account: AptosAccount; + + // sequence number on chain + lastUncommintedNumber: bigint | null = null; + + // local sequence number + currentNumber: bigint | null = null; + + /** + * We want to guarantee that we preserve ordering of workers to requests. + * + * `lock` is used to try to prevent multiple coroutines from accessing a shared resource at the same time, + * which can result in race conditions and data inconsistency. + * This code actually doesn't do it though, since we aren't giving out a slot, it is still somewhat a race condition. + * + * The ideal solution is likely that each thread grabs the next number from a incremental integer. + * When they complete, they increment that number and that entity is able to enter the `lock`. + * That would guarantee ordering. + */ + lock = false; + + maxWaitTime: number; + + maximumInFlight: number; + + sleepTime: number; + + constructor( + provider: Provider, + account: AptosAccount, + maxWaitTime: number, + maximumInFlight: number, + sleepTime: number, + ) { + this.provider = provider; + this.account = account; + this.maxWaitTime = maxWaitTime; + this.maximumInFlight = maximumInFlight; + this.sleepTime = sleepTime; + } + + /** + * Returns the next available sequence number for this account + * + * @returns next available sequence number + */ + async nextSequenceNumber(): Promise { + /* eslint-disable no-await-in-loop */ + while (this.lock) { + await sleep(this.sleepTime); + } + + this.lock = true; + let nextNumber = BigInt(0); + try { + if (this.lastUncommintedNumber === null || this.currentNumber === null) { + await this.initialize(); + } + + if (this.currentNumber! - this.lastUncommintedNumber! >= this.maximumInFlight) { + await this.update(); + + const startTime = now(); + while (this.currentNumber! - this.lastUncommintedNumber! >= this.maximumInFlight) { + await sleep(this.sleepTime); + if (now() - startTime > this.maxWaitTime) { + /* eslint-disable no-console */ + console.warn(`Waited over 30 seconds for a transaction to commit, resyncing ${this.account.address()}`); + await this.initialize(); + } else { + await this.update(); + } + } + } + nextNumber = this.currentNumber!; + this.currentNumber! += BigInt(1); + } catch (e) { + console.error("error in getting next sequence number for this account", e); + } finally { + this.lock = false; + } + return nextNumber; + } + + /** + * Initializes this account with the sequence number on chain + */ + async initialize(): Promise { + const { sequence_number: sequenceNumber } = await this.provider.getAccount(this.account.address()); + this.currentNumber = BigInt(sequenceNumber); + this.lastUncommintedNumber = BigInt(sequenceNumber); + } + + /** + * Updates this account sequence number with the one on-chain + * + * @returns on-chain sequence number for this account + */ + async update(): Promise { + const { sequence_number: sequenceNumber } = await this.provider.getAccount(this.account.address()); + this.lastUncommintedNumber = BigInt(sequenceNumber); + return this.lastUncommintedNumber; + } + + /** + * Synchronizes local sequence number with the seqeunce number on chain for this account. + * + * Poll the network until all submitted transactions have either been committed or until + * the maximum wait time has elapsed + */ + async synchronize(): Promise { + if (this.lastUncommintedNumber === this.currentNumber) return; + + /* eslint-disable no-await-in-loop */ + while (this.lock) { + await sleep(this.sleepTime); + } + + this.lock = true; + + try { + await this.update(); + const startTime = now(); + while (this.lastUncommintedNumber !== this.currentNumber) { + if (now() - startTime > this.maxWaitTime) { + /* eslint-disable no-console */ + console.warn(`Waited over 30 seconds for a transaction to commit, resyncing ${this.account.address()}`); + await this.initialize(); + } else { + await sleep(this.sleepTime); + await this.update(); + } + } + } catch (e) { + console.error("error in synchronizing this account sequence number with the one on chain", e); + } finally { + this.lock = false; + } + } +} diff --git a/ecosystem/typescript/sdk/src/transactions/async_queue.ts b/ecosystem/typescript/sdk/src/transactions/async_queue.ts new file mode 100644 index 0000000000000..1800438277f10 --- /dev/null +++ b/ecosystem/typescript/sdk/src/transactions/async_queue.ts @@ -0,0 +1,96 @@ +/** + * The AsyncQueue class is an async-aware data structure that provides a queue-like + * behavior for managing asynchronous tasks or operations. + * It allows to enqueue items and dequeue them asynchronously. + * This is not thread-safe but it is async concurrency safe and + * it does not guarantee ordering for those that call into and await on enqueue. + */ + +export class AsyncQueue { + readonly queue: T[] = []; + + // The resolveMap is used to handle the resolution of promises when items are enqueued and dequeued. + private resolveMap: Map void> = new Map(); + + private counter: number = 0; + + private cancelled: boolean = false; + + /** + * The enqueue method adds an item to the queue. If there are pending dequeued promises, + * in the resolveMap, it resolves the oldest promise with the enqueued item immediately. + * Otherwise, it adds the item to the queue. + * + * @param item T + */ + enqueue(item: T): void { + if (this.resolveMap.size > 0) { + const resolve = this.resolveMap.get(0); + if (resolve) { + this.resolveMap.delete(0); + resolve(item); + return; + } + } + this.queue.push(item); + } + + /** + * The dequeue method returns a promise that resolves to the next item in the queue. + * If the queue is not empty, it resolves the promise immediately with the next item. + * Otherwise, it creates a new promise. The promise's resolve function is stored + * in the resolveMap with a unique counter value as the key. + * The newly created promise is then returned, and it will be resolved later when an item is enqueued. + * + * @returns Promise + */ + async dequeue(): Promise { + if (this.queue.length > 0) { + return Promise.resolve(this.queue.shift()!); + } + const promise = new Promise((resolve) => { + this.counter += 1; + this.resolveMap.set(this.counter, resolve); + }); + return promise; + } + + /** + * The isEmpty method returns whether the queue is empty or not. + * + * @returns boolean + */ + isEmpty(): boolean { + return this.queue.length === 0; + } + + /** + * The cancel method cancels all pending promises in the queue. + * It rejects the promises with a AsyncQueueCancelledError error, + * ensuring that any awaiting code can handle the cancellation appropriately. + */ + cancel(): void { + this.cancelled = true; + this.resolveMap.forEach(async (resolve) => { + resolve(await Promise.reject(new AsyncQueueCancelledError("Task cancelled"))); + }); + this.resolveMap.clear(); + this.queue.length = 0; + } + + /** + * The isCancelled method returns whether the queue is cancelled or not. + * + * @returns boolean + */ + isCancelled(): boolean { + return this.cancelled; + } +} + +export class AsyncQueueCancelledError extends Error { + /* eslint-disable @typescript-eslint/no-useless-constructor */ + constructor(message: string) { + super(message); + } +} diff --git a/ecosystem/typescript/sdk/src/transactions/index.ts b/ecosystem/typescript/sdk/src/transactions/index.ts new file mode 100644 index 0000000000000..18817e38f1943 --- /dev/null +++ b/ecosystem/typescript/sdk/src/transactions/index.ts @@ -0,0 +1,2 @@ +export * from "./account_sequence_number"; +export * from "./transaction_worker"; diff --git a/ecosystem/typescript/sdk/src/transactions/tests/account_sequence_number.test.ts b/ecosystem/typescript/sdk/src/transactions/tests/account_sequence_number.test.ts new file mode 100644 index 0000000000000..cb89711c1fc94 --- /dev/null +++ b/ecosystem/typescript/sdk/src/transactions/tests/account_sequence_number.test.ts @@ -0,0 +1,86 @@ +import { AptosAccount } from "../../account"; +import { Provider } from "../../providers"; +import { AccountSequenceNumber } from "../account_sequence_number"; +import { getFaucetClient, longTestTimeout, PROVIDER_LOCAL_NETWORK_CONFIG } from "../../tests/unit/test_helper.test"; + +const provider = new Provider(PROVIDER_LOCAL_NETWORK_CONFIG); +const account = new AptosAccount(); +const faucet = getFaucetClient(); +const accountSequenceNumber = new AccountSequenceNumber(provider, account, 30, 100, 10); +let getAccountSpy: jest.SpyInstance; + +let lastSeqNumber: bigint | null; + +describe("account sequence number", () => { + beforeAll(async () => { + await faucet.fundAccount(account.address(), 1000000); + }, longTestTimeout); + + beforeEach(() => { + getAccountSpy = jest.spyOn(provider, "getAccount"); + }); + + afterEach(() => { + getAccountSpy.mockRestore(); + }); + + it("initializes with correct sequence number", async () => { + await accountSequenceNumber.initialize(); + expect(accountSequenceNumber.currentNumber).toEqual(BigInt(0)); + expect(accountSequenceNumber.lastUncommintedNumber).toEqual(BigInt(0)); + }); + + it("updates with correct sequence number", async () => { + const seqNum = "2"; + getAccountSpy.mockResolvedValue({ + sequence_number: seqNum, + authentication_key: account.authKey().hex(), + }); + await accountSequenceNumber.update(); + expect(accountSequenceNumber.lastUncommintedNumber).toEqual(BigInt(parseInt(seqNum))); + }); + + it( + "returns sequential number starting from 0", + async () => { + getAccountSpy.mockResolvedValue({ + sequence_number: "0", + authentication_key: account.authKey().hex(), + }); + for (let seqNum = 0; seqNum < 5; seqNum++) { + lastSeqNumber = await accountSequenceNumber.nextSequenceNumber(); + expect(lastSeqNumber).toEqual(BigInt(seqNum)); + } + }, + longTestTimeout, + ); + + it( + "includes updated on-chain sequnce number in local sequence number", + async () => { + const previousSeqNum = "5"; + getAccountSpy.mockResolvedValue({ + sequence_number: previousSeqNum, + authentication_key: account.authKey().hex(), + }); + for (let seqNum = 0; seqNum < accountSequenceNumber.maximumInFlight; seqNum++) { + lastSeqNumber = await accountSequenceNumber.nextSequenceNumber(); + expect(lastSeqNumber).toEqual(BigInt(seqNum + parseInt(previousSeqNum))); + } + }, + longTestTimeout, + ); + + it("synchronize completes when local and on-chain sequnec number equal", async () => { + const nextSequenceNumber = lastSeqNumber! + BigInt(1); + + getAccountSpy.mockResolvedValue({ + sequence_number: nextSequenceNumber + "", + authentication_key: account.authKey().hex(), + }); + + expect(accountSequenceNumber.currentNumber).not.toEqual(lastSeqNumber); + await accountSequenceNumber.synchronize(); + expect(accountSequenceNumber.currentNumber).toEqual(nextSequenceNumber); + }); +}); diff --git a/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts b/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts new file mode 100644 index 0000000000000..d333038d03ed7 --- /dev/null +++ b/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts @@ -0,0 +1,98 @@ +import { AptosAccount } from "../../account"; +import { bcsToBytes, bcsSerializeUint64 } from "../../bcs"; +import { Provider } from "../../providers"; +import { TxnBuilderTypes } from "../../transaction_builder"; +import { getFaucetClient, longTestTimeout, PROVIDER_LOCAL_NETWORK_CONFIG } from "../../tests/unit/test_helper.test"; +import { TransactionWorker, TransactionWorkerEvents } from "../transaction_worker"; + +const provider = new Provider(PROVIDER_LOCAL_NETWORK_CONFIG); + +const sender = new AptosAccount(); +const recipient = new AptosAccount(); + +const faucet = getFaucetClient(); + +describe("transactionWorker", () => { + beforeAll(async () => { + await faucet.fundAccount(sender.address(), 1000000000); + }); + + test("throws when starting an already started worker", async () => { + // start transactions worker + const transactionWorker = new TransactionWorker(provider, sender); + transactionWorker.start(); + expect(async () => { + transactionWorker.start(); + }).rejects.toThrow(`worker has already started`); + }); + + test("throws when stopping an already stopped worker", async () => { + // start transactions worker + const transactionWorker = new TransactionWorker(provider, sender); + transactionWorker.start(); + transactionWorker.stop(); + expect(async () => { + transactionWorker.stop(); + }).rejects.toThrow(`worker has already stopped`); + }); + + test( + "adds transaction into the transactionsQueue", + async () => { + const transactionWorker = new TransactionWorker(provider, sender); + transactionWorker.start(); + const txn = new TxnBuilderTypes.TransactionPayloadEntryFunction( + TxnBuilderTypes.EntryFunction.natural( + "0x1::aptos_account", + "transfer", + [], + [bcsToBytes(TxnBuilderTypes.AccountAddress.fromHex(recipient.address())), bcsSerializeUint64(5)], + ), + ); + transactionWorker.push(txn).then(() => { + transactionWorker.stop(); + expect(transactionWorker.transactionsQueue.queue).toHaveLength(1); + }); + }, + longTestTimeout, + ); + + test( + "submits 5 transactions to chain for a single account", + (done) => { + // Specify the number of assertions expected + expect.assertions(1); + + // create 5 transactions + const txn = new TxnBuilderTypes.TransactionPayloadEntryFunction( + TxnBuilderTypes.EntryFunction.natural( + "0x1::aptos_account", + "transfer", + [], + [bcsToBytes(TxnBuilderTypes.AccountAddress.fromHex(recipient.address())), bcsSerializeUint64(5)], + ), + ); + const payloads = [...Array(5).fill(txn)]; + + // start transactions worker + const transactionWorker = new TransactionWorker(provider, sender); + transactionWorker.start(); + + // push transactions to queue + for (const payload in payloads) { + transactionWorker.push(payloads[payload]); + } + + // stop transaction worker for testing purposes. + setTimeout(async () => { + transactionWorker.stop(); + const accountData = await provider.getAccount(sender.address()); + // call done() when all asynchronous operations are finished + done(); + // expect sender sequence number to be 5 + expect(accountData.sequence_number).toBe("5"); + }, 1000 * 30); + }, + longTestTimeout, + ); +}); diff --git a/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts b/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts new file mode 100644 index 0000000000000..52a330c369b0a --- /dev/null +++ b/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts @@ -0,0 +1,273 @@ +/* eslint-disable no-await-in-loop */ + +/** + * TransactionWorker provides a simple framework for receiving payloads to be processed. + * + * Once one `start()` the process and pushes a new transaction, the worker acquires + * the current account's next sequence number (by using the AccountSequenceNumber class), + * generates a signed transaction and pushes an async submission process into the `outstandingTransactions` queue. + * At the same time, the worker processes transactions by reading the `outstandingTransactions` queue + * and submits the next transaction to chain, it + * 1) waits for resolution of the submission process or get pre-execution validation error + * and 2) waits for the resolution of the execution process or get an execution error. + * The worker fires events for any submission and/or execution success and/or failure. + */ + +import EventEmitter from "eventemitter3"; +import { AptosAccount } from "../account"; +import { PendingTransaction, Transaction } from "../generated"; +import { AptosClient, Provider } from "../providers"; +import { TxnBuilderTypes } from "../transaction_builder"; +import { AccountSequenceNumber } from "./account_sequence_number"; +import { AsyncQueue, AsyncQueueCancelledError } from "./async_queue"; + +const promiseFulfilledStatus = "fulfilled"; + +export enum TransactionWorkerEvents { + TransactionSent = "transactionSent", + TransactionSendFailed = "transactionsendFailed", + TransactionExecuted = "transactionExecuted", + TransactionExecutionFailed = "transactionexecutionFailed", +} + +export class TransactionWorker extends EventEmitter { + readonly provider: Provider; + + readonly account: AptosAccount; + + // current account sequence number + readonly accountSequnceNumber: AccountSequenceNumber; + + readonly taskQueue: AsyncQueue<() => Promise> = new AsyncQueue<() => Promise>(); + + // process has started + started: boolean; + + /** + * transactions payloads waiting to be generated and signed + * + * TODO support entry function payload from ABI builder + */ + transactionsQueue = new AsyncQueue(); + + /** + * signed transactions waiting to be submitted + */ + outstandingTransactions = new AsyncQueue<[Promise, bigint]>(); + + /** + * transactions that have been submitted to chain + */ + sentTransactions: Array<[string, bigint, any]> = []; + + /** + * transactions that have been committed to chain + */ + executedTransactions: Array<[string, bigint, any]> = []; + + /** + * Provides a simple framework for receiving payloads to be processed. + * + * @param provider - a client provider + * @param sender - a sender as AptosAccount + * @param maxWaitTime - the max wait time to wait before resyncing the sequence number + * to the current on-chain state, default to 30 + * @param maximumInFlight - submit up to `maximumInFlight` transactions per account. + * Mempool limits the number of transactions per account to 100, hence why we default to 100. + * @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating, default to 10 + */ + constructor( + provider: Provider, + account: AptosAccount, + maxWaitTime: number = 30, + maximumInFlight: number = 100, + sleepTime: number = 10, + ) { + super(); + this.provider = provider; + this.account = account; + this.started = false; + this.accountSequnceNumber = new AccountSequenceNumber(provider, account, maxWaitTime, maximumInFlight, sleepTime); + } + + /** + * Gets the current account sequence number, + * generates the transaction with the account sequence number, + * adds the transaction to the outstanding transaction queue + * to be processed later. + */ + async submitNextTransaction() { + try { + /* eslint-disable no-constant-condition */ + while (true) { + if (this.transactionsQueue.isEmpty()) return; + const sequenceNumber = await this.accountSequnceNumber.nextSequenceNumber(); + if (sequenceNumber === null) return; + const transaction = await this.generateNextTransaction(this.account, sequenceNumber); + if (!transaction) return; + const pendingTransaction = this.provider.submitSignedBCSTransaction(transaction); + await this.outstandingTransactions.enqueue([pendingTransaction, sequenceNumber]); + } + } catch (error: any) { + if (error instanceof AsyncQueueCancelledError) { + return; + } + // TODO use future log service + /* eslint-disable no-console */ + console.log(error); + } + } + + /** + * Reads the outstanding transaction queue and submits the transaction to chain. + * + * If the transaction has fulfilled, it pushes the transaction to the processed + * transactions queue and fires a transactionsFulfilled event. + * + * If the transaction has failed, it pushes the transaction to the processed + * transactions queue with the failure reason and fires a transactionsFailed event. + */ + async processTransactions() { + try { + /* eslint-disable no-constant-condition */ + while (true) { + const awaitingTransactions = []; + const sequenceNumbers = []; + let [pendingTransaction, sequenceNumber] = await this.outstandingTransactions.dequeue(); + + awaitingTransactions.push(pendingTransaction); + sequenceNumbers.push(sequenceNumber); + + while (!this.outstandingTransactions.isEmpty()) { + [pendingTransaction, sequenceNumber] = await this.outstandingTransactions.dequeue(); + + awaitingTransactions.push(pendingTransaction); + sequenceNumbers.push(sequenceNumber); + } + // send awaiting transactions to chain + const sentTransactions = await Promise.allSettled(awaitingTransactions); + for (let i = 0; i < sentTransactions.length && i < sequenceNumbers.length; i += 1) { + // check sent transaction status + const sentTransaction = sentTransactions[i]; + sequenceNumber = sequenceNumbers[i]; + if (sentTransaction.status === promiseFulfilledStatus) { + // transaction sent to chain + this.sentTransactions.push([sentTransaction.value.hash, sequenceNumber, null]); + this.emit(TransactionWorkerEvents.TransactionSent, [ + this.sentTransactions.length, + sentTransaction.value.hash, + ]); + // check sent transaction execution + await this.checkTransaction(sentTransaction, sequenceNumber); + } else { + // send transaction failed + this.sentTransactions.push([sentTransaction.status, sequenceNumber, sentTransaction.reason]); + this.emit(TransactionWorkerEvents.TransactionSendFailed, [ + this.sentTransactions.length, + sentTransaction.reason, + ]); + } + } + } + } catch (error: any) { + if (error instanceof AsyncQueueCancelledError) { + return; + } + // TODO use future log service + /* eslint-disable no-console */ + console.log(error); + } + } + + /** + * Once transaction has been sent to chain, we check for its execution status. + * @param sentTransaction transactions that were sent to chain and are now waiting to be executed + * @param sequenceNumber the account's sequence number that was sent with the transaction + */ + async checkTransaction(sentTransaction: PromiseFulfilledResult, sequenceNumber: bigint) { + const waitFor: Array> = []; + waitFor.push(this.provider.waitForTransactionWithResult(sentTransaction.value.hash, { checkSuccess: true })); + const sentTransactions = await Promise.allSettled(waitFor); + + for (let i = 0; i < sentTransactions.length; i += 1) { + const executedTransaction = sentTransactions[i]; + if (executedTransaction.status === promiseFulfilledStatus) { + // transaction executed to chain + this.executedTransactions.push([executedTransaction.value.hash, sequenceNumber, null]); + this.emit(TransactionWorkerEvents.TransactionExecuted, [ + this.executedTransactions.length, + executedTransaction.value.hash, + ]); + } else { + // transaction execution failed + this.executedTransactions.push([executedTransaction.status, sequenceNumber, executedTransaction.reason]); + this.emit(TransactionWorkerEvents.TransactionExecutionFailed, [ + this.executedTransactions.length, + executedTransaction.reason, + ]); + } + } + } + + /** + * Push transaction to the transactions queue + * @param payload Transaction payload + */ + async push(payload: TxnBuilderTypes.TransactionPayload): Promise { + await this.transactionsQueue.enqueue(payload); + } + + /** + * Generates a signed transaction that can be submitted to chain + * @param account an Aptos account + * @param sequenceNumber a sequence number the transaction will be generated with + * @returns + */ + async generateNextTransaction(account: AptosAccount, sequenceNumber: bigint): Promise { + if (this.transactionsQueue.isEmpty()) return undefined; + const payload = await this.transactionsQueue.dequeue(); + const rawTransaction = await this.provider.generateRawTransaction(account.address(), payload, { + providedSequenceNumber: sequenceNumber, + }); + const signedTransaction = AptosClient.generateBCSTransaction(account, rawTransaction); + return signedTransaction; + } + + /** + * Starts transaction submission and transaction processing. + */ + async run() { + try { + while (!this.taskQueue.isCancelled()) { + const task = await this.taskQueue.dequeue(); + await task(); + } + } catch (error: any) { + throw new Error(error); + } + } + + /** + * Starts the transaction management process. + */ + start() { + if (this.started) { + throw new Error("worker has already started"); + } + this.started = true; + this.taskQueue.enqueue(() => this.submitNextTransaction()); + this.taskQueue.enqueue(() => this.processTransactions()); + this.run(); + } + + /** + * Stops the the transaction management process. + */ + stop() { + if (this.taskQueue.isCancelled()) { + throw new Error("worker has already stopped"); + } + this.started = false; + this.taskQueue.cancel(); + } +}