Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(events): add events system #155

Merged
merged 10 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 155 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,16 +262,10 @@ the APIs for running jobs.
### `run(options: RunnerOptions): Promise<Runner>`

Runs until either stopped by a signal event like `SIGINT` or by calling the
`stop()` function on the Runner object `run()` resolves to.
`stop()` method on the resolved object.

The Runner object also contains a `addJob` method (see [`addJob`](#addjob)) that
can be used to enqueue jobs:

```js
await runner.addJob("testTask", {
thisIsThePayload: true,
});
```
The the resolved 'Runner' object has a number of helpers on it, see
[Runner object](#runner-object) for more information.

### `runOnce(options: RunnerOptions): Promise<void>`

Expand Down Expand Up @@ -307,6 +301,9 @@ The following options for these methods are available.
- `schema` can be used to change the default `graphile_worker` schema to
something else (equivalent to `--schema` on the CLI)
- `forbiddenFlags` see [Forbidden flags](#forbidden-flags) below
- `events`: pass your own `new EventEmitter()` if you want to customize the
options, get earlier events (before the runner object resolves), or want to
get events from alternative Graphile Worker entrypoints.

Exactly one of either `taskDirectory` or `taskList` must be provided (except for
`runMigrations` which doesn't require a task list).
Expand All @@ -319,6 +316,155 @@ One of these must be provided (in order of priority):
- [PostgreSQL environmental variables](https://www.postgresql.org/docs/current/libpq-envars.html),
including at least `PGDATABASE` (NOTE: not all envvars are supported)

### `Runner` object

The `run` method above resolves to a 'Runner' object that has the following
methods and properties:

- `stop(): Promise<void>` - stops the runner from accepting new jobs, and
returns a promise that resolves when all the in progress tasks (if any) are
complete.
- `addJob: AddJobFunction` - see [`addJob`](#addjob).
- `promise: Promise<void>` - a promise that resolves once the runner has
completed.
- `events: WorkerEvents` - a Node.js `EventEmitter` that exposes certain events
within the runner (see [`WorkerEvents`](#workerevents)).

#### Example: adding a job with `runner.addJob`

See [`addJob`](#addjob) for more details.

```js
await runner.addJob("testTask", {
thisIsThePayload: true,
});
```

#### Example: listening to an event with `runner.events`

See [`WorkerEvents`](#workerevents) for more details.

```js
runner.events.on("job:success", ({ worker, job }) => {
console.log(`Hooray! Worker ${worker.workerId} completed job ${job.id}`);
});
```

### `WorkerEvents`

We support a large number of events via an EventEmitter. You can either retrieve
the event emitter via the `events` property on the `Runner` object, or you can
create your own event emitter and pass it to Graphile Worker via the
`WorkerOptions.events` option (this is primarily useful for getting events from
the other Graphile Worker entrypoints).

Details of what events we support and what data is available on the event
payload is detailed below in TypeScript syntax:

```
export type WorkerEvents = TypedEventEmitter<{
/**
* When a worker pool is created
*/
"pool:create": { workerPool: WorkerPool };

/**
* When a worker pool attempts to connect to PG ready to issue a LISTEN
* statement
*/
"pool:listen:connecting": { workerPool: WorkerPool };

/**
* When a worker pool starts listening for jobs via PG LISTEN
*/
"pool:listen:success": { workerPool: WorkerPool; client: PoolClient };

/**
* When a worker pool faces an error on their PG LISTEN client
*/
"pool:listen:error": {
workerPool: WorkerPool;
error: any;
client: PoolClient;
};

/**
* When a worker pool is released
*/
"pool:release": { pool: WorkerPool };

/**
* When a worker pool starts a graceful shutdown
*/
"pool:gracefulShutdown": { pool: WorkerPool; message: string };

/**
* When a worker pool graceful shutdown throws an error
*/
"pool:gracefulShutdown:error": { pool: WorkerPool; error: any };

/**
* When a worker is created
*/
"worker:create": { worker: Worker; tasks: TaskList };

/**
* When a worker release is requested
*/
"worker:release": { worker: Worker };

/**
* When a worker stops (normally after a release)
*/
"worker:stop": { worker: Worker; error?: any };

/**
* When a worker calls get_job but there are no available jobs
*/
"worker:getJob:error": { worker: Worker; error: any };

/**
* When a worker calls get_job but there are no available jobs
*/
"worker:getJob:empty": { worker: Worker };

/**
* When a worker is created
*/
"worker:fatalError": { worker: Worker; error: any; jobError: any | null };

/**
* When a job is retrieved by get_job
*/
"job:start": { worker: Worker; job: Job };

/**
* When a job completes successfully
*/
"job:success": { worker: Worker; job: Job };

/**
* When a job throws an error
*/
"job:error": { worker: Worker; job: Job; error: any };

/**
* When a job fails permanently (emitted after job:error when appropriate)
*/
"job:failed": { worker: Worker; job: Job; error: any };

/**
* When the runner is terminated by a signal
*/
gracefulShutdown: { signal: Signal };

/**
* When the runner is stopped
*/
stop: {};
}>;
```

## Library usage: queueing jobs

You can also use the `graphile-worker` library to queue jobs using one of the
Expand Down
131 changes: 131 additions & 0 deletions __tests__/events.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import { EventEmitter } from "events";
import { Pool } from "pg";

import { run } from "../src";
import deferred, { Deferred } from "../src/deferred";
import { Task, TaskList, WorkerSharedOptions } from "../src/interfaces";
import {
ESCAPED_GRAPHILE_WORKER_SCHEMA,
jobCount,
reset,
sleep,
sleepUntil,
withPgPool,
} from "./helpers";

const EVENTS = [
"pool:create",
"pool:listen:connecting",
"pool:listen:success",
"pool:listen:error",
"pool:release",
"pool:gracefulShutdown",
"pool:gracefulShutdown:error",
"worker:create",
"worker:release",
"worker:stop",
"worker:getJob:error",
"worker:getJob:empty",
"worker:fatalError",
"job:start",
"job:success",
"job:error",
"job:failed",
"gracefulShutdown",
"stop",
];

const addJob = (pgPool: Pool, id?: string | number) =>
pgPool.query(
`select ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.add_job('job1', json_build_object('id', $1::text), 'serial')`,
[String(id != null ? id : Math.random())],
);

const options: WorkerSharedOptions = {};

test("emits the expected events", () =>
withPgPool(async (pgPool) => {
await reset(pgPool, options);

// Build the tasks
const jobPromises: {
[id: string]: Deferred;
} = {};
const job1: Task = jest.fn(({ id }: { id: string }) => {
const jobPromise = deferred();
if (jobPromises[id]) {
throw new Error("Job with this id already registered");
}
jobPromises[id] = jobPromise;
return jobPromise;
});
const tasks: TaskList = {
job1,
};

// Run the worker
const events = new EventEmitter();

const emittedEvents: Array<{ event: string; payload: any }> = [];
function createListener(event: string) {
return (payload: any) => {
emittedEvents.push({ event, payload });
};
}

EVENTS.forEach((event) => {
events.on(event, createListener(event));
});

const CONCURRENCY = 3;
const runner = await run({
concurrency: CONCURRENCY,
pgPool,
taskList: tasks,
events,
});

expect(runner.events).toEqual(events);

const eventCount = (name: string) =>
emittedEvents.map((obj) => obj.event).filter((n) => n === name).length;

// NOTE: these are the events that get emitted _before_ `run` resolves; so
// you can only receive these if you pass an EventEmitter to run manually.
expect(eventCount("pool:create")).toEqual(1);
expect(eventCount("pool:listen:connecting")).toEqual(1);
expect(eventCount("worker:create")).toEqual(CONCURRENCY);

let finished = false;
runner.promise.then(() => {
finished = true;
});

for (let i = 0; i < 5; i++) {
await addJob(pgPool, i);
}

for (let i = 0; i < 5; i++) {
await sleepUntil(() => !!jobPromises[i]);
expect(eventCount("job:start")).toEqual(i + 1);
expect(eventCount("job:success")).toEqual(i);
jobPromises[i].resolve();
await sleepUntil(() => eventCount("job:success") === i + 1);
}

await sleep(1);
expect(finished).toBeFalsy();
expect(eventCount("stop")).toEqual(0);
expect(eventCount("worker:release")).toEqual(0);
expect(eventCount("pool:release")).toEqual(0);
await runner.stop();
expect(eventCount("stop")).toEqual(1);
expect(job1).toHaveBeenCalledTimes(5);
await sleep(1);
expect(finished).toBeTruthy();
await runner.promise;
expect(eventCount("worker:release")).toEqual(CONCURRENCY);
expect(eventCount("worker:stop")).toEqual(CONCURRENCY);
expect(eventCount("pool:release")).toEqual(1);
expect(await jobCount(pgPool)).toEqual(0);
}));
10 changes: 10 additions & 0 deletions examples/readme/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Examples from the README, mostly for testing.

- `events.js` is a combination of the
[Quickstart: library](https://github.com/graphile/worker/blob/main/README.md#quickstart-library)
example with the
[Example: listening to an event with `runner.events`](https://github.com/graphile/worker/blob/main/README.md#example-listening-to-an-event-with-runnerevents)
example; it's designed to be run standalone
- `tasks/task_2.js` to be used with `await addJob("task_2", { foo: "bar" });`;
to run this you can run `graphile-worker -c your_database_here` in this folder
and it should pick up the task automatically.
42 changes: 42 additions & 0 deletions examples/readme/events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
const { run, quickAddJob } = require(/* "graphile-worker" */ "../..");

async function main() {
// Run a worker to execute jobs:
const runner = await run({
connectionString: "postgres:///my_db",
concurrency: 5,
// Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc
noHandleSignals: false,
pollInterval: 1000,
// you can set the taskList or taskDirectory but not both
taskList: {
hello: async (payload, helpers) => {
const { name } = payload;
helpers.logger.info(`Hello, ${name}`);
},
},
// or:
// taskDirectory: `${__dirname}/tasks`,
});

runner.events.on("job:success", ({ worker, job }) => {
console.log(`Hooray! Worker ${worker.workerId} completed job ${job.id}`);
});

// Or add a job to be executed:
await quickAddJob(
// makeWorkerUtils options
{ connectionString: "postgres:///my_db" },

// Task identifier
"hello",

// Payload
{ name: "Bobby Tables" },
);
}

main().catch((err) => {
console.error(err);
process.exit(1);
});
4 changes: 4 additions & 0 deletions examples/readme/tasks/task_2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module.exports = async (payload, helpers) => {
// async is optional, but best practice
helpers.logger.debug(`Received ${JSON.stringify(payload)}`);
};
Loading