Skip to content

Commit

Permalink
Prevent socket exhaustion in BlobStorageS3Storage (#2666)
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasdax98 committed Nov 13, 2024
1 parent dffcd32 commit cb5d0ca
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 22 deletions.
5 changes: 5 additions & 0 deletions .changeset/poor-lizards-juggle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@comet/cms-api": patch
---

Prevent the API from crashing because of stream errors when delivering a file
15 changes: 15 additions & 0 deletions .changeset/spotty-ladybugs-travel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
"@comet/cms-api": patch
---

Prevent socket exhaustion in `BlobStorageS3Storage`

By default, the S3 client allows a maximum of 50 open sockets.
A socket is only released once a file is streamed completely.
Meaning, it can remain open forever if a file stream is interrupted (e.g., when the user leaves the site).
This could lead to socket exhaustion, preventing further file delivery.

To resolve this, the following changes were made:

1. Add a close handler to destroy the stream when the client disconnects
2. Set a 60-second `requestTimeout` to close unused connections
10 changes: 5 additions & 5 deletions demo/api/src/config/environment-variables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,23 @@ export class EnvironmentVariables {
@IsString()
BLOB_STORAGE_DIRECTORY_PREFIX: string;

@ValidateIf((v) => v.DAM_STORAGE_DRIVER === "s3")
@ValidateIf((v) => v.BLOB_STORAGE_DRIVER === "s3")
@IsString()
S3_REGION: string;

@ValidateIf((v) => v.DAM_STORAGE_DRIVER === "s3")
@ValidateIf((v) => v.BLOB_STORAGE_DRIVER === "s3")
@IsString()
S3_ENDPOINT: string;

@ValidateIf((v) => v.DAM_STORAGE_DRIVER === "s3")
@ValidateIf((v) => v.BLOB_STORAGE_DRIVER === "s3")
@IsString()
S3_ACCESS_KEY_ID: string;

@ValidateIf((v) => v.DAM_STORAGE_DRIVER === "s3")
@ValidateIf((v) => v.BLOB_STORAGE_DRIVER === "s3")
@IsString()
S3_SECRET_ACCESS_KEY: string;

@ValidateIf((v) => v.DAM_STORAGE_DRIVER === "s3")
@ValidateIf((v) => v.BLOB_STORAGE_DRIVER === "s3")
@IsString()
S3_BUCKET: string;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,16 @@ export class BlobStorageAzureStorage implements BlobStorageBackendInterface {
}
}

async getFile(folderName: string, fileName: string): Promise<NodeJS.ReadableStream> {
async getFile(folderName: string, fileName: string): Promise<Readable> {
const response = await this.client.getContainerClient(folderName).getBlobClient(fileName).download();
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return response.readableStreamBody!; // is defined in node.js but not for browsers
return Readable.from(response.readableStreamBody!); // is defined in node.js but not for browsers
}

async getPartialFile(folderName: string, fileName: string, offset: number, length: number): Promise<NodeJS.ReadableStream> {
async getPartialFile(folderName: string, fileName: string, offset: number, length: number): Promise<Readable> {
const response = await this.client.getContainerClient(folderName).getBlobClient(fileName).download(offset, length);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return response.readableStreamBody!; // is defined in node.js but not for browsers
return Readable.from(response.readableStreamBody!); // is defined in node.js but not for browsers
}

async removeFile(folderName: string, fileName: string): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Readable } from "stream";

export type StorageMetaData = {
size: number;
etag?: string;
Expand All @@ -16,8 +18,8 @@ export interface BlobStorageBackendInterface {
removeFolder(folderName: string): Promise<void>;
fileExists(folderName: string, fileName: string): Promise<boolean>;
createFile(folderName: string, fileName: string, data: NodeJS.ReadableStream | Buffer | string, options: CreateFileOptions): Promise<void>;
getFile(folderName: string, fileName: string): Promise<NodeJS.ReadableStream>;
getPartialFile(folderName: string, fileName: string, offset: number, length: number): Promise<NodeJS.ReadableStream>;
getFile(folderName: string, fileName: string): Promise<Readable>;
getPartialFile(folderName: string, fileName: string, offset: number, length: number): Promise<Readable>;
getFileMetaData(folderName: string, fileName: string): Promise<StorageMetaData>;
removeFile(folderName: string, fileName: string): Promise<void>;
getBackendFilePathPrefix(): string;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Inject, Injectable } from "@nestjs/common";
import { Readable } from "stream";

import { createHashedPath } from "../../dam/files/files.utils";
import { BlobStorageConfig } from "../blob-storage.config";
Expand Down Expand Up @@ -47,11 +48,11 @@ export class BlobStorageBackendService implements BlobStorageBackendInterface {
return this.backend.createFile(folderName, fileName, data, { ...options, headers: normalizeHeaders(headers) });
}

async getFile(folderName: string, fileName: string): Promise<NodeJS.ReadableStream> {
async getFile(folderName: string, fileName: string): Promise<Readable> {
return this.backend.getFile(folderName, fileName);
}

async getPartialFile(folderName: string, fileName: string, offset: number, length: number): Promise<NodeJS.ReadableStream> {
async getPartialFile(folderName: string, fileName: string, offset: number, length: number): Promise<Readable> {
return this.backend.getPartialFile(folderName, fileName, offset, length);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as fs from "fs";
import * as path from "path";
import { Stream } from "stream";
import { Readable, Stream } from "stream";

import { BlobStorageBackendInterface, CreateFileOptions, StorageMetaData } from "../blob-storage-backend.interface";
import { BlobStorageFileConfig } from "./blob-storage-file.config";
Expand Down Expand Up @@ -76,11 +76,11 @@ export class BlobStorageFileStorage implements BlobStorageBackendInterface {
]);
}

async getFile(folderName: string, fileName: string): Promise<NodeJS.ReadableStream> {
async getFile(folderName: string, fileName: string): Promise<Readable> {
return fs.createReadStream(`${this.path}/${folderName}/${fileName}`);
}

async getPartialFile(folderName: string, fileName: string, offset: number, length: number): Promise<NodeJS.ReadableStream> {
async getPartialFile(folderName: string, fileName: string, offset: number, length: number): Promise<Readable> {
return fs.createReadStream(`${this.path}/${folderName}/${fileName}`, {
start: offset,
end: offset + length - 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ export class BlobStorageS3Storage implements BlobStorageBackendInterface {

constructor(config: BlobStorageS3Config["s3"]) {
this.client = new AWS.S3({
requestHandler: {
// https://github.com/aws/aws-sdk-js-v3/blob/main/supplemental-docs/CLIENTS.md#request-handler-requesthandler
// Workaround to prevent socket exhaustion caused by dangling streams (e.g., when the user leaves the site).
// Close the connection when no request/response was sent for 60 seconds, indicating that the file stream was terminated.
requestTimeout: 60000,
connectionTimeout: 6000, // fail faster if there are no available connections
},
credentials: {
accessKeyId: config.accessKeyId,
secretAccessKey: config.secretAccessKey,
Expand Down Expand Up @@ -95,14 +102,14 @@ export class BlobStorageS3Storage implements BlobStorageBackendInterface {
await this.client.send(new AWS.PutObjectCommand(input));
}

async getFile(folderName: string, fileName: string): Promise<NodeJS.ReadableStream> {
async getFile(folderName: string, fileName: string): Promise<Readable> {
const response = await this.client.send(new AWS.GetObjectCommand(this.getCommandInput(folderName, fileName)));

// Blob is not supported and used in node
return Readable.from(response.Body as Readable | NodeJS.ReadableStream);
}

async getPartialFile(folderName: string, fileName: string, offset: number, length: number): Promise<NodeJS.ReadableStream> {
async getPartialFile(folderName: string, fileName: string, offset: number, length: number): Promise<Readable> {
const input: AWS.GetObjectCommandInput = {
...this.getCommandInput(folderName, fileName),
Range: `bytes=${offset}-${offset + length - 1}`,
Expand Down
20 changes: 16 additions & 4 deletions packages/api/cms-api/src/dam/files/files.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
Get,
Headers,
Inject,
Logger,
NotFoundException,
Param,
Post,
Expand All @@ -18,6 +19,7 @@ import { plainToInstance } from "class-transformer";
import { validate } from "class-validator";
import { Response } from "express";
import { OutgoingHttpHeaders } from "http";
import { Readable } from "stream";

import { GetCurrentUser } from "../../auth/decorators/get-current-user.decorator";
import { DisableGlobalGuard } from "../../auth/decorators/global-guard-disable.decorator";
Expand Down Expand Up @@ -54,6 +56,7 @@ export function createFilesController({ Scope: PassedScope }: { Scope?: Type<Dam
@Controller("dam/files")
@RequiredPermission(["dam"], { skipScopeCheck: true }) // Scope is checked in actions
class FilesController {
private readonly logger = new Logger(FilesController.name);
constructor(
@Inject(DAM_CONFIG) private readonly damConfig: DamConfig,
private readonly filesService: FilesService,
Expand Down Expand Up @@ -202,7 +205,7 @@ export function createFilesController({ Scope: PassedScope }: { Scope?: Type<Dam
};

// https://medium.com/@vishal1909/how-to-handle-partial-content-in-node-js-8b0a5aea216
let response: NodeJS.ReadableStream;
let stream: Readable;
if (options?.range) {
const { start, end, contentLength } = calculatePartialRanges(file.size, options.range);

Expand All @@ -215,7 +218,7 @@ export function createFilesController({ Scope: PassedScope }: { Scope?: Type<Dam
}

try {
response = await this.blobStorageBackendService.getPartialFile(
stream = await this.blobStorageBackendService.getPartialFile(
this.damConfig.filesDirectory,
createHashedPath(file.contentHash),
start,
Expand All @@ -234,18 +237,27 @@ export function createFilesController({ Scope: PassedScope }: { Scope?: Type<Dam
});
} else {
try {
response = await this.blobStorageBackendService.getFile(this.damConfig.filesDirectory, createHashedPath(file.contentHash));
stream = await this.blobStorageBackendService.getFile(this.damConfig.filesDirectory, createHashedPath(file.contentHash));
} catch (err) {
throw new Error(`File-Stream error: (storage.getFile) - ${(err as Error).message}`);
}

stream.on("error", (error) => {
this.logger.error("Stream error:", error);
res.end();
});

res.on("close", () => {
stream.destroy();
});

res.writeHead(200, {
...headers,
...options?.overrideHeaders,
});
}

response.pipe(res);
stream.pipe(res);
}
}

Expand Down

0 comments on commit cb5d0ca

Please sign in to comment.