From ef8f9e8a119deb71dd26276c88c0be35c14f24e8 Mon Sep 17 00:00:00 2001 From: maayan Date: Tue, 18 Jul 2023 19:22:38 -0400 Subject: [PATCH] implement async queue --- .../typescript/transactions_management.ts | 32 ++-- .../transactions/account_sequence_number.ts | 6 +- .../sdk/src/transactions/async_queue.ts | 94 ++++++++++++ .../src/transactions/transaction_worker.ts | 138 ++++++++++-------- 4 files changed, 189 insertions(+), 81 deletions(-) create mode 100644 ecosystem/typescript/sdk/src/transactions/async_queue.ts diff --git a/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts b/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts index 2f7d31190f8b87..402fcb23f43914 100644 --- a/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts +++ b/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts @@ -54,8 +54,8 @@ async function main() { await Promise.all(funds); + console.log(`${funds.length} sender accounts funded in ${Date.now() / 1000 - last} seconds`); last = Date.now() / 1000; - console.log(`${funds.length} sender accounts funded in ${last - start} seconds`); // read sender accounts const balances: Array> = []; @@ -64,8 +64,8 @@ async function main() { } await Promise.all(balances); + console.log(`${balances.length} sender account balances checked in ${Date.now() / 1000 - last} seconds`); last = Date.now() / 1000; - console.log(`${balances.length} sender account balances checked in ${last - start} seconds`); // create transactions const payloads: any[] = []; @@ -85,13 +85,12 @@ async function main() { } } - console.log(`sends ${totalTransactions * senders.length} transactions to chain....`); + console.log(`sends ${totalTransactions} transactions to chain....`); // emit batch transactions - for (let i = 0; i < senders.length; i++) { - batchTransactions(payloads, senders[i]); - } + const promises = senders.map((sender) => batchTransactions(payloads, sender)); + await Promise.all(promises); - function batchTransactions(payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) { + async function batchTransactions(payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) { const transactionWorker = new TransactionWorker(provider, sender, 30, 100, 10); transactionWorker.start(); @@ -100,7 +99,7 @@ async function main() { // push transactions to worker queue for (const payload in payloads) { - transactionWorker.push(payloads[payload]); + await transactionWorker.push(payloads[payload]); } } @@ -115,8 +114,7 @@ async function main() { transactionWorker.on("transactionSent", async (data) => { // all expected transactions have been sent if (data[0] === totalTransactions) { - last = Date.now() / 1000; - console.log(`transactions sent in ${last - start} seconds`); + console.log(`transactions sent in ${Date.now() / 1000 - last} seconds`); } }); @@ -133,10 +131,8 @@ async function main() { transactionWorker.on("transactionExecuted", async (data) => { // all expected transactions have been executed - console.log(data); if (data[0] === totalTransactions) { - last = Date.now() / 1000; - console.log(`transactions executed in ${last - start} seconds`); + console.log(`transactions executed in ${Date.now() / 1000 - last} seconds`); await checkAccounts(); } }); @@ -154,25 +150,21 @@ async function main() { } // check for account's sequence numbers - const checkAccounts = async () => { + async function checkAccounts(): Promise { const waitFor: Array> = []; for (let i = 0; i < senders.length; i++) { waitFor.push(provider.getAccount(senders[i].address())); } - const res = await Promise.all(waitFor); - last = Date.now() / 1000; - console.log(`transactions verified in ${last - start} seconds`); + console.log(`transactions verified in ${Date.now() / 1000 - last} seconds`); for (const account in res) { const currentAccount = res[account] as Types.AccountData; console.log( `sender account ${currentAccount.authentication_key} final sequence number is ${currentAccount.sequence_number}`, ); } - // exit for testing porpuses - this would stop the process. most cases we have all transactions - // commited, but in some rare cases we might be stopping it before last couple of transactions have commited. exit(0); - }; + } } main(); diff --git a/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts b/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts index 249626acb67b74..d51fea255c8170 100644 --- a/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts +++ b/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts @@ -110,7 +110,7 @@ export class AccountSequenceNumber { nextNumber = this.currentNumber!; this.currentNumber! += BigInt(1); } catch (e) { - console.error("error in updating this account sequence number with the one on chain", e); + console.error("error in getting next sequence number for this account", e); } finally { this.lock = false; } @@ -150,6 +150,8 @@ export class AccountSequenceNumber { await sleep(this.sleepTime); } + this.lock = true; + try { await this.update(); const startTime = now(); @@ -164,7 +166,7 @@ export class AccountSequenceNumber { } } } catch (e) { - console.error("error in updating this account sequence number with the one on chain", e); + console.error("error in synchronizing this account sequence number with the one on chain", e); } finally { this.lock = false; } diff --git a/ecosystem/typescript/sdk/src/transactions/async_queue.ts b/ecosystem/typescript/sdk/src/transactions/async_queue.ts new file mode 100644 index 00000000000000..21e3421eff5b2a --- /dev/null +++ b/ecosystem/typescript/sdk/src/transactions/async_queue.ts @@ -0,0 +1,94 @@ +/** + * The AsyncQueue class is an async-aware data structure that provides a queue-like + * behavior for managing asynchronous tasks or operations. + * It allows to enqueue items and dequeue them asynchronously. + */ + +export class AsyncQueue { + private queue: T[] = []; + + // The resolveMap is used to handle the resolution of promises when items are enqueued and dequeued. + private resolveMap: Map void> = new Map(); + + private counter: number = 0; + + private cancelled: boolean = false; + + /** + * The enqueue method adds an item to the queue. If there are pending dequeued promises, + * in the resolveMap, it resolves the oldest promise with the enqueued item immediately. + * Otherwise, it adds the item to the queue. + * + * @param item T + */ + enqueue(item: T): void { + if (this.resolveMap.size > 0) { + const resolve = this.resolveMap.get(0); + if (resolve) { + this.resolveMap.delete(0); + resolve(item); + return; + } + } + this.queue.push(item); + } + + /** + * The dequeue method returns a promise that resolves to the next item in the queue. + * If the queue is not empty, it resolves the promise immediately with the next item. + * Otherwise, it creates a new promise. The promise's resolve function is stored + * in the resolveMap with a unique counter value as the key. + * The newly created promise is then returned, and it will be resolved later when an item is enqueued. + * + * @returns Promise + */ + async dequeue(): Promise { + if (this.queue.length > 0) { + return Promise.resolve(this.queue.shift()!); + } + const promise = new Promise((resolve) => { + this.counter += 1; + this.resolveMap.set(this.counter, resolve); + }); + return promise; + } + + /** + * The isEmpty method returns whether the queue is empty or not. + * + * @returns boolean + */ + isEmpty(): boolean { + return this.queue.length === 0; + } + + /** + * The cancel method cancels all pending promises in the queue. + * It rejects the promises with a AsyncQueueCancelledError error, + * ensuring that any awaiting code can handle the cancellation appropriately. + */ + cancel(): void { + this.cancelled = true; + this.resolveMap.forEach(async (resolve) => { + resolve(await Promise.reject(new AsyncQueueCancelledError("Task cancelled"))); + }); + this.resolveMap.clear(); + this.queue.length = 0; + } + + /** + * The isCancelled method returns whether the queue is cancelled or not. + * + * @returns boolean + */ + isCancelled(): boolean { + return this.cancelled; + } +} + +export class AsyncQueueCancelledError extends Error { + /* eslint-disable @typescript-eslint/no-useless-constructor */ + constructor(message: string) { + super(message); + } +} diff --git a/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts b/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts index ba741d0dcb448d..5af326889051dc 100644 --- a/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts +++ b/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts @@ -1,7 +1,9 @@ +/* eslint-disable no-await-in-loop */ + /** - * Provides a simple framework for receiving payloads to be processed. + * TransactionWorker provides a simple framework for receiving payloads to be processed. * - * Once one `start()` the process, the worker acquires the current account next sequence number + * Once one `start()` the process, the worker acquires the current account's next sequence number * (by using the AccountSequenceNumber class), generates a signed transaction and pushes an async * submission process into a `outstandingTransactions` queue. * At the same time, the worker processes transactions by reading the `outstandingTransactions` queue @@ -17,6 +19,9 @@ import { PendingTransaction, Transaction } from "../generated"; import { AptosClient, Provider } from "../providers"; import { TxnBuilderTypes } from "../transaction_builder"; import { AccountSequenceNumber } from "./account_sequence_number"; +import { AsyncQueue, AsyncQueueCancelledError } from "./async_queue"; + +const promiseFulfilledStatus = "fulfilled"; // Events const transactionSent = "transactionSent"; @@ -32,18 +37,17 @@ export class TransactionWorker extends EventEmitter { // current account sequence number readonly accountSequnceNumber: AccountSequenceNumber; + readonly taskQueue: AsyncQueue<() => Promise> = new AsyncQueue<() => Promise>(); + // process has started started: boolean; - // process has stopped - stopped: boolean; - // transactions payloads waiting to be generated and signed // TODO support entry function payload from ABI builder - transactionsQueue: Array = []; + transactionsQueue = new AsyncQueue(); // signed transactions waiting to be submitted - outstandingTransactions: Array<[Promise, bigint]> = []; + outstandingTransactions = new AsyncQueue<[Promise, bigint]>(); // transactions that have been submitted to chain sentTransactions: Array<[string, bigint, any]> = []; @@ -71,7 +75,6 @@ export class TransactionWorker extends EventEmitter { this.provider = provider; this.account = account; this.started = false; - this.stopped = false; this.accountSequnceNumber = new AccountSequenceNumber(provider, account, maxWaitTime, maximumInFlight, sleepTime); } @@ -82,13 +85,23 @@ export class TransactionWorker extends EventEmitter { * to be processed later. */ async submitNextTransaction() { - if (this.transactionsQueue.length === 0) return; - const sequenceNumber = await this.accountSequnceNumber.nextSequenceNumber(); - if (sequenceNumber === null) return; - const transaction = await this.generateNextTransaction(this.account, sequenceNumber); - if (!transaction) return; - const pendingTransaction = this.provider.submitSignedBCSTransaction(transaction); - this.outstandingTransactions.push([pendingTransaction, sequenceNumber]); + try { + /* eslint-disable no-constant-condition */ + while (true) { + if (this.transactionsQueue.isEmpty()) return; + const sequenceNumber = await this.accountSequnceNumber.nextSequenceNumber(); + if (sequenceNumber === null) return; + const transaction = await this.generateNextTransaction(this.account, sequenceNumber); + if (!transaction) return; + const pendingTransaction = this.provider.submitSignedBCSTransaction(transaction); + await this.outstandingTransactions.enqueue([pendingTransaction, sequenceNumber]); + } + } catch (error: any) { + if (error instanceof AsyncQueueCancelledError) { + return; + } + console.log(error); + } } /** @@ -101,34 +114,46 @@ export class TransactionWorker extends EventEmitter { * transactions queue with the failure reason and fires a transactionsFailed event. */ async processTransactions() { - const awaitingTransactions = []; - const awaitingSequenceNumbers = []; - - while (this.outstandingTransactions.length > 0) { - const [pendingTransaction, sequenceNumber] = this.outstandingTransactions.shift()!; - - awaitingTransactions.push(pendingTransaction); - awaitingSequenceNumbers.push(sequenceNumber); - } - - // send awaiting transactions to chain - const sentTransactions = await Promise.allSettled(awaitingTransactions); - - for (let i = 0; i < sentTransactions.length && i < awaitingSequenceNumbers.length; i += 1) { - // check sent transaction status - const sentTransaction = sentTransactions[i]; - const sequenceNumber = awaitingSequenceNumbers[i]; - if (sentTransaction.status === "fulfilled") { - // transaction sent to chain - this.sentTransactions.push([sentTransaction.value.hash, sequenceNumber, null]); - this.emit(transactionSent, [this.sentTransactions.length, sentTransaction.value.hash]); - // check sent transaction execution - this.checkTransaction(sentTransaction, sequenceNumber); - } else { - // send transaction failed - this.sentTransactions.push([sentTransaction.status, sequenceNumber, sentTransaction.reason]); - this.emit(sentFailed, [this.sentTransactions.length, sentTransaction.reason]); + try { + /* eslint-disable no-constant-condition */ + while (true) { + const awaitingTransactions = []; + const sequenceNumbers = []; + let [pendingTransaction, sequenceNumber] = await this.outstandingTransactions.dequeue(); + + awaitingTransactions.push(pendingTransaction); + sequenceNumbers.push(sequenceNumber); + + while (!this.outstandingTransactions.isEmpty()) { + [pendingTransaction, sequenceNumber] = await this.outstandingTransactions.dequeue(); + + awaitingTransactions.push(pendingTransaction); + sequenceNumbers.push(sequenceNumber); + } + // send awaiting transactions to chain + const sentTransactions = await Promise.allSettled(awaitingTransactions); + for (let i = 0; i < sentTransactions.length && i < sequenceNumbers.length; i += 1) { + // check sent transaction status + const sentTransaction = sentTransactions[i]; + sequenceNumber = sequenceNumbers[i]; + if (sentTransaction.status === promiseFulfilledStatus) { + // transaction sent to chain + this.sentTransactions.push([sentTransaction.value.hash, sequenceNumber, null]); + this.emit(transactionSent, [this.sentTransactions.length, sentTransaction.value.hash]); + // check sent transaction execution + await this.checkTransaction(sentTransaction, sequenceNumber); + } else { + // send transaction failed + this.sentTransactions.push([sentTransaction.status, sequenceNumber, sentTransaction.reason]); + this.emit(sentFailed, [this.sentTransactions.length, sentTransaction.reason]); + } + } + } + } catch (error: any) { + if (error instanceof AsyncQueueCancelledError) { + return; } + console.log(error); } } @@ -144,7 +169,7 @@ export class TransactionWorker extends EventEmitter { for (let i = 0; i < sentTransactions.length; i += 1) { const executedTransaction = sentTransactions[i]; - if (executedTransaction.status === "fulfilled") { + if (executedTransaction.status === promiseFulfilledStatus) { // transaction executed to chain this.executedTransactions.push([executedTransaction.value.hash, sequenceNumber, null]); this.emit(transactionExecuted, [this.executedTransactions.length, executedTransaction.value.hash]); @@ -161,7 +186,7 @@ export class TransactionWorker extends EventEmitter { * @param payload Transaction payload */ async push(payload: TxnBuilderTypes.TransactionPayload): Promise { - await this.transactionsQueue.push(payload); + await this.transactionsQueue.enqueue(payload); } /** @@ -171,12 +196,12 @@ export class TransactionWorker extends EventEmitter { * @returns */ async generateNextTransaction(account: AptosAccount, sequenceNumber: bigint): Promise { - if (this.transactionsQueue.length === 0) return undefined; - const payload = await this.transactionsQueue.shift()!; + if (this.transactionsQueue.isEmpty()) return undefined; + const payload = await this.transactionsQueue.dequeue(); const rawTransaction = await this.provider.generateRawTransaction(account.address(), payload, { providedSequenceNumber: sequenceNumber, }); - const signedTransaction = await AptosClient.generateBCSTransaction(account, rawTransaction); + const signedTransaction = AptosClient.generateBCSTransaction(account, rawTransaction); return signedTransaction; } @@ -185,15 +210,9 @@ export class TransactionWorker extends EventEmitter { */ async run() { try { - while (!this.stopped) { - /* eslint-disable no-await-in-loop, no-promise-executor-return */ - await Promise.all([this.submitNextTransaction(), this.processTransactions()]); - /** - * since run() function runs continuously in a loop, it prevents the execution - * from reaching a callback function (e.g when client wants to gracefuly stop the worker). - * Add a small delay between iterations to allow other code to run - /* eslint-disable no-await-in-loop */ - await new Promise((resolve) => setTimeout(resolve, 100)); + while (!this.taskQueue.isCancelled()) { + const task = await this.taskQueue.dequeue(); + await task(); } } catch (error: any) { throw new Error(error); @@ -208,7 +227,8 @@ export class TransactionWorker extends EventEmitter { throw new Error("worker has already started"); } this.started = true; - this.stopped = false; + this.taskQueue.enqueue(() => this.submitNextTransaction()); + this.taskQueue.enqueue(() => this.processTransactions()); this.run(); } @@ -216,10 +236,10 @@ export class TransactionWorker extends EventEmitter { * Stops the the transaction management process. */ stop() { - if (this.stopped) { + if (this.taskQueue.isCancelled()) { throw new Error("worker has already stopped"); } - this.stopped = true; this.started = false; + this.taskQueue.cancel(); } }