Skip to content

Commit

Permalink
Merge pull request #8729 from wSedlacek/fix/sse-headers
Browse files Browse the repository at this point in the history
fix(core): apply existing headers to sse responses
  • Loading branch information
kamilmysliwiec authored Dec 6, 2021
2 parents 9aa9d0e + 14e7e0b commit ef12ab3
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 6 deletions.
1 change: 1 addition & 0 deletions packages/core/router/router-execution-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ export class RouterExecutionContext {
result,
(res as any).raw || res,
(req as any).raw || req,
{ additionalHeaders: res.getHeaders?.() },
);
};
}
Expand Down
17 changes: 13 additions & 4 deletions packages/core/router/router-response-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ import { isFunction, isObject } from '@nestjs/common/utils/shared.utils';
import { IncomingMessage } from 'http';
import { lastValueFrom, Observable } from 'rxjs';
import { debounce } from 'rxjs/operators';
import { HeaderStream, SseStream } from './sse-stream';
import {
AdditionalHeaders,
WritableHeaderStream,
SseStream,
} from './sse-stream';

export interface CustomHeader {
name: string;
Expand Down Expand Up @@ -85,9 +89,14 @@ export class RouterResponseController {

public async sse<
TInput extends Observable<unknown> = any,
TResponse extends HeaderStream = any,
TResponse extends WritableHeaderStream = any,
TRequest extends IncomingMessage = any,
>(result: TInput, response: TResponse, request: TRequest) {
>(
result: TInput,
response: TResponse,
request: TRequest,
options?: { additionalHeaders: AdditionalHeaders },
) {
// It's possible that we sent headers already so don't use a stream
if (response.writableEnded) {
return;
Expand All @@ -96,7 +105,7 @@ export class RouterResponseController {
this.assertObservable(result);

const stream = new SseStream(request);
stream.pipe(response);
stream.pipe(response, options);

const subscription = result
.pipe(
Expand Down
21 changes: 19 additions & 2 deletions packages/core/router/sse-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ function toDataString(data: string | object): string {
.join('');
}

export type AdditionalHeaders = Record<
string,
string[] | string | number | undefined
>;

interface ReadHeaders {
getHeaders?(): AdditionalHeaders;
}

interface WriteHeaders {
writableEnded?: boolean;
writeHead?(
Expand All @@ -24,7 +33,8 @@ interface WriteHeaders {
flushHeaders?(): void;
}

export type HeaderStream = NodeJS.WritableStream & WriteHeaders;
export type WritableHeaderStream = NodeJS.WritableStream & WriteHeaders;
export type HeaderStream = WritableHeaderStream & ReadHeaders;

/**
* Adapted from https://raw.githubusercontent.com/EventSource/node-ssestream
Expand All @@ -51,9 +61,16 @@ export class SseStream extends Transform {
}
}

pipe<T extends HeaderStream>(destination: T, options?: { end?: boolean }): T {
pipe<T extends WritableHeaderStream>(
destination: T,
options?: {
additionalHeaders?: AdditionalHeaders;
end?: boolean;
},
): T {
if (destination.writeHead) {
destination.writeHead(200, {
...options?.additionalHeaders,
// See https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
Expand Down
33 changes: 33 additions & 0 deletions packages/core/test/router/router-execution-context.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { PipesConsumer } from '../../pipes/pipes-consumer';
import { PipesContextCreator } from '../../pipes/pipes-context-creator';
import { RouteParamsFactory } from '../../router/route-params-factory';
import { RouterExecutionContext } from '../../router/router-execution-context';
import { HeaderStream } from '../../router/sse-stream';
import { NoopHttpAdapter } from '../utils/noop-adapter.spec';

describe('RouterExecutionContext', () => {
Expand Down Expand Up @@ -470,6 +471,38 @@ describe('RouterExecutionContext', () => {
);
}
});

it('should apply any headers that exists on the response', async () => {
const result = of('test');
const response = new PassThrough() as HeaderStream;
response.write = sinon.spy();
response.writeHead = sinon.spy();
response.flushHeaders = sinon.spy();
response.getHeaders = sinon
.stub()
.returns({ 'access-control-headers': 'some-cors-value' });

const request = new PassThrough();
request.on = sinon.spy();

sinon.stub(contextCreator, 'reflectRenderTemplate').returns(undefined);
sinon.stub(contextCreator, 'reflectSse').returns('/');

const handler = contextCreator.createHandleResponseFn(
null,
true,
undefined,
200,
) as HandlerResponseBasicFn;
await handler(result, response, request);

expect(
(response.writeHead as sinon.SinonSpy).calledWith(
200,
sinon.match.hasNested('access-control-headers', 'some-cors-value'),
),
).to.be.true;
});
});
});
});
15 changes: 15 additions & 0 deletions packages/core/test/router/sse-stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,21 @@ data: hello
sse.pipe(sink);
});

it('sets additional headers when provided', callback => {
const sse = new SseStream();
const sink = new Sink(
(status: number, headers: string | OutgoingHttpHeaders) => {
expect(headers).to.contain.keys('access-control-headers');
expect(headers['access-control-headers']).to.equal('some-cors-value');
callback();
return sink;
},
);
sse.pipe(sink, {
additionalHeaders: { 'access-control-headers': 'some-cors-value' },
});
});

it('allows an eventsource to connect', callback => {
let sse: SseStream;
const server = createServer((req, res) => {
Expand Down

0 comments on commit ef12ab3

Please sign in to comment.