Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: using cloudflare queue #12

Merged
merged 3 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
GITHUB_TOKEN=
KV_LOCAL_ID=
QUEUE_NAME=
KV_PROD_ID=
QUEUE_NAME_LOCAL=
QUEUE_NAME_PROD=
BINDING_NAME=metricsDataset

# Node Runtime Env
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ Cargo.lock
bin/
pkg/
wasm-pack.log
worker/
.space
node_modules/
.cargo-ok
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
"@opensumi/ide-utils": "^2.23.1",
"@opensumi/octo-service": "workspace:^",
"chatgpt": "^5.0.9",
"eventemitter3": "^5.0.1",
"hono": "^3.6.0",
"lodash": "^4.17.21",
"mdast-util-from-markdown": "^1.3.0",
Expand Down
12 changes: 10 additions & 2 deletions src/api/controllers/github.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Octokit } from '@octokit/core';

import { webhookHandler } from '@/github';
import { validateGithub, webhookHandler } from '@/github';
import { initApp } from '@/github/app';
import { GitHubKVManager } from '@/kv/github';

Expand All @@ -21,7 +21,15 @@ export function route(hono: THono) {
}

const app = await initApp(setting);
return webhookHandler(app.webhooks, c.req, c.executionCtx);
const payload = await validateGithub(c.req, app.webhooks);

return webhookHandler(
id,
'github-app',
app.webhooks,
c.executionCtx,
payload,
);
});

