Skip to content

Commit

Permalink
feat(mongodb-runner): add programmatic api for mongodb-runner (#186)
Browse files Browse the repository at this point in the history
* add prune and extract logic to a separate file

* move logic to a separate package

* fix lint

* use for-await

* add comment and return type to `parallelForEach`
  • Loading branch information
baileympearson authored Dec 13, 2023
1 parent e55cd86 commit 53faa4d
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 58 deletions.
67 changes: 11 additions & 56 deletions packages/mongodb-runner/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,10 @@ import yargs from 'yargs';
import { MongoCluster } from './mongocluster';
import os from 'os';
import path from 'path';
import { promises as fs } from 'fs';
import { BSON } from 'mongodb';
import { spawn } from 'child_process';
import createDebug from 'debug';
import { once } from 'events';

interface StoredInstance {
id: string;
filepath: string;
serialized: string;
connectionString: string;
}
import * as utilities from './index';

(async function () {
const defaultRunnerDir = path.join(os.homedir(), '.mongodb', 'runner2');
Expand Down Expand Up @@ -77,6 +69,7 @@ interface StoredInstance {
.option('debug', { type: 'boolean', describe: 'Enable debug output' })
.command('start', 'Start a MongoDB instance')
.command('stop', 'Stop a MongoDB instance')
.command('prune', 'Clean up metadata for any dead MongoDB instances')
.command('ls', 'List currently running MongoDB instances')
.command(
'exec',
Expand All @@ -90,24 +83,8 @@ interface StoredInstance {
}

async function start() {
const id = argv.id || new BSON.UUID().toHexString();
if (!/^[a-zA-Z0-9_-]+$/.test(id)) {
throw new Error(`ID '${id}' contains non-alphanumeric characters`);
}
await fs.mkdir(argv.runnerDir, { recursive: true });

const cluster = await MongoCluster.start({
...argv,
args,
});
const serialized = await cluster.serialize();
const { connectionString } = cluster;

await fs.writeFile(
path.join(argv.runnerDir, `m-${id}.json`),
JSON.stringify({ id, serialized, connectionString })
);
console.log(`Server started and running at ${connectionString}`);
const { cluster, id } = await utilities.start(argv);
console.log(`Server started and running at ${cluster.connectionString}`);
console.log('Run the following command to stop the instance:');
console.log(
`${argv.$0} stop --id=${id}` +
Expand All @@ -118,45 +95,23 @@ interface StoredInstance {
cluster.unref();
}

async function* instances(): AsyncIterable<StoredInstance> {
for await (const { name } of await fs.opendir(argv.runnerDir)) {
if (name.startsWith('m-') && name.endsWith('.json')) {
try {
const filepath = path.join(argv.runnerDir, name);
const stored = JSON.parse(await fs.readFile(filepath, 'utf8'));
yield { ...stored, filepath };
} catch {
/* ignore */
}
}
}
}

async function stop() {
if (!argv.id && !argv.all) {
throw new Error('Need --id or --all to know which server to stop');
}
const toStop: Array<StoredInstance> = [];
for await (const instance of instances()) {
if (instance.id === argv.id || argv.all) toStop.push(instance);
}
await Promise.all(
toStop.map(async ({ id, filepath, serialized, connectionString }) => {
await (await MongoCluster.deserialize(serialized)).close();
await fs.rm(filepath);
console.log(
`Stopped cluster '${id}' (was running at '${connectionString}')`
);
})
);
await utilities.stop(argv);
}

async function ls() {
for await (const { id, connectionString } of instances()) {
for await (const { id, connectionString } of utilities.instances(argv)) {
console.log(`${id}: ${connectionString}`);
}
}

async function prune() {
await utilities.prune(argv);
}

async function exec() {
let mongodArgs: string[];
let execArgs: string[];
Expand Down Expand Up @@ -198,7 +153,7 @@ interface StoredInstance {
);
}

await ({ start, stop, exec, ls }[command] ?? unknown)();
await ({ start, stop, exec, ls, prune }[command] ?? unknown)();
})().catch((err) => {
process.nextTick(() => {
throw err;
Expand Down
1 change: 1 addition & 0 deletions packages/mongodb-runner/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export { MongoServer, MongoServerOptions } from './mongoserver';

export { MongoCluster, MongoClusterOptions } from './mongocluster';
export type { ConnectionString } from 'mongodb-connection-string-url';
export { prune, start, stop, instances } from './runner-helpers';
9 changes: 7 additions & 2 deletions packages/mongodb-runner/src/mongocluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { MongoServer } from './mongoserver';
import { ConnectionString } from 'mongodb-connection-string-url';
import type { DownloadOptions } from '@mongodb-js/mongodb-downloader';
import { downloadMongoDb } from '@mongodb-js/mongodb-downloader';
import type { MongoClientOptions } from 'mongodb';
import { MongoClient } from 'mongodb';
import { sleep, range, uuid, debug } from './util';

Expand Down Expand Up @@ -225,9 +226,13 @@ export class MongoCluster {
}

async withClient<Fn extends (client: MongoClient) => any>(
fn: Fn
fn: Fn,
clientOptions: MongoClientOptions = {}
): Promise<ReturnType<Fn>> {
const client = await MongoClient.connect(this.connectionString);
const client = await MongoClient.connect(
this.connectionString,
clientOptions
);
try {
return await fn(client);
} finally {
Expand Down
97 changes: 97 additions & 0 deletions packages/mongodb-runner/src/runner-helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { BSON } from 'mongodb';
import path from 'path';
import type { MongoClusterOptions } from './mongocluster';
import { MongoCluster } from './mongocluster';
import { parallelForEach } from './util';
import * as fs from 'fs/promises';

interface StoredInstance {
id: string;
filepath: string;
serialized: string;
connectionString: string;
}

export async function start(
argv: {
id?: string;
runnerDir: string;
} & MongoClusterOptions,
args?: string[]
) {
const id = argv.id || new BSON.UUID().toHexString();
if (!/^[a-zA-Z0-9_-]+$/.test(id)) {
throw new Error(`ID '${id}' contains non-alphanumeric characters`);
}
await fs.mkdir(argv.runnerDir, { recursive: true });

const cluster = await MongoCluster.start({
...argv,
args,
});
const serialized = await cluster.serialize();
const { connectionString } = cluster;

await fs.writeFile(
path.join(argv.runnerDir, `m-${id}.json`),
JSON.stringify({ id, serialized, connectionString })
);

cluster.unref();

return { cluster, id };
}

export async function* instances(argv: {
runnerDir: string;
}): AsyncIterable<StoredInstance> {
for await (const { name } of await fs.opendir(argv.runnerDir)) {
if (name.startsWith('m-') && name.endsWith('.json')) {
try {
const filepath = path.join(argv.runnerDir, name);
const stored = JSON.parse(await fs.readFile(filepath, 'utf8'));
yield { ...stored, filepath };
} catch {
/* ignore */
}
}
}
}

/**
* Attempts to connect to every mongodb instance defined in `runnerDir`.
* If it cannot connect to an instance, it cleans up the entry from `runnerDir`.
*/
export async function prune(argv: { runnerDir: string }): Promise<void> {
async function handler(instance: StoredInstance) {
try {
const cluster = await MongoCluster.deserialize(instance.serialized);
await cluster.withClient(
() => {
// connect and close
},
{ serverSelectionTimeoutMS: 2000 }
);
} catch (e) {
await fs.rm(instance.filepath);
}
}
await parallelForEach(instances(argv), handler);
}

export async function stop(argv: {
runnerDir: string;
id?: string;
all?: boolean;
}) {
const toStop: Array<StoredInstance> = [];
for await (const instance of instances(argv)) {
if (instance.id === argv.id || argv.all) toStop.push(instance);
}
await Promise.all(
toStop.map(async ({ filepath, serialized }) => {
await (await MongoCluster.deserialize(serialized)).close();
await fs.rm(filepath);
})
);
}
22 changes: 22 additions & 0 deletions packages/mongodb-runner/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,25 @@ export const uuid = () => new BSON.UUID().toHexString(true);
export const sleep = (ms: number): Promise<void> =>
new Promise((r) => setTimeout(r, ms));
export const range = (n: number): number[] => [...Array(n).keys()];

/**
* This function iterates `iterable` and applies `fn` to each item that
* `iterable` produces. `fn` is not awaited on each iteration of `iterable`.
* After `iterable` has been consumed, all `fn`s are awaited together with `Promise.allSettled`
* and the result of `Promise.allSettled` is returned.
*
* **Note** This means that any errors from `fn` are caught and returned, not thrown as a rejection.
*
* This function speeds up the application of `fn` for scenarios where `fn` might take time.
*/
export async function parallelForEach<T>(
iterable: AsyncIterable<T>,
fn: (arg0: T) => Promise<void> | void
): Promise<PromiseSettledResult<void>[]> {
const result = [];
for await (const item of iterable) {
result.push(fn(item));
}

return await Promise.allSettled(result);
}

0 comments on commit 53faa4d

Please sign in to comment.