Skip to content

Commit

Permalink
feat(sse): Implement Server-Sent Events
Browse files Browse the repository at this point in the history
See discussion in #4826
  • Loading branch information
soyuka committed May 25, 2020
1 parent edadbb1 commit aa36e8a
Show file tree
Hide file tree
Showing 24 changed files with 14,367 additions and 3 deletions.
1 change: 1 addition & 0 deletions packages/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ export const HTTP_CODE_METADATA = '__httpCode__';
export const MODULE_PATH = '__module_path__';
export const HEADERS_METADATA = '__headers__';
export const REDIRECT_METADATA = '__redirect__';
export const SSE_METADATA = '__sse__';
1 change: 1 addition & 0 deletions packages/common/decorators/http/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export * from './create-route-param-metadata.decorator';
export * from './render.decorator';
export * from './header.decorator';
export * from './redirect.decorator';
export * from './sse.decorator';
28 changes: 28 additions & 0 deletions packages/common/decorators/http/sse.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { SSE_METADATA, PATH_METADATA, METHOD_METADATA } from '../../constants';
import { RequestMethod } from '../../enums/request-method.enum';

/**
* Declares this route as a Server-Sent-Events endpoint
*
* @publicApi
*/
export function Sse(path?: string): MethodDecorator {
return (
target: object,
key: string | symbol,
descriptor: TypedPropertyDescriptor<any>,
) => {
Reflect.defineMetadata(
PATH_METADATA,
path && path.length ? path : '/',
descriptor.value,
);
Reflect.defineMetadata(
METHOD_METADATA,
RequestMethod.GET,
descriptor.value,
);
Reflect.defineMetadata(SSE_METADATA, true, descriptor.value);
return descriptor;
};
}
1 change: 1 addition & 0 deletions packages/common/http/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './http.module';
export * from './http.service';
export * from './interfaces';
export { SseStream } from './sse-stream.service';
89 changes: 89 additions & 0 deletions packages/common/http/sse-stream.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { Transform } from 'stream';
import { IncomingMessage, OutgoingHttpHeaders } from 'http';
import { MessageEvent } from '../interfaces';

function toDataString(data: string | object): string {
if (typeof data === 'object') return toDataString(JSON.stringify(data));
return data
.split(/\r\n|\r|\n/)
.map(line => `data: ${line}\n`)
.join('');
}

interface WriteHeaders {
writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): WriteHeaders;
flushHeaders?(): void;
}

export type HeaderStream = NodeJS.WritableStream & WriteHeaders;

/**
* Adapted from https://raw.githubusercontent.com/EventSource/node-ssestream
* Transforms "messages" to W3C event stream content.
* See https://html.spec.whatwg.org/multipage/server-sent-events.html
* A message is an object with one or more of the following properties:
* - data (String or object, which gets turned into JSON)
* - type
* - id
* - retry
*
* If constructed with a HTTP Request, it will optimise the socket for streaming.
* If this stream is piped to an HTTP Response, it will set appropriate headers.
*/
export class SseStream extends Transform {
private lastEventId: number = null;

constructor(req?: IncomingMessage) {
super({ objectMode: true });
if (req) {
req.socket.setKeepAlive(true);
req.socket.setNoDelay(true);
req.socket.setTimeout(0);
}
}

pipe<T extends HeaderStream>(destination: T, options?: { end?: boolean }): T {
if (destination.writeHead) {
destination.writeHead(200, {
'Content-Type': 'text/event-stream; charset=utf-8',
'Transfer-Encoding': 'identity',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
destination.flushHeaders();
}

destination.write(':ok\n\n');
return super.pipe(destination, options);
}

_transform(
message: MessageEvent,
encoding: string,
callback: (error?: Error | null, data?: any) => void,
) {
if (message.type) this.push(`event: ${message.type}\n`);
if (message.id) this.push(`id: ${message.id}\n`);
if (message.retry) this.push(`retry: ${message.retry}\n`);
if (message.data) this.push(toDataString(message.data));
this.push('\n');
callback();
}

writeMessage(
message: MessageEvent,
encoding?: string,
cb?: (error: Error | null | undefined) => void,
): boolean {
if (!message.id) {
this.lastEventId = this.lastEventId === null ? 0 : this.lastEventId + 1;
message.id = '' + this.lastEventId;
}

if (!message.type) {
message.type = 'message';
}

return this.write(message, encoding, cb);
}
}
1 change: 1 addition & 0 deletions packages/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export {
INestApplication,
INestApplicationContext,
INestMicroservice,
MessageEvent,
MiddlewareConsumer,
NestApplicationOptions,
NestInterceptor,
Expand Down
7 changes: 7 additions & 0 deletions packages/common/interfaces/http/http-server.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,10 @@ export interface HttpServer<TRequest = any, TResponse = any> {
close(): any;
getType(): string;
}

export interface MessageEvent {
data: string | object;
id?: string;
type?: string;
retry?: number;
}
16 changes: 16 additions & 0 deletions packages/common/test/decorators/sse.decorator.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { expect } from 'chai';
import { Sse } from '../../decorators/http/sse.decorator';
import { HTTP_CODE_METADATA, SSE_METADATA } from '../../constants';

describe('@Sse', () => {
const prefix = '/prefix';
class Test {
@Sse(prefix)
public static test() {}
}

it('should enhance method with expected http status code', () => {
const metadata = Reflect.getMetadata(SSE_METADATA, Test.test);
expect(metadata).to.be.eql(prefix);
});
});
3 changes: 2 additions & 1 deletion packages/core/helpers/handler-metadata-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ export interface HandlerMetadata {
contextId?: ContextId,
inquirerId?: string,
) => (ParamProperties & { metatype?: any })[];
fnHandleResponse: <TResult, TResponse>(
fnHandleResponse: <TResult, TResponse, TRequest>(
result: TResult,
res: TResponse,
req?: TRequest,
) => any;
}

Expand Down
17 changes: 16 additions & 1 deletion packages/core/router/router-execution-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
REDIRECT_METADATA,
RENDER_METADATA,
ROUTE_ARGS_METADATA,
SSE_METADATA,
} from '@nestjs/common/constants';
import { RouteParamMetadata } from '@nestjs/common/decorators';
import { RouteParamtypes } from '@nestjs/common/enums/route-paramtypes.enum';
Expand Down Expand Up @@ -160,7 +161,7 @@ export class RouterExecutionContext {
handler(args, req, res, next),
contextType,
);
await fnHandleResponse(result, res);
await fnHandleResponse(result, res, req);
};
}

