diff --git a/__tests__/queue/base.test.ts b/__tests__/queue/base.test.ts index 818e74f..10b3a4a 100644 --- a/__tests__/queue/base.test.ts +++ b/__tests__/queue/base.test.ts @@ -1,4 +1,4 @@ -import { QueueConsumer } from '@/queue'; +import { QueueBatchConsumer } from '@/queue'; import { BaseWorker } from '@/queue/worker'; import { MockMessage, MockMessageBatch } from '../__mocks__/queue/message'; @@ -11,7 +11,7 @@ class Worker extends BaseWorker { describe('queue consumer', () => { it('can work', async () => { - const consumer = new QueueConsumer(); + const consumer = new QueueBatchConsumer(); const batch = createMessageBatch(); const wk = new Worker(); consumer.addWorker('test', wk); diff --git a/src/queue/index.ts b/src/queue/index.ts index baa625b..02ad423 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -6,7 +6,7 @@ import { TQueueMessage } from './types'; import { BaseWorker } from './worker'; import { GitHubEventWorker } from './worker/github'; -export class QueueConsumer { +export class QueueBatchConsumer { logger = Logger.instance(); private workerMap = new MultiMap>(); @@ -21,7 +21,7 @@ export class QueueConsumer { const workers = this.workerMap.get(v.body.type); if (!workers) { this.logger.error('no worker found for', v.body.type); - return; + continue; } for (const worker of workers) { @@ -39,8 +39,8 @@ export class QueueConsumer { } } -export const createConsumer = () => { - const consumer = new QueueConsumer(); +export const createBatchConsumer = () => { + const consumer = new QueueBatchConsumer(); consumer.addWorker('github-app', new GitHubEventWorker('app')); consumer.addWorker('github-webhook', new GitHubEventWorker('webhook')); return consumer; diff --git a/src/queue/worker/github.ts b/src/queue/worker/github.ts index 1b2d36a..fe0ea4f 100644 --- a/src/queue/worker/github.ts +++ b/src/queue/worker/github.ts @@ -12,6 +12,7 @@ import { Logger } from '@/utils/logger'; import { IGitHubEventQueueMessage } from '../types'; +import { memoize, pMemoize } from '@opensumi/ide-utils'; import { BaseWorker } from '.'; export function createUniqueMessageId( @@ -50,7 +51,13 @@ export class GitHubEventWorker extends BaseWorker { // do nothing } + private _appMap = new Map(); async createGitHubApp(botId: string): Promise { + const cached = this._appMap.get(botId); + if (cached) { + return cached; + } + const appSetting = await GitHubKVManager.instance().getAppSettingById(botId); @@ -68,13 +75,22 @@ export class GitHubEventWorker extends BaseWorker { } const app = await initApp(appSetting); + this._appMap.set(botId, { + webhooks: app.webhooks, + setting: appSetting, + }); return { webhooks: app.webhooks, setting: appSetting, }; } + private _webhookMap = new Map(); async createWebhook(botId: string): Promise { + const cached = this._webhookMap.get(botId); + if (cached) { + return cached; + } const _setting = await GitHubKVManager.instance().getSettingById(botId); if (!_setting) { this.logger.error('github app worker error: setting not found', botId); @@ -96,6 +112,10 @@ export class GitHubEventWorker extends BaseWorker { }); const setting = _setting; + this._webhookMap.set(botId, { + webhooks, + setting, + }); return { webhooks, setting, diff --git a/src/runtime/cfworker/index.ts b/src/runtime/cfworker/index.ts index f748748..cf4544b 100644 --- a/src/runtime/cfworker/index.ts +++ b/src/runtime/cfworker/index.ts @@ -1,6 +1,6 @@ import { ignition } from '@/api'; import Environment from '@/env'; -import { createConsumer } from '@/queue'; +import { createBatchConsumer } from '@/queue'; import { TQueueMessage } from '@/queue/types'; import { RequiredField } from '@/types'; import { Logger } from '@/utils/logger'; @@ -22,7 +22,7 @@ export default { const logger = Logger.instance(); Environment.initialize(runtimeConfig, env); - const consumer = createConsumer(); + const consumer = createBatchConsumer(); consumer.push(...batch.messages); ctx.waitUntil(