From e3647fa06f1c3c4cc847bf0f08ce90afe804b432 Mon Sep 17 00:00:00 2001 From: soyuka Date: Fri, 29 May 2020 20:48:46 +0200 Subject: [PATCH] test(sse): Test all the things --- package-lock.json | 20 ++- package.json | 5 +- packages/common/http/index.ts | 1 - .../test/decorators/sse.decorator.spec.ts | 14 +- .../core/helpers/handler-metadata-storage.ts | 12 +- .../core/router/router-execution-context.ts | 5 +- .../core/router/router-response-controller.ts | 28 ++-- packages/core/services/index.ts | 1 + .../services}/sse-stream.service.ts | 11 +- .../router/router-execution-context.spec.ts | 53 +++++++ .../router/router-response-controller.spec.ts | 77 +++++++++ .../test/services/sse-stream.service.spec.ts | 146 ++++++++++++++++++ 12 files changed, 345 insertions(+), 28 deletions(-) rename packages/{common/http => core/services}/sse-stream.service.ts (92%) create mode 100644 packages/core/test/services/sse-stream.service.spec.ts diff --git a/package-lock.json b/package-lock.json index a54b9dc2535..7898ecd68c3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "@nestjs/core", - "version": "7.0.5", + "version": "7.0.11", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -8935,6 +8935,15 @@ "integrity": "sha512-tvtQIeLVHjDkJYnzf2dgVMxfuSGJeM/7UCG17TT4EumTfNtF+0nebF/4zWOIkCreAbtNqhGEboB6BWrwqNaw4Q==", "dev": true }, + "eventsource": { + "version": "1.0.7", + "resolved": "https://registry.npmjs.org/eventsource/-/eventsource-1.0.7.tgz", + "integrity": "sha512-4Ln17+vVT0k8aWq+t/bF5arcS3EpT9gYtW66EPacdj/mAFevznsnyoHLPy2BA8gbIQeIHoPsvwmfBftfcG//BQ==", + "dev": true, + "requires": { + "original": "^1.0.0" + } + }, "execa": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/execa/-/execa-1.0.0.tgz", @@ -18448,6 +18457,15 @@ } } }, + "original": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/original/-/original-1.0.2.tgz", + "integrity": "sha512-hyBVl6iqqUOJ8FqRe+l/gS8H+kKYjrEndd5Pm1MfBtsEKA038HkkdbAl/72EAXGyonD/PFsvmVG+EvcIpliMBg==", + "dev": true, + "requires": { + "url-parse": "^1.4.3" + } + }, "os-locale": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/os-locale/-/os-locale-1.4.0.tgz", diff --git a/package.json b/package.json index 26095597175..a9595cbb600 100644 --- a/package.json +++ b/package.json @@ -66,8 +66,8 @@ "reflect-metadata": "0.1.13", "rxjs": "6.5.5", "socket.io": "2.3.0", - "uuid": "8.0.0", - "tslib": "1.11.2" + "tslib": "1.11.2", + "uuid": "8.0.0" }, "devDependencies": { "@codechecks/client": "0.1.10", @@ -117,6 +117,7 @@ "eslint": "7.0.0", "eslint-config-prettier": "6.11.0", "eslint-plugin-import": "2.20.2", + "eventsource": "^1.0.7", "fancy-log": "1.3.3", "fastify": "2.14.1", "fastify-cors": "3.0.3", diff --git a/packages/common/http/index.ts b/packages/common/http/index.ts index 9bf7ff7a8fa..2e7a1fead2a 100644 --- a/packages/common/http/index.ts +++ b/packages/common/http/index.ts @@ -1,4 +1,3 @@ export * from './http.module'; export * from './http.service'; export * from './interfaces'; -export { SseStream } from './sse-stream.service'; diff --git a/packages/common/test/decorators/sse.decorator.spec.ts b/packages/common/test/decorators/sse.decorator.spec.ts index 3359125267a..a84524745dd 100644 --- a/packages/common/test/decorators/sse.decorator.spec.ts +++ b/packages/common/test/decorators/sse.decorator.spec.ts @@ -1,6 +1,12 @@ import { expect } from 'chai'; import { Sse } from '../../decorators/http/sse.decorator'; -import { HTTP_CODE_METADATA, SSE_METADATA } from '../../constants'; +import { + HTTP_CODE_METADATA, + SSE_METADATA, + PATH_METADATA, + METHOD_METADATA, +} from '../../constants'; +import { RequestMethod } from '../../enums/request-method.enum'; describe('@Sse', () => { const prefix = '/prefix'; @@ -10,7 +16,11 @@ describe('@Sse', () => { } it('should enhance method with expected http status code', () => { + const path = Reflect.getMetadata(PATH_METADATA, Test.test); + expect(path).to.be.eql('/prefix'); + const method = Reflect.getMetadata(METHOD_METADATA, Test.test); + expect(method).to.be.eql(RequestMethod.GET); const metadata = Reflect.getMetadata(SSE_METADATA, Test.test); - expect(metadata).to.be.eql(prefix); + expect(metadata).to.be.eql(true); }); }); diff --git a/packages/core/helpers/handler-metadata-storage.ts b/packages/core/helpers/handler-metadata-storage.ts index fa17821a161..27a2acdf202 100644 --- a/packages/core/helpers/handler-metadata-storage.ts +++ b/packages/core/helpers/handler-metadata-storage.ts @@ -6,6 +6,12 @@ import { ParamProperties } from './context-utils'; export const HANDLER_METADATA_SYMBOL = Symbol.for('handler_metadata:cache'); +export type HandleResponseFn = ( + result: TResult, + res: TResponse, + req?: TRequest, +) => any; + export interface HandlerMetadata { argsLength: number; paramtypes: any[]; @@ -17,11 +23,7 @@ export interface HandlerMetadata { contextId?: ContextId, inquirerId?: string, ) => (ParamProperties & { metatype?: any })[]; - fnHandleResponse: ( - result: TResult, - res: TResponse, - req?: TRequest, - ) => any; + fnHandleResponse: HandleResponseFn; } export class HandlerMetadataStorage< diff --git a/packages/core/router/router-execution-context.ts b/packages/core/router/router-execution-context.ts index c276ea03e1e..c7c282f97a2 100644 --- a/packages/core/router/router-execution-context.ts +++ b/packages/core/router/router-execution-context.ts @@ -27,6 +27,7 @@ import { ExecutionContextHost } from '../helpers/execution-context-host'; import { HandlerMetadata, HandlerMetadataStorage, + HandleResponseFn, } from '../helpers/handler-metadata-storage'; import { STATIC_CONTEXT } from '../injector/constants'; import { InterceptorsConsumer } from '../interceptors/interceptors-consumer'; @@ -403,7 +404,7 @@ export class RouterExecutionContext { isResponseHandled: boolean, redirectResponse?: RedirectResponse, httpStatusCode?: number, - ) { + ): HandleResponseFn { const renderTemplate = this.reflectRenderTemplate(callback); if (renderTemplate) { return async (result: TResult, res: TResponse) => { @@ -420,7 +421,7 @@ export class RouterExecutionContext { return async ( result: TResult, res: TResponse, - req: TRequest, + req?: TRequest, ) => { await this.responseController.sse(result, res, req); }; diff --git a/packages/core/router/router-response-controller.ts b/packages/core/router/router-response-controller.ts index 6bc82661477..fd35c49294c 100644 --- a/packages/core/router/router-response-controller.ts +++ b/packages/core/router/router-response-controller.ts @@ -2,11 +2,12 @@ import { HttpServer, HttpStatus, RequestMethod, - SseStream, MessageEvent, } from '@nestjs/common'; import { isFunction } from '@nestjs/common/utils/shared.utils'; import { Observable } from 'rxjs'; +import { SseStream, HeaderStream } from '../services'; +import { IncomingMessage, ServerResponse } from 'http'; export interface CustomHeader { name: string; @@ -86,21 +87,14 @@ export class RouterResponseController { this.applicationRef.status(response, statusCode); } - public async sse( - result: any, - response: any, - request: any, - ) { - if (!isFunction(result.subscribe)) { - throw new ReferenceError( - 'You should use an observable to use server-sent events.', - ); - } + public async sse(result: TInput, response: any, request: any) { + const observable = this.assertObservable(result); const stream = new SseStream(request); stream.pipe(response); - const subscription = result.subscribe((message: MessageEvent) => { + const subscription = observable.subscribe((message: any) => { + if (typeof message !== 'object') message = { data: message }; stream.writeMessage(message); }); @@ -109,4 +103,14 @@ export class RouterResponseController { subscription.unsubscribe(); }); } + + private assertObservable(result: any): Observable { + if (!isFunction(result.subscribe)) { + throw new ReferenceError( + 'You should use an observable to use server-sent events.', + ); + } + + return result; + } } diff --git a/packages/core/services/index.ts b/packages/core/services/index.ts index b81014ca8e8..8cd41cdd686 100644 --- a/packages/core/services/index.ts +++ b/packages/core/services/index.ts @@ -1 +1,2 @@ export * from './reflector.service'; +export * from './sse-stream.service'; diff --git a/packages/common/http/sse-stream.service.ts b/packages/core/services/sse-stream.service.ts similarity index 92% rename from packages/common/http/sse-stream.service.ts rename to packages/core/services/sse-stream.service.ts index ae9079c7be1..9845ee09e92 100644 --- a/packages/common/http/sse-stream.service.ts +++ b/packages/core/services/sse-stream.service.ts @@ -1,6 +1,6 @@ import { Transform } from 'stream'; import { IncomingMessage, OutgoingHttpHeaders } from 'http'; -import { MessageEvent } from '../interfaces'; +import { MessageEvent } from '@nestjs/common/interfaces'; function toDataString(data: string | object): string { if (typeof data === 'object') return toDataString(JSON.stringify(data)); @@ -11,7 +11,12 @@ function toDataString(data: string | object): string { } interface WriteHeaders { - writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): WriteHeaders; + writeHead?( + statusCode: number, + reasonPhrase?: string, + headers?: OutgoingHttpHeaders, + ): void; + writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): void; flushHeaders?(): void; } @@ -35,7 +40,7 @@ export class SseStream extends Transform { constructor(req?: IncomingMessage) { super({ objectMode: true }); - if (req) { + if (req && req.socket) { req.socket.setKeepAlive(true); req.socket.setNoDelay(true); req.socket.setTimeout(0); diff --git a/packages/core/test/router/router-execution-context.spec.ts b/packages/core/test/router/router-execution-context.spec.ts index 8c48353ea22..9e1da12c48e 100644 --- a/packages/core/test/router/router-execution-context.spec.ts +++ b/packages/core/test/router/router-execution-context.spec.ts @@ -1,5 +1,6 @@ import { ForbiddenException } from '@nestjs/common/exceptions/forbidden.exception'; import { expect } from 'chai'; +import { of } from 'rxjs'; import * as sinon from 'sinon'; import { HttpException, HttpStatus, RouteParamMetadata } from '../../../common'; import { CUSTOM_ROUTE_AGRS_METADATA } from '../../../common/constants'; @@ -17,6 +18,7 @@ import { PipesContextCreator } from '../../pipes/pipes-context-creator'; import { RouteParamsFactory } from '../../router/route-params-factory'; import { RouterExecutionContext } from '../../router/router-execution-context'; import { NoopHttpAdapter } from '../utils/noop-adapter.spec'; +import { PassThrough } from 'stream'; describe('RouterExecutionContext', () => { let contextCreator: RouterExecutionContext; @@ -326,6 +328,7 @@ describe('RouterExecutionContext', () => { sinon.stub(contextCreator, 'reflectResponseHeaders').returns([]); sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined); + sinon.stub(contextCreator, 'reflectSse').returns(undefined); const handler = contextCreator.createHandleResponseFn( null, @@ -377,6 +380,7 @@ describe('RouterExecutionContext', () => { sinon.stub(contextCreator, 'reflectResponseHeaders').returns([]); sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined); + sinon.stub(contextCreator, 'reflectSse').returns(undefined); const handler = contextCreator.createHandleResponseFn( null, @@ -396,6 +400,7 @@ describe('RouterExecutionContext', () => { const response = {}; sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined); + sinon.stub(contextCreator, 'reflectSse').returns(undefined); const handler = contextCreator.createHandleResponseFn( null, @@ -414,5 +419,53 @@ describe('RouterExecutionContext', () => { ).to.be.true; }); }); + + describe('when "isSse" is enabled', () => { + it('should use sse-stream.service', async () => { + const result = of('test'); + const response = new PassThrough(); + response.write = sinon.spy(); + const request = new PassThrough(); + request.on = sinon.spy(); + + sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined); + sinon.stub(contextCreator, 'reflectSse').returns('/'); + + const handler = contextCreator.createHandleResponseFn( + null, + true, + undefined, + 200, + ); + await handler(result, response, request); + + expect((response.write as any).called).to.be.true; + expect((request.on as any).called).to.be.true; + }); + + it('should not allow a non-observable result', async () => { + const result = Promise.resolve('test'); + const response = new PassThrough(); + const request = new PassThrough(); + + sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined); + sinon.stub(contextCreator, 'reflectSse').returns('/'); + + const handler = contextCreator.createHandleResponseFn( + null, + true, + undefined, + 200, + ); + + try { + await handler(result, response, request); + } catch (e) { + expect(e.message).to.equal( + 'You should use an observable to use server-sent events.', + ); + } + }); + }); }); }); diff --git a/packages/core/test/router/router-response-controller.spec.ts b/packages/core/test/router/router-response-controller.spec.ts index b5d6baa0bf6..ba93e3c5eb7 100644 --- a/packages/core/test/router/router-response-controller.spec.ts +++ b/packages/core/test/router/router-response-controller.spec.ts @@ -5,6 +5,8 @@ import * as sinon from 'sinon'; import { RequestMethod, HttpStatus } from '../../../common'; import { RouterResponseController } from '../../router/router-response-controller'; import { NoopHttpAdapter } from '../utils/noop-adapter.spec'; +import { ServerResponse, IncomingMessage } from 'http'; +import { PassThrough, Writable, Readable } from 'stream'; describe('RouterResponseController', () => { let adapter: NoopHttpAdapter; @@ -247,4 +249,79 @@ describe('RouterResponseController', () => { }); }); }); + describe('Server-Sent-Events', () => { + it('should accept only observables', async () => { + const result = Promise.resolve('test'); + try { + await routerResponseController.sse( + (result as unknown) as any, + ({} as unknown) as ServerResponse, + ({} as unknown) as IncomingMessage, + ); + } catch (e) { + expect(e.message).to.eql( + 'You should use an observable to use server-sent events.', + ); + } + }); + + it('should write string', async () => { + class Sink extends Writable { + private readonly chunks: string[] = []; + + _write( + chunk: any, + encoding: string, + callback: (error?: Error | null) => void, + ): void { + this.chunks.push(chunk); + callback(); + } + + get content() { + return this.chunks.join(''); + } + } + + const written = (stream: Writable) => + new Promise((resolve, reject) => + stream.on('error', reject).on('finish', resolve), + ); + + const result = of('test'); + const response = new Sink(); + const request = new PassThrough(); + routerResponseController.sse( + result, + (response as unknown) as ServerResponse, + (request as unknown) as IncomingMessage, + ); + request.destroy(); + await written(response); + expect(response.content).to.eql( + `:ok + +event: message +id: 0 +data: test + +`, + ); + }); + + it('should close on request close', done => { + const result = of('test'); + const response = new Writable(); + response.end = () => done(); + response._write = () => {}; + const request = new Writable(); + request._write = () => {}; + routerResponseController.sse( + result, + (response as unknown) as ServerResponse, + (request as unknown) as IncomingMessage, + ); + request.destroy(); + }); + }); }); diff --git a/packages/core/test/services/sse-stream.service.spec.ts b/packages/core/test/services/sse-stream.service.spec.ts new file mode 100644 index 00000000000..5106985d581 --- /dev/null +++ b/packages/core/test/services/sse-stream.service.spec.ts @@ -0,0 +1,146 @@ +import { expect } from 'chai'; +import { Writable } from 'stream'; +import { createServer, OutgoingHttpHeaders } from 'http'; +import * as EventSource from 'eventsource'; +import { SseStream, HeaderStream } from '../../services/sse-stream.service'; +import { AddressInfo } from 'net'; + +const written = (stream: Writable) => + new Promise((resolve, reject) => + stream.on('error', reject).on('finish', resolve), + ); + +class Sink extends Writable implements HeaderStream { + private readonly chunks: string[] = []; + + constructor( + public readonly writeHead?: ( + statusCode: number, + headers?: OutgoingHttpHeaders | string, + ) => void, + ) { + super({ objectMode: true }); + } + + _write( + chunk: any, + encoding: string, + callback: (error?: Error | null) => void, + ): void { + this.chunks.push(chunk); + callback(); + } + + get content() { + return this.chunks.join(''); + } +} + +describe('SseStream', () => { + it('writes multiple multiline messages', async () => { + const sse = new SseStream(); + const sink = new Sink(); + sse.pipe(sink); + sse.writeMessage({ + data: 'hello\nworld', + }); + sse.write({ + data: 'bonjour\nmonde', + }); + sse.end(); + await written(sink); + expect(sink.content).to.equal( + `:ok + +event: message +id: 0 +data: hello +data: world + +data: bonjour +data: monde + +`, + ); + }); + + it('writes object messages as JSON', async () => { + const sse = new SseStream(); + const sink = new Sink(); + sse.pipe(sink); + sse.writeMessage({ + data: { hello: 'world' }, + }); + sse.end(); + await written(sink); + expect(sink.content).to.equal( + `:ok + +event: message +id: 0 +data: {"hello":"world"} + +`, + ); + }); + + it('writes all message attributes', async () => { + const sse = new SseStream(); + const sink = new Sink(); + sse.pipe(sink); + sse.writeMessage({ + type: 'tea-time', + id: 'the-id', + retry: 222, + data: 'hello', + }); + sse.end(); + await written(sink); + expect(sink.content).to.equal( + `:ok + +event: tea-time +id: the-id +retry: 222 +data: hello + +`, + ); + }); + + it('sets headers on destination when it looks like a HTTP Response', callback => { + const sse = new SseStream(); + const sink = new Sink((status: number, headers: OutgoingHttpHeaders) => { + expect(headers).to.deep.equal({ + 'Content-Type': 'text/event-stream; charset=utf-8', + 'Transfer-Encoding': 'identity', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }); + callback(); + return sink; + }); + sse.pipe(sink); + }); + + it('allows an eventsource to connect', callback => { + let sse: SseStream; + const server = createServer((req, res) => { + sse = new SseStream(req); + sse.pipe(res); + }); + server.listen(() => { + const es = new EventSource( + `http://localhost:${(server.address() as AddressInfo).port}`, + ); + es.onmessage = e => { + expect(e.data).to.equal('hello'); + es.close(); + server.close(callback); + }; + es.onopen = () => sse.writeMessage({ data: 'hello' }); + es.onerror = e => + callback(new Error(`Error from EventSource: ${JSON.stringify(e)}`)); + }); + }); +});