Skip to content

Commit

Permalink
Merge pull request #8740 from wSedlacek/feat/handle-observable-stream…
Browse files Browse the repository at this point in the history
…-errors

feat(core): handle errors in observable streams
  • Loading branch information
kamilmysliwiec authored Dec 17, 2021
2 parents f704a43 + b7f3680 commit b7378ad
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 19 deletions.
4 changes: 2 additions & 2 deletions packages/core/router/router-execution-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ export class RouterExecutionContext {
}
const isSseHandler = !!this.reflectSse(callback);
if (isSseHandler) {
return async <
return <
TResult extends Observable<unknown> = any,
TResponse extends HeaderStream = any,
TRequest extends IncomingMessage = any,
Expand All @@ -440,7 +440,7 @@ export class RouterExecutionContext {
res: TResponse,
req: TRequest,
) => {
await this.responseController.sse(
this.responseController.sse(
result,
(res as any).raw || res,
(req as any).raw || req,
Expand Down
44 changes: 33 additions & 11 deletions packages/core/router/router-response-controller.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import { HttpServer, HttpStatus, RequestMethod } from '@nestjs/common';
import {
HttpServer,
HttpStatus,
Logger,
RequestMethod,
MessageEvent,
} from '@nestjs/common';
import { isFunction, isObject } from '@nestjs/common/utils/shared.utils';
import { IncomingMessage } from 'http';
import { lastValueFrom, Observable } from 'rxjs';
import { debounce } from 'rxjs/operators';
import { EMPTY, lastValueFrom, Observable } from 'rxjs';
import { catchError, debounce, map } from 'rxjs/operators';
import {
AdditionalHeaders,
WritableHeaderStream,
Expand All @@ -20,6 +26,8 @@ export interface RedirectResponse {
}

export class RouterResponseController {
private readonly logger = new Logger(RouterResponseController.name);

constructor(private readonly applicationRef: HttpServer) {}

public async apply<TInput = any, TResponse = any>(
Expand Down Expand Up @@ -87,7 +95,7 @@ export class RouterResponseController {
this.applicationRef.status(response, statusCode);
}

public async sse<
public sse<
TInput extends Observable<unknown> = any,
TResponse extends WritableHeaderStream = any,
TRequest extends IncomingMessage = any,
Expand All @@ -109,15 +117,29 @@ export class RouterResponseController {

const subscription = result
.pipe(
map((message): MessageEvent => {
if (isObject(message)) {
return message as MessageEvent;
}

return { data: message as object | string };
}),
debounce(
(message: any) =>
new Promise(resolve => {
if (!isObject(message)) {
message = { data: message };
}
stream.writeMessage(message, resolve);
}),
message =>
new Promise<void>(resolve =>
stream.writeMessage(message, () => resolve()),
),
),
catchError(err => {
const data = err instanceof Error ? err.message : err;
stream.writeMessage({ type: 'error', data }, writeError => {
if (writeError) {
this.logger.error(writeError);
}
});

return EMPTY;
}),
)
.subscribe({
complete: () => {
Expand Down
84 changes: 78 additions & 6 deletions packages/core/test/router/router-response-controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { isNil, isObject } from '@nestjs/common/utils/shared.utils';
import { expect } from 'chai';
import { IncomingMessage, ServerResponse } from 'http';
import { Observable, of } from 'rxjs';
import { Observable, of, Subject } from 'rxjs';
import * as sinon from 'sinon';
import { PassThrough, Writable } from 'stream';
import { HttpStatus, RequestMethod } from '../../../common';
Expand Down Expand Up @@ -349,14 +349,86 @@ data: test
const request = new Writable();
request._write = () => {};

routerResponseController.sse(
result as unknown as Observable<string>,
response as unknown as ServerResponse,
request as unknown as IncomingMessage,
);
try {
routerResponseController.sse(
result as unknown as Observable<string>,
response as unknown as ServerResponse,
request as unknown as IncomingMessage,
);
} catch {
// Wether an error is thrown or not
// is not relevant, so long as
// result is not called
}

sinon.assert.notCalled(result);
done();
});

describe('when there is an error', () => {
it('should close the request', done => {
const result = new Subject();
const response = new Writable();
response.end = done;
response._write = () => {};

const request = new Writable();
request._write = () => {};

routerResponseController.sse(
result,
response as unknown as ServerResponse,
request as unknown as IncomingMessage,
);

result.error(new Error('Some error'));
});

it('should write the error message to the stream', async () => {
class Sink extends Writable {
private readonly chunks: string[] = [];

_write(
chunk: any,
encoding: string,
callback: (error?: Error | null) => void,
): void {
this.chunks.push(chunk);
callback();
}

get content() {
return this.chunks.join('');
}
}

const written = (stream: Writable) =>
new Promise((resolve, reject) =>
stream.on('error', reject).on('finish', resolve),
);

const result = new Subject();
const response = new Sink();
const request = new PassThrough();
routerResponseController.sse(
result,
response as unknown as ServerResponse,
request as unknown as IncomingMessage,
);

result.error(new Error('Some error'));
request.destroy();

await written(response);
expect(response.content).to.eql(
`:
event: error
id: 1
data: Some error
`,
);
});
});
});
});

0 comments on commit b7378ad

Please sign in to comment.