From ac6c1bcc2509114df9821d456f2034771b652577 Mon Sep 17 00:00:00 2001 From: Livio Brunner Date: Thu, 14 Sep 2023 21:59:27 +0200 Subject: [PATCH] fix(microservice): connect to kafka with `producerOnlyMode` per default --- docker-compose.yml | 34 ++- .../microservice.health.e2e-spec.ts | 194 +++++++++++------- .../microservice/microservice.health.ts | 11 +- package-lock.json | 16 ++ package.json | 1 + .../src/health/health.controller.ts | 10 +- 6 files changed, 191 insertions(+), 75 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 45589f8e2e..540976c2f8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,13 +21,43 @@ services: ports: - 27017:27017 - redis_test: + rabbitmq: + image: rabbitmq:3-management-alpine + container_name: 'rabbitmq' + networks: + - overlay + ports: + - 5672:5672 + - 15672:15672 + + redis: image: redis:latest - hostname: redis_test networks: - overlay ports: - 6379:6379 + kafka_zookeeper: + image: confluentinc/cp-zookeeper:7.0.1 + container_name: kafka_zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka_broker: + image: confluentinc/cp-kafka:7.0.1 + container_name: broker + ports: + - "9092:9092" + depends_on: + - kafka_zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'kafka_zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka_broker:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 networks: overlay: diff --git a/e2e/health-checks/microservice.health.e2e-spec.ts b/e2e/health-checks/microservice.health.e2e-spec.ts index a9d8cf9fbc..90325d597c 100644 --- a/e2e/health-checks/microservice.health.e2e-spec.ts +++ b/e2e/health-checks/microservice.health.e2e-spec.ts @@ -5,7 +5,12 @@ import { bootstrapTestingModule, DynamicHealthEndpointFn, } from '../helper'; -import { Transport } from '@nestjs/microservices'; +import { + KafkaOptions, + RmqOptions, + TcpClientOptions, + Transport, +} from '@nestjs/microservices'; describe('MicroserviceHealthIndicator', () => { let app: INestApplication; @@ -20,80 +25,131 @@ describe('MicroserviceHealthIndicator', () => { microservice = await bootstrapMicroservice(); }); - it('should check if the microservice is available', async () => { - app = await setHealthEndpoint(({ healthCheck, microservice }) => - healthCheck.check([ - async () => - microservice.pingCheck('tcp', { - transport: Transport.TCP, - options: { - host: '0.0.0.0', - port: 8889, - }, - }), - ]), - ).start(); - - const details = { tcp: { status: 'up' } }; - return request(app.getHttpServer()).get('/health').expect(200).expect({ - status: 'ok', - info: details, - error: {}, - details, + describe('TCP', () => { + it('should connect', async () => { + app = await setHealthEndpoint(({ healthCheck, microservice }) => + healthCheck.check([ + async () => + microservice.pingCheck('tcp', { + transport: Transport.TCP, + options: { + host: '0.0.0.0', + port: 8889, + }, + }), + ]), + ).start(); + + const details = { tcp: { status: 'up' } }; + return request(app.getHttpServer()).get('/health').expect(200).expect({ + status: 'ok', + info: details, + error: {}, + details, + }); + }); + + it('should throw an error if is not reachable', async () => { + app = await setHealthEndpoint(({ healthCheck, microservice }) => + healthCheck.check([ + async () => + microservice.pingCheck('tcp', { + transport: Transport.TCP, + options: { + host: '0.0.0.0', + port: 8889, + }, + }), + ]), + ).start(); + + await microservice.close(); + + const details = { + tcp: { status: 'down', message: 'connect ECONNREFUSED 0.0.0.0:8889' }, + }; + return request(app.getHttpServer()).get('/health').expect(503).expect({ + status: 'error', + info: {}, + error: details, + details, + }); }); }); - it('should throw an error if the service is not reachable', async () => { - app = await setHealthEndpoint(({ healthCheck, microservice }) => - healthCheck.check([ - async () => - microservice.pingCheck('tcp', { - transport: Transport.TCP, - options: { - host: '0.0.0.0', - port: 8889, - }, - }), - ]), - ).start(); - - await microservice.close(); - - const details = { - tcp: { status: 'down', message: 'connect ECONNREFUSED 0.0.0.0:8889' }, - }; - return request(app.getHttpServer()).get('/health').expect(503).expect({ - status: 'error', - info: {}, - error: details, - details, + describe('RMQ', () => { + it('should connect', async () => { + app = await setHealthEndpoint(({ healthCheck, microservice }) => + healthCheck.check([ + async () => + microservice.pingCheck('rmq', { + transport: Transport.RMQ, + options: { + urls: ['amqp://localhost:5672'], + }, + }), + ]), + ).start(); + + const details = { rmq: { status: 'up' } }; + return request(app.getHttpServer()).get('/health').expect(200).expect({ + status: 'ok', + info: details, + error: {}, + details, + }); + }); + + it('should throw an error if is not reachable', async () => { + app = await setHealthEndpoint(({ healthCheck, microservice }) => + healthCheck.check([ + async () => + microservice.pingCheck('rmq', { + transport: Transport.RMQ, + options: { + urls: ['amqp://0.0.0.0:8889'], + }, + }), + ]), + ).start(); + + await microservice.close(); + + const details = { + rmq: { status: 'down', message: 'rmq is not available' }, + }; + return request(app.getHttpServer()).get('/health').expect(503).expect({ + status: 'error', + info: {}, + error: details, + details, + }); }); }); - it('should throw an error if an RMQ microservice is not reachable', async () => { - app = await setHealthEndpoint(({ healthCheck, microservice }) => - healthCheck.check([ - async () => - microservice.pingCheck('rmq', { - transport: Transport.RMQ, - options: { - host: '0.0.0.0', - port: 8889, - }, - }), - ]), - ).start(); - - await microservice.close(); - - const details = { - rmq: { status: 'down', message: 'rmq is not available' }, - }; - return request(app.getHttpServer()).get('/health').expect(503).expect({ - status: 'error', - info: {}, - error: details, - details, + describe('Kafka', () => { + it('should connect', async () => { + app = await setHealthEndpoint(({ healthCheck, microservice }) => + healthCheck.check([ + async () => + microservice.pingCheck('kafka', { + transport: Transport.KAFKA, + options: { + client: { + brokers: ['localhost:9092'], + }, + }, + }), + ]), + ).start(); + + const details = { kafka: { status: 'up' } }; + return request(app.getHttpServer()).get('/health').expect(200).expect({ + status: 'ok', + info: details, + error: {}, + details, + }); }); }); diff --git a/lib/health-indicator/microservice/microservice.health.ts b/lib/health-indicator/microservice/microservice.health.ts index e28624df30..03100e1c23 100644 --- a/lib/health-indicator/microservice/microservice.health.ts +++ b/lib/health-indicator/microservice/microservice.health.ts @@ -112,7 +112,7 @@ export class MicroserviceHealthIndicator extends HealthIndicator { * @throws {HealthCheckError} If the microservice is not reachable * * @example - * microservice.pingCheck('tcp', { + * microservice.pingCheck('tcp', { * transport: Transport.TCP, * options: { host: 'localhost', port: 3001 }, * }) @@ -124,6 +124,15 @@ export class MicroserviceHealthIndicator extends HealthIndicator { let isHealthy = false; const timeout = options.timeout || 1000; + if (options.transport === NestJSMicroservices.Transport.KAFKA) { + options.options = { + // We need to set the producerOnlyMode to true in order to speed + // up the connection process. https://github.com/nestjs/terminus/issues/1690 + producerOnlyMode: true, + ...options.options, + }; + } + try { await promiseTimeout(timeout, this.pingMicroservice(options)); isHealthy = true; diff --git a/package-lock.json b/package-lock.json index 3406bff9ae..753d3ba232 100644 --- a/package-lock.json +++ b/package-lock.json @@ -55,6 +55,7 @@ "husky": "8.0.3", "ioredis": "5.3.2", "jest": "29.7.0", + "kafkajs": "^2.2.4", "lint-staged": "13.2.3", "mongoose": "7.3.3", "mysql2": "3.6.1", @@ -12999,6 +13000,15 @@ "integrity": "sha512-qpcRocdkUmf+UTNBYx5w6dexX5J31AKK1OmPwH630a83DdVVUIngk55RSAiIGpQyoH0dlr872VHfPjnQnK1qDQ==", "dev": true }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "dev": true, + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/kareem": { "version": "2.5.1", "resolved": "https://registry.npmjs.org/kareem/-/kareem-2.5.1.tgz", @@ -31316,6 +31326,12 @@ "integrity": "sha512-qpcRocdkUmf+UTNBYx5w6dexX5J31AKK1OmPwH630a83DdVVUIngk55RSAiIGpQyoH0dlr872VHfPjnQnK1qDQ==", "dev": true }, + "kafkajs": { + "version": "2.2.4", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "dev": true + }, "kareem": { "version": "2.5.1", "resolved": "https://registry.npmjs.org/kareem/-/kareem-2.5.1.tgz", diff --git a/package.json b/package.json index 33d676478e..30ca924365 100644 --- a/package.json +++ b/package.json @@ -83,6 +83,7 @@ "husky": "8.0.3", "ioredis": "5.3.2", "jest": "29.7.0", + "kafkajs": "^2.2.4", "lint-staged": "13.2.3", "mongoose": "7.3.3", "mysql2": "3.6.1", diff --git a/sample/002-microservice-app/src/health/health.controller.ts b/sample/002-microservice-app/src/health/health.controller.ts index bf11e56ad4..684a9d814b 100644 --- a/sample/002-microservice-app/src/health/health.controller.ts +++ b/sample/002-microservice-app/src/health/health.controller.ts @@ -1,5 +1,9 @@ import { Controller, Get } from '@nestjs/common'; -import { Transport } from '@nestjs/microservices'; +import { + RedisOptions, + TcpClientOptions, + Transport, +} from '@nestjs/microservices'; import { HealthCheck, HealthCheckService, @@ -18,12 +22,12 @@ export class HealthController { check() { return this.health.check([ async () => - this.microservice.pingCheck('tcp', { + this.microservice.pingCheck('tcp', { transport: Transport.TCP, options: { host: 'localhost', port: 8889 }, }), async () => - this.microservice.pingCheck('redis', { + this.microservice.pingCheck('redis', { transport: Transport.REDIS, options: { host: 'localhost',