From 53faa4d27117e1ab3fb6fd015c64df5b9aa9ca70 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Wed, 13 Dec 2023 10:11:05 -0700 Subject: [PATCH] feat(mongodb-runner): add programmatic api for mongodb-runner (#186) * add prune and extract logic to a separate file * move logic to a separate package * fix lint * use for-await * add comment and return type to `parallelForEach` --- packages/mongodb-runner/src/cli.ts | 67 +++---------- packages/mongodb-runner/src/index.ts | 1 + packages/mongodb-runner/src/mongocluster.ts | 9 +- packages/mongodb-runner/src/runner-helpers.ts | 97 +++++++++++++++++++ packages/mongodb-runner/src/util.ts | 22 +++++ 5 files changed, 138 insertions(+), 58 deletions(-) create mode 100644 packages/mongodb-runner/src/runner-helpers.ts diff --git a/packages/mongodb-runner/src/cli.ts b/packages/mongodb-runner/src/cli.ts index 02dc40be..1998e21e 100644 --- a/packages/mongodb-runner/src/cli.ts +++ b/packages/mongodb-runner/src/cli.ts @@ -3,18 +3,10 @@ import yargs from 'yargs'; import { MongoCluster } from './mongocluster'; import os from 'os'; import path from 'path'; -import { promises as fs } from 'fs'; -import { BSON } from 'mongodb'; import { spawn } from 'child_process'; import createDebug from 'debug'; import { once } from 'events'; - -interface StoredInstance { - id: string; - filepath: string; - serialized: string; - connectionString: string; -} +import * as utilities from './index'; (async function () { const defaultRunnerDir = path.join(os.homedir(), '.mongodb', 'runner2'); @@ -77,6 +69,7 @@ interface StoredInstance { .option('debug', { type: 'boolean', describe: 'Enable debug output' }) .command('start', 'Start a MongoDB instance') .command('stop', 'Stop a MongoDB instance') + .command('prune', 'Clean up metadata for any dead MongoDB instances') .command('ls', 'List currently running MongoDB instances') .command( 'exec', @@ -90,24 +83,8 @@ interface StoredInstance { } async function start() { - const id = argv.id || new BSON.UUID().toHexString(); - if (!/^[a-zA-Z0-9_-]+$/.test(id)) { - throw new Error(`ID '${id}' contains non-alphanumeric characters`); - } - await fs.mkdir(argv.runnerDir, { recursive: true }); - - const cluster = await MongoCluster.start({ - ...argv, - args, - }); - const serialized = await cluster.serialize(); - const { connectionString } = cluster; - - await fs.writeFile( - path.join(argv.runnerDir, `m-${id}.json`), - JSON.stringify({ id, serialized, connectionString }) - ); - console.log(`Server started and running at ${connectionString}`); + const { cluster, id } = await utilities.start(argv); + console.log(`Server started and running at ${cluster.connectionString}`); console.log('Run the following command to stop the instance:'); console.log( `${argv.$0} stop --id=${id}` + @@ -118,45 +95,23 @@ interface StoredInstance { cluster.unref(); } - async function* instances(): AsyncIterable { - for await (const { name } of await fs.opendir(argv.runnerDir)) { - if (name.startsWith('m-') && name.endsWith('.json')) { - try { - const filepath = path.join(argv.runnerDir, name); - const stored = JSON.parse(await fs.readFile(filepath, 'utf8')); - yield { ...stored, filepath }; - } catch { - /* ignore */ - } - } - } - } - async function stop() { if (!argv.id && !argv.all) { throw new Error('Need --id or --all to know which server to stop'); } - const toStop: Array = []; - for await (const instance of instances()) { - if (instance.id === argv.id || argv.all) toStop.push(instance); - } - await Promise.all( - toStop.map(async ({ id, filepath, serialized, connectionString }) => { - await (await MongoCluster.deserialize(serialized)).close(); - await fs.rm(filepath); - console.log( - `Stopped cluster '${id}' (was running at '${connectionString}')` - ); - }) - ); + await utilities.stop(argv); } async function ls() { - for await (const { id, connectionString } of instances()) { + for await (const { id, connectionString } of utilities.instances(argv)) { console.log(`${id}: ${connectionString}`); } } + async function prune() { + await utilities.prune(argv); + } + async function exec() { let mongodArgs: string[]; let execArgs: string[]; @@ -198,7 +153,7 @@ interface StoredInstance { ); } - await ({ start, stop, exec, ls }[command] ?? unknown)(); + await ({ start, stop, exec, ls, prune }[command] ?? unknown)(); })().catch((err) => { process.nextTick(() => { throw err; diff --git a/packages/mongodb-runner/src/index.ts b/packages/mongodb-runner/src/index.ts index f621a511..338de0c8 100644 --- a/packages/mongodb-runner/src/index.ts +++ b/packages/mongodb-runner/src/index.ts @@ -2,3 +2,4 @@ export { MongoServer, MongoServerOptions } from './mongoserver'; export { MongoCluster, MongoClusterOptions } from './mongocluster'; export type { ConnectionString } from 'mongodb-connection-string-url'; +export { prune, start, stop, instances } from './runner-helpers'; diff --git a/packages/mongodb-runner/src/mongocluster.ts b/packages/mongodb-runner/src/mongocluster.ts index 906cc389..d6d5c705 100644 --- a/packages/mongodb-runner/src/mongocluster.ts +++ b/packages/mongodb-runner/src/mongocluster.ts @@ -3,6 +3,7 @@ import { MongoServer } from './mongoserver'; import { ConnectionString } from 'mongodb-connection-string-url'; import type { DownloadOptions } from '@mongodb-js/mongodb-downloader'; import { downloadMongoDb } from '@mongodb-js/mongodb-downloader'; +import type { MongoClientOptions } from 'mongodb'; import { MongoClient } from 'mongodb'; import { sleep, range, uuid, debug } from './util'; @@ -225,9 +226,13 @@ export class MongoCluster { } async withClient any>( - fn: Fn + fn: Fn, + clientOptions: MongoClientOptions = {} ): Promise> { - const client = await MongoClient.connect(this.connectionString); + const client = await MongoClient.connect( + this.connectionString, + clientOptions + ); try { return await fn(client); } finally { diff --git a/packages/mongodb-runner/src/runner-helpers.ts b/packages/mongodb-runner/src/runner-helpers.ts new file mode 100644 index 00000000..952d4945 --- /dev/null +++ b/packages/mongodb-runner/src/runner-helpers.ts @@ -0,0 +1,97 @@ +import { BSON } from 'mongodb'; +import path from 'path'; +import type { MongoClusterOptions } from './mongocluster'; +import { MongoCluster } from './mongocluster'; +import { parallelForEach } from './util'; +import * as fs from 'fs/promises'; + +interface StoredInstance { + id: string; + filepath: string; + serialized: string; + connectionString: string; +} + +export async function start( + argv: { + id?: string; + runnerDir: string; + } & MongoClusterOptions, + args?: string[] +) { + const id = argv.id || new BSON.UUID().toHexString(); + if (!/^[a-zA-Z0-9_-]+$/.test(id)) { + throw new Error(`ID '${id}' contains non-alphanumeric characters`); + } + await fs.mkdir(argv.runnerDir, { recursive: true }); + + const cluster = await MongoCluster.start({ + ...argv, + args, + }); + const serialized = await cluster.serialize(); + const { connectionString } = cluster; + + await fs.writeFile( + path.join(argv.runnerDir, `m-${id}.json`), + JSON.stringify({ id, serialized, connectionString }) + ); + + cluster.unref(); + + return { cluster, id }; +} + +export async function* instances(argv: { + runnerDir: string; +}): AsyncIterable { + for await (const { name } of await fs.opendir(argv.runnerDir)) { + if (name.startsWith('m-') && name.endsWith('.json')) { + try { + const filepath = path.join(argv.runnerDir, name); + const stored = JSON.parse(await fs.readFile(filepath, 'utf8')); + yield { ...stored, filepath }; + } catch { + /* ignore */ + } + } + } +} + +/** + * Attempts to connect to every mongodb instance defined in `runnerDir`. + * If it cannot connect to an instance, it cleans up the entry from `runnerDir`. + */ +export async function prune(argv: { runnerDir: string }): Promise { + async function handler(instance: StoredInstance) { + try { + const cluster = await MongoCluster.deserialize(instance.serialized); + await cluster.withClient( + () => { + // connect and close + }, + { serverSelectionTimeoutMS: 2000 } + ); + } catch (e) { + await fs.rm(instance.filepath); + } + } + await parallelForEach(instances(argv), handler); +} + +export async function stop(argv: { + runnerDir: string; + id?: string; + all?: boolean; +}) { + const toStop: Array = []; + for await (const instance of instances(argv)) { + if (instance.id === argv.id || argv.all) toStop.push(instance); + } + await Promise.all( + toStop.map(async ({ filepath, serialized }) => { + await (await MongoCluster.deserialize(serialized)).close(); + await fs.rm(filepath); + }) + ); +} diff --git a/packages/mongodb-runner/src/util.ts b/packages/mongodb-runner/src/util.ts index f3544405..cdf3309e 100644 --- a/packages/mongodb-runner/src/util.ts +++ b/packages/mongodb-runner/src/util.ts @@ -6,3 +6,25 @@ export const uuid = () => new BSON.UUID().toHexString(true); export const sleep = (ms: number): Promise => new Promise((r) => setTimeout(r, ms)); export const range = (n: number): number[] => [...Array(n).keys()]; + +/** + * This function iterates `iterable` and applies `fn` to each item that + * `iterable` produces. `fn` is not awaited on each iteration of `iterable`. + * After `iterable` has been consumed, all `fn`s are awaited together with `Promise.allSettled` + * and the result of `Promise.allSettled` is returned. + * + * **Note** This means that any errors from `fn` are caught and returned, not thrown as a rejection. + * + * This function speeds up the application of `fn` for scenarios where `fn` might take time. + */ +export async function parallelForEach( + iterable: AsyncIterable, + fn: (arg0: T) => Promise | void +): Promise[]> { + const result = []; + for await (const item of iterable) { + result.push(fn(item)); + } + + return await Promise.allSettled(result); +}