hono.get('/github/installation-token/:id', async (c) => {
Expand Down
24 changes: 18 additions & 6 deletions src/api/controllers/webhook.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { Webhooks } from '@octokit/webhooks';

import { webhookHandler, setupWebhooksTemplate } from '@/github';
import Configuration from '@/github/configuration';
import {
webhookHandler,
setupWebhooksTemplate,
validateGithub,
} from '@/github';
import { GitHubKVManager } from '@/kv/github';

export function route(hono: THono) {
Expand All @@ -20,15 +23,24 @@ export function route(hono: THono) {
return c.send.error(400, 'please set webhook secret in database');
}

Configuration.init(setting);

const webhooks = new Webhooks<{ octokit: undefined }>({
const webhooks = new Webhooks<{
octokit: undefined;
}>({
secret: setting.githubSecret,
});

const payload = await validateGithub(c.req, webhooks);

setupWebhooksTemplate(webhooks, {
setting: setting,
});
return webhookHandler(webhooks, c.req, c.executionCtx);

return webhookHandler(
id,
'github-webhook',
webhooks,
c.executionCtx,
payload,
);
});
}
8 changes: 7 additions & 1 deletion src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@ export default class Environment {

static #instance: Environment | null;

useQueue = false;

get Queue() {
return this.env.MESSAGE_QUEUE;
}

get KV() {
return this.env.KV_PROD;
return this.env.KV;
}

get metrics() {
Expand Down
2 changes: 0 additions & 2 deletions src/github/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { GitHubService } from '@opensumi/octo-service';

import { CommandContext, issueCc } from './commands';
import { parseCommandInMarkdownComments } from './commands/parse';
import Configuration from './configuration';
import { setupWebhooksTemplate } from './handler';
import { OpenSumiOctoService } from './service/opensumi';
import { sendToDing } from './utils';
Expand All @@ -25,7 +24,6 @@ export class App {

constructor(setting: AppSetting) {
const { appSettings, githubSecret } = setting;
Configuration.init(setting);

this.octoApp = new OctoApp({
appId: appSettings.appId,
Expand Down
24 changes: 0 additions & 24 deletions src/github/configuration.ts

This file was deleted.

67 changes: 41 additions & 26 deletions src/github/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import { User } from '@octokit/webhooks-types';
import { HonoRequest } from 'hono';

import { error, json } from '@/api/utils/response';
import Environment from '@/env';

import { getTemplates, StopHandleError } from './templates';
import type { MarkdownContent, Context, ITemplateResult } from './types';
import type { MarkdownContent, Context } from './types';
import { sendToDing } from './utils';

export class ValidationError extends Error {
Expand All @@ -19,11 +20,18 @@ export class ValidationError extends Error {
}
}

export interface IGitHubEvent {
id: string;
event: string;
text: string;
payload: any;
}

export async function validateGithub(
// eslint-disable-next-line @typescript-eslint/ban-types
req: HonoRequest<any, {}>,
webhooks: Webhooks,
) {
): Promise<IGitHubEvent> {
const headers = req.raw.headers;

if (!headers.get('User-Agent')?.startsWith('GitHub-Hookshot/')) {
Expand Down Expand Up @@ -70,9 +78,9 @@ const blockedUser = new Set(['renovate[bot]']);

export const setupWebhooksTemplate = (
webhooks: Webhooks<{ octokit?: Octokit }>,
ctx: Context,
context: Context,
) => {
const templates = getTemplates(ctx);
const templates = getTemplates(context);
const supportTemplates = Object.keys(templates) as EmitterWebhookEventName[];

// webhooks.onAny(async ({ id, name, payload }) => {
Expand Down Expand Up @@ -105,17 +113,12 @@ export const setupWebhooksTemplate = (
console.log('run handler:', handler?.name);

const data = await handler(payload, {
...ctx,
...context,
octokit,
});
console.log('get data from handler: ', data);

const result = {
data,
eventName,
} as ITemplateResult;

await sendToDing(data, eventName, ctx.setting);
await sendToDing(data, eventName, context.setting);
} catch (err) {
console.log('stop handler because: ', err);
if (!(err instanceof StopHandleError)) {
Expand All @@ -127,26 +130,38 @@ export const setupWebhooksTemplate = (
};

export async function webhookHandler(
webhooks: Webhooks,
// eslint-disable-next-line @typescript-eslint/ban-types
req: HonoRequest<any, {}>,
botId: string,
type: 'github-app' | 'github-webhook',
webhooks: Webhooks<{ octokit?: Octokit }>,
execContext: ExecutionContext,
data: IGitHubEvent,
forceNoQueue?: boolean,
) {
const { id, event: eventName, payload } = data;
try {
const {
id,
event: eventName,
payload,
} = await validateGithub(req, webhooks);
console.log('Receive Github Webhook, id: ', id, ', name: ', eventName);
try {
execContext.waitUntil(
webhooks.receive({
id: id,
name: eventName as any,
payload: payload,
}),
);
let useQueue = Environment.instance().useQueue;
if (forceNoQueue) {
useQueue = false;
}

if (useQueue) {
Environment.instance().Queue.send({
botId,
type,
data,
});
} else {
execContext.waitUntil(
webhooks.receive({
id: id,
name: eventName as any,
payload: payload,
}),
);
}

return json({
id: id,
name: eventName,
Expand Down
5 changes: 0 additions & 5 deletions src/github/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ export type MarkdownContent = {
text: string;
};

export interface ITemplateResult {
data: MarkdownContent;
eventName: string;
}

export type TemplateMapping = {
[TEmitterEvent in EmitterWebhookEventName]?: (
payload: ExtractPayload<TEmitterEvent>,
Expand Down
35 changes: 35 additions & 0 deletions src/queue/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { EventEmitter } from 'eventemitter3';

import { TQueueMessage } from './types';
import { githubWorker, githubWebhookWorker } from './worker/github';

export type IConsumeWorker<T> = (
message: Message<T>,
env: IRuntimeEnv,
ctx: ExecutionContext,
) => void;

export class QueueConsumer {
private emitter = new EventEmitter();
addWorker<T, K extends TQueueMessage['type']>(
type: K,
handler: IConsumeWorker<T>,
) {
this.emitter.on(type, handler);
}

consume(
message: Message<TQueueMessage>,
env: IRuntimeEnv,
ctx: ExecutionContext,
) {
const { body } = message;
const { type } = body;

this.emitter.emit(type, message, env, ctx);
}
}

export const consumer = new QueueConsumer();
consumer.addWorker('github-app', githubWorker);
consumer.addWorker('github-webhook', githubWebhookWorker);
24 changes: 24 additions & 0 deletions src/queue/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { IGitHubEvent } from '@/github';

export interface IGitHubEventQueueMessage {
/**
* current bot id
*/
botId: string;

type: 'github-app' | 'github-webhook';

data: IGitHubEvent;
}

export interface IWechatyQueueMessage {
/**
* current bot id
*/
botId: string;

type: 'wechaty';
data: any;
}

export type TQueueMessage = IGitHubEventQueueMessage | IWechatyQueueMessage;
49 changes: 49 additions & 0 deletions src/queue/worker/github.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { Webhooks } from '@octokit/webhooks';

import { initApp } from '@/github/app';
import { webhookHandler } from '@/github/handler';
import { GitHubKVManager } from '@/kv/github';

import { IGitHubEventQueueMessage } from '../types';

export const githubWorker = async (
message: Message<IGitHubEventQueueMessage>,
env: IRuntimeEnv,
ctx: ExecutionContext,
) => {
const { body } = message;
const { botId, type, data } = body;

const githubKVManager = new GitHubKVManager();
const setting = await githubKVManager.getAppSettingById(botId);

if (setting && setting.githubSecret) {
const app = await initApp(setting);

await webhookHandler(botId, type, app.webhooks, ctx, data, true);
} else {
// todo logger
}
};

export const githubWebhookWorker = async (
message: Message<IGitHubEventQueueMessage>,
env: IRuntimeEnv,
ctx: ExecutionContext,
) => {
const { body } = message;
const { botId, type, data } = body;

const githubKVManager = new GitHubKVManager();
const setting = await githubKVManager.getSettingById(botId);

if (setting && setting.githubSecret) {
const webhooks = new Webhooks<{ octokit: undefined }>({
secret: setting.githubSecret,
});

await webhookHandler(botId, type, webhooks, ctx, data, true);
} else {
// todo logger
}
};
Loading