diff --git a/integration/microservices/e2e/sum-rpc-tls.spec.ts b/integration/microservices/e2e/sum-rpc-tls.spec.ts new file mode 100644 index 00000000000..e32a726df34 --- /dev/null +++ b/integration/microservices/e2e/sum-rpc-tls.spec.ts @@ -0,0 +1,142 @@ +import { INestApplication } from '@nestjs/common'; +import { Transport } from '@nestjs/microservices'; +import { Test } from '@nestjs/testing'; +import { expect } from 'chai'; +import * as request from 'supertest'; +import { AppController } from '../src/tcp-tls/app.controller'; +import { ApplicationModule } from '../src/tcp-tls/app.module'; +import * as fs from 'fs'; +import * as path from 'path'; + +describe('RPC TLS transport', () => { + let server; + let app: INestApplication; + let key: string; + let cert: string; + + before(() => { + // Generate a self-signed key pair + key = fs + .readFileSync(path.join(__dirname, '../src/tcp-tls/privkey.pem'), 'utf8') + .toString(); + cert = fs + .readFileSync(path.join(__dirname, '../src/tcp-tls/ca.cert.pem'), 'utf8') + .toString(); + }); + + beforeEach(async () => { + const module = await Test.createTestingModule({ + imports: [ApplicationModule], + }).compile(); + + app = module.createNestApplication(); + server = app.getHttpAdapter().getInstance(); + + app.connectMicroservice({ + transport: Transport.TCP, + options: { + host: '0.0.0.0', + tlsOptions: { key: key, cert: cert }, + }, + }); + await app.startAllMicroservices(); + await app.init(); + }); + + it(`/POST TLS`, () => { + return request(server) + .post('/?command=sum') + .send([1, 2, 3, 4, 5]) + .expect(200, '15'); + }); + + it(`/POST (Promise/async)`, () => { + return request(server) + .post('/?command=asyncSum') + .send([1, 2, 3, 4, 5]) + .expect(200) + .expect(200, '15'); + }); + + it(`/POST (Observable stream)`, () => { + return request(server) + .post('/?command=streamSum') + .send([1, 2, 3, 4, 5]) + .expect(200, '15'); + }); + + it(`/POST (useFactory client)`, () => { + return request(server) + .post('/useFactory?command=sum') + .send([1, 2, 3, 4, 5]) + .expect(200, '15'); + }); + + it(`/POST (useClass client)`, () => { + return request(server) + .post('/useClass?command=sum') + .send([1, 2, 3, 4, 5]) + .expect(200, '15'); + }); + + it(`/POST (concurrent)`, () => { + return request(server) + .post('/concurrent') + .send([ + Array.from({ length: 10 }, (v, k) => k + 1), + Array.from({ length: 10 }, (v, k) => k + 11), + Array.from({ length: 10 }, (v, k) => k + 21), + Array.from({ length: 10 }, (v, k) => k + 31), + Array.from({ length: 10 }, (v, k) => k + 41), + Array.from({ length: 10 }, (v, k) => k + 51), + Array.from({ length: 10 }, (v, k) => k + 61), + Array.from({ length: 10 }, (v, k) => k + 71), + Array.from({ length: 10 }, (v, k) => k + 81), + Array.from({ length: 10 }, (v, k) => k + 91), + ]) + .expect(200, 'true'); + }); + + it(`/POST (streaming)`, () => { + return request(server) + .post('/stream') + .send([1, 2, 3, 4, 5]) + .expect(200, '15'); + }); + + it(`/POST (pattern not found)`, () => { + return request(server).post('/?command=test').expect(500); + }); + + it(`/POST (event notification)`, done => { + request(server) + .post('/notify') + .send([1, 2, 3, 4, 5]) + .end(() => { + setTimeout(() => { + expect(AppController.IS_NOTIFIED).to.be.true; + done(); + }, 1000); + }); + }); + + it('/POST (custom client)', () => { + return request(server) + .post('/error?client=custom') + .send({}) + .expect(200) + .expect('true'); + }); + + it('/POST (standard client)', () => { + return request(server) + .post('/error?client=standard') + .send({}) + .expect(200) + .expect('false'); + }); + + afterEach(async () => { + await app.close(); + }); +}); diff --git a/integration/microservices/src/tcp-tls/app.controller.ts b/integration/microservices/src/tcp-tls/app.controller.ts new file mode 100644 index 00000000000..81101741fc0 --- /dev/null +++ b/integration/microservices/src/tcp-tls/app.controller.ts @@ -0,0 +1,141 @@ +import { + Body, + Controller, + HttpCode, + Inject, + Post, + Query, +} from '@nestjs/common'; +import { + Client, + ClientProxy, + EventPattern, + MessagePattern, + RpcException, + Transport, +} from '@nestjs/microservices'; +import { from, lastValueFrom, Observable, of, throwError } from 'rxjs'; +import { catchError, scan } from 'rxjs/operators'; +import * as fs from 'fs'; +import * as path from 'path'; + +@Controller() +export class AppController { + constructor( + @Inject('USE_CLASS_CLIENT') private useClassClient: ClientProxy, + @Inject('USE_FACTORY_CLIENT') private useFactoryClient: ClientProxy, + @Inject('CUSTOM_PROXY_CLIENT') private customClient: ClientProxy, + ) {} + static IS_NOTIFIED = false; + + @Client({ + transport: Transport.TCP, + options: { + tlsOptions: { + ca: [ + fs + .readFileSync(path.join(__dirname, 'ca.cert.pem'), 'utf-8') + .toString(), + ], + }, + }, + }) + client: ClientProxy; + + @Post() + @HttpCode(200) + call(@Query('command') cmd, @Body() data: number[]): Observable { + return this.client.send({ cmd }, data); + } + + @Post('useFactory') + @HttpCode(200) + callWithClientUseFactory( + @Query('command') cmd, + @Body() data: number[], + ): Observable { + return this.useFactoryClient.send({ cmd }, data); + } + + @Post('useClass') + @HttpCode(200) + callWithClientUseClass( + @Query('command') cmd, + @Body() data: number[], + ): Observable { + return this.useClassClient.send({ cmd }, data); + } + + @Post('stream') + @HttpCode(200) + stream(@Body() data: number[]): Observable { + return this.client + .send({ cmd: 'streaming' }, data) + .pipe(scan((a, b) => a + b)); + } + + @Post('concurrent') + @HttpCode(200) + concurrent(@Body() data: number[][]): Promise { + const send = async (tab: number[]) => { + const expected = tab.reduce((a, b) => a + b); + const result = await lastValueFrom( + this.client.send({ cmd: 'sum' }, tab), + ); + + return result === expected; + }; + return data + .map(async tab => send(tab)) + .reduce(async (a, b) => (await a) && b); + } + + @Post('error') + @HttpCode(200) + serializeError( + @Query('client') query: 'custom' | 'standard' = 'standard', + @Body() body: Record, + ): Observable { + const client = query === 'custom' ? this.customClient : this.client; + return client.send({ cmd: 'err' }, {}).pipe( + catchError(err => { + return of(err instanceof RpcException); + }), + ); + } + + @MessagePattern({ cmd: 'sum' }) + sum(data: number[]): number { + return (data || []).reduce((a, b) => a + b); + } + + @MessagePattern({ cmd: 'asyncSum' }) + async asyncSum(data: number[]): Promise { + return (data || []).reduce((a, b) => a + b); + } + + @MessagePattern({ cmd: 'streamSum' }) + streamSum(data: number[]): Observable { + return of((data || []).reduce((a, b) => a + b)); + } + + @MessagePattern({ cmd: 'streaming' }) + streaming(data: number[]): Observable { + return from(data); + } + + @MessagePattern({ cmd: 'err' }) + throwAnError() { + return throwError(() => new Error('err')); + } + + @Post('notify') + async sendNotification(): Promise { + return this.client.emit('notification', true); + } + + @EventPattern('notification') + eventHandler(data: boolean) { + AppController.IS_NOTIFIED = data; + } +} diff --git a/integration/microservices/src/tcp-tls/app.module.ts b/integration/microservices/src/tcp-tls/app.module.ts new file mode 100644 index 00000000000..79a167886d8 --- /dev/null +++ b/integration/microservices/src/tcp-tls/app.module.ts @@ -0,0 +1,90 @@ +import { Module, Injectable } from '@nestjs/common'; +import { AppController } from './app.controller'; +import { + ClientsModule, + Transport, + ClientsModuleOptionsFactory, + ClientOptions, + ClientTCP, + RpcException, +} from '@nestjs/microservices'; + +import * as fs from 'fs'; +import * as path from 'path'; + +const caCert = fs.readFileSync(path.join(__dirname, 'ca.cert.pem')).toString(); + +class ErrorHandlingProxy extends ClientTCP { + constructor() { + super({ + tlsOptions: { ca: caCert }, + }); + } + + serializeError(err) { + return new RpcException(err); + } +} + +@Injectable() +class ConfigService { + private readonly config = { + transport: Transport.TCP, + }; + get(key: string, defaultValue?: any) { + return this.config[key] || defaultValue; + } +} + +@Module({ + providers: [ConfigService], + exports: [ConfigService], +}) +class ConfigModule {} + +@Injectable() +class ClientOptionService implements ClientsModuleOptionsFactory { + constructor(private readonly configService: ConfigService) {} + createClientOptions(): Promise | ClientOptions { + return { + transport: this.configService.get('transport'), + options: { + tlsOptions: { ca: caCert }, + }, + }; + } +} + +@Module({ + imports: [ + ClientsModule.registerAsync([ + { + imports: [ConfigModule], + name: 'USE_FACTORY_CLIENT', + useFactory: (configService: ConfigService) => ({ + transport: configService.get('transport'), + options: { + tlsOptions: { ca: caCert }, + }, + }), + inject: [ConfigService], + }, + { + imports: [ConfigModule], + name: 'USE_CLASS_CLIENT', + useClass: ClientOptionService, + inject: [ConfigService], + }, + { + imports: [ConfigModule], + inject: [ConfigService], + name: 'CUSTOM_PROXY_CLIENT', + useFactory: (config: ConfigService) => ({ + customClass: ErrorHandlingProxy, + }), + }, + ]), + ], + controllers: [AppController], +}) +export class ApplicationModule {} diff --git a/integration/microservices/src/tcp-tls/ca.cert.pem b/integration/microservices/src/tcp-tls/ca.cert.pem new file mode 100644 index 00000000000..dbc7d8a7739 --- /dev/null +++ b/integration/microservices/src/tcp-tls/ca.cert.pem @@ -0,0 +1,17 @@ +-----BEGIN CERTIFICATE----- +MIICpDCCAYwCCQCyP27z3r0PFjANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls +b2NhbGhvc3QwHhcNMjIxMjAyMDQ0NTQ1WhcNMzIxMTI5MDQ0NTQ1WjAUMRIwEAYD +VQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDZ +1IdZZXqrwXql4AIOJnlfpoGKOKoIalnK7KaKHTsq1QOF8z2abFuNBVIIrO0etQ/0 +PPAaFGkXl6HHBuA5PrFpsw3V1wSnNs1Cns9NhvypHI2V71lkwBJrEaSicNWL2AOE +QkQ9cZ4YsTGd0BrM8D5VvgXdrC7gOXfj7Hx3E4K+wFO/Gi4AUXl5CXxleSFcW4U+ +jFulfq/DE8rBZXs29IsGeVkkgUoICjQ4Ey4zE6EY7f3SPKgU8gfgzYyGSd/ZZ/E7 +6M2yakEUX448Nl4BeuNWroBHVm1pSiMo+Cm1g34pJScPrx1yw6qquziCc/2n1M6O +B4WGIZAmJDWnAOEjjrxFAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAABGByZZUjaq +ZygICSH2qUGHPPIyrfaCe0qM7de6kYfxxPYQQZb0HDynzv780Lq1002XeT02fNR+ +5sBCVFuKvS8BNvTq6kHzO1FiWIk/E5fQcYNToYSeEcXgWFLhJMty7+R6sIc9y8PH +2YNehf78Jjm9ukM52sLc4+JWl7AEeqPrOHZdh/ve8M2gTfimFKTrW5cEAVPIOPhp +2t5BdDKt8ZxgrGC7iRxga+v80VUOHRGfrd3hf3NlDQZO8upVGY8DdJhPRDB72+R0 +kzJ7eyQwlGXM20atiFxPk43h0f273MneIJG8NgGiVU0ND4XnZkAB3KSAu7pB+nEw +QRYMYDgo/8Q= +-----END CERTIFICATE----- diff --git a/integration/microservices/src/tcp-tls/privkey.pem b/integration/microservices/src/tcp-tls/privkey.pem new file mode 100644 index 00000000000..f8a90acffc9 --- /dev/null +++ b/integration/microservices/src/tcp-tls/privkey.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQDZ1IdZZXqrwXql +4AIOJnlfpoGKOKoIalnK7KaKHTsq1QOF8z2abFuNBVIIrO0etQ/0PPAaFGkXl6HH +BuA5PrFpsw3V1wSnNs1Cns9NhvypHI2V71lkwBJrEaSicNWL2AOEQkQ9cZ4YsTGd +0BrM8D5VvgXdrC7gOXfj7Hx3E4K+wFO/Gi4AUXl5CXxleSFcW4U+jFulfq/DE8rB +ZXs29IsGeVkkgUoICjQ4Ey4zE6EY7f3SPKgU8gfgzYyGSd/ZZ/E76M2yakEUX448 +Nl4BeuNWroBHVm1pSiMo+Cm1g34pJScPrx1yw6qquziCc/2n1M6OB4WGIZAmJDWn +AOEjjrxFAgMBAAECggEANUKZtWnyjHxKGLSkzADsPE7h7YHdUSFvwwYJ0ktDZD2h +FudacJ994wUiAa0GbTOoKvebXUUQTQxuKdOsj1Kc3lNBVr+0C46CsX9TAIm4zUCF +/dr/6HpuBm/R6UXdcMvoUDZDqSJWnYL1trhjVSiIlT5ZANJQw9JJVhlEdXj3xtuc +I9aC+33f9hKO1wzei/mTjIRGyRIeKselZPpA7qJnE4s3hmZxnO/rSqyPIvh1XLfZ +3Eoyyg+xNpTZ8JqlHB5d7hDSnj8cjboa7IYpHJDXN3r8Aui+R9e/sQkezHbF7fR0 +xHpBVYQvGMuqSnTBkdJfq4qPAR1K49UrpEXB2GHaIQKBgQDs3t/ZUA138AiPIeLd +aTsEPhf5dWEZynfNvXZ0VcoSr8ckaiaq9OEllprAriYWj+XphTDsBUAh/R7KlOR4 +eb+m6OwH7LseGiLIEr54GGP0LzVXAkfH2/uR1cak8qAmHB00jNEg7sj0eVAsHO3f +WQm67f+RNP/IgAa+V8JKIkgTwwKBgQDrbAH8eAQsq9rjpxcic1EUI7uFHzr+cKf/ +4Y8ThLUNAzNfAbQWRBYjS1R8GM79Wiuh+WT1ooHKLryuLF7LVukvKHJ5GiNFBmaO +llf72Zf1y4tBE2RCXQbf6h8+ohSDC/hwYy+w20/i2KzSBKkS0+gQuAX/HzfTpRd3 +q3/uEniXVwKBgDNnElDIbIPQlSrqgZ7mzSXYi79Y15+PLnx5VxFb5KQ1fRPL7WRA +C/PqQN77a8yNoakRfFJbuVUm5t2zffkfApYoCcCWgOzBYzbjym2pbVd6PysIlacr +d+Zn69mzxUk/5J6YyHFLIFTdVqacCIrleZUVPNa4F6HdFpmL1d/cnKOdAoGAMDuB +sKsaF9jh0LBkEf/URa8IdT6vxH9qPAeHW7VdrpvQQ4/CyKkMbBC772zZw5hcxiOl +Zpnzw2uN5pVamohk3++GfH85aKPmESKGRigPdSFNl3iUmvAaP3flDN0CHNMwBD6d +/7r/A/fmeGTSCvR1YC+DswA/XNI/G5p8bFdGc6MCgYBd9oQiZlkYMiDGPUAjx+DO +kqtAmc8DLJEanSbWdIxL2bGL04cgBRPssM4m0UScx4PucvqWEPdN/5Ug0z5TrD77 +2K5nZSBUdy4DunBImz1NHRQEiytkrYX0LesGr02QlzIH4wmwb1TFu7rLkr6KfNuV +xqWi+JVY8N4vuHAxCeEALw== +-----END PRIVATE KEY----- \ No newline at end of file diff --git a/package.json b/package.json index 6ae368475b9..095fcf5b823 100644 --- a/package.json +++ b/package.json @@ -243,4 +243,4 @@ ], "exit": true } -} +} \ No newline at end of file diff --git a/packages/microservices/client/client-tcp.ts b/packages/microservices/client/client-tcp.ts index b4c58194fff..e0b1ddb91cb 100644 --- a/packages/microservices/client/client-tcp.ts +++ b/packages/microservices/client/client-tcp.ts @@ -2,6 +2,7 @@ import { Logger, Type } from '@nestjs/common'; import * as net from 'net'; import { EmptyError, lastValueFrom } from 'rxjs'; import { share, tap } from 'rxjs/operators'; +import { ConnectionOptions } from 'tls'; import { CLOSE_EVENT, ECONNREFUSED, @@ -11,6 +12,7 @@ import { TCP_DEFAULT_PORT, } from '../constants'; import { JsonSocket, TcpSocket } from '../helpers'; +import { connect as tlsConnect, TLSSocket } from 'tls'; import { PacketId, ReadPacket, WritePacket } from '../interfaces'; import { TcpClientOptions } from '../interfaces/client-metadata.interface'; import { ClientProxy } from './client-proxy'; @@ -26,6 +28,7 @@ export class ClientTCP extends ClientProxy { private readonly socketClass: Type; private isConnected = false; private socket: TcpSocket; + public tlsOptions?: ConnectionOptions; constructor(options: TcpClientOptions['options']) { super(); @@ -33,6 +36,7 @@ export class ClientTCP extends ClientProxy { this.host = this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST; this.socketClass = this.getOptionsProp(options, 'socketClass') || JsonSocket; + this.tlsOptions = this.getOptionsProp(options, 'tlsOptions'); this.initializeSerializer(options); this.initializeDeserializer(options); @@ -55,7 +59,10 @@ export class ClientTCP extends ClientProxy { share(), ); - this.socket.connect(this.port, this.host); + // For TLS connections, the connection is initiated when the socket is created + if (!this.tlsOptions) { + this.socket.connect(this.port, this.host); + } this.connection = lastValueFrom(source$).catch(err => { if (err instanceof EmptyError) { return; @@ -87,7 +94,21 @@ export class ClientTCP extends ClientProxy { } public createSocket(): TcpSocket { - return new this.socketClass(new net.Socket()); + let socket: net.Socket | TLSSocket; + /** + * TLS enabled, "upgrade" the TCP Socket to TLS + */ + if (this.tlsOptions) { + socket = tlsConnect({ + ...this.tlsOptions, + port: this.port, + host: this.host, + socket, + }); + } else { + socket = new net.Socket(); + } + return new this.socketClass(socket); } public close() { diff --git a/packages/microservices/interfaces/client-metadata.interface.ts b/packages/microservices/interfaces/client-metadata.interface.ts index 67177088a00..53bb8b556c7 100644 --- a/packages/microservices/interfaces/client-metadata.interface.ts +++ b/packages/microservices/interfaces/client-metadata.interface.ts @@ -12,6 +12,7 @@ import { RmqOptions, } from './microservice-configuration.interface'; import { Serializer } from './serializer.interface'; +import { ConnectionOptions } from 'tls'; export type ClientOptions = | RedisOptions @@ -40,6 +41,7 @@ export interface TcpClientOptions { port?: number; serializer?: Serializer; deserializer?: Deserializer; + tlsOptions?: ConnectionOptions; socketClass?: Type; }; } diff --git a/packages/microservices/interfaces/microservice-configuration.interface.ts b/packages/microservices/interfaces/microservice-configuration.interface.ts index fa3f7c08a06..63847c10afa 100644 --- a/packages/microservices/interfaces/microservice-configuration.interface.ts +++ b/packages/microservices/interfaces/microservice-configuration.interface.ts @@ -1,4 +1,5 @@ import { Type } from '@nestjs/common'; +import { ConnectionOptions } from 'tls'; import { Transport } from '../enums/transport.enum'; import { ChannelOptions } from '../external/grpc-options.interface'; import { @@ -87,6 +88,7 @@ export interface TcpOptions { retryAttempts?: number; retryDelay?: number; serializer?: Serializer; + tlsOptions?: ConnectionOptions; deserializer?: Deserializer; socketClass?: Type; }; diff --git a/packages/microservices/server/server-tcp.ts b/packages/microservices/server/server-tcp.ts index bffeb158a7c..655fcd00388 100644 --- a/packages/microservices/server/server-tcp.ts +++ b/packages/microservices/server/server-tcp.ts @@ -15,6 +15,7 @@ import { import { TcpContext } from '../ctx-host/tcp.context'; import { Transport } from '../enums'; import { JsonSocket, TcpSocket } from '../helpers'; +import { createServer as tlsCreateServer } from 'tls'; import { CustomTransportStrategy, IncomingRequest, @@ -35,6 +36,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy { private readonly socketClass: Type; private isExplicitlyTerminated = false; private retryAttemptsCount = 0; + private tlsOptions?; constructor(private readonly options: TcpOptions['options']) { super(); @@ -42,6 +44,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy { this.host = this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST; this.socketClass = this.getOptionsProp(options, 'socketClass') || JsonSocket; + this.tlsOptions = this.getOptionsProp(options, 'tlsOptions'); this.init(); this.initializeSerializer(options); @@ -124,7 +127,16 @@ export class ServerTCP extends Server implements CustomTransportStrategy { } private init() { - this.server = net.createServer(this.bindHandler.bind(this)); + if (this.tlsOptions) { + // TLS enabled, use tls server + this.server = tlsCreateServer( + this.tlsOptions, + this.bindHandler.bind(this), + ); + } else { + // TLS disabled, use net server + this.server = net.createServer(this.bindHandler.bind(this)); + } this.server.on(ERROR_EVENT, this.handleError.bind(this)); this.server.on(CLOSE_EVENT, this.handleClose.bind(this)); } diff --git a/packages/microservices/test/client/client-tcp.spec.ts b/packages/microservices/test/client/client-tcp.spec.ts index 1a5c1617eeb..d3b36c172e5 100644 --- a/packages/microservices/test/client/client-tcp.spec.ts +++ b/packages/microservices/test/client/client-tcp.spec.ts @@ -1,5 +1,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; +import { TLSSocket } from 'tls'; +import { Socket as NetSocket } from 'net'; import { ClientTCP } from '../../client/client-tcp'; import { ERROR_EVENT } from '../../constants'; @@ -214,4 +216,17 @@ describe('ClientTCP', () => { expect(sendMessageStub.called).to.be.true; }); }); + + describe('tls', () => { + it('should upgrade to TLS', () => { + const client = new ClientTCP({ tlsOptions: {} }); + console.log(client); + const jsonSocket = client.createSocket(); + expect(jsonSocket.socket).instanceOf(TLSSocket); + }); + it('should not upgrade to TLS, if not requested', () => { + const jsonSocket = new ClientTCP({}).createSocket(); + expect(jsonSocket.socket).instanceOf(NetSocket); + }); + }); });