diff --git a/package.json b/package.json index 888193b2..b6bf1644 100644 --- a/package.json +++ b/package.json @@ -80,6 +80,7 @@ "mdast-util-to-markdown": "^1.5.0", "micromark-extension-gfm": "^2.0.1", "micromark-extension-gfm-autolink-literal": "^1.0.3", + "mnemonist": "^0.39.5", "mri": "^1.2.0", "openai-fetch": "^1.1.0", "tsx": "^3.12.2", diff --git a/src/api/controllers/webhook.ts b/src/api/controllers/webhook.ts index 46a92b0e..e7562b51 100644 --- a/src/api/controllers/webhook.ts +++ b/src/api/controllers/webhook.ts @@ -5,6 +5,7 @@ import { setupWebhooksTemplate, validateGithub, } from '@/github'; +import { sendToDing } from '@/github/utils'; import { GitHubKVManager } from '@/kv/github'; export function route(hono: THono) { @@ -31,9 +32,15 @@ export function route(hono: THono) { const payload = await validateGithub(c.req, webhooks); - setupWebhooksTemplate(webhooks, { - setting: setting, - }); + setupWebhooksTemplate( + webhooks, + { + setting: setting, + }, + async ({ markdown, eventName }) => { + await sendToDing(markdown, eventName, setting); + }, + ); return webhookHandler( id, diff --git a/src/github/app.ts b/src/github/app.ts index b66f92f7..2e54462e 100644 --- a/src/github/app.ts +++ b/src/github/app.ts @@ -36,7 +36,13 @@ export class App { this.ctx = { setting, }; - setupWebhooksTemplate(this.octoApp.webhooks, this.ctx); + setupWebhooksTemplate( + this.octoApp.webhooks, + this.ctx, + async ({ markdown, eventName }) => { + await sendToDing(markdown, eventName, this.ctx.setting); + }, + ); this.octoService = new GitHubService(); this.opensumiOctoService = new OpenSumiOctoService(); this.octoApp.webhooks.on('star.created', async ({ payload }) => { diff --git a/src/github/handler.ts b/src/github/handler.ts index ba38d445..1b08a4ae 100644 --- a/src/github/handler.ts +++ b/src/github/handler.ts @@ -22,7 +22,7 @@ export class ValidationError extends Error { export interface IGitHubEvent { id: string; - event: string; + event: EmitterWebhookEventName; text: string; payload: any; } @@ -79,17 +79,14 @@ const blockedUser = new Set(['renovate[bot]']); export const setupWebhooksTemplate = ( webhooks: Webhooks<{ octokit?: Octokit }>, context: Context, + done: (data: { + markdown: MarkdownContent; + eventName: EmitterWebhookEventName; + }) => Promise, ) => { const templates = getTemplates(context); const supportTemplates = Object.keys(templates) as EmitterWebhookEventName[]; - // webhooks.onAny(async ({ id, name, payload }) => { - // console.log('Receive Github Webhook, id: ', id, ', name: ', name); - // if ((payload as THasAction)?.action) { - // console.log('payload.action: ', (payload as THasAction).action); - // } - // }); - for (const eventName of supportTemplates) { webhooks.on(eventName, async ({ id, payload, octokit }) => { if ((payload as { sender: User })?.sender) { @@ -112,13 +109,14 @@ export const setupWebhooksTemplate = ( try { console.log('run handler:', handler?.name); - const data = await handler(payload, { + const markdown = await handler(payload, { ...context, octokit, }); - console.log('get data from handler: ', data); - await sendToDing(data, eventName, context.setting); + console.log('get data from handler: ', markdown); + + await done({ markdown, eventName }); } catch (err) { console.log('stop handler because: ', err); if (!(err instanceof StopHandleError)) { diff --git a/src/lib/README.md b/src/lib/README.md deleted file mode 100644 index fb2f4692..00000000 --- a/src/lib/README.md +++ /dev/null @@ -1,12 +0,0 @@ -# lib - -## @octokit/app - -只是 clone 下来让 esbuild 构建出 browser 层。 - -``` -cd ./libs -rm -rf app.js -git clone https://github.com/octokit/app.js.git -rm -rf app.js/.git -``` diff --git a/src/queue/index.ts b/src/queue/index.ts index 5bdaa1d1..12543b37 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -1,35 +1,42 @@ -import { EventEmitter } from 'eventemitter3'; +import MultiMap from 'mnemonist/multi-map'; import { TQueueMessage } from './types'; -import { githubWorker, githubWebhookWorker } from './worker/github'; - -export type IConsumeWorker = ( - message: Message, - env: IRuntimeEnv, - ctx: ExecutionContext, -) => void; +import { BaseWorker } from './worker'; +import { GitHubAppWorker } from './worker/github'; export class QueueConsumer { - private emitter = new EventEmitter(); - addWorker( + constructor(public env: IRuntimeEnv, public ctx: ExecutionContext) {} + + private workerMap = new MultiMap< + TQueueMessage['type'], + BaseWorker + >(); + + addWorker( type: K, - handler: IConsumeWorker, + worker: BaseWorker, ) { - this.emitter.on(type, handler); + this.workerMap.set(type, worker); } - consume( - message: Message, - env: IRuntimeEnv, - ctx: ExecutionContext, - ) { - const { body } = message; - const { type } = body; + consume(message: Message) { + for (const w of this.workerMap.values()) { + w.consume(message); + } + } - this.emitter.emit(type, message, env, ctx); + async runAndWait() { + const promises = [] as Promise[]; + for (const w of this.workerMap.values()) { + promises.push(w.run()); + } + return await Promise.allSettled(promises); } } -export const consumer = new QueueConsumer(); -consumer.addWorker('github-app', githubWorker); -consumer.addWorker('github-webhook', githubWebhookWorker); +export const createConsumer = (env: IRuntimeEnv, ctx: ExecutionContext) => { + const consumer = new QueueConsumer(env, ctx); + consumer.addWorker('github-app', new GitHubAppWorker(env, ctx)); + // consumer.addWorker('github-webhook', githubWebhookWorker); + return consumer; +}; diff --git a/src/queue/worker/github.ts b/src/queue/worker/github.ts index 9ebb30b4..37fed736 100644 --- a/src/queue/worker/github.ts +++ b/src/queue/worker/github.ts @@ -1,49 +1,77 @@ -import { Webhooks } from '@octokit/webhooks'; +import { EmitterWebhookEventName } from '@octokit/webhooks'; +import chain from 'lodash/chain'; +import MultiMap from 'mnemonist/multi-map'; import { initApp } from '@/github/app'; -import { webhookHandler } from '@/github/handler'; +import { setupWebhooksTemplate } from '@/github/handler'; +import { MarkdownContent } from '@/github/types'; +import { sendToDing } from '@/github/utils'; import { GitHubKVManager } from '@/kv/github'; import { IGitHubEventQueueMessage } from '../types'; -export const githubWorker = async ( - message: Message, - env: IRuntimeEnv, - ctx: ExecutionContext, -) => { - const { body } = message; - const { botId, type, data } = body; +import { BaseWorker } from '.'; - const githubKVManager = new GitHubKVManager(); - const setting = await githubKVManager.getAppSettingById(botId); +export class GitHubAppWorker extends BaseWorker { + async run() { + await Promise.allSettled( + chain(this.queue) + .groupBy((v) => v.body.botId) + .map(async (messages, botId) => { + const githubKVManager = new GitHubKVManager(); + const setting = await githubKVManager.getAppSettingById(botId); - if (setting && setting.githubSecret) { - const app = await initApp(setting); + if (setting && setting.githubSecret) { + const app = await initApp(setting); - await webhookHandler(botId, type, app.webhooks, ctx, data, true); - } else { - // todo logger - } -}; - -export const githubWebhookWorker = async ( - message: Message, - env: IRuntimeEnv, - ctx: ExecutionContext, -) => { - const { body } = message; - const { botId, type, data } = body; - - const githubKVManager = new GitHubKVManager(); - const setting = await githubKVManager.getSettingById(botId); - - if (setting && setting.githubSecret) { - const webhooks = new Webhooks<{ octokit: undefined }>({ - secret: setting.githubSecret, - }); - - await webhookHandler(botId, type, webhooks, ctx, data, true); - } else { - // todo logger + const results = new MultiMap(); + + setupWebhooksTemplate( + app.webhooks, + { setting }, + async ({ markdown, eventName }) => { + results.set(eventName, markdown); + }, + ); + + await Promise.allSettled( + chain(messages) + .groupBy((v) => v.body.data.event) + .map(async (messages, eventName: EmitterWebhookEventName) => { + await Promise.all( + messages.map(async (message) => { + try { + const { data } = message.body; + await app.webhooks.receive({ + id: data.id, + name: data.event as any, + payload: data.payload, + }); + message.ack(); + } catch (error) { + console.error('github app worker error', error); + message.retry(); + } + }), + ); + + const markdowns = results.get(eventName); + if (markdowns && markdowns.length > 0) { + // 只有特定内容的 content 要被合并起来 + await Promise.allSettled( + markdowns.map((markdown) => + sendToDing(markdown, eventName, setting), + ), + ); + } + }) + .value(), + ); + } else { + console.error('github app worker error: setting not found', botId); + } + }) + .value(), + ); } -}; +} diff --git a/src/queue/worker/index.ts b/src/queue/worker/index.ts new file mode 100644 index 00000000..180fe5ea --- /dev/null +++ b/src/queue/worker/index.ts @@ -0,0 +1,15 @@ +import { EventEmitter } from 'eventemitter3'; + +export abstract class BaseWorker { + protected emitter = new EventEmitter(); + + queue: Message[] = []; + + constructor(public env: IRuntimeEnv, public ctx: ExecutionContext) {} + + consume(message: Message) { + this.queue.push(message); + } + + abstract run(): Promise; +} diff --git a/src/runtime/cfworker/index.ts b/src/runtime/cfworker/index.ts index b9883819..0e8ec1ff 100644 --- a/src/runtime/cfworker/index.ts +++ b/src/runtime/cfworker/index.ts @@ -2,7 +2,7 @@ import { Hono } from 'hono'; import { ignition } from '@/api'; import Environment from '@/env'; -import { consumer } from '@/queue'; +import { createConsumer } from '@/queue'; import { TQueueMessage } from '@/queue/types'; import { RequiredField } from '@/types'; @@ -23,9 +23,19 @@ export default { ) { Environment.from('cfworker', env); - const messages = batch.messages; - messages.forEach((message) => { - consumer.consume(message, env, ctx); - }); + const consumer = createConsumer(env, ctx); + + batch.messages.forEach((v) => consumer.consume(v)); + + ctx.waitUntil( + consumer + .runAndWait() + .then(() => { + console.log('queue done'); + }) + .catch((err) => { + console.log('queue error', err); + }), + ); }, } as RequiredField, 'fetch' | 'queue'>; diff --git a/yarn.lock b/yarn.lock index 0a94b38a..f96f824a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5732,6 +5732,7 @@ __metadata: mdast-util-to-markdown: ^1.5.0 micromark-extension-gfm: ^2.0.1 micromark-extension-gfm-autolink-literal: ^1.0.3 + mnemonist: ^0.39.5 mri: ^1.2.0 nodemon: ^2.0.20 npm-run-all: ^4.1.5 @@ -8779,6 +8780,15 @@ __metadata: languageName: node linkType: hard +"mnemonist@npm:^0.39.5": + version: 0.39.5 + resolution: "mnemonist@npm:0.39.5" + dependencies: + obliterator: ^2.0.1 + checksum: 6669d687a434226924b2c84ee6eb7ce7d0f83dfc5caad8bcc164c73c0c11fb6d43cbe32636e710f068046f4b40a56c3032532554e93e02640aafc6ca3dd222e6 + languageName: node + linkType: hard + "modify-values@npm:^1.0.1": version: 1.0.1 resolution: "modify-values@npm:1.0.1" @@ -9419,6 +9429,13 @@ __metadata: languageName: node linkType: hard +"obliterator@npm:^2.0.1": + version: 2.0.4 + resolution: "obliterator@npm:2.0.4" + checksum: f28ad35b6d812089315f375dc3e6e5f9bebf958ebe4b10ccd471c7115cbcf595e74bdac4783ae758e5b1f47e3096427fdb37cfa7bed566b132df92ff317b9a7c + languageName: node + linkType: hard + "on-finished@npm:2.4.1": version: 2.4.1 resolution: "on-finished@npm:2.4.1"