Skip to content

Commit

Permalink
fix: cache app and webhook
Browse files Browse the repository at this point in the history
  • Loading branch information
bytemain committed Sep 27, 2024
1 parent d83c78a commit b241adb
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 8 deletions.
4 changes: 2 additions & 2 deletions __tests__/queue/base.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { QueueConsumer } from '@/queue';
import { QueueBatchConsumer } from '@/queue';
import { BaseWorker } from '@/queue/worker';

import { MockMessage, MockMessageBatch } from '../__mocks__/queue/message';
Expand All @@ -11,7 +11,7 @@ class Worker<T> extends BaseWorker<T> {

describe('queue consumer', () => {
it('can work', async () => {
const consumer = new QueueConsumer<FakeMessage>();
const consumer = new QueueBatchConsumer<FakeMessage>();
const batch = createMessageBatch();
const wk = new Worker<FakeMessage>();
consumer.addWorker('test', wk);
Expand Down
8 changes: 4 additions & 4 deletions src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { TQueueMessage } from './types';
import { BaseWorker } from './worker';
import { GitHubEventWorker } from './worker/github';

export class QueueConsumer<T extends { type: string }> {
export class QueueBatchConsumer<T extends { type: string }> {
logger = Logger.instance();

private workerMap = new MultiMap<string, BaseWorker<T>>();
Expand All @@ -21,7 +21,7 @@ export class QueueConsumer<T extends { type: string }> {
const workers = this.workerMap.get(v.body.type);
if (!workers) {
this.logger.error('no worker found for', v.body.type);
return;
continue;
}

for (const worker of workers) {
Expand All @@ -39,8 +39,8 @@ export class QueueConsumer<T extends { type: string }> {
}
}

export const createConsumer = () => {
const consumer = new QueueConsumer<TQueueMessage>();
export const createBatchConsumer = () => {
const consumer = new QueueBatchConsumer<TQueueMessage>();
consumer.addWorker('github-app', new GitHubEventWorker('app'));
consumer.addWorker('github-webhook', new GitHubEventWorker('webhook'));
return consumer;
Expand Down
20 changes: 20 additions & 0 deletions src/queue/worker/github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Logger } from '@/utils/logger';

import { IGitHubEventQueueMessage } from '../types';

import { memoize, pMemoize } from '@opensumi/ide-utils';
import { BaseWorker } from '.';

export function createUniqueMessageId(
Expand Down Expand Up @@ -50,7 +51,13 @@ export class GitHubEventWorker extends BaseWorker<IGitHubEventQueueMessage> {
// do nothing
}

private _appMap = new Map<string, IOctokitShape>();
async createGitHubApp(botId: string): Promise<IOctokitShape | undefined> {
const cached = this._appMap.get(botId);
if (cached) {
return cached;
}

const appSetting =
await GitHubKVManager.instance().getAppSettingById(botId);

Expand All @@ -68,13 +75,22 @@ export class GitHubEventWorker extends BaseWorker<IGitHubEventQueueMessage> {
}

const app = await initApp(appSetting);
this._appMap.set(botId, {
webhooks: app.webhooks,
setting: appSetting,
});
return {
webhooks: app.webhooks,
setting: appSetting,
};
}

private _webhookMap = new Map<string, IOctokitShape>();
async createWebhook(botId: string): Promise<IOctokitShape | undefined> {
const cached = this._webhookMap.get(botId);
if (cached) {
return cached;
}
const _setting = await GitHubKVManager.instance().getSettingById(botId);
if (!_setting) {
this.logger.error('github app worker error: setting not found', botId);
Expand All @@ -96,6 +112,10 @@ export class GitHubEventWorker extends BaseWorker<IGitHubEventQueueMessage> {
});
const setting = _setting;

this._webhookMap.set(botId, {
webhooks,
setting,
});
return {
webhooks,
setting,
Expand Down
4 changes: 2 additions & 2 deletions src/runtime/cfworker/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ignition } from '@/api';
import Environment from '@/env';
import { createConsumer } from '@/queue';
import { createBatchConsumer } from '@/queue';
import { TQueueMessage } from '@/queue/types';
import { RequiredField } from '@/types';
import { Logger } from '@/utils/logger';
Expand All @@ -22,7 +22,7 @@ export default {
const logger = Logger.instance();
Environment.initialize(runtimeConfig, env);

const consumer = createConsumer();
const consumer = createBatchConsumer();

consumer.push(...batch.messages);
ctx.waitUntil(
Expand Down

0 comments on commit b241adb

Please sign in to comment.