Skip to content

Commit

Permalink
implement async queue
Browse files Browse the repository at this point in the history
  • Loading branch information
0xmaayan committed Jul 21, 2023
1 parent e9afff4 commit d131d17
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ async function main() {

await Promise.all(funds);

console.log(`${funds.length} sender accounts funded in ${Date.now() / 1000 - last} seconds`);
last = Date.now() / 1000;
console.log(`${funds.length} sender accounts funded in ${last - start} seconds`);

// read sender accounts
const balances: Array<Promise<Types.AccountData>> = [];
Expand All @@ -64,8 +64,8 @@ async function main() {
}
await Promise.all(balances);

console.log(`${balances.length} sender account balances checked in ${Date.now() / 1000 - last} seconds`);
last = Date.now() / 1000;
console.log(`${balances.length} sender account balances checked in ${last - start} seconds`);

// create transactions
const payloads: any[] = [];
Expand All @@ -85,13 +85,12 @@ async function main() {
}
}

console.log(`sends ${totalTransactions * senders.length} transactions to chain....`);
console.log(`sends ${totalTransactions} transactions to chain....`);
// emit batch transactions
for (let i = 0; i < senders.length; i++) {
batchTransactions(payloads, senders[i]);
}
const promises = senders.map((sender) => batchTransactions(payloads, sender));
await Promise.all(promises);

function batchTransactions(payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) {
async function batchTransactions(payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) {
const transactionWorker = new TransactionWorker(provider, sender, 30, 100, 10);

transactionWorker.start();
Expand All @@ -100,7 +99,7 @@ async function main() {

// push transactions to worker queue
for (const payload in payloads) {
transactionWorker.push(payloads[payload]);
await transactionWorker.push(payloads[payload]);
}
}

Expand All @@ -115,8 +114,7 @@ async function main() {
transactionWorker.on("transactionSent", async (data) => {
// all expected transactions have been sent
if (data[0] === totalTransactions) {
last = Date.now() / 1000;
console.log(`transactions sent in ${last - start} seconds`);
console.log(`transactions sent in ${Date.now() / 1000 - last} seconds`);
}
});

Expand All @@ -133,10 +131,8 @@ async function main() {

transactionWorker.on("transactionExecuted", async (data) => {
// all expected transactions have been executed
console.log(data);
if (data[0] === totalTransactions) {
last = Date.now() / 1000;
console.log(`transactions executed in ${last - start} seconds`);
console.log(`transactions executed in ${Date.now() / 1000 - last} seconds`);
await checkAccounts();
}
});
Expand All @@ -154,25 +150,21 @@ async function main() {
}

// check for account's sequence numbers
const checkAccounts = async () => {
async function checkAccounts(): Promise<void> {
const waitFor: Array<Promise<Types.AccountData>> = [];
for (let i = 0; i < senders.length; i++) {
waitFor.push(provider.getAccount(senders[i].address()));
}

const res = await Promise.all(waitFor);
last = Date.now() / 1000;
console.log(`transactions verified in ${last - start} seconds`);
console.log(`transactions verified in ${Date.now() / 1000 - last} seconds`);
for (const account in res) {
const currentAccount = res[account] as Types.AccountData;
console.log(
`sender account ${currentAccount.authentication_key} final sequence number is ${currentAccount.sequence_number}`,
);
}
// exit for testing porpuses - this would stop the process. most cases we have all transactions
// commited, but in some rare cases we might be stopping it before last couple of transactions have commited.
exit(0);
};
}
}

main();
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export class AccountSequenceNumber {
nextNumber = this.currentNumber!;
this.currentNumber! += BigInt(1);
} catch (e) {
console.error("error in updating this account sequence number with the one on chain", e);
console.error("error in getting next sequence number for this account", e);
} finally {
this.lock = false;
}
Expand Down Expand Up @@ -150,6 +150,8 @@ export class AccountSequenceNumber {
await sleep(this.sleepTime);
}

this.lock = true;

try {
await this.update();
const startTime = now();
Expand All @@ -164,7 +166,7 @@ export class AccountSequenceNumber {
}
}
} catch (e) {
console.error("error in updating this account sequence number with the one on chain", e);
console.error("error in synchronizing this account sequence number with the one on chain", e);
} finally {
this.lock = false;
}
Expand Down
94 changes: 94 additions & 0 deletions ecosystem/typescript/sdk/src/transactions/async_queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* The AsyncQueue class is an async-aware data structure that provides a queue-like
* behavior for managing asynchronous tasks or operations.
* It allows to enqueue items and dequeue them asynchronously.
*/

export class AsyncQueue<T> {
private queue: T[] = [];

// The resolveMap is used to handle the resolution of promises when items are enqueued and dequeued.
private resolveMap: Map<number, (value: T) => void> = new Map();

private counter: number = 0;

private cancelled: boolean = false;

/**
* The enqueue method adds an item to the queue. If there are pending dequeued promises,
* in the resolveMap, it resolves the oldest promise with the enqueued item immediately.
* Otherwise, it adds the item to the queue.
*
* @param item T
*/
enqueue(item: T): void {
if (this.resolveMap.size > 0) {
const resolve = this.resolveMap.get(0);
if (resolve) {
this.resolveMap.delete(0);
resolve(item);
return;
}
}
this.queue.push(item);
}

/**
* The dequeue method returns a promise that resolves to the next item in the queue.
* If the queue is not empty, it resolves the promise immediately with the next item.
* Otherwise, it creates a new promise. The promise's resolve function is stored
* in the resolveMap with a unique counter value as the key.
* The newly created promise is then returned, and it will be resolved later when an item is enqueued.
*
* @returns Promise<T>
*/
async dequeue(): Promise<T> {
if (this.queue.length > 0) {
return Promise.resolve(this.queue.shift()!);
}
const promise = new Promise<T>((resolve) => {
this.counter += 1;
this.resolveMap.set(this.counter, resolve);
});
return promise;
}

/**
* The isEmpty method returns whether the queue is empty or not.
*
* @returns boolean
*/
isEmpty(): boolean {
return this.queue.length === 0;
}

/**
* The cancel method cancels all pending promises in the queue.
* It rejects the promises with a AsyncQueueCancelledError error,
* ensuring that any awaiting code can handle the cancellation appropriately.
*/
cancel(): void {
this.cancelled = true;
this.resolveMap.forEach(async (resolve) => {
resolve(await Promise.reject(new AsyncQueueCancelledError("Task cancelled")));
});
this.resolveMap.clear();
this.queue.length = 0;
}

/**
* The isCancelled method returns whether the queue is cancelled or not.
*
* @returns boolean
*/
isCancelled(): boolean {
return this.cancelled;
}
}

export class AsyncQueueCancelledError extends Error {
/* eslint-disable @typescript-eslint/no-useless-constructor */
constructor(message: string) {
super(message);
}
}
Loading

0 comments on commit d131d17

Please sign in to comment.