Skip to content

Commit

Permalink
add method comments
Browse files Browse the repository at this point in the history
  • Loading branch information
0xmaayan committed Jul 20, 2023
1 parent 34a17bc commit 7341272
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async function main() {
senders.push(new AptosAccount());
recipients.push(new AptosAccount());
}
console.log(`${senders.length * 2} sender and recipient accounts created`);
console.log(`${senders.length + recipients.length} sender and recipient accounts created`);

// funds sender accounts
const funds: Array<Promise<string[]>> = [];
Expand All @@ -49,7 +49,6 @@ async function main() {
// send requests
await Promise.all(balances);

//await Promise.all(balances);
console.log(`${balances.length} sender account balances checked`);

// create transactions
Expand All @@ -70,7 +69,7 @@ async function main() {
}
}

const batchTransactions = (payloads: any[], sender: AptosAccount) => {
const batchTransactions = (payloads: TxnBuilderTypes.Transaction[], sender: AptosAccount) => {
const transactionWorker = new TransactionWorker(provider, sender);
const waitFor: Array<Promise<void>> = [];

Expand Down Expand Up @@ -98,6 +97,7 @@ async function main() {
}
};

// emit batch transactions
for (let i = 0; i < senders.length; i++) {
batchTransactions(payloads, senders[i]);
}
Expand All @@ -118,12 +118,10 @@ async function main() {
`sender account ${currentAccount.authentication_key} final sequence number is ${currentAccount.sequence_number}`,
);
}
// exit for testing porpuses - this would stop the process. most cases we have all transactions
// commited, but in some rare cases we might be stopping it before last couple of transactions commited.
exit(0);
};
}

async function sleep(ms: number): Promise<void> {
return new Promise<void>((resolve) => setTimeout(resolve, ms));
}

