Skip to content

Commit

Permalink
feat: method waiting for long running operation (WebOfTrust#236)
Browse files Browse the repository at this point in the history
  • Loading branch information
lenkan authored Apr 12, 2024
1 parent 42d3b14 commit 4c0072f
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 100 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { SignifyClient } from 'signify-ts';
import { getOrCreateClients, getOrCreateIdentifier } from './utils/test-setup';
import { resolveEnvironment } from './utils/resolve-env';
import { assertOperations, waitOperation } from './utils/test-util';
import { assertOperations } from './utils/test-util';

let client: SignifyClient;
let name1_id: string, name1_oobi: string;
Expand Down
91 changes: 27 additions & 64 deletions examples/integration-scripts/utils/test-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ export function sleep(ms: number): Promise<void> {
export async function assertOperations(
...clients: SignifyClient[]
): Promise<void> {
for (let client of clients) {
let operations = await client.operations().list();
for (const client of clients) {
const operations = await client.operations().list();
expect(operations).toHaveLength(0);
}
}
Expand All @@ -30,9 +30,9 @@ export async function assertOperations(
export async function assertNotifications(
...clients: SignifyClient[]
): Promise<void> {
for (let client of clients) {
let res = await client.notifications().list();
let notes = res.notes.filter((i: any) => i.r === false);
for (const client of clients) {
const res = await client.notifications().list();
const notes = res.notes.filter((i: { r: boolean }) => i.r === false);
expect(notes).toHaveLength(0);
}
}
Expand All @@ -46,9 +46,9 @@ export async function warnNotifications(
...clients: SignifyClient[]
): Promise<void> {
let count = 0;
for (let client of clients) {
let res = await client.notifications().list();
let notes = res.notes.filter((i: any) => i.r === false);
for (const client of clients) {
const res = await client.notifications().list();
const notes = res.notes.filter((i: { r: boolean }) => i.r === false);
if (notes.length > 0) {
count += notes.length;
console.warn('notifications', notes);
Expand All @@ -57,73 +57,36 @@ export async function warnNotifications(
expect(count).toBeGreaterThan(0); // replace warnNotifications with assertNotifications
}

/**
* Get status of operation.
* If parameter recurse is set then also checks status of dependent operations.
*/
async function getOperation<T>(
client: SignifyClient,
name: string,
recurse?: boolean
): Promise<Operation<T>> {
const result = await client.operations().get<T>(name);
if (recurse === true) {
let i: Operation | undefined = result;
while (result.done && i?.metadata?.depends !== undefined) {
let depends: Operation = await client
.operations()
.get(i.metadata.depends.name);
result.done = result.done && depends.done;
i.metadata.depends = depends;
i = depends.metadata?.depends;
}
}
return result;
}

/**
* Poll for operation to become completed.
* Removes completed operation
*/
export async function waitOperation<T = any>(
client: SignifyClient,
op: Operation<T> | string,
options: RetryOptions = {}
signal?: AbortSignal
): Promise<Operation<T>> {
const ctrl = new AbortController();
options.signal?.addEventListener('abort', (e: Event) => {
const s = e.target as AbortSignal;
ctrl.abort(s.reason);
});
let name: string;
if (typeof op === 'string') {
name = op;
} else if (typeof op === 'object' && 'name' in op) {
name = op.name;
} else {
throw new Error();
op = await client.operations().get(op);
}
const result: Operation<T> = await retry(async () => {
let t: Operation<T>;
try {
t = await getOperation<T>(client, name, true);
} catch (e) {
ctrl.abort(e);
throw e;
}
if (t.done !== true) {
throw new Error(`Operation ${name} not done`);
}
console.log('DONE', name);
return t;
}, options);
let i: Operation | undefined = result;
while (i !== undefined) {
// console.log('DELETE', i.name);
await client.operations().delete(i.name);
i = i.metadata?.depends;

op = await client
.operations()
.wait(op, { signal: signal ?? AbortSignal.timeout(30000) });
await deleteOperations(client, op);

return op;
}

async function deleteOperations<T = any>(
client: SignifyClient,
op: Operation<T>
) {
if (op.metadata?.depends) {
await deleteOperations(client, op.metadata.depends);
}
return result;

await client.operations().delete(op.name);
}

export async function resolveOobi(
Expand Down
58 changes: 56 additions & 2 deletions src/keri/app/coring.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,27 @@ export interface Operation<T = unknown> {
response?: T;
}

export interface OperationsDeps {
fetch(
pathname: string,
method: string,
body: unknown,
headers?: Headers
): Promise<Response>;
}

/**
* Operations
* @remarks
* Operations represent the status and result of long running tasks performed by KERIA agent
*/
export class Operations {
public client: SignifyClient;
public client: OperationsDeps;
/**
* Operations
* @param {SignifyClient} client
*/
constructor(client: SignifyClient) {
constructor(client: OperationsDeps) {
this.client = client;
}

Expand Down Expand Up @@ -128,6 +137,51 @@ export class Operations {
const method = 'DELETE';
await this.client.fetch(path, method, data);
}

/**
* Poll for operation to become completed.
*/
async wait<T>(
op: Operation<T>,
options: {
signal?: AbortSignal;
minSleep?: number;
maxSleep?: number;
increaseFactor?: number;
} = {}
): Promise<Operation<T>> {
const minSleep = options.minSleep ?? 10;
const maxSleep = options.maxSleep ?? 10000;
const increaseFactor = options.increaseFactor ?? 50;

if (op.metadata?.depends?.done === false) {
await this.wait(op.metadata.depends, options);
}

if (op.done === true) {
return op;
}

let retries = 0;

// eslint-disable-next-line no-constant-condition
while (true) {
op = await this.get(op.name);

const delay = Math.max(
minSleep,
Math.min(maxSleep, 2 ** retries * increaseFactor)
);
retries++;

if (op.done === true) {
return op;
}

await new Promise((resolve) => setTimeout(resolve, delay));
options.signal?.throwIfAborted();
}
}
}

/**
Expand Down
Loading

0 comments on commit 4c0072f

Please sign in to comment.