From 2583fc8357c98d5bc448f0d63f5903046a8c992d Mon Sep 17 00:00:00 2001 From: maayan Date: Fri, 21 Jul 2023 11:06:06 -0400 Subject: [PATCH] address feedback --- .../typescript/transactions_management.ts | 6 +-- .../transactions/account_sequence_number.ts | 1 + .../sdk/src/transactions/async_queue.ts | 4 +- .../tests/transaction_worker.test.ts | 44 ++++++++++++++- .../src/transactions/transaction_worker.ts | 54 ++++++++++++------- 5 files changed, 85 insertions(+), 24 deletions(-) diff --git a/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts b/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts index fa974949602e3c..7793fd76e70d39 100644 --- a/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts +++ b/ecosystem/typescript/sdk/examples/typescript/transactions_management.ts @@ -100,7 +100,7 @@ async function main() { await Promise.all(promises); async function batchTransactions(payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) { - const transactionWorker = new TransactionWorker(provider, sender, 30, 100, 10); + const transactionWorker = new TransactionWorker(provider, sender); transactionWorker.start(); @@ -127,7 +127,7 @@ async function main() { } }); - transactionWorker.on(TransactionWorkerEvents.SentFailed, async (data) => { + transactionWorker.on(TransactionWorkerEvents.TransactionSendFailed, async (data) => { /** * transaction sent failed, up to the user to decide next steps. * whether to stop the worker by transactionWorker.stop() and handle @@ -146,7 +146,7 @@ async function main() { } }); - transactionWorker.on(TransactionWorkerEvents.ExecutionFailed, async (data) => { + transactionWorker.on(TransactionWorkerEvents.TransactionExecutionFailed, async (data) => { /** * transaction execution failed, up to the user to decide next steps. * whether to stop the worker by transactionWorker.stop() and handle diff --git a/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts b/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts index d51fea255c8170..0b0644c32a70d3 100644 --- a/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts +++ b/ecosystem/typescript/sdk/src/transactions/account_sequence_number.ts @@ -146,6 +146,7 @@ export class AccountSequenceNumber { async synchronize(): Promise { if (this.lastUncommintedNumber === this.currentNumber) return; + /* eslint-disable no-await-in-loop */ while (this.lock) { await sleep(this.sleepTime); } diff --git a/ecosystem/typescript/sdk/src/transactions/async_queue.ts b/ecosystem/typescript/sdk/src/transactions/async_queue.ts index 21e3421eff5b2a..1800438277f10a 100644 --- a/ecosystem/typescript/sdk/src/transactions/async_queue.ts +++ b/ecosystem/typescript/sdk/src/transactions/async_queue.ts @@ -2,10 +2,12 @@ * The AsyncQueue class is an async-aware data structure that provides a queue-like * behavior for managing asynchronous tasks or operations. * It allows to enqueue items and dequeue them asynchronously. + * This is not thread-safe but it is async concurrency safe and + * it does not guarantee ordering for those that call into and await on enqueue. */ export class AsyncQueue { - private queue: T[] = []; + readonly queue: T[] = []; // The resolveMap is used to handle the resolution of promises when items are enqueued and dequeued. private resolveMap: Map void> = new Map(); diff --git a/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts b/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts index fb2046678a5710..d333038d03ed7a 100644 --- a/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts +++ b/ecosystem/typescript/sdk/src/transactions/tests/transaction_worker.test.ts @@ -3,7 +3,7 @@ import { bcsToBytes, bcsSerializeUint64 } from "../../bcs"; import { Provider } from "../../providers"; import { TxnBuilderTypes } from "../../transaction_builder"; import { getFaucetClient, longTestTimeout, PROVIDER_LOCAL_NETWORK_CONFIG } from "../../tests/unit/test_helper.test"; -import { TransactionWorker } from "../transaction_worker"; +import { TransactionWorker, TransactionWorkerEvents } from "../transaction_worker"; const provider = new Provider(PROVIDER_LOCAL_NETWORK_CONFIG); @@ -17,6 +17,46 @@ describe("transactionWorker", () => { await faucet.fundAccount(sender.address(), 1000000000); }); + test("throws when starting an already started worker", async () => { + // start transactions worker + const transactionWorker = new TransactionWorker(provider, sender); + transactionWorker.start(); + expect(async () => { + transactionWorker.start(); + }).rejects.toThrow(`worker has already started`); + }); + + test("throws when stopping an already stopped worker", async () => { + // start transactions worker + const transactionWorker = new TransactionWorker(provider, sender); + transactionWorker.start(); + transactionWorker.stop(); + expect(async () => { + transactionWorker.stop(); + }).rejects.toThrow(`worker has already stopped`); + }); + + test( + "adds transaction into the transactionsQueue", + async () => { + const transactionWorker = new TransactionWorker(provider, sender); + transactionWorker.start(); + const txn = new TxnBuilderTypes.TransactionPayloadEntryFunction( + TxnBuilderTypes.EntryFunction.natural( + "0x1::aptos_account", + "transfer", + [], + [bcsToBytes(TxnBuilderTypes.AccountAddress.fromHex(recipient.address())), bcsSerializeUint64(5)], + ), + ); + transactionWorker.push(txn).then(() => { + transactionWorker.stop(); + expect(transactionWorker.transactionsQueue.queue).toHaveLength(1); + }); + }, + longTestTimeout, + ); + test( "submits 5 transactions to chain for a single account", (done) => { @@ -35,7 +75,7 @@ describe("transactionWorker", () => { const payloads = [...Array(5).fill(txn)]; // start transactions worker - const transactionWorker = new TransactionWorker(provider, sender, 30, 100, 10); + const transactionWorker = new TransactionWorker(provider, sender); transactionWorker.start(); // push transactions to queue diff --git a/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts b/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts index 6acd3e996914e7..52a330c369b0ac 100644 --- a/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts +++ b/ecosystem/typescript/sdk/src/transactions/transaction_worker.ts @@ -3,9 +3,9 @@ /** * TransactionWorker provides a simple framework for receiving payloads to be processed. * - * Once one `start()` the process, the worker acquires the current account's next sequence number - * (by using the AccountSequenceNumber class), generates a signed transaction and pushes an async - * submission process into a `outstandingTransactions` queue. + * Once one `start()` the process and pushes a new transaction, the worker acquires + * the current account's next sequence number (by using the AccountSequenceNumber class), + * generates a signed transaction and pushes an async submission process into the `outstandingTransactions` queue. * At the same time, the worker processes transactions by reading the `outstandingTransactions` queue * and submits the next transaction to chain, it * 1) waits for resolution of the submission process or get pre-execution validation error @@ -25,9 +25,9 @@ const promiseFulfilledStatus = "fulfilled"; export enum TransactionWorkerEvents { TransactionSent = "transactionSent", - SentFailed = "sentFailed", + TransactionSendFailed = "transactionsendFailed", TransactionExecuted = "transactionExecuted", - ExecutionFailed = "executionFailed", + TransactionExecutionFailed = "transactionexecutionFailed", } export class TransactionWorker extends EventEmitter { @@ -43,17 +43,26 @@ export class TransactionWorker extends EventEmitter { // process has started started: boolean; - // transactions payloads waiting to be generated and signed - // TODO support entry function payload from ABI builder + /** + * transactions payloads waiting to be generated and signed + * + * TODO support entry function payload from ABI builder + */ transactionsQueue = new AsyncQueue(); - // signed transactions waiting to be submitted + /** + * signed transactions waiting to be submitted + */ outstandingTransactions = new AsyncQueue<[Promise, bigint]>(); - // transactions that have been submitted to chain + /** + * transactions that have been submitted to chain + */ sentTransactions: Array<[string, bigint, any]> = []; - // transactions that have been committed to chain + /** + * transactions that have been committed to chain + */ executedTransactions: Array<[string, bigint, any]> = []; /** @@ -61,16 +70,18 @@ export class TransactionWorker extends EventEmitter { * * @param provider - a client provider * @param sender - a sender as AptosAccount - * @param maxWaitTime - the max wait time to wait before resyncing the sequence number to the current on-chain state - * @param maximumInFlight - submit up to `maximumInFlight` transactions per account - * @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating + * @param maxWaitTime - the max wait time to wait before resyncing the sequence number + * to the current on-chain state, default to 30 + * @param maximumInFlight - submit up to `maximumInFlight` transactions per account. + * Mempool limits the number of transactions per account to 100, hence why we default to 100. + * @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating, default to 10 */ constructor( provider: Provider, account: AptosAccount, - maxWaitTime: number, - maximumInFlight: number, - sleepTime: number, + maxWaitTime: number = 30, + maximumInFlight: number = 100, + sleepTime: number = 10, ) { super(); this.provider = provider; @@ -101,6 +112,8 @@ export class TransactionWorker extends EventEmitter { if (error instanceof AsyncQueueCancelledError) { return; } + // TODO use future log service + /* eslint-disable no-console */ console.log(error); } } @@ -149,7 +162,10 @@ export class TransactionWorker extends EventEmitter { } else { // send transaction failed this.sentTransactions.push([sentTransaction.status, sequenceNumber, sentTransaction.reason]); - this.emit(TransactionWorkerEvents.SentFailed, [this.sentTransactions.length, sentTransaction.reason]); + this.emit(TransactionWorkerEvents.TransactionSendFailed, [ + this.sentTransactions.length, + sentTransaction.reason, + ]); } } } @@ -157,6 +173,8 @@ export class TransactionWorker extends EventEmitter { if (error instanceof AsyncQueueCancelledError) { return; } + // TODO use future log service + /* eslint-disable no-console */ console.log(error); } } @@ -183,7 +201,7 @@ export class TransactionWorker extends EventEmitter { } else { // transaction execution failed this.executedTransactions.push([executedTransaction.status, sequenceNumber, executedTransaction.reason]); - this.emit(TransactionWorkerEvents.ExecutionFailed, [ + this.emit(TransactionWorkerEvents.TransactionExecutionFailed, [ this.executedTransactions.length, executedTransaction.reason, ]);