Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: method waiting for long running operation #236

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { SignifyClient } from 'signify-ts';
import { getOrCreateClients, getOrCreateIdentifier } from './utils/test-setup';
import { resolveEnvironment } from './utils/resolve-env';
import { assertOperations, waitOperation } from './utils/test-util';
import { assertOperations } from './utils/test-util';

let client: SignifyClient;
let name1_id: string, name1_oobi: string;
Expand Down
91 changes: 27 additions & 64 deletions examples/integration-scripts/utils/test-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ export function sleep(ms: number): Promise<void> {
export async function assertOperations(
...clients: SignifyClient[]
): Promise<void> {
for (let client of clients) {
let operations = await client.operations().list();
for (const client of clients) {
const operations = await client.operations().list();
expect(operations).toHaveLength(0);
}
}
Expand All @@ -30,9 +30,9 @@ export async function assertOperations(
export async function assertNotifications(
...clients: SignifyClient[]
): Promise<void> {
for (let client of clients) {
let res = await client.notifications().list();
let notes = res.notes.filter((i: any) => i.r === false);
for (const client of clients) {
const res = await client.notifications().list();
const notes = res.notes.filter((i: { r: boolean }) => i.r === false);
expect(notes).toHaveLength(0);
}
}
Expand All @@ -46,9 +46,9 @@ export async function warnNotifications(
...clients: SignifyClient[]
): Promise<void> {
let count = 0;
for (let client of clients) {
let res = await client.notifications().list();
let notes = res.notes.filter((i: any) => i.r === false);
for (const client of clients) {
const res = await client.notifications().list();
const notes = res.notes.filter((i: { r: boolean }) => i.r === false);
if (notes.length > 0) {
count += notes.length;
console.warn('notifications', notes);
Expand All @@ -57,73 +57,36 @@ export async function warnNotifications(
expect(count).toBeGreaterThan(0); // replace warnNotifications with assertNotifications
}

/**
* Get status of operation.
* If parameter recurse is set then also checks status of dependent operations.
*/
async function getOperation<T>(
client: SignifyClient,
name: string,
recurse?: boolean
): Promise<Operation<T>> {
const result = await client.operations().get<T>(name);
if (recurse === true) {
let i: Operation | undefined = result;
while (result.done && i?.metadata?.depends !== undefined) {
let depends: Operation = await client
.operations()
.get(i.metadata.depends.name);
result.done = result.done && depends.done;
i.metadata.depends = depends;
i = depends.metadata?.depends;
}
}
return result;
}

/**
* Poll for operation to become completed.
* Removes completed operation
*/
export async function waitOperation<T = any>(
client: SignifyClient,
op: Operation<T> | string,
options: RetryOptions = {}
signal?: AbortSignal
): Promise<Operation<T>> {
const ctrl = new AbortController();
options.signal?.addEventListener('abort', (e: Event) => {
const s = e.target as AbortSignal;
ctrl.abort(s.reason);
});
let name: string;
if (typeof op === 'string') {
name = op;
} else if (typeof op === 'object' && 'name' in op) {
name = op.name;
} else {
throw new Error();
op = await client.operations().get(op);
}
const result: Operation<T> = await retry(async () => {
let t: Operation<T>;
try {
t = await getOperation<T>(client, name, true);
} catch (e) {
ctrl.abort(e);
throw e;
}
if (t.done !== true) {
throw new Error(`Operation ${name} not done`);
}
console.log('DONE', name);
return t;
}, options);
let i: Operation | undefined = result;
while (i !== undefined) {
// console.log('DELETE', i.name);
await client.operations().delete(i.name);
i = i.metadata?.depends;

op = await client
.operations()
.wait(op, { signal: signal ?? AbortSignal.timeout(30000) });
await deleteOperations(client, op);

return op;
}

async function deleteOperations<T = any>(
client: SignifyClient,
op: Operation<T>
) {
if (op.metadata?.depends) {
await deleteOperations(client, op.metadata.depends);
}
return result;

await client.operations().delete(op.name);
}

export async function resolveOobi(
Expand Down
58 changes: 56 additions & 2 deletions src/keri/app/coring.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,27 @@ export interface Operation<T = unknown> {
response?: T;
}

export interface OperationsDeps {
fetch(
pathname: string,
method: string,
body: unknown,
headers?: Headers
): Promise<Response>;
}

/**
* Operations
* @remarks
* Operations represent the status and result of long running tasks performed by KERIA agent
*/
export class Operations {
public client: SignifyClient;
public client: OperationsDeps;
/**
* Operations
* @param {SignifyClient} client
*/
constructor(client: SignifyClient) {
constructor(client: OperationsDeps) {
this.client = client;
}

Expand Down Expand Up @@ -127,6 +136,51 @@ export class Operations {
const method = 'DELETE';
await this.client.fetch(path, method, data);
}

/**
* Poll for operation to become completed.
*/
async wait<T>(
op: Operation<T>,
options: {
signal?: AbortSignal;
minSleep?: number;
maxSleep?: number;
increaseFactor?: number;
} = {}
): Promise<Operation<T>> {
const minSleep = options.minSleep ?? 10;
const maxSleep = options.maxSleep ?? 10000;
const increaseFactor = options.increaseFactor ?? 50;

if (op.metadata?.depends?.done === false) {
await this.wait(op.metadata.depends, options);
}

if (op.done === true) {
return op;
}

let retries = 0;

// eslint-disable-next-line no-constant-condition
while (true) {
op = await this.get(op.name);

const delay = Math.max(
minSleep,
Math.min(maxSleep, 2 ** retries * increaseFactor)
);
retries++;

if (op.done === true) {
return op;
}

await new Promise((resolve) => setTimeout(resolve, delay));
options.signal?.throwIfAborted();
}
}
}

/**
Expand Down
Loading
Loading