Skip to content

Commit

Permalink
refactor: make queue clear
Browse files Browse the repository at this point in the history
  • Loading branch information
bytemain committed Jan 30, 2024
1 parent 0028874 commit 4b86e19
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 97 deletions.
10 changes: 4 additions & 6 deletions src/api/controllers/github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ export function route(hono: THono) {
if (!id) {
return c.send.error(400, 'need a valid id');
}
const githubKVManager = new GitHubKVManager();
const setting = await githubKVManager.getAppSettingById(id);

const setting = await GitHubKVManager.instance().getAppSettingById(id);

if (!setting) {
return c.send.error(400, 'id not found in database');
Expand Down Expand Up @@ -62,8 +62,7 @@ export function route(hono: THono) {
}

// 先查数据库有没有设置这个 id 对应的 installation id
const githubKVManager = new GitHubKVManager();
const setting = await githubKVManager.getAppSettingById(id);
const setting = await GitHubKVManager.instance().getAppSettingById(id);

if (!setting) {
return c.send.error(400, 'id not found in database');
Expand Down Expand Up @@ -107,8 +106,7 @@ export function route(hono: THono) {
}

// 先查数据库有没有设置这个 id 对应的 installation id
const githubKVManager = new GitHubKVManager();
const setting = await githubKVManager.getAppSettingById(id);
const setting = await GitHubKVManager.instance().getAppSettingById(id);

if (!setting) {
return c.send.error(400, 'id not found in database');
Expand Down
3 changes: 1 addition & 2 deletions src/api/controllers/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ export function route(hono: THono) {
if (!id) {
return c.send.error(400, 'need a valid id');
}
const githubKVManager = new GitHubKVManager();
const setting = await githubKVManager.getSettingById(id);
const setting = await GitHubKVManager.instance().getSettingById(id);

if (!setting) {
return c.send.error(400, 'id not found in database');
Expand Down
15 changes: 7 additions & 8 deletions src/github/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,20 @@ export async function webhookHandler(
try {
logger.info('Receive Github Webhook, id: ', id, ', name: ', eventName);
try {
const webhookEvent = {
id: id,
name: eventName as any,
payload: payload,
};
if (useQueue) {
logger.info('send to queue');
Environment.instance().Queue.send({
botId,
type,
data,
data: webhookEvent,
});
} else {
execContext.waitUntil(
webhooks.receive({
id: id,
name: eventName as any,
payload: payload,
}),
);
execContext.waitUntil(webhooks.receive(webhookEvent));
}

return json({
Expand Down
2 changes: 1 addition & 1 deletion src/im/ding/bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class DingBotAdapter implements IBotAdapter {
public ctx: ExecutionContext,
public setting: IDingBotSetting,
) {
this.githubKVManager = new GitHubKVManager();
this.githubKVManager = GitHubKVManager.instance();
this.conversationKVManager = new ConversationKVManager(msg);
this.userInfoKVManager = new DingUserKVManager();
}
Expand Down
10 changes: 9 additions & 1 deletion src/kv/github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export class GitHubKVManager {
appSettingsKV: KVManager<AppSetting>;
settingsKV: KVManager<ISetting>;

constructor() {
private constructor() {
this.appSettingsKV = KVManager.for<AppSetting>(
GitHubCommon.GITHUB_APP_SETTINGS_PREFIX,
);
Expand All @@ -15,6 +15,14 @@ export class GitHubKVManager {
);
}

private static _instance: GitHubKVManager;
static instance() {
if (!this._instance) {
this._instance = new GitHubKVManager();
}
return this._instance;
}

setAppSettingById(id: string, data: AppSetting) {
return this.appSettingsKV.setJSON(id, data);
}
Expand Down
8 changes: 3 additions & 5 deletions src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import { GitHubAppWorker } from './worker/github';
export class QueueConsumer<T> {
logger = Logger.instance();

constructor(public env: IRuntimeEnv, public ctx: ExecutionContext) {}

private workerMap = new MultiMap<string, BaseWorker<T>>();

addWorker(type: string, worker: BaseWorker<T>) {
Expand All @@ -35,9 +33,9 @@ export class QueueConsumer<T> {
}
}

export const createConsumer = (env: IRuntimeEnv, ctx: ExecutionContext) => {
const consumer = new QueueConsumer<TQueueMessage>(env, ctx);
consumer.addWorker('github-app', new GitHubAppWorker(env, ctx));
export const createConsumer = () => {
const consumer = new QueueConsumer<TQueueMessage>();
consumer.addWorker('github-app', new GitHubAppWorker());
// consumer.addWorker('github-webhook', githubWebhookWorker);
return consumer;
};
8 changes: 6 additions & 2 deletions src/queue/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { IGitHubEvent } from '@/github';
export interface IGitHubMessageBody {
id: string;
name: string;
payload: any;
}

export interface IGitHubEventQueueMessage {
/**
Expand All @@ -8,7 +12,7 @@ export interface IGitHubEventQueueMessage {

type: 'github-app' | 'github-webhook';

data: IGitHubEvent;
data: IGitHubMessageBody;
}

export interface IWechatyQueueMessage {
Expand Down
138 changes: 82 additions & 56 deletions src/queue/worker/github.ts
Original file line number Diff line number Diff line change
@@ -1,77 +1,103 @@
import { EmitterWebhookEventName } from '@octokit/webhooks';
import chain from 'lodash/chain';
import groupBy from 'lodash/groupBy';
import MultiMap from 'mnemonist/multi-map';

import { initApp } from '@/github/app';
import { setupWebhooksTemplate } from '@/github/handler';
import { MarkdownContent } from '@/github/types';
import { sendToDing } from '@/github/utils';
import { GitHubKVManager } from '@/kv/github';
import { Logger } from '@/utils/logger';

import { IGitHubEventQueueMessage } from '../types';

import { BaseWorker } from '.';

export class GitHubAppWorker extends BaseWorker<IGitHubEventQueueMessage> {
logger = Logger.instance();

async run() {
await Promise.allSettled(
chain(this.queue)
.groupBy((v) => v.body.botId)
.map(async (messages, botId) => {
const githubKVManager = new GitHubKVManager();
const setting = await githubKVManager.getAppSettingById(botId);

if (setting && setting.githubSecret) {
const app = await initApp(setting);

const results = new MultiMap<string, MarkdownContent>();

setupWebhooksTemplate(
app.webhooks,
{ setting },
async ({ markdown, eventName }) => {
results.set(eventName, markdown);
},
);
const byId = groupBy(this.queue, (v) => v.body.botId);

const result = await Promise.allSettled(
Object.entries(byId).map(async ([botId, messages]) => {
this.logger.info('consume for', botId, messages.length);

const setting = await GitHubKVManager.instance().getAppSettingById(
botId,
);

if (!setting) {
console.error('github app worker error: setting not found', botId);
return;
}

if (!setting.githubSecret) {
console.error(
'github app worker error: please set app webhook secret in database',
botId,
);
return;
}

const app = await initApp(setting);

const results = new MultiMap<string, MarkdownContent>();

setupWebhooksTemplate(
app.webhooks,
{ setting },
async ({ markdown, eventName }) => {
results.set(eventName, markdown);
},
);

const byEvent = groupBy(messages, (v) => v.body.data.name) as Record<
EmitterWebhookEventName,
Message<IGitHubEventQueueMessage>[]
>;

await Promise.allSettled(
Object.entries(byEvent).map(async ([eventName, messages]) => {
await Promise.allSettled(
chain(messages)
.groupBy((v) => v.body.data.event)
.map(async (messages, eventName: EmitterWebhookEventName) => {
await Promise.all(
messages.map(async (message) => {
try {
const { data } = message.body;
await app.webhooks.receive({
id: data.id,
name: data.event as any,
payload: data.payload,
});
message.ack();
} catch (error) {
console.error('github app worker error', error);
message.retry();
}
}),
);

const markdowns = results.get(eventName);
if (markdowns && markdowns.length > 0) {
// 只有特定内容的 content 要被合并起来
await Promise.allSettled(
markdowns.map((markdown) =>
sendToDing(markdown, eventName, setting),
),
);
}
})
.value(),
messages.map(async (message) => {
try {
const { data } = message.body;
await app.webhooks.receive({
id: data.id,
name: data.name as any,
payload: data.payload,
});
message.ack();
} catch (error) {
console.error('github app worker error', error);
message.retry();
}
}),
);
} else {
console.error('github app worker error: setting not found', botId);
}
})
.value(),

const markdowns = results.get(eventName);
if (markdowns && markdowns.length > 0) {
// 只有特定内容的 content 要被合并起来
await Promise.allSettled(
markdowns.map((markdown) =>
sendToDing(
markdown,
eventName as EmitterWebhookEventName,
setting,
),
),
);
}
}),
);
}),
);

result.forEach((v) => {
if (v.status === 'rejected') {
console.error('github app worker error', v);
}
});
}
}
2 changes: 0 additions & 2 deletions src/queue/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ export abstract class BaseWorker<T> {

queue: Message<T>[] = [];

constructor(public env: IRuntimeEnv, public ctx: ExecutionContext) {}

consume(message: Message<T>) {
this.queue.push(message);
}
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/cfworker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export default {
) {
Environment.from('cfworker', env);

const consumer = createConsumer(env, ctx);
const consumer = createConsumer();

consumer.consume(...batch.messages);

Expand Down
25 changes: 25 additions & 0 deletions test/__mocks__/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { EdgeKVNamespace } from 'edge-mock';

import Environment from '@/env';
import { GitHubCommon } from '@/kv/constants';

export function prepareEnv() {
const kv = new EdgeKVNamespace();
Environment.from('node', {
KV: kv,
MESSAGE_QUEUE: {} as any,
});

kv.put(
`${GitHubCommon.GITHUB_APP_SETTINGS_PREFIX}mock`,
JSON.stringify({
appSettings: {
appId: process.env.GITHUB_APPID,
privateKey: process.env.GITHUB_APP_PRIVATE_KEY!.replace(/\\n/g, '\n'),
},
githubSecret: process.env.GITHUB_TOKEN,
dingWebhooks: [],
contentLimit: 300,
}),
);
}
4 changes: 4 additions & 0 deletions test/__mocks__/queue/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ export class MockMessageBatch<T> implements MessageBatch<T> {
ackAll(): void {
this.messages.forEach((v) => v.ack());
}

static from<T>(messages: T[]) {
return new MockMessageBatch(messages.map((v) => new MockMessage(v)));
}
}
16 changes: 3 additions & 13 deletions test/queue/base.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import { makeEdgeEnv, EdgeKVNamespace } from 'edge-mock';
import { makeEdgeEnv } from 'edge-mock';
makeEdgeEnv();

declare const FetchEvent: any;
declare const Request: any;

import { QueueConsumer } from '@/queue';
import { BaseWorker } from '@/queue/worker';

Expand All @@ -15,18 +12,11 @@ class Worker<T> extends BaseWorker<T> {
}
}

const env = {
KV: new EdgeKVNamespace() as any,
MESSAGE_QUEUE: {} as any,
};
const request = new Request('/?foo=1', { method: 'POST', body: 'hello' });
const event = new FetchEvent('fetch', { request });

describe('queue consumer', () => {
it('can work', async () => {
const consumer = new QueueConsumer<FakeMessage>(env, event);
const consumer = new QueueConsumer<FakeMessage>();
const batch = createMessageBatch();
const wk = new Worker<FakeMessage>(env, event);
const wk = new Worker<FakeMessage>();
consumer.addWorker('test', wk);
consumer.consume(...batch.messages);
expect(wk.queue.length).toBe(100);
Expand Down
Loading

0 comments on commit 4b86e19

Please sign in to comment.