Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Reduce memory usage in the Webhook node #4640

Merged
merged 1 commit into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1631,18 +1631,22 @@ class App {
// Binary data
// ----------------------------------------

// Returns binary buffer
// Download binary
this.app.get(
`/${this.restEndpoint}/data/:path`,
ResponseHelper.send(async (req: express.Request, res: express.Response): Promise<string> => {
async (req: express.Request, res: express.Response): Promise<void> => {
// TODO UM: check if this needs permission check for UM
const dataPath = req.params.path;
return BinaryDataManager.getInstance()
.retrieveBinaryDataByIdentifier(dataPath)
.then((buffer: Buffer) => {
return buffer.toString('base64');
});
}),
const identifier = req.params.path;
const binaryDataManager = BinaryDataManager.getInstance();
const binaryPath = binaryDataManager.getBinaryPath(identifier);
const { mimeType, fileName, fileSize } = await binaryDataManager.getBinaryMetadata(
identifier,
);
if (mimeType) res.setHeader('Content-Type', mimeType);
if (fileName) res.setHeader('Content-Disposition', `attachment; filename="${fileName}"`);
res.setHeader('Content-Length', fileSize);
res.sendFile(binaryPath);
},
);

// ----------------------------------------
Expand Down
1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"n8n-workflow": "~0.125.0",
"oauth-1.0a": "^2.2.6",
"p-cancelable": "^2.0.0",
"pretty-bytes": "^5.6.0",
"qs": "^6.10.1",
"request": "^2.88.2",
"request-promise-native": "^1.0.7",
Expand Down
53 changes: 44 additions & 9 deletions packages/core/src/BinaryDataManager/FileSystem.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { promises as fs } from 'fs';
import fs from 'fs/promises';
import { jsonParse } from 'n8n-workflow';
import path from 'path';
import { v4 as uuid } from 'uuid';

import { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';
import { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';

const PREFIX_METAFILE = 'binarymeta';
const PREFIX_PERSISTED_METAFILE = 'persistedmeta';
Expand Down Expand Up @@ -43,17 +44,47 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
.then(() => {});
}

async getFileSize(identifier: string): Promise<number> {
const stats = await fs.stat(this.getBinaryPath(identifier));
return stats.size;
}

async copyBinaryFile(filePath: string, executionId: string): Promise<string> {
const binaryDataId = this.generateFileName(executionId);
await this.addBinaryIdToPersistMeta(executionId, binaryDataId);
await this.copyFileToLocalStorage(filePath, binaryDataId);
return binaryDataId;
}

async storeBinaryMetadata(identifier: string, metadata: BinaryMetadata) {
await fs.writeFile(this.getMetadataPath(identifier), JSON.stringify(metadata), {
encoding: 'utf-8',
});
}

async getBinaryMetadata(identifier: string): Promise<BinaryMetadata> {
return jsonParse(await fs.readFile(this.getMetadataPath(identifier), { encoding: 'utf-8' }));
}

async storeBinaryData(binaryBuffer: Buffer, executionId: string): Promise<string> {
const binaryDataId = this.generateFileName(executionId);
return this.addBinaryIdToPersistMeta(executionId, binaryDataId).then(async () =>
this.saveToLocalStorage(binaryBuffer, binaryDataId).then(() => binaryDataId),
);
await this.addBinaryIdToPersistMeta(executionId, binaryDataId);
await this.saveToLocalStorage(binaryBuffer, binaryDataId);
return binaryDataId;
}

async retrieveBinaryDataByIdentifier(identifier: string): Promise<Buffer> {
return this.retrieveFromLocalStorage(identifier);
}

getBinaryPath(identifier: string): string {
return path.join(this.storagePath, identifier);
}

getMetadataPath(identifier: string): string {
return path.join(this.storagePath, `${identifier}.metadata`);
}

async markDataForDeletionByExecutionId(executionId: string): Promise<void> {
const tt = new Date(new Date().getTime() + this.binaryDataTTL * 60000);
return fs.writeFile(
Expand Down Expand Up @@ -180,7 +211,7 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
}

private generateFileName(prefix: string): string {
return `${prefix}_${uuid()}`;
return [prefix, uuid()].join('');
}

private getBinaryDataMetaPath() {
Expand All @@ -196,15 +227,19 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
}

private async deleteFromLocalStorage(identifier: string) {
return fs.rm(path.join(this.storagePath, identifier));
return fs.rm(this.getBinaryPath(identifier));
}

private async copyFileToLocalStorage(source: string, identifier: string): Promise<void> {
await fs.cp(source, this.getBinaryPath(identifier));
}

private async saveToLocalStorage(data: Buffer, identifier: string) {
await fs.writeFile(path.join(this.storagePath, identifier), data);
await fs.writeFile(this.getBinaryPath(identifier), data);
}

private async retrieveFromLocalStorage(identifier: string): Promise<Buffer> {
const filePath = path.join(this.storagePath, identifier);
const filePath = this.getBinaryPath(identifier);
try {
return await fs.readFile(filePath);
} catch (e) {
Expand Down
88 changes: 68 additions & 20 deletions packages/core/src/BinaryDataManager/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { IBinaryData, INodeExecutionData } from 'n8n-workflow';
import prettyBytes from 'pretty-bytes';
import type { IBinaryData, INodeExecutionData } from 'n8n-workflow';
import { BINARY_ENCODING } from '../Constants';
import { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';
import type { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';
import { BinaryDataFileSystem } from './FileSystem';
import { readFile, stat } from 'fs/promises';

export class BinaryDataManager {
static instance: BinaryDataManager | undefined;
Expand Down Expand Up @@ -43,31 +45,59 @@ export class BinaryDataManager {
return BinaryDataManager.instance;
}

async copyBinaryFile(
binaryData: IBinaryData,
filePath: string,
executionId: string,
): Promise<IBinaryData> {
// If a manager handles this binary, copy over the binary file and return its reference id.
const manager = this.managers[this.binaryDataMode];
if (manager) {
const identifier = await manager.copyBinaryFile(filePath, executionId);
// Add data manager reference id.
binaryData.id = this.generateBinaryId(identifier);

// Prevent preserving data in memory if handled by a data manager.
binaryData.data = this.binaryDataMode;

const fileSize = await manager.getFileSize(identifier);
binaryData.fileSize = prettyBytes(fileSize);

await manager.storeBinaryMetadata(identifier, {
fileName: binaryData.fileName,
mimeType: binaryData.mimeType,
fileSize,
});
} else {
const { size } = await stat(filePath);
binaryData.fileSize = prettyBytes(size);
binaryData.data = await readFile(filePath, { encoding: BINARY_ENCODING });
}

return binaryData;
}

async storeBinaryData(
binaryData: IBinaryData,
binaryBuffer: Buffer,
executionId: string,
): Promise<IBinaryData> {
const retBinaryData = binaryData;

// If a manager handles this binary, return the binary data with it's reference id.
if (this.managers[this.binaryDataMode]) {
return this.managers[this.binaryDataMode]
.storeBinaryData(binaryBuffer, executionId)
.then((filename) => {
// Add data manager reference id.
retBinaryData.id = this.generateBinaryId(filename);

// Prevent preserving data in memory if handled by a data manager.
retBinaryData.data = this.binaryDataMode;

// Short-circuit return to prevent further actions.
return retBinaryData;
});
binaryData.fileSize = prettyBytes(binaryBuffer.length);

// If a manager handles this binary, return the binary data with its reference id.
const manager = this.managers[this.binaryDataMode];
if (manager) {
const identifier = await manager.storeBinaryData(binaryBuffer, executionId);
// Add data manager reference id.
binaryData.id = this.generateBinaryId(identifier);

// Prevent preserving data in memory if handled by a data manager.
binaryData.data = this.binaryDataMode;
} else {
// Else fallback to storing this data in memory.
binaryData.data = binaryBuffer.toString(BINARY_ENCODING);
}

// Else fallback to storing this data in memory.
retBinaryData.data = binaryBuffer.toString(BINARY_ENCODING);
return binaryData;
}

Expand All @@ -88,6 +118,24 @@ export class BinaryDataManager {
throw new Error('Storage mode used to store binary data not available');
}

getBinaryPath(identifier: string): string {
const { mode, id } = this.splitBinaryModeFileId(identifier);
if (this.managers[mode]) {
return this.managers[mode].getBinaryPath(id);
}

throw new Error('Storage mode used to store binary data not available');
}

async getBinaryMetadata(identifier: string): Promise<BinaryMetadata> {
const { mode, id } = this.splitBinaryModeFileId(identifier);
if (this.managers[mode]) {
return this.managers[mode].getBinaryMetadata(id);
}

throw new Error('Storage mode used to store binary data not available');
}

async markDataForDeletionByExecutionId(executionId: string): Promise<void> {
if (this.managers[this.binaryDataMode]) {
return this.managers[this.binaryDataMode].markDataForDeletionByExecutionId(executionId);
Expand Down
12 changes: 12 additions & 0 deletions packages/core/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ export interface IWebhookFunctions extends IWebhookFunctionsBase {
filePath?: string,
mimeType?: string,
): Promise<IBinaryData>;
copyBinaryFile(filePath: string, fileName: string, mimeType?: string): Promise<IBinaryData>;
request: (uriOrObject: string | IDataObject | any, options?: IDataObject) => Promise<any>;
requestWithAuthentication(
this: IAllExecuteFunctions,
Expand Down Expand Up @@ -306,10 +307,21 @@ export interface IBinaryDataConfig {
persistedBinaryDataTTL: number;
}

export interface BinaryMetadata {
fileName?: string;
mimeType?: string;
fileSize: number;
}

export interface IBinaryDataManager {
init(startPurger: boolean): Promise<void>;
getFileSize(filePath: string): Promise<number>;
copyBinaryFile(filePath: string, executionId: string): Promise<string>;
storeBinaryMetadata(identifier: string, metadata: BinaryMetadata): Promise<void>;
getBinaryMetadata(identifier: string): Promise<BinaryMetadata>;
storeBinaryData(binaryBuffer: Buffer, executionId: string): Promise<string>;
retrieveBinaryDataByIdentifier(identifier: string): Promise<Buffer>;
getBinaryPath(identifier: string): string;
markDataForDeletionByExecutionId(executionId: string): Promise<void>;
deleteMarkedFiles(): Promise<unknown>;
deleteBinaryDataByIdentifier(identifier: string): Promise<void>;
Expand Down
Loading