diff --git a/packages/core/router/router-response-controller.ts b/packages/core/router/router-response-controller.ts
index 96bd06b94a3..e66544a1ded 100644
--- a/packages/core/router/router-response-controller.ts
+++ b/packages/core/router/router-response-controller.ts
@@ -8,7 +8,7 @@ import {
 import { isObject } from '@nestjs/common/utils/shared.utils';
 import { IncomingMessage } from 'http';
 import { EMPTY, lastValueFrom, Observable, isObservable } from 'rxjs';
-import { catchError, debounce, map } from 'rxjs/operators';
+import { catchError, concatMap, map } from 'rxjs/operators';
 import {
   AdditionalHeaders,
   WritableHeaderStream,
@@ -128,7 +128,7 @@ export class RouterResponseController {
 
           return { data: message as object | string };
         }),
-        debounce(
+        concatMap(
           message =>
             new Promise<void>(resolve =>
               stream.writeMessage(message, () => resolve()),
@@ -153,6 +153,9 @@ export class RouterResponseController {
 
     request.on('close', () => {
       subscription.unsubscribe();
+      if (!stream.writableEnded) {
+        stream.end();
+      }
     });
   }
 
diff --git a/packages/core/router/sse-stream.ts b/packages/core/router/sse-stream.ts
index 3bf666c03c3..523e745e02a 100644
--- a/packages/core/router/sse-stream.ts
+++ b/packages/core/router/sse-stream.ts
@@ -116,7 +116,7 @@ export class SseStream extends Transform {
       message.id = this.lastEventId.toString();
     }
 
-    if (!this.write(message, 'utf-8', cb)) {
+    if (!this.write(message, 'utf-8')) {
       this.once('drain', cb);
     } else {
       process.nextTick(cb);
diff --git a/packages/core/test/router/router-response-controller.spec.ts b/packages/core/test/router/router-response-controller.spec.ts
index aac3212628e..1fdd92e7b2b 100644
--- a/packages/core/test/router/router-response-controller.spec.ts
+++ b/packages/core/test/router/router-response-controller.spec.ts
@@ -7,6 +7,7 @@ import { PassThrough, Writable } from 'stream';
 import { HttpStatus, RequestMethod } from '../../../common';
 import { RouterResponseController } from '../../router/router-response-controller';
 import { NoopHttpAdapter } from '../utils/noop-adapter.spec';
+import { SseStream } from '../../router/sse-stream';
 
 describe('RouterResponseController', () => {
   let adapter: NoopHttpAdapter;
@@ -374,6 +375,71 @@ data: test
       done();
     });
 
+    describe('when writing data too densely', () => {
+      const DEFAULT_MAX_LISTENERS = SseStream.defaultMaxListeners;
+      const MAX_LISTENERS = 1;
+      const sandbox = sinon.createSandbox();
+
+      beforeEach(() => {
+        // Can't access to the internal sseStream,
+        // as a workround, set `defaultMaxListeners` of `SseStream` and reset the max listeners of `process`
+        const PROCESS_MAX_LISTENERS = process.getMaxListeners();
+        SseStream.defaultMaxListeners = MAX_LISTENERS;
+        process.setMaxListeners(PROCESS_MAX_LISTENERS);
+
+        const sseStream = sinon.createStubInstance(SseStream);
+        const originalWrite = SseStream.prototype.write;
+        // Make `.write()` always return false, so as to listen `drain` event
+        sseStream.write.callsFake(function (...args: any[]) {
+          originalWrite.apply(this, args);
+          return false;
+        });
+        sandbox.replace(SseStream.prototype, 'write', sseStream.write);
+      });
+
+      afterEach(() => {
+        sandbox.restore();
+        SseStream.defaultMaxListeners = DEFAULT_MAX_LISTENERS;
+      });
+
+      it('should not cause memory leak', async () => {
+        let maxDrainListenersExceededWarning = null;
+        process.on('warning', (warning: any) => {
+          if (
+            warning.name === 'MaxListenersExceededWarning' &&
+            warning.emitter instanceof SseStream &&
+            warning.type === 'drain' &&
+            warning.count === MAX_LISTENERS + 1
+          ) {
+            maxDrainListenersExceededWarning = warning;
+          }
+        });
+
+        const result = new Subject();
+
+        const response = new Writable();
+        response._write = () => {};
+
+        const request = new Writable();
+        request._write = () => {};
+
+        routerResponseController.sse(
+          result,
+          response as unknown as ServerResponse,
+          request as unknown as IncomingMessage,
+        );
+
+        // Send multiple messages simultaneously
+        Array.from({ length: MAX_LISTENERS + 1 }).forEach((_, i) =>
+          result.next(String(i)),
+        );
+
+        await new Promise(resolve => process.nextTick(resolve));
+
+        expect(maxDrainListenersExceededWarning).to.equal(null);
+      });
+    });
+
     describe('when there is an error', () => {
       it('should close the request', done => {
         const result = new Subject();