main();
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export class AccountSequenceNumber {

lock = false;

// up to 100 outstanding sequence numbers
maximumInFlight = 100;

sleepTime = 10;
Expand All @@ -32,10 +33,9 @@ export class AccountSequenceNumber {
/**
* Returns the next available sequnce number on this account
*
* @param block
* @returns next available sequnce number
*/
async nextSequenceNumber(block: boolean = true): Promise<bigint | null> {
async nextSequenceNumber(): Promise<bigint | null> {
/*
`lock` is used to prevent multiple coroutines from accessing a shared resource at the same time,
which can result in race conditions and data inconsistency.
Expand All @@ -60,9 +60,6 @@ export class AccountSequenceNumber {

const startTime = now();
while (this.currentNumber! - this.lastUncommintedNumber! >= this.maximumInFlight) {
if (!block) {
return null;
}
await sleep(this.sleepTime);
if (now() - startTime > this.maxWaitTime) {
/* eslint-disable no-console */
Expand Down Expand Up @@ -104,7 +101,7 @@ export class AccountSequenceNumber {
}

/**
* Synchronizes local sequnce number with the sequnce number on chain for this account.
* Synchronizes local sequence number with the seqeunce number on chain for this account.
*
* Poll the network until all submitted transactions have either been committed or until
* the maximum wait time has elapsed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ describe("account sequence number", () => {
});

it("updates with correct sequence number", async () => {
const seqNum = "2";
getAccountSpy.mockResolvedValue({
sequence_number: "2",
sequence_number: seqNum,
authentication_key: account.authKey().hex(),
});
await accountSequenceNumber.update();
expect(accountSequenceNumber.lastUncommintedNumber).toEqual(BigInt(2));
expect(accountSequenceNumber.lastUncommintedNumber).toEqual(BigInt(parseInt(seqNum)));
});

it(
Expand All @@ -57,23 +58,19 @@ describe("account sequence number", () => {
it(
"includes updated on-chain sequnce number in local sequence number",
async () => {
const previousSeqNum = "5";
getAccountSpy.mockResolvedValue({
sequence_number: "5",
sequence_number: previousSeqNum,
authentication_key: account.authKey().hex(),
});
for (let seqNum = 0; seqNum < accountSequenceNumber.maximumInFlight; seqNum++) {
lastSeqNumber = await accountSequenceNumber.nextSequenceNumber();
expect(lastSeqNumber).toEqual(BigInt(seqNum + 5));
expect(lastSeqNumber).toEqual(BigInt(seqNum + parseInt(previousSeqNum)));
}
},
longTestTimeout,
);

it("returns null if nextSequenceNumber blocks", async () => {
const res = await accountSequenceNumber.nextSequenceNumber(false);
expect(res).toBeNull();
});

it("synchronize completes when local and on-chain sequnec number equal", async () => {
const nextSequenceNumber = lastSeqNumber! + BigInt(1);

Expand Down
46 changes: 43 additions & 3 deletions ecosystem/typescript/sdk/src/transactions/transaction_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@ export class TransactionWorker extends EventEmitter {

readonly account: AptosAccount;

// current account sequence number
readonly accountSequnceNumber: AccountSequenceNumber;

// process has started
started: boolean;

// process has stopped
stopped: boolean;

// transactions payloads waiting to be generated and signed
// TODO support entry function payload from ABI builder
transactionsQueue: Array<TxnBuilderTypes.TransactionPayload> = [];

// signed transactions waiting to be submitted
outstandingTransactions: Array<[Promise<PendingTransaction>, bigint]> = [];

// transactions that have been submitted to chain
processedTransactions: Array<[string, bigint, any]> = [];

// TODO support entry function payload from ABI builder
transactionsQueue: Array<TxnBuilderTypes.TransactionPayload> = [];

constructor(provider: Provider, account: AptosAccount) {
super();
this.provider = provider;
Expand All @@ -32,6 +38,12 @@ export class TransactionWorker extends EventEmitter {
this.accountSequnceNumber = new AccountSequenceNumber(provider, account);
}

/**
* Gets the current account sequence number,
* generates the transaction with the account sequence number,
* adds the transaction to the outstanding transaction queue
* to be processed later.
*/
async submitTransactions() {
try {
if (this.transactionsQueue.length === 0) return;
Expand All @@ -46,6 +58,15 @@ export class TransactionWorker extends EventEmitter {
}
}

/**
* Reads the outstanding transaction queue and submits the transaction to chain.
*
* If the transaction has fulfilled, it pushes the transaction to the processed
* transactions queue and fires a transactionsFulfilled event.
*
* If the transaction has failed, it pushes the transaction to the processed
* transactions queue with the failure reason and fires a transactionsFailed event.
*/
async processTransactions() {
try {
const awaitingTransactions = [];
Expand Down Expand Up @@ -79,10 +100,20 @@ export class TransactionWorker extends EventEmitter {
}
}

/**
* Push transaction to the transactions queue
* @param payload Transaction payload
*/
async push(payload: TxnBuilderTypes.TransactionPayload): Promise<void> {
await this.transactionsQueue.push(payload);
}

/**
* Generates a signed transaction that can be submitted to chain
* @param account an Aptos account
* @param sequenceNumber a sequence number the transaction will be generated with
* @returns
*/
async generateNextTransaction(account: AptosAccount, sequenceNumber: bigint): Promise<Uint8Array | undefined> {
if (this.transactionsQueue.length === 0) return undefined;
const payload = await this.transactionsQueue.shift()!;
Expand All @@ -93,6 +124,9 @@ export class TransactionWorker extends EventEmitter {
return signedTransaction;
}

/**
* Starts transaction submission and transaction processing.
*/
async runTransactions() {
try {
while (!this.stopped) {
Expand All @@ -110,6 +144,9 @@ export class TransactionWorker extends EventEmitter {
}
}

/**
* Starts the transaction management process.
*/
start() {
if (this.started) {
throw new Error("worker has already started");
Expand All @@ -119,6 +156,9 @@ export class TransactionWorker extends EventEmitter {
this.runTransactions();
}

/**
* Stops the the transaction management process.
*/
stop() {
if (this.stopped) {
throw new Error("worker has already stopped");
Expand Down

0 comments on commit 7341272

Please sign in to comment.