Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Respond to Webhook Node): Move from Binary Buffer to Binary streaming #5613

Merged
merged 8 commits into from
May 17, 2023
19 changes: 14 additions & 5 deletions packages/cli/src/WebhookHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
/* eslint-disable prefer-destructuring */
import type express from 'express';
import get from 'lodash.get';
import stream from 'stream';
import { promisify } from 'util';

import { BinaryDataManager, NodeExecuteFunctions, eventEmitter } from 'n8n-core';

import type {
IBinaryData,
IBinaryKeyData,
IDataObject,
IDeferredPromise,
Expand Down Expand Up @@ -59,6 +62,8 @@ import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { Container } from 'typedi';

const pipeline = promisify(stream.pipeline);

export const WEBHOOK_METHODS = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT'];

/**
Expand Down Expand Up @@ -418,13 +423,17 @@ export async function executeWebhook(
return;
}

if (Buffer.isBuffer(response.body)) {
const binaryData = (response.body as IDataObject)?.binaryData as IBinaryData;
if (binaryData?.id) {
res.header(response.headers);
const stream = NodeExecuteFunctions.getBinaryStream(binaryData.id);
void pipeline(stream, res).then(() =>
responseCallback(null, { noWebhookResponse: true }),
);
} else if (Buffer.isBuffer(response.body)) {
res.header(response.headers);
res.end(response.body);

responseCallback(null, {
noWebhookResponse: true,
});
responseCallback(null, { noWebhookResponse: true });
} else {
// TODO: This probably needs some more changes depending on the options on the
// Webhook Response node
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { Readable } from 'stream';
import type {
IDataObject,
IExecuteFunctions,
Expand All @@ -7,7 +8,7 @@ import type {
INodeType,
INodeTypeDescription,
} from 'n8n-workflow';
import { jsonParse, NodeOperationError } from 'n8n-workflow';
import { jsonParse, BINARY_ENCODING, NodeOperationError } from 'n8n-workflow';

export class RespondToWebhook implements INodeType {
description: INodeTypeDescription = {
Expand Down Expand Up @@ -201,7 +202,7 @@ export class RespondToWebhook implements INodeType {
}
}

let responseBody: IN8nHttpResponse;
let responseBody: IN8nHttpResponse | Readable;
if (respondWith === 'json') {
const responseBodyParameter = this.getNodeParameter('responseBody', 0) as string;
if (responseBodyParameter) {
Expand Down Expand Up @@ -235,15 +236,16 @@ export class RespondToWebhook implements INodeType {
}

const binaryData = this.helpers.assertBinaryData(0, responseBinaryPropertyName);
const binaryDataBuffer = await this.helpers.getBinaryDataBuffer(
0,
responseBinaryPropertyName,
);
if (binaryData.id) {
responseBody = { binaryData };
} else {
responseBody = Buffer.from(binaryData.data, BINARY_ENCODING);
headers['content-length'] = (responseBody as Buffer).length;
}

if (!headers['content-type']) {
headers['content-type'] = binaryData.mimeType;
}
responseBody = binaryDataBuffer;
} else if (respondWith !== 'noData') {
throw new NodeOperationError(
this.getNode(),
Expand Down
2 changes: 1 addition & 1 deletion packages/workflow/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ export interface IHttpRequestOptions {
export type IN8nHttpResponse = IDataObject | Buffer | GenericValue | GenericValue[] | null;

export interface IN8nHttpFullResponse {
body: IN8nHttpResponse;
body: IN8nHttpResponse | Readable;
headers: IDataObject;
statusCode: number;
statusMessage?: string;
Expand Down