Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
0xmaayan committed Jul 27, 2023
1 parent f64b9b8 commit 2583fc8
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ async function main() {
await Promise.all(promises);

async function batchTransactions(payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) {
const transactionWorker = new TransactionWorker(provider, sender, 30, 100, 10);
const transactionWorker = new TransactionWorker(provider, sender);

transactionWorker.start();

Expand All @@ -127,7 +127,7 @@ async function main() {
}
});

transactionWorker.on(TransactionWorkerEvents.SentFailed, async (data) => {
transactionWorker.on(TransactionWorkerEvents.TransactionSendFailed, async (data) => {
/**
* transaction sent failed, up to the user to decide next steps.
* whether to stop the worker by transactionWorker.stop() and handle
Expand All @@ -146,7 +146,7 @@ async function main() {
}
});

transactionWorker.on(TransactionWorkerEvents.ExecutionFailed, async (data) => {
transactionWorker.on(TransactionWorkerEvents.TransactionExecutionFailed, async (data) => {
/**
* transaction execution failed, up to the user to decide next steps.
* whether to stop the worker by transactionWorker.stop() and handle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ export class AccountSequenceNumber {
async synchronize(): Promise<void> {
if (this.lastUncommintedNumber === this.currentNumber) return;

/* eslint-disable no-await-in-loop */
while (this.lock) {
await sleep(this.sleepTime);
}
Expand Down
4 changes: 3 additions & 1 deletion ecosystem/typescript/sdk/src/transactions/async_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
* The AsyncQueue class is an async-aware data structure that provides a queue-like
* behavior for managing asynchronous tasks or operations.
* It allows to enqueue items and dequeue them asynchronously.
* This is not thread-safe but it is async concurrency safe and
* it does not guarantee ordering for those that call into and await on enqueue.
*/

export class AsyncQueue<T> {
private queue: T[] = [];
readonly queue: T[] = [];

// The resolveMap is used to handle the resolution of promises when items are enqueued and dequeued.
private resolveMap: Map<number, (value: T) => void> = new Map();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { bcsToBytes, bcsSerializeUint64 } from "../../bcs";
import { Provider } from "../../providers";
import { TxnBuilderTypes } from "../../transaction_builder";
import { getFaucetClient, longTestTimeout, PROVIDER_LOCAL_NETWORK_CONFIG } from "../../tests/unit/test_helper.test";
import { TransactionWorker } from "../transaction_worker";
import { TransactionWorker, TransactionWorkerEvents } from "../transaction_worker";

const provider = new Provider(PROVIDER_LOCAL_NETWORK_CONFIG);

Expand All @@ -17,6 +17,46 @@ describe("transactionWorker", () => {
await faucet.fundAccount(sender.address(), 1000000000);
});

test("throws when starting an already started worker", async () => {
// start transactions worker
const transactionWorker = new TransactionWorker(provider, sender);
transactionWorker.start();
expect(async () => {
transactionWorker.start();
}).rejects.toThrow(`worker has already started`);
});

test("throws when stopping an already stopped worker", async () => {
// start transactions worker
const transactionWorker = new TransactionWorker(provider, sender);
transactionWorker.start();
transactionWorker.stop();
expect(async () => {
transactionWorker.stop();
}).rejects.toThrow(`worker has already stopped`);
});

test(
"adds transaction into the transactionsQueue",
async () => {
const transactionWorker = new TransactionWorker(provider, sender);
transactionWorker.start();
const txn = new TxnBuilderTypes.TransactionPayloadEntryFunction(
TxnBuilderTypes.EntryFunction.natural(
"0x1::aptos_account",
"transfer",
[],
[bcsToBytes(TxnBuilderTypes.AccountAddress.fromHex(recipient.address())), bcsSerializeUint64(5)],
),
);
transactionWorker.push(txn).then(() => {
transactionWorker.stop();
expect(transactionWorker.transactionsQueue.queue).toHaveLength(1);
});
},
longTestTimeout,
);

test(
"submits 5 transactions to chain for a single account",
(done) => {
Expand All @@ -35,7 +75,7 @@ describe("transactionWorker", () => {
const payloads = [...Array(5).fill(txn)];

// start transactions worker
const transactionWorker = new TransactionWorker(provider, sender, 30, 100, 10);
const transactionWorker = new TransactionWorker(provider, sender);
transactionWorker.start();

// push transactions to queue
Expand Down
54 changes: 36 additions & 18 deletions ecosystem/typescript/sdk/src/transactions/transaction_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
/**
* TransactionWorker provides a simple framework for receiving payloads to be processed.
*
* Once one `start()` the process, the worker acquires the current account's next sequence number
* (by using the AccountSequenceNumber class), generates a signed transaction and pushes an async
* submission process into a `outstandingTransactions` queue.
* Once one `start()` the process and pushes a new transaction, the worker acquires
* the current account's next sequence number (by using the AccountSequenceNumber class),
* generates a signed transaction and pushes an async submission process into the `outstandingTransactions` queue.
* At the same time, the worker processes transactions by reading the `outstandingTransactions` queue
* and submits the next transaction to chain, it
* 1) waits for resolution of the submission process or get pre-execution validation error
Expand All @@ -25,9 +25,9 @@ const promiseFulfilledStatus = "fulfilled";

export enum TransactionWorkerEvents {
TransactionSent = "transactionSent",
SentFailed = "sentFailed",
TransactionSendFailed = "transactionsendFailed",
TransactionExecuted = "transactionExecuted",
ExecutionFailed = "executionFailed",
TransactionExecutionFailed = "transactionexecutionFailed",
}

export class TransactionWorker extends EventEmitter<TransactionWorkerEvents> {
Expand All @@ -43,34 +43,45 @@ export class TransactionWorker extends EventEmitter<TransactionWorkerEvents> {
// process has started
started: boolean;

// transactions payloads waiting to be generated and signed
// TODO support entry function payload from ABI builder
/**
* transactions payloads waiting to be generated and signed
*
* TODO support entry function payload from ABI builder
*/
transactionsQueue = new AsyncQueue<TxnBuilderTypes.TransactionPayload>();

// signed transactions waiting to be submitted
/**
* signed transactions waiting to be submitted
*/
outstandingTransactions = new AsyncQueue<[Promise<PendingTransaction>, bigint]>();

// transactions that have been submitted to chain
/**
* transactions that have been submitted to chain
*/
sentTransactions: Array<[string, bigint, any]> = [];

// transactions that have been committed to chain
/**
* transactions that have been committed to chain
*/
executedTransactions: Array<[string, bigint, any]> = [];

/**
* Provides a simple framework for receiving payloads to be processed.
*
* @param provider - a client provider
* @param sender - a sender as AptosAccount
* @param maxWaitTime - the max wait time to wait before resyncing the sequence number to the current on-chain state
* @param maximumInFlight - submit up to `maximumInFlight` transactions per account
* @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating
* @param maxWaitTime - the max wait time to wait before resyncing the sequence number
* to the current on-chain state, default to 30
* @param maximumInFlight - submit up to `maximumInFlight` transactions per account.
* Mempool limits the number of transactions per account to 100, hence why we default to 100.
* @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating, default to 10
*/
constructor(
provider: Provider,
account: AptosAccount,
maxWaitTime: number,
maximumInFlight: number,
sleepTime: number,
maxWaitTime: number = 30,
maximumInFlight: number = 100,
sleepTime: number = 10,
) {
super();
this.provider = provider;
Expand Down Expand Up @@ -101,6 +112,8 @@ export class TransactionWorker extends EventEmitter<TransactionWorkerEvents> {
if (error instanceof AsyncQueueCancelledError) {
return;
}
// TODO use future log service
/* eslint-disable no-console */
console.log(error);
}
}
Expand Down Expand Up @@ -149,14 +162,19 @@ export class TransactionWorker extends EventEmitter<TransactionWorkerEvents> {
} else {
// send transaction failed
this.sentTransactions.push([sentTransaction.status, sequenceNumber, sentTransaction.reason]);
this.emit(TransactionWorkerEvents.SentFailed, [this.sentTransactions.length, sentTransaction.reason]);
this.emit(TransactionWorkerEvents.TransactionSendFailed, [
this.sentTransactions.length,
sentTransaction.reason,
]);
}
}
}
} catch (error: any) {
if (error instanceof AsyncQueueCancelledError) {
return;
}
// TODO use future log service
/* eslint-disable no-console */
console.log(error);
}
}
Expand All @@ -183,7 +201,7 @@ export class TransactionWorker extends EventEmitter<TransactionWorkerEvents> {
} else {
// transaction execution failed
this.executedTransactions.push([executedTransaction.status, sequenceNumber, executedTransaction.reason]);
this.emit(TransactionWorkerEvents.ExecutionFailed, [
this.emit(TransactionWorkerEvents.TransactionExecutionFailed, [
this.executedTransactions.length,
executedTransaction.reason,
]);
Expand Down

0 comments on commit 2583fc8

Please sign in to comment.