Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
0xmaayan committed Jul 17, 2023
1 parent adbb462 commit acc32e8
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 104 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
/**
* This example demonstrates how a client can utilize the TransactionWorker class.
*
* The TransactionWorker provides a simple framework for receiving payloads to be processed. It
* acquires an account new sequence number, produces a signed transaction and
* then submits the transaction. In other tasks, it waits for resolution of the submission
* process or get pre-execution validation error and waits for the resolution of the execution process
* or get an execution validation error.
*
* The TransactionWorker constructor accepts
* @param provider - a client provider
* @param sender - the sender account: AptosAccount
* @param maxWaitTime - the max wait time to wait before restarting the local sequence number to the current on-chain state
* @param maximumInFlight - submit up to `maximumInFlight` transactions per account
* @param sleepTime - If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating
*
* Read more about it here {@link https://aptos.dev/guides/transaction-management}
*/

import { AptosAccount, BCS, TxnBuilderTypes, TransactionWorker, FaucetClient, Provider, Types } from "aptos";
Expand All @@ -15,41 +30,42 @@ async function main() {
const transactionsCount = 100;
const totalTransactions = accountsCount * transactionsCount;

const start = Date.now() / 1000; // current time in seconds

console.log("starting...");
console.log(new Date().toTimeString());
// create senders and recipients accounts
const senders: AptosAccount[] = [];
const recipients: AptosAccount[] = [];
for (let i = 0; i < accountsCount; i++) {
senders.push(new AptosAccount());
recipients.push(new AptosAccount());
}
console.log(`${senders.length + recipients.length} sender and recipient accounts created`);
let last = Date.now() / 1000;
console.log(
`${senders.length} sender accounts and ${recipients.length} recipient accounts created in ${last - start} seconds`,
);

// funds sender accounts
// fund sender accounts
const funds: Array<Promise<string[]>> = [];

for (let i = 0; i < senders.length; i++) {
funds.push(faucet.fundAccount(senders[i].address().noPrefix(), 10000000000));
}

// send requests
await Promise.all(funds);
console.log(`${funds.length} sender accounts funded`);
for (const acc in senders) {
const curr = senders[acc] as AptosAccount;
console.log(curr.address().hex());
}

last = Date.now() / 1000;
console.log(`${funds.length} sender accounts funded in ${last - start} seconds`);

// read sender accounts
const balances: Array<Promise<Types.AccountData>> = [];
for (let i = 0; i < senders.length; i++) {
balances.push(provider.getAccount(senders[i].address().hex()));
}
// send requests
await Promise.all(balances);

console.log(`${balances.length} sender account balances checked`);
last = Date.now() / 1000;
console.log(`${balances.length} sender account balances checked in ${last - start} seconds`);

// create transactions
const payloads: any[] = [];
Expand All @@ -69,37 +85,72 @@ async function main() {
}
}

