Skip to content

Commit

Permalink
feat: consume github app event queue
Browse files Browse the repository at this point in the history
  • Loading branch information
bytemain committed Nov 25, 2023
1 parent 6816076 commit 62e96cf
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 94 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"mdast-util-to-markdown": "^1.5.0",
"micromark-extension-gfm": "^2.0.1",
"micromark-extension-gfm-autolink-literal": "^1.0.3",
"mnemonist": "^0.39.5",
"mri": "^1.2.0",
"openai-fetch": "^1.1.0",
"tsx": "^3.12.2",
Expand Down
13 changes: 10 additions & 3 deletions src/api/controllers/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
setupWebhooksTemplate,
validateGithub,
} from '@/github';
import { sendToDing } from '@/github/utils';
import { GitHubKVManager } from '@/kv/github';

export function route(hono: THono) {
Expand All @@ -31,9 +32,15 @@ export function route(hono: THono) {

const payload = await validateGithub(c.req, webhooks);

setupWebhooksTemplate(webhooks, {
setting: setting,
});
setupWebhooksTemplate(
webhooks,
{
setting: setting,
},
async ({ markdown, eventName }) => {
await sendToDing(markdown, eventName, setting);
},
);

return webhookHandler(
id,
Expand Down
8 changes: 7 additions & 1 deletion src/github/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ export class App {
this.ctx = {
setting,
};
setupWebhooksTemplate(this.octoApp.webhooks, this.ctx);
setupWebhooksTemplate(
this.octoApp.webhooks,
this.ctx,
async ({ markdown, eventName }) => {
await sendToDing(markdown, eventName, this.ctx.setting);
},
);
this.octoService = new GitHubService();
this.opensumiOctoService = new OpenSumiOctoService();
this.octoApp.webhooks.on('star.created', async ({ payload }) => {
Expand Down
20 changes: 9 additions & 11 deletions src/github/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class ValidationError extends Error {

export interface IGitHubEvent {
id: string;
event: string;
event: EmitterWebhookEventName;
text: string;
payload: any;
}
Expand Down Expand Up @@ -79,17 +79,14 @@ const blockedUser = new Set(['renovate[bot]']);
export const setupWebhooksTemplate = (
webhooks: Webhooks<{ octokit?: Octokit }>,
context: Context,
done: (data: {
markdown: MarkdownContent;
eventName: EmitterWebhookEventName;
}) => Promise<void>,
) => {
const templates = getTemplates(context);
const supportTemplates = Object.keys(templates) as EmitterWebhookEventName[];

// webhooks.onAny(async ({ id, name, payload }) => {
// console.log('Receive Github Webhook, id: ', id, ', name: ', name);
// if ((payload as THasAction)?.action) {
// console.log('payload.action: ', (payload as THasAction).action);
// }
// });

for (const eventName of supportTemplates) {
webhooks.on(eventName, async ({ id, payload, octokit }) => {
if ((payload as { sender: User })?.sender) {
Expand All @@ -112,13 +109,14 @@ export const setupWebhooksTemplate = (
try {
console.log('run handler:', handler?.name);

const data = await handler(payload, {
const markdown = await handler(payload, {
...context,
octokit,
});
console.log('get data from handler: ', data);

await sendToDing(data, eventName, context.setting);
console.log('get data from handler: ', markdown);

await done({ markdown, eventName });
} catch (err) {
console.log('stop handler because: ', err);
if (!(err instanceof StopHandleError)) {
Expand Down
12 changes: 0 additions & 12 deletions src/lib/README.md

This file was deleted.

53 changes: 30 additions & 23 deletions src/queue/index.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,42 @@
import { EventEmitter } from 'eventemitter3';
import MultiMap from 'mnemonist/multi-map';

import { TQueueMessage } from './types';
import { githubWorker, githubWebhookWorker } from './worker/github';

export type IConsumeWorker<T> = (
message: Message<T>,
env: IRuntimeEnv,
ctx: ExecutionContext,
) => void;
import { BaseWorker } from './worker';
import { GitHubAppWorker } from './worker/github';

export class QueueConsumer {
private emitter = new EventEmitter();
addWorker<T, K extends TQueueMessage['type']>(
constructor(public env: IRuntimeEnv, public ctx: ExecutionContext) {}

private workerMap = new MultiMap<
TQueueMessage['type'],
BaseWorker<TQueueMessage>
>();

addWorker<T extends TQueueMessage, K extends TQueueMessage['type']>(
type: K,
handler: IConsumeWorker<T>,
worker: BaseWorker<T>,
) {
this.emitter.on(type, handler);
this.workerMap.set(type, worker);
}

consume(
message: Message<TQueueMessage>,
env: IRuntimeEnv,
ctx: ExecutionContext,
) {
const { body } = message;
const { type } = body;
consume(message: Message<TQueueMessage>) {
for (const w of this.workerMap.values()) {
w.consume(message);
}
}

this.emitter.emit(type, message, env, ctx);
async runAndWait() {
const promises = [] as Promise<void>[];
for (const w of this.workerMap.values()) {
promises.push(w.run());
}
return await Promise.allSettled(promises);
}
}

export const consumer = new QueueConsumer();
consumer.addWorker('github-app', githubWorker);
consumer.addWorker('github-webhook', githubWebhookWorker);
export const createConsumer = (env: IRuntimeEnv, ctx: ExecutionContext) => {
const consumer = new QueueConsumer(env, ctx);
consumer.addWorker('github-app', new GitHubAppWorker(env, ctx));
// consumer.addWorker('github-webhook', githubWebhookWorker);
return consumer;
};
106 changes: 67 additions & 39 deletions src/queue/worker/github.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,77 @@
import { Webhooks } from '@octokit/webhooks';
import { EmitterWebhookEventName } from '@octokit/webhooks';
import chain from 'lodash/chain';
import MultiMap from 'mnemonist/multi-map';

import { initApp } from '@/github/app';
import { webhookHandler } from '@/github/handler';
import { setupWebhooksTemplate } from '@/github/handler';
import { MarkdownContent } from '@/github/types';
import { sendToDing } from '@/github/utils';
import { GitHubKVManager } from '@/kv/github';

import { IGitHubEventQueueMessage } from '../types';

export const githubWorker = async (
message: Message<IGitHubEventQueueMessage>,
env: IRuntimeEnv,
ctx: ExecutionContext,
) => {
const { body } = message;
const { botId, type, data } = body;
import { BaseWorker } from '.';

const githubKVManager = new GitHubKVManager();
const setting = await githubKVManager.getAppSettingById(botId);
export class GitHubAppWorker extends BaseWorker<IGitHubEventQueueMessage> {
async run() {
await Promise.allSettled(
chain(this.queue)
.groupBy((v) => v.body.botId)
.map(async (messages, botId) => {
const githubKVManager = new GitHubKVManager();
const setting = await githubKVManager.getAppSettingById(botId);

if (setting && setting.githubSecret) {
const app = await initApp(setting);
if (setting && setting.githubSecret) {
const app = await initApp(setting);

await webhookHandler(botId, type, app.webhooks, ctx, data, true);
} else {
// todo logger
}
};

export const githubWebhookWorker = async (
message: Message<IGitHubEventQueueMessage>,
env: IRuntimeEnv,
ctx: ExecutionContext,
) => {
const { body } = message;
const { botId, type, data } = body;

const githubKVManager = new GitHubKVManager();
const setting = await githubKVManager.getSettingById(botId);

if (setting && setting.githubSecret) {
const webhooks = new Webhooks<{ octokit: undefined }>({
secret: setting.githubSecret,
});

await webhookHandler(botId, type, webhooks, ctx, data, true);
} else {
// todo logger
const results = new MultiMap<string, MarkdownContent>();

setupWebhooksTemplate(
app.webhooks,
{ setting },
async ({ markdown, eventName }) => {
results.set(eventName, markdown);
},
);

await Promise.allSettled(
chain(messages)
.groupBy((v) => v.body.data.event)
.map(async (messages, eventName: EmitterWebhookEventName) => {
await Promise.all(
messages.map(async (message) => {
try {
const { data } = message.body;
await app.webhooks.receive({
id: data.id,
name: data.event as any,
payload: data.payload,
});
message.ack();
} catch (error) {
console.error('github app worker error', error);
message.retry();
}
}),
);

const markdowns = results.get(eventName);
if (markdowns && markdowns.length > 0) {
// 只有特定内容的 content 要被合并起来
await Promise.allSettled(
markdowns.map((markdown) =>
sendToDing(markdown, eventName, setting),
),
);
}
})
.value(),
);
} else {
console.error('github app worker error: setting not found', botId);
}
})
.value(),
);
}
};
}
15 changes: 15 additions & 0 deletions src/queue/worker/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { EventEmitter } from 'eventemitter3';

export abstract class BaseWorker<T> {
protected emitter = new EventEmitter();

queue: Message<T>[] = [];

constructor(public env: IRuntimeEnv, public ctx: ExecutionContext) {}

consume(message: Message<T>) {
this.queue.push(message);
}

abstract run(): Promise<void>;
}
20 changes: 15 additions & 5 deletions src/runtime/cfworker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Hono } from 'hono';

import { ignition } from '@/api';
import Environment from '@/env';
import { consumer } from '@/queue';
import { createConsumer } from '@/queue';
import { TQueueMessage } from '@/queue/types';
import { RequiredField } from '@/types';

Expand All @@ -23,9 +23,19 @@ export default {
) {
Environment.from('cfworker', env);

const messages = batch.messages;
messages.forEach((message) => {
consumer.consume(message, env, ctx);
});
const consumer = createConsumer(env, ctx);

batch.messages.forEach((v) => consumer.consume(v));

ctx.waitUntil(
consumer
.runAndWait()
.then(() => {
console.log('queue done');
})
.catch((err) => {
console.log('queue error', err);
}),
);
},
} as RequiredField<ExportedHandler<IRuntimeEnv>, 'fetch' | 'queue'>;
17 changes: 17 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5732,6 +5732,7 @@ __metadata:
mdast-util-to-markdown: ^1.5.0
micromark-extension-gfm: ^2.0.1
micromark-extension-gfm-autolink-literal: ^1.0.3
mnemonist: ^0.39.5
mri: ^1.2.0
nodemon: ^2.0.20
npm-run-all: ^4.1.5
Expand Down Expand Up @@ -8779,6 +8780,15 @@ __metadata:
languageName: node
linkType: hard

"mnemonist@npm:^0.39.5":
version: 0.39.5
resolution: "mnemonist@npm:0.39.5"
dependencies:
obliterator: ^2.0.1
checksum: 6669d687a434226924b2c84ee6eb7ce7d0f83dfc5caad8bcc164c73c0c11fb6d43cbe32636e710f068046f4b40a56c3032532554e93e02640aafc6ca3dd222e6
languageName: node
linkType: hard

"modify-values@npm:^1.0.1":
version: 1.0.1
resolution: "modify-values@npm:1.0.1"
Expand Down Expand Up @@ -9419,6 +9429,13 @@ __metadata:
languageName: node
linkType: hard

"obliterator@npm:^2.0.1":
version: 2.0.4
resolution: "obliterator@npm:2.0.4"
checksum: f28ad35b6d812089315f375dc3e6e5f9bebf958ebe4b10ccd471c7115cbcf595e74bdac4783ae758e5b1f47e3096427fdb37cfa7bed566b132df92ff317b9a7c
languageName: node
linkType: hard

"on-finished@npm:2.4.1":
version: 2.4.1
resolution: "on-finished@npm:2.4.1"
Expand Down

0 comments on commit 62e96cf

Please sign in to comment.