Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
address comments
Browse files Browse the repository at this point in the history
0xmaayan committed Jul 20, 2023
1 parent 7341272 commit 46ae683
Showing 5 changed files with 248 additions and 104 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
/**
* This example demonstrates how a client can utilize the TransactionWorker class.
*
* The TransactionWorker provides a simple framework for receiving payloads to be processed. It
* acquires an account new sequence number, produces a signed transaction and
* then submits the transaction. In other tasks, it waits for resolution of the submission
* process or get pre-execution validation error and waits for the resolution of the execution process
* or get an execution validation error.
*
* The TransactionWorker constructor accepts
* @param provider - a client provider
* @param sender - the sender account: AptosAccount
* @param maxWaitTime - the max wait time to wait before restarting the local sequence number to the current on-chain state
* @param maximumInFlight - submit up to `maximumInFlight` transactions per account
* @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating
*
* Read more about it here {@link https://aptos.dev/guides/transaction-management}
*/

import { AptosAccount, BCS, TxnBuilderTypes, TransactionWorker, FaucetClient, Provider, Types } from "aptos";
@@ -15,41 +30,42 @@ async function main() {
const transactionsCount = 100;
const totalTransactions = accountsCount * transactionsCount;

const start = Date.now() / 1000; // current time in seconds

console.log("starting...");
console.log(new Date().toTimeString());
// create senders and recipients accounts
const senders: AptosAccount[] = [];
const recipients: AptosAccount[] = [];
for (let i = 0; i < accountsCount; i++) {
senders.push(new AptosAccount());
recipients.push(new AptosAccount());
}
console.log(`${senders.length + recipients.length} sender and recipient accounts created`);
let last = Date.now() / 1000;
console.log(
`${senders.length} sender accounts and ${recipients.length} recipient accounts created in ${last - start} seconds`,
);

// funds sender accounts
// fund sender accounts
const funds: Array<Promise<string[]>> = [];

for (let i = 0; i < senders.length; i++) {
funds.push(faucet.fundAccount(senders[i].address().noPrefix(), 10000000000));
}

// send requests
await Promise.all(funds);
console.log(`${funds.length} sender accounts funded`);
for (const acc in senders) {
const curr = senders[acc] as AptosAccount;
console.log(curr.address().hex());
}

last = Date.now() / 1000;
console.log(`${funds.length} sender accounts funded in ${last - start} seconds`);

// read sender accounts
const balances: Array<Promise<Types.AccountData>> = [];
for (let i = 0; i < senders.length; i++) {
balances.push(provider.getAccount(senders[i].address().hex()));
}
// send requests
await Promise.all(balances);

console.log(`${balances.length} sender account balances checked`);
last = Date.now() / 1000;
console.log(`${balances.length} sender account balances checked in ${last - start} seconds`);

// create transactions
const payloads: any[] = [];
@@ -69,37 +85,72 @@ async function main() {
}
}