const batchTransactions = (payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) => {
const transactionWorker = new TransactionWorker(provider, sender);
const waitFor: Array<Promise<void>> = [];
console.log(`sends ${totalTransactions * senders.length} transactions to chain....`);
// emit batch transactions
for (let i = 0; i < senders.length; i++) {
batchTransactions(payloads, senders[i]);
}

function batchTransactions(payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) {
const transactionWorker = new TransactionWorker(provider, sender, 30, 100, 10);

transactionWorker.start();

transactionWorker.on("transactionsFulfilled", async (data) => {
registerToWorkerEvents(transactionWorker);

// push transactions to worker queue
for (const payload in payloads) {
transactionWorker.push(payloads[payload]);
}
}

function registerToWorkerEvents(transactionWorker: TransactionWorker) {
/**
* The callback from an event listener, i.e `data`, is an array with 2 elements
* data[0] - the amount of processed transactions
* data[1] -
* on a success event, is the hash value of the processed transaction
* on a failure event, is the reason for the failure
*/
transactionWorker.on("transactionSent", async (data) => {
// all expected transactions have been sent
if (data[0] === totalTransactions) {
last = Date.now() / 1000;
console.log(`transactions sent in ${last - start} seconds`);
}
});

transactionWorker.on("sentFailed", async (data) => {
/**
* data is an array with 2 elements
* data[0] = the amount of processed transactions
* data[1] = the hash value of the processed transaction
* transaction sent failed, up to the user to decide next steps.
* whether to stop the worker by transactionWorker.stop() and handle
* the error, or simply return the error to the end user.
* At this point, we have the failed transaction queue number
* and the transaction failure reason
*/
waitFor.push(provider.waitForTransaction(data[1], { checkSuccess: true }));
// all expected transactions have been fulfilled
console.log("sentFailed", data);
});

transactionWorker.on("transactionExecuted", async (data) => {
// all expected transactions have been executed
console.log(data);
if (data[0] === totalTransactions) {
await Promise.all(waitFor);
console.log("transactions submitted");
console.log(new Date().toTimeString());
last = Date.now() / 1000;
console.log(`transactions executed in ${last - start} seconds`);
await checkAccounts();
}
});

// push transactions to queue
for (const payload in payloads) {
transactionWorker.push(payloads[payload]);
}
};

// emit batch transactions
for (let i = 0; i < senders.length; i++) {
batchTransactions(payloads, senders[i]);
transactionWorker.on("executionFailed", async (data) => {
/**
* transaction execution failed, up to the user to decide next steps.
* whether to stop the worker by transactionWorker.stop() and handle
* the error, or simply return the error to the end user.
* At this point, we have the failed transaction queue number
* and the transaction object data
*/
console.log("executionFailed", data);
});
}

// check for account's sequence numbers
Expand All @@ -110,16 +161,16 @@ async function main() {
}

const res = await Promise.all(waitFor);
console.log(`transactions verified`);
console.log(new Date().toTimeString());
last = Date.now() / 1000;
console.log(`transactions verified in ${last - start} seconds`);
for (const account in res) {
const currentAccount = res[account] as Types.AccountData;
console.log(
`sender account ${currentAccount.authentication_key} final sequence number is ${currentAccount.sequence_number}`,
);
}
// exit for testing porpuses - this would stop the process. most cases we have all transactions
// commited, but in some rare cases we might be stopping it before last couple of transactions commited.
// commited, but in some rare cases we might be stopping it before last couple of transactions have commited.
exit(0);
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,33 @@
/**
* A wrapper that handles and manages an account sequence number.
*
* Submit up to `maximumInFlight` transactions per account in parallel with a timeout of `sleepTime`
* If local assumes `maximumInFlight` are in flight, determine the actual committed state from the network
* If there are less than `maximumInFlight` due to some being committed, adjust the window
* If `maximumInFlight` are in flight, wait `sleepTime` seconds before re-evaluating
* If ever waiting more than `maxWaitTime` restart the sequence number to the current on-chain state
*
* Assumptions:
* Accounts are expected to be managed by a single AccountSequenceNumber and not used otherwise.
* They are initialized to the current on-chain state, so if there are already transactions in
* flight, they may take some time to reset.
* Accounts are automatically initialized if not explicitly
*
* Notes:
* This is co-routine safe, that is many async tasks can be reading from this concurrently.
* The state of an account cannot be used across multiple AccountSequenceNumber services.
* The synchronize method will create a barrier that prevents additional nextSequenceNumber
* calls until it is complete.
* This only manages the distribution of sequence numbers it does not help handle transaction
* failures.
* If a transaction fails, you should call synchronize and wait for timeouts.
*/

import { AptosAccount } from "../account";
import { Uint64 } from "../bcs";
import { Provider } from "../providers";
import { sleep } from "../utils";

// returns `now` time in seconds
const now = () => Math.floor(Date.now() / 1000);

export class AccountSequenceNumber {
Expand All @@ -11,38 +36,50 @@ export class AccountSequenceNumber {
readonly account: AptosAccount;

// sequence number on chain
lastUncommintedNumber: Uint64 | null = null;
lastUncommintedNumber: bigint | null = null;

// local sequence number
currentNumber: Uint64 | null = null;
currentNumber: bigint | null = null;

/**
* We want to guarantee that we preserve ordering of workers to requests.
*
* `lock` is used to try to prevent multiple coroutines from accessing a shared resource at the same time,
* which can result in race conditions and data inconsistency.
* This code actually doesn't do it though, since we aren't giving out a slot, it is still somewhat a race condition.
*
* The ideal solution is likely that each thread grabs the next number from a incremental integer.
* When they complete, they increment that number and that entity is able to enter the `lock`.
* That would guarantee ordering.
*/
lock = false;

// up to 100 outstanding sequence numbers
maximumInFlight = 100;
maxWaitTime: number;

sleepTime = 10;
maximumInFlight: number;

maxWaitTime = 30; // in seconds
sleepTime: number;

constructor(provider: Provider, account: AptosAccount) {
constructor(
provider: Provider,
account: AptosAccount,
maxWaitTime: number,
maximumInFlight: number,
sleepTime: number,
) {
this.provider = provider;
this.account = account;
this.maxWaitTime = maxWaitTime;
this.maximumInFlight = maximumInFlight;
this.sleepTime = sleepTime;
}

/**
* Returns the next available sequnce number on this account
* Returns the next available sequence number for this account
*
* @returns next available sequnce number
* @returns next available sequence number
*/
async nextSequenceNumber(): Promise<bigint | null> {
/*
`lock` is used to prevent multiple coroutines from accessing a shared resource at the same time,
which can result in race conditions and data inconsistency.
This implementation is not as robust as using a proper lock implementation
like `async-mutex` because it relies on busy waiting to acquire the lock,
which can be less efficient and may not work well in all scenarios
*/
/* eslint-disable no-await-in-loop */
while (this.lock) {
await sleep(this.sleepTime);
Expand Down Expand Up @@ -73,15 +110,15 @@ export class AccountSequenceNumber {
nextNumber = this.currentNumber!;
this.currentNumber! += BigInt(1);
} catch (e) {
console.error("error", e);
console.error("error in updating this account sequence number with the one on chain", e);
} finally {
this.lock = false;
}
return nextNumber;
}

/**
* Initializes this account with the sequnce number on chain
* Initializes this account with the sequence number on chain
*/
async initialize(): Promise<void> {
const { sequence_number: sequenceNumber } = await this.provider.getAccount(this.account.address());
Expand All @@ -90,9 +127,9 @@ export class AccountSequenceNumber {
}

/**
* Updates this account sequnce number with the one on-chain
* Updates this account sequence number with the one on-chain
*
* @returns on-chain sequnce number for this account
* @returns on-chain sequence number for this account
*/
async update(): Promise<bigint> {
const { sequence_number: sequenceNumber } = await this.provider.getAccount(this.account.address());
Expand Down Expand Up @@ -127,7 +164,7 @@ export class AccountSequenceNumber {
}
}
} catch (e) {
console.error("error", e);
console.error("error in updating this account sequence number with the one on chain", e);
} finally {
this.lock = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { getFaucetClient, longTestTimeout, PROVIDER_LOCAL_NETWORK_CONFIG } from
const provider = new Provider(PROVIDER_LOCAL_NETWORK_CONFIG);
const account = new AptosAccount();
const faucet = getFaucetClient();
const accountSequenceNumber = new AccountSequenceNumber(provider, account);
const accountSequenceNumber = new AccountSequenceNumber(provider, account, 30, 100, 10);
let getAccountSpy: jest.SpyInstance;

let lastSeqNumber: bigint | null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe("transactionWorker", () => {
});

test(
"index",
"submits 5 transactions to chain for a single account",
(done) => {
// Specify the number of assertions expected
expect.assertions(1);
Expand All @@ -35,7 +35,7 @@ describe("transactionWorker", () => {
const payloads = [...Array(5).fill(txn)];

// start transactions worker
const transactionWorker = new TransactionWorker(provider, sender);
const transactionWorker = new TransactionWorker(provider, sender, 30, 100, 10);
transactionWorker.start();

// push transactions to queue
Expand Down
Loading

0 comments on commit acc32e8

Please sign in to comment.