Expand Down Expand Up @@ -265,6 +266,10 @@ export class RouterExecutionContext {
return Reflect.getMetadata(HEADERS_METADATA, callback) || [];
}

public reflectSse(callback: (...args: unknown[]) => unknown): string {
return Reflect.getMetadata(SSE_METADATA, callback);
}

public exchangeKeysForValues(
keys: string[],
metadata: Record<number, RouteParamMetadata>,
Expand Down Expand Up @@ -410,6 +415,16 @@ export class RouterExecutionContext {
await this.responseController.redirect(result, res, redirectResponse);
};
}
const isSse = !!this.reflectSse(callback);
if (isSse) {
return async <TResult, TResponse, TRequest>(
result: TResult,
res: TResponse,
req: TRequest,
) => {
await this.responseController.sse(result, res, req);
};
}
return async <TResult, TResponse>(result: TResult, res: TResponse) => {
result = await this.responseController.transformToResult(result);
!isResponseHandled &&
Expand Down
33 changes: 32 additions & 1 deletion packages/core/router/router-response-controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import { HttpServer, HttpStatus, RequestMethod } from '@nestjs/common';
import {
HttpServer,
HttpStatus,
RequestMethod,
SseStream,
MessageEvent,
} from '@nestjs/common';
import { isFunction } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';

export interface CustomHeader {
name: string;
Expand Down Expand Up @@ -78,4 +85,28 @@ export class RouterResponseController {
) {
this.applicationRef.status(response, statusCode);
}

public async sse<TInput = unknown, TResponse = unknown, TRequest = unknown>(
result: any,
response: any,
request: any,
) {
if (!isFunction(result.subscribe)) {
throw new ReferenceError(
'You should use an observable to use server-sent events.',
);
}

const stream = new SseStream(request);
stream.pipe(response);

const subscription = result.subscribe((message: MessageEvent) => {
stream.writeMessage(message);
});

request.on('close', () => {
response.end();
subscription.unsubscribe();
});
}
}
24 changes: 24 additions & 0 deletions sample/28-sse/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
module.exports = {
parser: '@typescript-eslint/parser',
parserOptions: {
project: 'tsconfig.json',
sourceType: 'module',
},
plugins: ['@typescript-eslint/eslint-plugin'],
extends: [
'plugin:@typescript-eslint/eslint-recommended',
'plugin:@typescript-eslint/recommended',
'prettier',
'prettier/@typescript-eslint',
],
root: true,
env: {
node: true,
jest: true,
},
rules: {
'@typescript-eslint/interface-name-prefix': 'off',
'@typescript-eslint/explicit-function-return-type': 'off',
'@typescript-eslint/no-explicit-any': 'off',
},
};
34 changes: 34 additions & 0 deletions sample/28-sse/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# compiled output
/dist
/node_modules

# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*

# OS
.DS_Store

# Tests
/coverage
/.nyc_output

# IDEs and editors
/.idea
.project
.classpath
.c9/
*.launch
.settings/
*.sublime-workspace

# IDE - VSCode
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
4 changes: 4 additions & 0 deletions sample/28-sse/.prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"singleQuote": true,
"trailingComma": "all"
}
Loading

0 comments on commit aa36e8a

Please sign in to comment.