From cca75cc5693dcb8f58932b5058b03649b0a9ba9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Fri, 24 Nov 2023 19:24:39 +0800 Subject: [PATCH 1/3] chore: init queue --- package.json | 1 + src/api/controllers/github.ts | 8 ++++---- src/api/controllers/webhook.ts | 2 +- src/env.ts | 8 +++++++- src/github/handler.ts | 29 ++++++++++++++++++++------- src/queue/index.ts | 34 ++++++++++++++++++++++++++++++++ src/queue/types.ts | 27 +++++++++++++++++++++++++ src/runtime/cfworker/index.ts | 17 +++++++++++++--- src/runtime/node/index.ts | 4 +++- src/runtime/node/queue/index.ts | 11 +++++++++++ test/commander/commander.test.ts | 4 +++- test/handle.test.ts | 4 ++-- typings/global.d.ts | 6 +++++- wrangler.tpl.toml | 23 +++++++++++++++++++-- yarn.lock | 8 ++++++++ 15 files changed, 163 insertions(+), 23 deletions(-) create mode 100644 src/queue/index.ts create mode 100644 src/queue/types.ts create mode 100644 src/runtime/node/queue/index.ts diff --git a/package.json b/package.json index 32fa543d..888193b2 100644 --- a/package.json +++ b/package.json @@ -71,6 +71,7 @@ "@opensumi/ide-utils": "^2.23.1", "@opensumi/octo-service": "workspace:^", "chatgpt": "^5.0.9", + "eventemitter3": "^5.0.1", "hono": "^3.6.0", "lodash": "^4.17.21", "mdast-util-from-markdown": "^1.3.0", diff --git a/src/api/controllers/github.ts b/src/api/controllers/github.ts index f71352f8..7a3dc2c1 100644 --- a/src/api/controllers/github.ts +++ b/src/api/controllers/github.ts @@ -6,12 +6,12 @@ import { GitHubKVManager } from '@/kv/github'; export function route(hono: THono) { hono.post('/github/app/:id', async (c) => { - const id = c.req.param('id') ?? c.req.query('id'); - if (!id) { + const botId = c.req.param('id') ?? c.req.query('id'); + if (!botId) { return c.send.error(400, 'need a valid id'); } const githubKVManager = new GitHubKVManager(); - const setting = await githubKVManager.getAppSettingById(id); + const setting = await githubKVManager.getAppSettingById(botId); if (!setting) { return c.send.error(400, 'id not found in database'); @@ -21,7 +21,7 @@ export function route(hono: THono) { } const app = await initApp(setting); - return webhookHandler(app.webhooks, c.req, c.executionCtx); + return webhookHandler(botId, app.webhooks, c.req, c.executionCtx); }); hono.get('/github/installation-token/:id', async (c) => { diff --git a/src/api/controllers/webhook.ts b/src/api/controllers/webhook.ts index 02deb76a..011f579a 100644 --- a/src/api/controllers/webhook.ts +++ b/src/api/controllers/webhook.ts @@ -29,6 +29,6 @@ export function route(hono: THono) { setupWebhooksTemplate(webhooks, { setting: setting, }); - return webhookHandler(webhooks, c.req, c.executionCtx); + return webhookHandler(id, webhooks, c.req, c.executionCtx); }); } diff --git a/src/env.ts b/src/env.ts index c821ef42..e684c309 100644 --- a/src/env.ts +++ b/src/env.ts @@ -8,8 +8,14 @@ export default class Environment { static #instance: Environment | null; + useQueue = false; + + get Queue() { + return this.env.MESSAGE_QUEUE; + } + get KV() { - return this.env.KV_PROD; + return this.env.KV; } get metrics() { diff --git a/src/github/handler.ts b/src/github/handler.ts index 5fc86596..6cc65a20 100644 --- a/src/github/handler.ts +++ b/src/github/handler.ts @@ -8,6 +8,7 @@ import { User } from '@octokit/webhooks-types'; import { HonoRequest } from 'hono'; import { error, json } from '@/api/utils/response'; +import Environment from '@/env'; import { getTemplates, StopHandleError } from './templates'; import type { MarkdownContent, Context, ITemplateResult } from './types'; @@ -127,6 +128,7 @@ export const setupWebhooksTemplate = ( }; export async function webhookHandler( + botId: string, webhooks: Webhooks, // eslint-disable-next-line @typescript-eslint/ban-types req: HonoRequest, @@ -140,13 +142,26 @@ export async function webhookHandler( } = await validateGithub(req, webhooks); console.log('Receive Github Webhook, id: ', id, ', name: ', eventName); try { - execContext.waitUntil( - webhooks.receive({ - id: id, - name: eventName as any, - payload: payload, - }), - ); + if (Environment.instance().useQueue) { + Environment.instance().Queue.send({ + botId, + type: 'github-app', + data: { + id: id, + event: eventName, + payload: payload, + }, + }); + } else { + execContext.waitUntil( + webhooks.receive({ + id: id, + name: eventName as any, + payload: payload, + }), + ); + } + return json({ id: id, name: eventName, diff --git a/src/queue/index.ts b/src/queue/index.ts new file mode 100644 index 00000000..cd61ff59 --- /dev/null +++ b/src/queue/index.ts @@ -0,0 +1,34 @@ +import { EventEmitter } from 'eventemitter3'; + +import { TQueueMessage } from './types'; +import { githubWorker } from './worker/github'; + +export type IConsumeWorker = ( + message: Message, + env: IRuntimeEnv, + ctx: ExecutionContext, +) => void; + +export class QueueConsumer { + private emitter = new EventEmitter(); + addWorker( + type: K, + handler: IConsumeWorker, + ) { + this.emitter.on(type, handler); + } + + consume( + message: Message, + env: IRuntimeEnv, + ctx: ExecutionContext, + ) { + const { body } = message; + const { type } = body; + + this.emitter.emit(type, message, env, ctx); + } +} + +export const consumer = new QueueConsumer(); +consumer.addWorker('github-app', githubWorker); diff --git a/src/queue/types.ts b/src/queue/types.ts new file mode 100644 index 00000000..03bc26a0 --- /dev/null +++ b/src/queue/types.ts @@ -0,0 +1,27 @@ +export interface IGitHubEventQueueMessage { + /** + * current bot id + */ + botId: string; + + type: 'github-app'; + + data: { + id: string; + event: string; + payload: any; + }; +} + +export interface IWechatyQueueMessage { + /** + * current bot id + */ + botId: string; + + type: 'wechaty'; + + data: any; +} + +export type TQueueMessage = IGitHubEventQueueMessage | IWechatyQueueMessage; diff --git a/src/runtime/cfworker/index.ts b/src/runtime/cfworker/index.ts index 55fdd79d..0c598fdf 100644 --- a/src/runtime/cfworker/index.ts +++ b/src/runtime/cfworker/index.ts @@ -2,6 +2,8 @@ import { Hono } from 'hono'; import { ignition } from '@/api'; import Environment from '@/env'; +import { consumer } from '@/queue'; +import { TQueueMessage } from '@/queue/types'; import { RequiredField } from '@/types'; const app = new Hono<{ Bindings: IRuntimeEnv }>() as THono; @@ -9,10 +11,19 @@ const app = new Hono<{ Bindings: IRuntimeEnv }>() as THono; ignition(app); export default { - ...app, fetch: async (request: Request, env: IRuntimeEnv, ctx: ExecutionContext) => { Environment.from('cfworker', env); - + Environment.instance().useQueue = true; return app.fetch(request, env, ctx); }, -} as RequiredField, 'fetch'>; + async queue( + batch: MessageBatch, + env: IRuntimeEnv, + ctx: ExecutionContext, + ) { + const messages = batch.messages; + messages.forEach((message) => { + consumer.consume(message, env, ctx); + }); + }, +} as RequiredField, 'fetch' | 'queue'>; diff --git a/src/runtime/node/index.ts b/src/runtime/node/index.ts index a77fffc3..6a4a2ca1 100644 --- a/src/runtime/node/index.ts +++ b/src/runtime/node/index.ts @@ -4,10 +4,12 @@ import Environment from '@/env'; import { NodeKV } from './kv'; import { NodeAnalyticsEngineDataset } from './metrics'; +import { NodeQueue } from './queue'; Environment.from('node', { ...process.env, - KV_PROD: new NodeKV(), + KV: new NodeKV(), + MESSAGE_QUEUE: new NodeQueue(), metricsDataset: new NodeAnalyticsEngineDataset(), }); diff --git a/src/runtime/node/queue/index.ts b/src/runtime/node/queue/index.ts new file mode 100644 index 00000000..65888a39 --- /dev/null +++ b/src/runtime/node/queue/index.ts @@ -0,0 +1,11 @@ +export class NodeQueue implements Queue { + async send( + message: T, + options?: QueueSendOptions | undefined, + ): Promise { + throw new Error('Method not implemented.'); + } + async sendBatch(messages: Iterable>): Promise { + throw new Error('Method not implemented.'); + } +} diff --git a/test/commander/commander.test.ts b/test/commander/commander.test.ts index 2f63c71b..1e7cdbf4 100644 --- a/test/commander/commander.test.ts +++ b/test/commander/commander.test.ts @@ -3,9 +3,11 @@ import { EdgeKVNamespace } from 'edge-mock'; import { CommandCenter } from '@/commander'; import Environment from '@/env'; import { ISSUE_REGEX } from '@/im/commands/constants'; +import { NodeQueue } from '@/runtime/node/queue'; const testEnv: IRuntimeEnv = { - KV_PROD: new EdgeKVNamespace(), + KV: new EdgeKVNamespace(), + MESSAGE_QUEUE: new NodeQueue(), }; describe('command center', () => { diff --git a/test/handle.test.ts b/test/handle.test.ts index 7060b9c6..1620a898 100644 --- a/test/handle.test.ts +++ b/test/handle.test.ts @@ -17,8 +17,8 @@ describe('handle', () => { const result = await app.fetch( request, { - KV_PROD: new EdgeKVNamespace() as any, - // MY_QUEUE: {} as any, + KV: new EdgeKVNamespace() as any, + MESSAGE_QUEUE: {} as any, }, event, ); diff --git a/typings/global.d.ts b/typings/global.d.ts index 15b96af7..43d6685f 100644 --- a/typings/global.d.ts +++ b/typings/global.d.ts @@ -1,5 +1,7 @@ import { Hono } from 'hono'; +import { TQueueMessage } from '../src/queue/types'; + export {}; declare module 'hono' { @@ -38,7 +40,9 @@ declare global { } interface IRuntimeEnv { - readonly KV_PROD: IKVNamespace; + readonly KV: IKVNamespace; + readonly MESSAGE_QUEUE: Queue; + readonly OPENAI_API_KEY?: string; readonly TIMEOUT?: string; readonly wechaty?: any; diff --git a/wrangler.tpl.toml b/wrangler.tpl.toml index 9b75383e..bfc2a5ca 100644 --- a/wrangler.tpl.toml +++ b/wrangler.tpl.toml @@ -9,14 +9,33 @@ workers_dev = true usage_model = "unbound" kv_namespaces = [ - { binding = "KV_PROD", id = "{{KV_LOCAL_ID}}", preview_id = "{{KV_LOCAL_ID}}" }, + { binding = "KV", id = "{{KV_LOCAL_ID}}", preview_id = "{{KV_LOCAL_ID}}" }, ] + +[[queues.producers]] +queue = "{{QUEUE_NAME_LOCAL}}" +binding = "MESSAGE_QUEUE" + +[[queues.consumers]] +queue = "{{QUEUE_NAME_LOCAL}}" +max_batch_size = 10 +max_batch_timeout = 1 + [env.prod] name = "sumi-webhook" vars = { ENVIRONMENT = "prod" } -kv_namespaces = [{ binding = "KV_PROD", id = "{{KV_PROD_ID}}" }] +kv_namespaces = [{ binding = "KV", id = "{{KV_PROD_ID}}" }] [[analytics_engine_datasets]] binding = "metricsDataset" + +[[queues.producers]] +queue = "{{QUEUE_NAME_PROD}}" +binding = "MESSAGE_QUEUE" + +[[queues.consumers]] +queue = "{{QUEUE_NAME_PROD}}" +max_batch_size = 10 +max_batch_timeout = 1 diff --git a/yarn.lock b/yarn.lock index 7739a85f..0a94b38a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5013,6 +5013,13 @@ __metadata: languageName: node linkType: hard +"eventemitter3@npm:^5.0.1": + version: 5.0.1 + resolution: "eventemitter3@npm:5.0.1" + checksum: 543d6c858ab699303c3c32e0f0f47fc64d360bf73c3daf0ac0b5079710e340d6fe9f15487f94e66c629f5f82cd1a8678d692f3dbb6f6fcd1190e1b97fcad36f8 + languageName: node + linkType: hard + "eventsource-parser@npm:^1.0.0": version: 1.0.0 resolution: "eventsource-parser@npm:1.0.0" @@ -5711,6 +5718,7 @@ __metadata: eslint-config-prettier: ^8.6.0 eslint-import-resolver-typescript: ^3.5.3 eslint-plugin-import: ^2.27.5 + eventemitter3: ^5.0.1 execa: ^7.2.0 hono: ^3.6.0 jest: ^29.4.1 From 0a2c09eefec9aa712dacdf568f172dc1444733c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Fri, 24 Nov 2023 21:06:39 +0800 Subject: [PATCH 2/3] feat: implement queue --- .gitignore | 1 - src/api/controllers/github.ts | 18 +++++++++---- src/api/controllers/webhook.ts | 24 ++++++++++++----- src/github/app.ts | 2 -- src/github/configuration.ts | 24 ----------------- src/github/handler.ts | 44 +++++++++++++----------------- src/github/types.ts | 5 ---- src/queue/index.ts | 3 ++- src/queue/types.ts | 11 +++----- src/queue/worker/github.ts | 49 ++++++++++++++++++++++++++++++++++ 10 files changed, 105 insertions(+), 76 deletions(-) delete mode 100644 src/github/configuration.ts create mode 100644 src/queue/worker/github.ts diff --git a/.gitignore b/.gitignore index 68fb3632..6ca37371 100644 --- a/.gitignore +++ b/.gitignore @@ -13,7 +13,6 @@ Cargo.lock bin/ pkg/ wasm-pack.log -worker/ .space node_modules/ .cargo-ok diff --git a/src/api/controllers/github.ts b/src/api/controllers/github.ts index 7a3dc2c1..66e5f09e 100644 --- a/src/api/controllers/github.ts +++ b/src/api/controllers/github.ts @@ -1,17 +1,17 @@ import { Octokit } from '@octokit/core'; -import { webhookHandler } from '@/github'; +import { validateGithub, webhookHandler } from '@/github'; import { initApp } from '@/github/app'; import { GitHubKVManager } from '@/kv/github'; export function route(hono: THono) { hono.post('/github/app/:id', async (c) => { - const botId = c.req.param('id') ?? c.req.query('id'); - if (!botId) { + const id = c.req.param('id') ?? c.req.query('id'); + if (!id) { return c.send.error(400, 'need a valid id'); } const githubKVManager = new GitHubKVManager(); - const setting = await githubKVManager.getAppSettingById(botId); + const setting = await githubKVManager.getAppSettingById(id); if (!setting) { return c.send.error(400, 'id not found in database'); @@ -21,7 +21,15 @@ export function route(hono: THono) { } const app = await initApp(setting); - return webhookHandler(botId, app.webhooks, c.req, c.executionCtx); + const payload = await validateGithub(c.req, app.webhooks); + + return webhookHandler( + id, + 'github-app', + app.webhooks, + c.executionCtx, + payload, + ); }); hono.get('/github/installation-token/:id', async (c) => { diff --git a/src/api/controllers/webhook.ts b/src/api/controllers/webhook.ts index 011f579a..46a92b0e 100644 --- a/src/api/controllers/webhook.ts +++ b/src/api/controllers/webhook.ts @@ -1,7 +1,10 @@ import { Webhooks } from '@octokit/webhooks'; -import { webhookHandler, setupWebhooksTemplate } from '@/github'; -import Configuration from '@/github/configuration'; +import { + webhookHandler, + setupWebhooksTemplate, + validateGithub, +} from '@/github'; import { GitHubKVManager } from '@/kv/github'; export function route(hono: THono) { @@ -20,15 +23,24 @@ export function route(hono: THono) { return c.send.error(400, 'please set webhook secret in database'); } - Configuration.init(setting); - - const webhooks = new Webhooks<{ octokit: undefined }>({ + const webhooks = new Webhooks<{ + octokit: undefined; + }>({ secret: setting.githubSecret, }); + const payload = await validateGithub(c.req, webhooks); + setupWebhooksTemplate(webhooks, { setting: setting, }); - return webhookHandler(id, webhooks, c.req, c.executionCtx); + + return webhookHandler( + id, + 'github-webhook', + webhooks, + c.executionCtx, + payload, + ); }); } diff --git a/src/github/app.ts b/src/github/app.ts index 89928931..b66f92f7 100644 --- a/src/github/app.ts +++ b/src/github/app.ts @@ -9,7 +9,6 @@ import { GitHubService } from '@opensumi/octo-service'; import { CommandContext, issueCc } from './commands'; import { parseCommandInMarkdownComments } from './commands/parse'; -import Configuration from './configuration'; import { setupWebhooksTemplate } from './handler'; import { OpenSumiOctoService } from './service/opensumi'; import { sendToDing } from './utils'; @@ -25,7 +24,6 @@ export class App { constructor(setting: AppSetting) { const { appSettings, githubSecret } = setting; - Configuration.init(setting); this.octoApp = new OctoApp({ appId: appSettings.appId, diff --git a/src/github/configuration.ts b/src/github/configuration.ts deleted file mode 100644 index 193dedce..00000000 --- a/src/github/configuration.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { ISetting } from '@/kv/types'; - -export default class Configuration { - private constructor() { - // noop - } - static #instance: Configuration; - static instance() { - if (!this.#instance) { - this.#instance = new Configuration(); - } - return this.#instance; - } - - contentLimit = 300; - - static init(setting: ISetting) { - const instance = this.instance(); - if (setting.contentLimit) { - instance.contentLimit = setting.contentLimit; - } - return instance; - } -} diff --git a/src/github/handler.ts b/src/github/handler.ts index 6cc65a20..256eadd9 100644 --- a/src/github/handler.ts +++ b/src/github/handler.ts @@ -11,7 +11,7 @@ import { error, json } from '@/api/utils/response'; import Environment from '@/env'; import { getTemplates, StopHandleError } from './templates'; -import type { MarkdownContent, Context, ITemplateResult } from './types'; +import type { MarkdownContent, Context } from './types'; import { sendToDing } from './utils'; export class ValidationError extends Error { @@ -20,11 +20,18 @@ export class ValidationError extends Error { } } +export interface IGitHubEvent { + id: string; + event: string; + text: string; + payload: any; +} + export async function validateGithub( // eslint-disable-next-line @typescript-eslint/ban-types req: HonoRequest, webhooks: Webhooks, -) { +): Promise { const headers = req.raw.headers; if (!headers.get('User-Agent')?.startsWith('GitHub-Hookshot/')) { @@ -71,9 +78,9 @@ const blockedUser = new Set(['renovate[bot]']); export const setupWebhooksTemplate = ( webhooks: Webhooks<{ octokit?: Octokit }>, - ctx: Context, + context: Context, ) => { - const templates = getTemplates(ctx); + const templates = getTemplates(context); const supportTemplates = Object.keys(templates) as EmitterWebhookEventName[]; // webhooks.onAny(async ({ id, name, payload }) => { @@ -106,17 +113,12 @@ export const setupWebhooksTemplate = ( console.log('run handler:', handler?.name); const data = await handler(payload, { - ...ctx, + ...context, octokit, }); console.log('get data from handler: ', data); - const result = { - data, - eventName, - } as ITemplateResult; - - await sendToDing(data, eventName, ctx.setting); + await sendToDing(data, eventName, context.setting); } catch (err) { console.log('stop handler because: ', err); if (!(err instanceof StopHandleError)) { @@ -129,28 +131,20 @@ export const setupWebhooksTemplate = ( export async function webhookHandler( botId: string, - webhooks: Webhooks, - // eslint-disable-next-line @typescript-eslint/ban-types - req: HonoRequest, + type: 'github-app' | 'github-webhook', + webhooks: Webhooks<{ octokit?: Octokit }>, execContext: ExecutionContext, + data: IGitHubEvent, ) { + const { id, event: eventName, payload } = data; try { - const { - id, - event: eventName, - payload, - } = await validateGithub(req, webhooks); console.log('Receive Github Webhook, id: ', id, ', name: ', eventName); try { if (Environment.instance().useQueue) { Environment.instance().Queue.send({ botId, - type: 'github-app', - data: { - id: id, - event: eventName, - payload: payload, - }, + type, + data, }); } else { execContext.waitUntil( diff --git a/src/github/types.ts b/src/github/types.ts index a5782b0d..43e111c6 100644 --- a/src/github/types.ts +++ b/src/github/types.ts @@ -17,11 +17,6 @@ export type MarkdownContent = { text: string; }; -export interface ITemplateResult { - data: MarkdownContent; - eventName: string; -} - export type TemplateMapping = { [TEmitterEvent in EmitterWebhookEventName]?: ( payload: ExtractPayload, diff --git a/src/queue/index.ts b/src/queue/index.ts index cd61ff59..5bdaa1d1 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -1,7 +1,7 @@ import { EventEmitter } from 'eventemitter3'; import { TQueueMessage } from './types'; -import { githubWorker } from './worker/github'; +import { githubWorker, githubWebhookWorker } from './worker/github'; export type IConsumeWorker = ( message: Message, @@ -32,3 +32,4 @@ export class QueueConsumer { export const consumer = new QueueConsumer(); consumer.addWorker('github-app', githubWorker); +consumer.addWorker('github-webhook', githubWebhookWorker); diff --git a/src/queue/types.ts b/src/queue/types.ts index 03bc26a0..89aeb3f7 100644 --- a/src/queue/types.ts +++ b/src/queue/types.ts @@ -1,16 +1,14 @@ +import { IGitHubEvent } from '@/github'; + export interface IGitHubEventQueueMessage { /** * current bot id */ botId: string; - type: 'github-app'; + type: 'github-app' | 'github-webhook'; - data: { - id: string; - event: string; - payload: any; - }; + data: IGitHubEvent; } export interface IWechatyQueueMessage { @@ -20,7 +18,6 @@ export interface IWechatyQueueMessage { botId: string; type: 'wechaty'; - data: any; } diff --git a/src/queue/worker/github.ts b/src/queue/worker/github.ts new file mode 100644 index 00000000..6ed90dec --- /dev/null +++ b/src/queue/worker/github.ts @@ -0,0 +1,49 @@ +import { Webhooks } from '@octokit/webhooks'; + +import { initApp } from '@/github/app'; +import { webhookHandler } from '@/github/handler'; +import { GitHubKVManager } from '@/kv/github'; + +import { IGitHubEventQueueMessage } from '../types'; + +export const githubWorker = async ( + message: Message, + env: IRuntimeEnv, + ctx: ExecutionContext, +) => { + const { body } = message; + const { botId, type, data } = body; + + const githubKVManager = new GitHubKVManager(); + const setting = await githubKVManager.getAppSettingById(botId); + + if (setting && setting.githubSecret) { + const app = await initApp(setting); + + await webhookHandler(botId, type, app.webhooks, ctx, data); + } else { + // todo logger + } +}; + +export const githubWebhookWorker = async ( + message: Message, + env: IRuntimeEnv, + ctx: ExecutionContext, +) => { + const { body } = message; + const { botId, type, data } = body; + + const githubKVManager = new GitHubKVManager(); + const setting = await githubKVManager.getSettingById(botId); + + if (setting && setting.githubSecret) { + const webhooks = new Webhooks<{ octokit: undefined }>({ + secret: setting.githubSecret, + }); + + await webhookHandler(botId, type, webhooks, ctx, data); + } else { + // todo logger + } +}; From b5272a325f91bc5774757ebb8f06c8f647a9b6de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Fri, 24 Nov 2023 21:20:45 +0800 Subject: [PATCH 3/3] fix: no env and no queue --- .env.example | 4 +++- src/github/handler.ts | 8 +++++++- src/queue/worker/github.ts | 4 ++-- src/runtime/cfworker/index.ts | 2 ++ 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/.env.example b/.env.example index da310659..5d97f79e 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,8 @@ GITHUB_TOKEN= KV_LOCAL_ID= -QUEUE_NAME= +KV_PROD_ID= +QUEUE_NAME_LOCAL= +QUEUE_NAME_PROD= BINDING_NAME=metricsDataset # Node Runtime Env diff --git a/src/github/handler.ts b/src/github/handler.ts index 256eadd9..ba38d445 100644 --- a/src/github/handler.ts +++ b/src/github/handler.ts @@ -135,12 +135,18 @@ export async function webhookHandler( webhooks: Webhooks<{ octokit?: Octokit }>, execContext: ExecutionContext, data: IGitHubEvent, + forceNoQueue?: boolean, ) { const { id, event: eventName, payload } = data; try { console.log('Receive Github Webhook, id: ', id, ', name: ', eventName); try { - if (Environment.instance().useQueue) { + let useQueue = Environment.instance().useQueue; + if (forceNoQueue) { + useQueue = false; + } + + if (useQueue) { Environment.instance().Queue.send({ botId, type, diff --git a/src/queue/worker/github.ts b/src/queue/worker/github.ts index 6ed90dec..9ebb30b4 100644 --- a/src/queue/worker/github.ts +++ b/src/queue/worker/github.ts @@ -20,7 +20,7 @@ export const githubWorker = async ( if (setting && setting.githubSecret) { const app = await initApp(setting); - await webhookHandler(botId, type, app.webhooks, ctx, data); + await webhookHandler(botId, type, app.webhooks, ctx, data, true); } else { // todo logger } @@ -42,7 +42,7 @@ export const githubWebhookWorker = async ( secret: setting.githubSecret, }); - await webhookHandler(botId, type, webhooks, ctx, data); + await webhookHandler(botId, type, webhooks, ctx, data, true); } else { // todo logger } diff --git a/src/runtime/cfworker/index.ts b/src/runtime/cfworker/index.ts index 0c598fdf..b9883819 100644 --- a/src/runtime/cfworker/index.ts +++ b/src/runtime/cfworker/index.ts @@ -21,6 +21,8 @@ export default { env: IRuntimeEnv, ctx: ExecutionContext, ) { + Environment.from('cfworker', env); + const messages = batch.messages; messages.forEach((message) => { consumer.consume(message, env, ctx);