const batchTransactions = (payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) => {
const transactionWorker = new TransactionWorker(provider, sender);
const waitFor: Array<Promise<void>> = [];
console.log(`sends ${totalTransactions * senders.length} transactions to chain....`);
// emit batch transactions
for (let i = 0; i < senders.length; i++) {
batchTransactions(payloads, senders[i]);
}

function batchTransactions(payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) {
const transactionWorker = new TransactionWorker(provider, sender, 30, 100, 10);

transactionWorker.start();

transactionWorker.on("transactionsFulfilled", async (data) => {
registerToWorkerEvents(transactionWorker);

// push transactions to worker queue
for (const payload in payloads) {
transactionWorker.push(payloads[payload]);
}
}

function registerToWorkerEvents(transactionWorker: TransactionWorker) {
/**
* The callback from an event listener, i.e `data`, is an array with 2 elements
* data[0] - the amount of processed transactions
* data[1] -
* on a success event, is the hash value of the processed transaction
* on a failure event, is the reason for the failure
*/
transactionWorker.on("transactionSent", async (data) => {
// all expected transactions have been sent
if (data[0] === totalTransactions) {
last = Date.now() / 1000;
console.log(`transactions sent in ${last - start} seconds`);
}
});

transactionWorker.on("sentFailed", async (data) => {
/**
* data is an array with 2 elements
* data[0] = the amount of processed transactions
* data[1] = the hash value of the processed transaction
* transaction sent failed, up to the user to decide next steps.
* whether to stop the worker by transactionWorker.stop() and handle
* the error, or simply return the error to the end user.
* At this point, we have the failed transaction queue number
* and the transaction failure reason
*/
waitFor.push(provider.waitForTransaction(data[1], { checkSuccess: true }));
// all expected transactions have been fulfilled
console.log("sentFailed", data);
});

transactionWorker.on("transactionExecuted", async (data) => {
// all expected transactions have been executed
console.log(data);
if (data[0] === totalTransactions) {
await Promise.all(waitFor);
console.log("transactions submitted");
console.log(new Date().toTimeString());
last = Date.now() / 1000;
console.log(`transactions executed in ${last - start} seconds`);
await checkAccounts();
}
});

// push transactions to queue
for (const payload in payloads) {
transactionWorker.push(payloads[payload]);
}
};

// emit batch transactions
for (let i = 0; i < senders.length; i++) {
batchTransactions(payloads, senders[i]);
transactionWorker.on("executionFailed", async (data) => {
/**
* transaction execution failed, up to the user to decide next steps.
* whether to stop the worker by transactionWorker.stop() and handle
* the error, or simply return the error to the end user.
* At this point, we have the failed transaction queue number
* and the transaction object data
*/
console.log("executionFailed", data);
});
}

// check for account's sequence numbers
@@ -110,16 +161,16 @@ async function main() {
}

const res = await Promise.all(waitFor);
console.log(`transactions verified`);
console.log(new Date().toTimeString());
last = Date.now() / 1000;
console.log(`transactions verified in ${last - start} seconds`);
for (const account in res) {
const currentAccount = res[account] as Types.AccountData;
console.log(
`sender account ${currentAccount.authentication_key} final sequence number is ${currentAccount.sequence_number}`,
);
}
// exit for testing porpuses - this would stop the process. most cases we have all transactions
// commited, but in some rare cases we might be stopping it before last couple of transactions commited.
// commited, but in some rare cases we might be stopping it before last couple of transactions have commited.
exit(0);
};
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,33 @@
/**
* A wrapper that handles and manages an account sequence number.
*
* Submit up to `maximumInFlight` transactions per account in parallel with a timeout of `sleepTime`
* If local assumes `maximumInFlight` are in flight, determine the actual committed state from the network
* If there are less than `maximumInFlight` due to some being committed, adjust the window
* If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating
* If ever waiting more than `maxWaitTime` restart the sequence number to the current on-chain state
*
* Assumptions:
* Accounts are expected to be managed by a single AccountSequenceNumber and not used otherwise.
* They are initialized to the current on-chain state, so if there are already transactions in
* flight, they may take some time to reset.
* Accounts are automatically initialized if not explicitly
*
* Notes:
* This is co-routine safe, that is many async tasks can be reading from this concurrently.
* The state of an account cannot be used across multiple AccountSequenceNumber services.
* The synchronize method will create a barrier that prevents additional nextSequenceNumber
* calls until it is complete.
* This only manages the distribution of sequence numbers it does not help handle transaction
* failures.
* If a transaction fails, you should call synchronize and wait for timeouts.
*/

import { AptosAccount } from "../account";
import { Uint64 } from "../bcs";
import { Provider } from "../providers";
import { sleep } from "../utils";

// returns `now` time in seconds
const now = () => Math.floor(Date.now() / 1000);

export class AccountSequenceNumber {
@@ -11,38 +36,50 @@ export class AccountSequenceNumber {
readonly account: AptosAccount;

// sequence number on chain
lastUncommintedNumber: Uint64 | null = null;
lastUncommintedNumber: bigint | null = null;

// local sequence number
currentNumber: Uint64 | null = null;
currentNumber: bigint | null = null;

/**
* We want to guarantee that we preserve ordering of workers to requests.
*
* `lock` is used to try to prevent multiple coroutines from accessing a shared resource at the same time,
* which can result in race conditions and data inconsistency.
* This code actually doesn't do it though, since we aren't giving out a slot, it is still somewhat a race condition.
*
* The ideal solution is likely that each thread grabs the next number from a incremental integer.
* When they complete, they increment that number and that entity is able to enter the `lock`.
* That would guarantee ordering.
*/
lock = false;

// up to 100 outstanding sequence numbers
maximumInFlight = 100;
maxWaitTime: number;

sleepTime = 10;
maximumInFlight: number;

maxWaitTime = 30; // in seconds
sleepTime: number;

constructor(provider: Provider, account: AptosAccount) {
constructor(
provider: Provider,
account: AptosAccount,
maxWaitTime: number,
maximumInFlight: number,
sleepTime: number,
) {
this.provider = provider;
this.account = account;
this.maxWaitTime = maxWaitTime;
this.maximumInFlight = maximumInFlight;
this.sleepTime = sleepTime;
}

/**
* Returns the next available sequnce number on this account
* Returns the next available sequence number for this account
*
* @returns next available sequnce number
* @returns next available sequence number
*/
async nextSequenceNumber(): Promise<bigint | null> {
/*
`lock` is used to prevent multiple coroutines from accessing a shared resource at the same time,
which can result in race conditions and data inconsistency.
This implementation is not as robust as using a proper lock implementation
like `async-mutex` because it relies on busy waiting to acquire the lock,
which can be less efficient and may not work well in all scenarios
*/
/* eslint-disable no-await-in-loop */
while (this.lock) {
await sleep(this.sleepTime);
@@ -73,15 +110,15 @@ export class AccountSequenceNumber {
nextNumber = this.currentNumber!;
this.currentNumber! += BigInt(1);
} catch (e) {
console.error("error", e);
console.error("error in updating this account sequence number with the one on chain", e);
} finally {
this.lock = false;
}
return nextNumber;
}

/**
* Initializes this account with the sequnce number on chain
* Initializes this account with the sequence number on chain
*/
async initialize(): Promise<void> {
const { sequence_number: sequenceNumber } = await this.provider.getAccount(this.account.address());
@@ -90,9 +127,9 @@ export class AccountSequenceNumber {
}

/**
* Updates this account sequnce number with the one on-chain
* Updates this account sequence number with the one on-chain
*
* @returns on-chain sequnce number for this account
* @returns on-chain sequence number for this account
*/
async update(): Promise<bigint> {
const { sequence_number: sequenceNumber } = await this.provider.getAccount(this.account.address());
@@ -127,7 +164,7 @@ export class AccountSequenceNumber {
}
}
} catch (e) {
console.error("error", e);
console.error("error in updating this account sequence number with the one on chain", e);
} finally {
this.lock = false;
}
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ import { getFaucetClient, longTestTimeout, PROVIDER_LOCAL_NETWORK_CONFIG } from
const provider = new Provider(PROVIDER_LOCAL_NETWORK_CONFIG);
const account = new AptosAccount();
const faucet = getFaucetClient();
const accountSequenceNumber = new AccountSequenceNumber(provider, account);
const accountSequenceNumber = new AccountSequenceNumber(provider, account, 30, 100, 10);
let getAccountSpy: jest.SpyInstance;

let lastSeqNumber: bigint | null;
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ describe("transactionWorker", () => {
});

test(
"index",
"submits 5 transactions to chain for a single account",
(done) => {
// Specify the number of assertions expected
expect.assertions(1);
@@ -35,7 +35,7 @@ describe("transactionWorker", () => {
const payloads = [...Array(5).fill(txn)];

// start transactions worker
const transactionWorker = new TransactionWorker(provider, sender);
const transactionWorker = new TransactionWorker(provider, sender, 30, 100, 10);
transactionWorker.start();

// push transactions to queue
144 changes: 100 additions & 44 deletions ecosystem/typescript/sdk/src/transactions/transaction_worker.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
/**
* Provides a simple framework for receiving payloads to be processed.
*
* Once one `start()` the process, the worker acquires the current account next sequence number
* (by using the AccountSequenceNumber class), generates a signed transaction and pushes an async
* submission process into a `outstandingTransactions` queue.
* At the same time, the worker processes transactions by reading the `outstandingTransactions` queue
* and submits the next transaction to chain, it
* 1) waits for resolution of the submission process or get pre-execution validation error
* and 2) waits for the resolution of the execution process or get an execution error.
* The worker fires events for any submission and/or execution success and/or failure.
*/

import EventEmitter from "eventemitter3";
import { AptosAccount } from "../account";
import { PendingTransaction } from "../generated";
import { PendingTransaction, Transaction } from "../generated";
import { AptosClient, Provider } from "../providers";
import { TxnBuilderTypes } from "../transaction_builder";
import { AccountSequenceNumber } from "./account_sequence_number";

// Events
const transactionSent = "transactionSent";
const sentFailed = "sentFailed";

const transactionExecuted = "transactionExecuted";
const executionFailed = "executionFailed";
export class TransactionWorker extends EventEmitter {
readonly provider: Provider;

@@ -27,15 +46,33 @@ export class TransactionWorker extends EventEmitter {
outstandingTransactions: Array<[Promise<PendingTransaction>, bigint]> = [];

// transactions that have been submitted to chain
processedTransactions: Array<[string, bigint, any]> = [];
sentTransactions: Array<[string, bigint, any]> = [];

// transactions that have been committed to chain
executedTransactions: Array<[string, bigint, any]> = [];

constructor(provider: Provider, account: AptosAccount) {
/**
* Provides a simple framework for receiving payloads to be processed.
*
* @param provider - a client provider
* @param sender - a sender as AptosAccount
* @param maxWaitTime - the max wait time to wait before resyncing the sequence number to the current on-chain state
* @param maximumInFlight - submit up to `maximumInFlight` transactions per account
* @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating
*/
constructor(
provider: Provider,
account: AptosAccount,
maxWaitTime: number,
maximumInFlight: number,
sleepTime: number,
) {
super();
this.provider = provider;
this.account = account;
this.started = false;
this.stopped = false;
this.accountSequnceNumber = new AccountSequenceNumber(provider, account);
this.accountSequnceNumber = new AccountSequenceNumber(provider, account, maxWaitTime, maximumInFlight, sleepTime);
}

/**
@@ -44,18 +81,14 @@ export class TransactionWorker extends EventEmitter {
* adds the transaction to the outstanding transaction queue
* to be processed later.
*/
async submitTransactions() {
try {
if (this.transactionsQueue.length === 0) return;
const sequenceNumber = await this.accountSequnceNumber.nextSequenceNumber();
if (sequenceNumber === null) return;
const transaction = await this.generateNextTransaction(this.account, sequenceNumber);
if (!transaction) return;
const pendingTransaction = this.provider.submitSignedBCSTransaction(transaction);
this.outstandingTransactions.push([pendingTransaction, sequenceNumber]);
} catch (error: any) {
throw new Error(error);
}
async submitNextTransaction() {
if (this.transactionsQueue.length === 0) return;
const sequenceNumber = await this.accountSequnceNumber.nextSequenceNumber();
if (sequenceNumber === null) return;
const transaction = await this.generateNextTransaction(this.account, sequenceNumber);
if (!transaction) return;
const pendingTransaction = this.provider.submitSignedBCSTransaction(transaction);
this.outstandingTransactions.push([pendingTransaction, sequenceNumber]);
}

/**
@@ -68,35 +101,58 @@ export class TransactionWorker extends EventEmitter {
* transactions queue with the failure reason and fires a transactionsFailed event.
*/
async processTransactions() {
try {
const awaitingTransactions = [];
const awaitingSequenceNumbers = [];
const awaitingTransactions = [];
const awaitingSequenceNumbers = [];

while (this.outstandingTransactions.length > 0) {
const [pendingTransaction, sequenceNumber] = this.outstandingTransactions.shift()!;

while (this.outstandingTransactions.length > 0) {
const [pendingTransaction, sequenceNumber] = this.outstandingTransactions.shift()!;
awaitingTransactions.push(pendingTransaction);
awaitingSequenceNumbers.push(sequenceNumber);
}

awaitingTransactions.push(pendingTransaction);
awaitingSequenceNumbers.push(sequenceNumber);
// send awaiting transactions to chain
const sentTransactions = await Promise.allSettled(awaitingTransactions);

for (let i = 0; i < sentTransactions.length && i < awaitingSequenceNumbers.length; i += 1) {
// check sent transaction status
const sentTransaction = sentTransactions[i];
const sequenceNumber = awaitingSequenceNumbers[i];
if (sentTransaction.status === "fulfilled") {
// transaction sent to chain
this.sentTransactions.push([sentTransaction.value.hash, sequenceNumber, null]);
this.emit(transactionSent, [this.sentTransactions.length, sentTransaction.value.hash]);
// check sent transaction execution
this.checkTransaction(sentTransaction, sequenceNumber);
} else {
// send transaction failed
this.sentTransactions.push([sentTransaction.status, sequenceNumber, sentTransaction.reason]);
this.emit(sentFailed, [this.sentTransactions.length, sentTransaction.reason]);
}
}
}

try {
const outputs = await Promise.allSettled(awaitingTransactions);
for (let i = 0; i < outputs.length && i < awaitingSequenceNumbers.length; i += 1) {
const output = outputs[i];
const sequenceNumber = awaitingSequenceNumbers[i];

if (output.status === "fulfilled") {
this.processedTransactions.push([output.value.hash, sequenceNumber, null]);
this.emit("transactionsFulfilled", [this.processedTransactions.length, output.value.hash]);
} else {
this.processedTransactions.push([output.status, sequenceNumber, output.reason]);
}
}
} catch (error: any) {
throw new Error(error);
/**
* Once transaction has been sent to chain, we check for its execution status.
* @param sentTransaction transactions that were sent to chain and are now waiting to be executed
* @param sequenceNumber the account's sequence number that was sent with the transaction
*/
async checkTransaction(sentTransaction: PromiseFulfilledResult<PendingTransaction>, sequenceNumber: bigint) {
const waitFor: Array<Promise<Transaction>> = [];
waitFor.push(this.provider.waitForTransactionWithResult(sentTransaction.value.hash, { checkSuccess: true }));
const sentTransactions = await Promise.allSettled(waitFor);

for (let i = 0; i < sentTransactions.length; i += 1) {
const executedTransaction = sentTransactions[i];
if (executedTransaction.status === "fulfilled") {
// transaction executed to chain
this.executedTransactions.push([executedTransaction.value.hash, sequenceNumber, null]);
this.emit(transactionExecuted, [this.executedTransactions.length, executedTransaction.value.hash]);
} else {
// transaction execution failed
this.executedTransactions.push([executedTransaction.status, sequenceNumber, executedTransaction.reason]);
this.emit(executionFailed, [this.executedTransactions.length, executedTransaction.reason]);
}
} catch (error: any) {
throw new Error(error);
}
}

@@ -127,13 +183,13 @@ export class TransactionWorker extends EventEmitter {
/**
* Starts transaction submission and transaction processing.
*/
async runTransactions() {
async run() {
try {
while (!this.stopped) {
/* eslint-disable no-await-in-loop, no-promise-executor-return */
await Promise.all([this.submitTransactions(), this.processTransactions()]);
await Promise.all([this.submitNextTransaction(), this.processTransactions()]);
/**
* since runTransactions() function runs continuously in a loop, it prevents the execution
* since run() function runs continuously in a loop, it prevents the execution
* from reaching a callback function (e.g when client wants to gracefuly stop the worker).
* Add a small delay between iterations to allow other code to run
/* eslint-disable no-await-in-loop */
@@ -153,7 +209,7 @@ export class TransactionWorker extends EventEmitter {
}
this.started = true;
this.stopped = false;
this.runTransactions();
this.run();
}

/**

0 comments on commit 46ae683

Please sign in to comment.