From 4b86e19d8d25b54b9cfdda8c61c9a9c364830544 Mon Sep 17 00:00:00 2001 From: "lijiacheng.ljc" Date: Tue, 30 Jan 2024 14:22:15 +0800 Subject: [PATCH] refactor: make queue clear --- src/api/controllers/github.ts | 10 +-- src/api/controllers/webhook.ts | 3 +- src/github/handler.ts | 15 ++-- src/im/ding/bot.ts | 2 +- src/kv/github.ts | 10 ++- src/queue/index.ts | 8 +- src/queue/types.ts | 8 +- src/queue/worker/github.ts | 138 +++++++++++++++++++------------- src/queue/worker/index.ts | 2 - src/runtime/cfworker/index.ts | 2 +- test/__mocks__/index.ts | 25 ++++++ test/__mocks__/queue/message.ts | 4 + test/queue/base.test.ts | 16 +--- test/queue/index.test.ts | 66 +++++++++++++++ 14 files changed, 212 insertions(+), 97 deletions(-) create mode 100644 test/__mocks__/index.ts create mode 100644 test/queue/index.test.ts diff --git a/src/api/controllers/github.ts b/src/api/controllers/github.ts index 7d01a9e5..6663726d 100644 --- a/src/api/controllers/github.ts +++ b/src/api/controllers/github.ts @@ -12,8 +12,8 @@ export function route(hono: THono) { if (!id) { return c.send.error(400, 'need a valid id'); } - const githubKVManager = new GitHubKVManager(); - const setting = await githubKVManager.getAppSettingById(id); + + const setting = await GitHubKVManager.instance().getAppSettingById(id); if (!setting) { return c.send.error(400, 'id not found in database'); @@ -62,8 +62,7 @@ export function route(hono: THono) { } // 先查数据库有没有设置这个 id 对应的 installation id - const githubKVManager = new GitHubKVManager(); - const setting = await githubKVManager.getAppSettingById(id); + const setting = await GitHubKVManager.instance().getAppSettingById(id); if (!setting) { return c.send.error(400, 'id not found in database'); @@ -107,8 +106,7 @@ export function route(hono: THono) { } // 先查数据库有没有设置这个 id 对应的 installation id - const githubKVManager = new GitHubKVManager(); - const setting = await githubKVManager.getAppSettingById(id); + const setting = await GitHubKVManager.instance().getAppSettingById(id); if (!setting) { return c.send.error(400, 'id not found in database'); diff --git a/src/api/controllers/webhook.ts b/src/api/controllers/webhook.ts index e7562b51..b507951c 100644 --- a/src/api/controllers/webhook.ts +++ b/src/api/controllers/webhook.ts @@ -14,8 +14,7 @@ export function route(hono: THono) { if (!id) { return c.send.error(400, 'need a valid id'); } - const githubKVManager = new GitHubKVManager(); - const setting = await githubKVManager.getSettingById(id); + const setting = await GitHubKVManager.instance().getSettingById(id); if (!setting) { return c.send.error(400, 'id not found in database'); diff --git a/src/github/handler.ts b/src/github/handler.ts index 96d4359b..494e47c0 100644 --- a/src/github/handler.ts +++ b/src/github/handler.ts @@ -140,21 +140,20 @@ export async function webhookHandler( try { logger.info('Receive Github Webhook, id: ', id, ', name: ', eventName); try { + const webhookEvent = { + id: id, + name: eventName as any, + payload: payload, + }; if (useQueue) { logger.info('send to queue'); Environment.instance().Queue.send({ botId, type, - data, + data: webhookEvent, }); } else { - execContext.waitUntil( - webhooks.receive({ - id: id, - name: eventName as any, - payload: payload, - }), - ); + execContext.waitUntil(webhooks.receive(webhookEvent)); } return json({ diff --git a/src/im/ding/bot.ts b/src/im/ding/bot.ts index 7c10768f..c0ccaf4d 100644 --- a/src/im/ding/bot.ts +++ b/src/im/ding/bot.ts @@ -30,7 +30,7 @@ export class DingBotAdapter implements IBotAdapter { public ctx: ExecutionContext, public setting: IDingBotSetting, ) { - this.githubKVManager = new GitHubKVManager(); + this.githubKVManager = GitHubKVManager.instance(); this.conversationKVManager = new ConversationKVManager(msg); this.userInfoKVManager = new DingUserKVManager(); } diff --git a/src/kv/github.ts b/src/kv/github.ts index 1da95c3c..aa048db8 100644 --- a/src/kv/github.ts +++ b/src/kv/github.ts @@ -6,7 +6,7 @@ export class GitHubKVManager { appSettingsKV: KVManager; settingsKV: KVManager; - constructor() { + private constructor() { this.appSettingsKV = KVManager.for( GitHubCommon.GITHUB_APP_SETTINGS_PREFIX, ); @@ -15,6 +15,14 @@ export class GitHubKVManager { ); } + private static _instance: GitHubKVManager; + static instance() { + if (!this._instance) { + this._instance = new GitHubKVManager(); + } + return this._instance; + } + setAppSettingById(id: string, data: AppSetting) { return this.appSettingsKV.setJSON(id, data); } diff --git a/src/queue/index.ts b/src/queue/index.ts index 4b0a73c1..b2f03bbe 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -9,8 +9,6 @@ import { GitHubAppWorker } from './worker/github'; export class QueueConsumer { logger = Logger.instance(); - constructor(public env: IRuntimeEnv, public ctx: ExecutionContext) {} - private workerMap = new MultiMap>(); addWorker(type: string, worker: BaseWorker) { @@ -35,9 +33,9 @@ export class QueueConsumer { } } -export const createConsumer = (env: IRuntimeEnv, ctx: ExecutionContext) => { - const consumer = new QueueConsumer(env, ctx); - consumer.addWorker('github-app', new GitHubAppWorker(env, ctx)); +export const createConsumer = () => { + const consumer = new QueueConsumer(); + consumer.addWorker('github-app', new GitHubAppWorker()); // consumer.addWorker('github-webhook', githubWebhookWorker); return consumer; }; diff --git a/src/queue/types.ts b/src/queue/types.ts index 89aeb3f7..6d6304cf 100644 --- a/src/queue/types.ts +++ b/src/queue/types.ts @@ -1,4 +1,8 @@ -import { IGitHubEvent } from '@/github'; +export interface IGitHubMessageBody { + id: string; + name: string; + payload: any; +} export interface IGitHubEventQueueMessage { /** @@ -8,7 +12,7 @@ export interface IGitHubEventQueueMessage { type: 'github-app' | 'github-webhook'; - data: IGitHubEvent; + data: IGitHubMessageBody; } export interface IWechatyQueueMessage { diff --git a/src/queue/worker/github.ts b/src/queue/worker/github.ts index 37fed736..7eb18f5c 100644 --- a/src/queue/worker/github.ts +++ b/src/queue/worker/github.ts @@ -1,5 +1,5 @@ import { EmitterWebhookEventName } from '@octokit/webhooks'; -import chain from 'lodash/chain'; +import groupBy from 'lodash/groupBy'; import MultiMap from 'mnemonist/multi-map'; import { initApp } from '@/github/app'; @@ -7,71 +7,97 @@ import { setupWebhooksTemplate } from '@/github/handler'; import { MarkdownContent } from '@/github/types'; import { sendToDing } from '@/github/utils'; import { GitHubKVManager } from '@/kv/github'; +import { Logger } from '@/utils/logger'; import { IGitHubEventQueueMessage } from '../types'; import { BaseWorker } from '.'; export class GitHubAppWorker extends BaseWorker { + logger = Logger.instance(); + async run() { - await Promise.allSettled( - chain(this.queue) - .groupBy((v) => v.body.botId) - .map(async (messages, botId) => { - const githubKVManager = new GitHubKVManager(); - const setting = await githubKVManager.getAppSettingById(botId); - - if (setting && setting.githubSecret) { - const app = await initApp(setting); - - const results = new MultiMap(); - - setupWebhooksTemplate( - app.webhooks, - { setting }, - async ({ markdown, eventName }) => { - results.set(eventName, markdown); - }, - ); + const byId = groupBy(this.queue, (v) => v.body.botId); + + const result = await Promise.allSettled( + Object.entries(byId).map(async ([botId, messages]) => { + this.logger.info('consume for', botId, messages.length); + + const setting = await GitHubKVManager.instance().getAppSettingById( + botId, + ); + + if (!setting) { + console.error('github app worker error: setting not found', botId); + return; + } + + if (!setting.githubSecret) { + console.error( + 'github app worker error: please set app webhook secret in database', + botId, + ); + return; + } + const app = await initApp(setting); + + const results = new MultiMap(); + + setupWebhooksTemplate( + app.webhooks, + { setting }, + async ({ markdown, eventName }) => { + results.set(eventName, markdown); + }, + ); + + const byEvent = groupBy(messages, (v) => v.body.data.name) as Record< + EmitterWebhookEventName, + Message[] + >; + + await Promise.allSettled( + Object.entries(byEvent).map(async ([eventName, messages]) => { await Promise.allSettled( - chain(messages) - .groupBy((v) => v.body.data.event) - .map(async (messages, eventName: EmitterWebhookEventName) => { - await Promise.all( - messages.map(async (message) => { - try { - const { data } = message.body; - await app.webhooks.receive({ - id: data.id, - name: data.event as any, - payload: data.payload, - }); - message.ack(); - } catch (error) { - console.error('github app worker error', error); - message.retry(); - } - }), - ); - - const markdowns = results.get(eventName); - if (markdowns && markdowns.length > 0) { - // 只有特定内容的 content 要被合并起来 - await Promise.allSettled( - markdowns.map((markdown) => - sendToDing(markdown, eventName, setting), - ), - ); - } - }) - .value(), + messages.map(async (message) => { + try { + const { data } = message.body; + await app.webhooks.receive({ + id: data.id, + name: data.name as any, + payload: data.payload, + }); + message.ack(); + } catch (error) { + console.error('github app worker error', error); + message.retry(); + } + }), ); - } else { - console.error('github app worker error: setting not found', botId); - } - }) - .value(), + + const markdowns = results.get(eventName); + if (markdowns && markdowns.length > 0) { + // 只有特定内容的 content 要被合并起来 + await Promise.allSettled( + markdowns.map((markdown) => + sendToDing( + markdown, + eventName as EmitterWebhookEventName, + setting, + ), + ), + ); + } + }), + ); + }), ); + + result.forEach((v) => { + if (v.status === 'rejected') { + console.error('github app worker error', v); + } + }); } } diff --git a/src/queue/worker/index.ts b/src/queue/worker/index.ts index 180fe5ea..c0206926 100644 --- a/src/queue/worker/index.ts +++ b/src/queue/worker/index.ts @@ -5,8 +5,6 @@ export abstract class BaseWorker { queue: Message[] = []; - constructor(public env: IRuntimeEnv, public ctx: ExecutionContext) {} - consume(message: Message) { this.queue.push(message); } diff --git a/src/runtime/cfworker/index.ts b/src/runtime/cfworker/index.ts index 1c14bd38..143384f8 100644 --- a/src/runtime/cfworker/index.ts +++ b/src/runtime/cfworker/index.ts @@ -22,7 +22,7 @@ export default { ) { Environment.from('cfworker', env); - const consumer = createConsumer(env, ctx); + const consumer = createConsumer(); consumer.consume(...batch.messages); diff --git a/test/__mocks__/index.ts b/test/__mocks__/index.ts new file mode 100644 index 00000000..85f6cbea --- /dev/null +++ b/test/__mocks__/index.ts @@ -0,0 +1,25 @@ +import { EdgeKVNamespace } from 'edge-mock'; + +import Environment from '@/env'; +import { GitHubCommon } from '@/kv/constants'; + +export function prepareEnv() { + const kv = new EdgeKVNamespace(); + Environment.from('node', { + KV: kv, + MESSAGE_QUEUE: {} as any, + }); + + kv.put( + `${GitHubCommon.GITHUB_APP_SETTINGS_PREFIX}mock`, + JSON.stringify({ + appSettings: { + appId: process.env.GITHUB_APPID, + privateKey: process.env.GITHUB_APP_PRIVATE_KEY!.replace(/\\n/g, '\n'), + }, + githubSecret: process.env.GITHUB_TOKEN, + dingWebhooks: [], + contentLimit: 300, + }), + ); +} diff --git a/test/__mocks__/queue/message.ts b/test/__mocks__/queue/message.ts index 2807f190..732e6fdd 100644 --- a/test/__mocks__/queue/message.ts +++ b/test/__mocks__/queue/message.ts @@ -23,4 +23,8 @@ export class MockMessageBatch implements MessageBatch { ackAll(): void { this.messages.forEach((v) => v.ack()); } + + static from(messages: T[]) { + return new MockMessageBatch(messages.map((v) => new MockMessage(v))); + } } diff --git a/test/queue/base.test.ts b/test/queue/base.test.ts index 504a58ce..c17a5051 100644 --- a/test/queue/base.test.ts +++ b/test/queue/base.test.ts @@ -1,9 +1,6 @@ -import { makeEdgeEnv, EdgeKVNamespace } from 'edge-mock'; +import { makeEdgeEnv } from 'edge-mock'; makeEdgeEnv(); -declare const FetchEvent: any; -declare const Request: any; - import { QueueConsumer } from '@/queue'; import { BaseWorker } from '@/queue/worker'; @@ -15,18 +12,11 @@ class Worker extends BaseWorker { } } -const env = { - KV: new EdgeKVNamespace() as any, - MESSAGE_QUEUE: {} as any, -}; -const request = new Request('/?foo=1', { method: 'POST', body: 'hello' }); -const event = new FetchEvent('fetch', { request }); - describe('queue consumer', () => { it('can work', async () => { - const consumer = new QueueConsumer(env, event); + const consumer = new QueueConsumer(); const batch = createMessageBatch(); - const wk = new Worker(env, event); + const wk = new Worker(); consumer.addWorker('test', wk); consumer.consume(...batch.messages); expect(wk.queue.length).toBe(100); diff --git a/test/queue/index.test.ts b/test/queue/index.test.ts new file mode 100644 index 00000000..3e985ec2 --- /dev/null +++ b/test/queue/index.test.ts @@ -0,0 +1,66 @@ +import 'dotenv/config'; + +import { EmitterWebhookEventName } from '@octokit/webhooks'; + +import { IGitHubEventQueueMessage } from '@/queue/types'; +import { GitHubAppWorker } from '@/queue/worker/github'; + +import { prepareEnv } from '../__mocks__'; +import { MockMessageBatch } from '../__mocks__/queue/message'; +import { + pull_request_closed, + pull_request_opened, + pull_request_13_opened, + pull_request_edited_wip, + pull_request_edited_base, + issue_opened_event, + release_published, + antd_mini_release_published, + pull_request_review_comment_0_created, +} from '../fixtures'; + +const botId = 'mock'; + +const events = [ + { name: 'pull_request.closed', payload: pull_request_closed }, + { name: 'pull_request.opened', payload: pull_request_opened }, + { name: 'pull_request.opened', payload: pull_request_13_opened }, + { name: 'pull_request.edited', payload: pull_request_edited_wip }, + { name: 'pull_request.edited', payload: pull_request_edited_base }, + { name: 'issues.opened', payload: issue_opened_event }, + { name: 'release.published', payload: release_published }, + { name: 'release.published', payload: antd_mini_release_published }, + { + name: 'pull_request_review_comment.created', + payload: pull_request_review_comment_0_created, + }, +] as { name: EmitterWebhookEventName; payload: any }[]; + +const githubAppMessage = events.map((v) => ({ + botId, + type: 'github-app', + data: { + id: `${Math.random()}`, + name: v.name, + payload: v.payload, + }, +})) as IGitHubEventQueueMessage[]; + +const describe = process.env.GITHUB_TOKEN + ? global.describe + : global.describe.skip; + +describe('queue', () => { + beforeAll(() => { + prepareEnv(); + }); + it('should work', async () => { + const wk = new GitHubAppWorker(); + const batch = MockMessageBatch.from(githubAppMessage); + batch.messages.forEach((v) => { + wk.consume(v); + }); + + await wk.run(); + }); +});