Skip to content

Commit

Permalink
feat: add eventSource and fetch Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
chaxus committed Dec 23, 2024
1 parent 1fa02ff commit 3319a4d
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 38 deletions.
16 changes: 9 additions & 7 deletions packages/im/app/controllers/home.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import fs from 'node:fs';
import { fileURLToPath } from 'node:url';
import path from 'node:path';
import { getMime } from 'ranuts';
import { ssr } from '@/app/lib/vite';
import { vite } from '@/app/lib/vite';
import { HtmlWritable, getEnv } from '@/app/lib/index';
import { FORMAT, HTML_PATH_MAP, RENDER_PATH_MAP, TEMPLATE_REPLACE } from '@/app/lib/constant';
import type { Context } from '@/app/types/index';
Expand All @@ -11,12 +11,12 @@ const env = getEnv();

const __dirname = path.dirname(fileURLToPath(import.meta.url));

export default class ServerRender {
export default class HomeController {
async index(ctx: Context): Promise<void> {
try {
const fsTemplate = fs.readFileSync(path.resolve(__dirname, HTML_PATH_MAP[env]), FORMAT);
const template = await ssr.transformIndexHtml(ctx.request.path, fsTemplate);
const { render } = await ssr.ssrLoadModule(path.resolve(__dirname, RENDER_PATH_MAP[env]));
const template = await vite.transformIndexHtml(ctx.request.path, fsTemplate);
const { render } = await vite.ssrLoadModule(path.resolve(__dirname, RENDER_PATH_MAP[env]));
const writable = new HtmlWritable();
const stream = render(ctx, {
onShellReady() {
Expand All @@ -32,10 +32,12 @@ export default class ServerRender {
onAllReady() {
const type = getMime(ctx.request.url) || 'text/html';
if (type) {
ctx.response.setHeader('Content-Type', type);
ctx.res.setHeader('Content-Type', type);
} else {
ctx.response.setHeader('Content-Type', 'text/html');
ctx.response.setHeader('Transfer-Encoding', 'chunked');
// 如果 `getMime` 返回 `null` 或 `undefined`,你可以手动设置 `Content-Type`
// ctx.res.setHeader('Content-Type', 'application/javascript');
// ctx.response.setHeader('Content-Type', 'text/html');
// ctx.response.setHeader('Transfer-Encoding', 'chunked');
}
},
});
Expand Down
41 changes: 41 additions & 0 deletions packages/im/app/controllers/im.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import type { Context } from "@/app/types/index";

export default class IMController {
dialog(ctx: Context): void {
try {
const { res, req, request } = ctx;
const { chat_id, question } = request.body;
console.log('dialog:', chat_id, question);
const headers = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}
// 如果请求 /events 路径,建立 SSE 连接
res.writeHead(200, headers)
// 每隔 1 秒发送一条消息
let id = 0
const intervalId = setInterval(() => {
res.write(`event: customEvent\n`)
res.write(`id: ${id}\n`)
res.write(`retry: 30000\n`)
const data = { id, time: new Date().toISOString(), question }
res.write(`data: ${JSON.stringify(data)}\n\n`)
id++
if (id >= 10) {
clearInterval(intervalId)
res.end()
}
}, 1000)
// 当客户端关闭连接时停止发送消息
req.on('close', () => {
clearInterval(intervalId)
id = 0
res.end()
})
} catch (error) {
console.log('dialog:', error);
// ctx.errorHandler({ error });
}
}
}
58 changes: 35 additions & 23 deletions packages/im/app/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { routing } from '@/app/routes';
import { createContext, createController } from '@/app/lib/context';
import { LOCAL_URL, MIME_TYPES, PORT } from '@/app/lib/constant';
import type { Context } from '@/app/types/index';
import { vite } from '@/app/lib/vite';

// 静态文件目录
const createStaticDir = () => {
Expand All @@ -18,25 +19,25 @@ const createStaticDir = () => {
// 静态服务
const createStaticServer =
(dirStatic: string) =>
(ctx: Context, callback = noop): void => {
const { req, res } = ctx;
const filePath = path.join(dirStatic, req.url || '/');
const extname = String(path.extname(filePath)).toLowerCase();
const contentType = MIME_TYPES[extname] || 'application/octet-stream';
fs.readFile(filePath, (error, content) => {
if (error) {
if (error.code === 'ENOENT') {
callback(ctx);
(ctx: Context, callback = noop): void => {
const { req, res } = ctx;
const filePath = path.join(dirStatic, req.url || '/');
const extname = String(path.extname(filePath)).toLowerCase();
const contentType = MIME_TYPES[extname] || 'application/octet-stream';
fs.readFile(filePath, (error, content) => {
if (error) {
if (error.code === 'ENOENT') {
callback(ctx);
} else {
res.writeHead(500);
res.end(`Server Error: ${error.code}`);
}
} else {
res.writeHead(500);
res.end(`Server Error: ${error.code}`);
res.writeHead(200, { 'Content-Type': contentType });
res.end(content, 'utf-8');
}
} else {
res.writeHead(200, { 'Content-Type': contentType });
res.end(content, 'utf-8');
}
});
};
});
};
// controller 目录
const createControllerDir = () => {
const __filename = fileURLToPath(import.meta.url);
Expand All @@ -48,12 +49,23 @@ const dirController = createControllerDir();

createController(dirController).then((controller) => {
const server = createServer((req: IncomingMessage, res: ServerResponse) => {
createContext(req, res, controller).then((ctx: Context) => {
// 匹配静态服务
createStaticServer(createStaticDir())(ctx, () => {
// 没有静态服务再匹配路由
routing(ctx);
});
// 创建上下文
createContext(req, res, controller).then((ctx) => {
// 定义一个处理函数链
const handleRequest = (handlers: Array<(ctx: Context, next: () => void) => void>, index = 0) => {
if (index < handlers.length) {
handlers[index](ctx, () => handleRequest(handlers, index + 1));
}
};
// 处理函数链,包括静态文件服务、Vite 中间件和自定义路由
const handlers = [
(ctx: Context, next: () => void) => createStaticServer(createStaticDir())(ctx, next),
(_: Context, next: () => void) => vite.middlewares(req, res, next),
(ctx: Context) => routing(ctx)
];

// 开始处理请求
handleRequest(handlers);
});
});
server.listen(PORT, () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/im/app/lib/vite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ const viteServer = async () => {
});
};

export const ssr = await viteServer();
export const vite = await viteServer();
7 changes: 5 additions & 2 deletions packages/im/app/router.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
* env: 该路由在什么环境中生效 local|test|staging|prod 不同环境的路由可以放到一起
*/
const serverRender = {
// 获取主页
'get=>#/^(?!/api).*$/#': 'home#index',
// 获取 client 端页面
// 'get=>#/^(?!/api).*$/#': 'home#index',
'get=>/home': 'home#index',
// 用户
'post=>/api/user/login': 'user#login',
// IM 消息通信
'post=>/api/im/dialog': 'im#dialog',
};

export default {
Expand Down
4 changes: 2 additions & 2 deletions packages/im/app/routes.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { noop } from 'ranuts';
import routes from '@/app/router.config';
import type { Context } from '@/app/types/index';
import { notMatchResponse } from '@/app/lib/response';
// import { notMatchResponse } from '@/app/lib/response';

const regex = /#\/(.*?)\/#/;

Expand All @@ -21,7 +21,7 @@ export const routing = (ctx: Context): void => {
return;
} else {
// 处理匹配不上的情况
notMatchResponse(ctx);
// notMatchResponse(ctx);
}
}
};
127 changes: 127 additions & 0 deletions packages/im/client/lib/eventSource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
interface RequestOption {
onopen?: () => void
onmessage?: (data: string) => void
onclose?: () => void
onerror?: (e: Error) => void
}

enum EventSourceStatus {
CONNECTING,
OPEN,
CLOSED,
ERROR
}

export class EventStreamFetch {
controller?: AbortController;
status: EventSourceStatus;
constructor() {
this.controller = undefined
this.status = EventSourceStatus.CLOSED
}
// 建立 FETCH-SSE 连接
public fetchStream = (url: string, body: Record<string, string> = {}): Promise<void> => {
this.controller = new AbortController()
const headers = {
'content-type': 'application/json',
'Accept': 'text/event-stream',
'Connection': 'keep-alive',
}
return this.fetchEventSource(url, {
method: 'POST',
headers,
body: JSON.stringify(body),
signal: this.controller.signal,
onopen: () => {
this.status = EventSourceStatus.OPEN
console.log('FETCH 连接打开');
},
onclose: () => {
this.status = EventSourceStatus.CLOSED
console.log('FETCH 连接关闭');
},
onmessage: (event) => {
this.status = EventSourceStatus.CONNECTING
const data = JSON.parse(event)
console.log('FETCH 接收到消息:', data);
},
onerror: (e) => {
this.status = EventSourceStatus.ERROR
console.log(e)
}
})
}
// 断开 FETCH-SSE 连接
public close = (): void => {
if (this.controller) {
this.status = EventSourceStatus.CLOSED
this.controller.abort()
this.controller = undefined
}
}
private fetchEventSource = (url: string, options: RequestOption & RequestInit): Promise<void> => {
return fetch(url, options)
.then(response => {
if (response.status === 200) {
options.onopen && options.onopen()
return response.body
}
})
.then(rb => {
// eslint-disable-next-line n/no-unsupported-features/node-builtins
const reader = rb?.pipeThrough(new TextDecoderStream()).getReader()
const push = (): Promise<void> | undefined => {
// done 为数据流是否接收完成,boolean
// value 为返回数据,Uint8Array
return reader?.read().then(({ done, value }) => {
if (done) {
options.onclose && options.onclose()
return
}
options.onmessage && options.onmessage(value)
// 持续读取流信息
return push()
})
}
// 开始读取流信息
return push()
})
.catch((e) => {
options.onerror && options.onerror(e)
})
}
}


export class EventStreamSource {
// eslint-disable-next-line n/no-unsupported-features/node-builtins
eventSource?: EventSource;
status: EventSourceStatus;
constructor() {
this.status = EventSourceStatus.CLOSED
}
// 建立 SSE 连接
public connectSSE = (url: string): void => {
// eslint-disable-next-line n/no-unsupported-features/node-builtins
this.eventSource = new EventSource(url)
this.eventSource.onopen = () => {
this.status = EventSourceStatus.OPEN
console.log('SSE 连接打开');
}
this.eventSource.onerror = () => {
this.status = EventSourceStatus.ERROR
console.log('SSE 连接错误');
}
this.eventSource.onmessage = (event) => {
this.status = EventSourceStatus.CONNECTING
const data = JSON.parse(event.data)
console.log('SSE 接收到消息:', data);
}
}

// 断开 SSE 连接
public closeSSE = (): void => {
this.status = EventSourceStatus.CLOSED
this.eventSource?.close()
}
}
1 change: 1 addition & 0 deletions packages/im/client/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,4 @@ export const randomGeneratePolygon = (number: number = 3, maxSides: number = 10)
};
});
};

26 changes: 24 additions & 2 deletions packages/im/client/pages/home/index.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,29 @@
import React from 'react';
import React, { } from 'react';
import { EventStreamFetch } from '@/client/lib/eventSource';


export const Home = (): React.JSX.Element => {
return <div>Home</div>;

const { fetchStream } = new EventStreamFetch()

const sendMessage = (params: Record<string, string> = {}) => {
fetchStream('/api/im/dialog', params)
}

const click = () => {
sendMessage({ chat_id: '1', question: 'hello' })
}

return (
<div>
<h1>Home</h1>
<div>输入消息</div>
<input type="text" />
<button onClick={click}>发送消息</button>
<div>回答</div>
<textarea></textarea>
</div>
);
};

export default Home;
3 changes: 3 additions & 0 deletions packages/im/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"description": "",
"type": "module",
"main": "index.js",
"engines": {
"node": "^22.9.0"
},
"scripts": {
"dev": "sh ./bin/dev.sh"
},
Expand Down
2 changes: 1 addition & 1 deletion packages/im/views/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<script type="module" src="/client/client.tsx"></script>
<link href="/client/assets/images/favicon.ico" rel="shortcut icon" />
<!-- <link href="/client/assets/images/favicon.ico" rel="shortcut icon" /> -->
<title>RAN IM</title>
</head>

Expand Down
2 changes: 2 additions & 0 deletions packages/ranuts/src/utils/mimeType.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ export const MimeType = new Map<string, string>([
['.ser', 'application/java-serialized-object'],
['.class', 'application/java-vm'],
['.js', 'application/javascript'],
['.jsx', 'application/javascript'],
['.ts', 'application/javascript'],
['.tsx', 'application/javascript'],
['.json', 'application/json'],
['.lostxml', 'application/lost+xml'],
['.hqx', 'application/mac-binhex40'],
Expand Down

0 comments on commit 3319a4d

Please sign in to comment.