Skip to content

Commit

Permalink
handle streams correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Mar 29, 2023
1 parent e656a14 commit e3dae5e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
20 changes: 14 additions & 6 deletions packages/cli/src/WebhookHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
/* eslint-disable prefer-destructuring */
import type express from 'express';
import get from 'lodash.get';
import stream from 'stream';
import { promisify } from 'util';

import { BinaryDataManager, NodeExecuteFunctions, eventEmitter } from 'n8n-core';

Expand Down Expand Up @@ -59,6 +61,8 @@ import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { Container } from 'typedi';

const pipeline = promisify(stream.pipeline);

export const WEBHOOK_METHODS = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT'];

/**
Expand Down Expand Up @@ -418,13 +422,17 @@ export async function executeWebhook(
return;
}

if (Buffer.isBuffer(response.body)) {
const isBuffer = Buffer.isBuffer(response.body);
const isStream = response.body instanceof stream.Readable;
if (isBuffer || isStream) {
res.header(response.headers);
res.end(response.body);

responseCallback(null, {
noWebhookResponse: true,
});
if (isBuffer) {
res.end(response.body);
responseCallback(null, { noWebhookResponse: true });
} else
void pipeline(response.body as stream.Readable, res).then(() =>
responseCallback(null, { noWebhookResponse: true }),
);
} else {
// TODO: This probably needs some more changes depending on the options on the
// Webhook Response node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ export class RespondToWebhook implements INodeType {
}
}

let responseBody: IN8nHttpResponse;
let responseBody: IN8nHttpResponse | Readable;
if (respondWith === 'json') {
const responseBodyParameter = this.getNodeParameter('responseBody', 0) as string;
if (responseBodyParameter) {
Expand Down Expand Up @@ -235,17 +235,18 @@ export class RespondToWebhook implements INodeType {
responseBinaryPropertyName = binaryKeys[0];
}
const binaryData = this.helpers.assertBinaryData(0, responseBinaryPropertyName);
let uploadData: Buffer | Readable;
if (binaryData.id) {
uploadData = this.helpers.getBinaryStream(binaryData.id);
responseBody = this.helpers.getBinaryStream(binaryData.id);
const metadata = await this.helpers.getBinaryMetadata(binaryData.id);
headers['content-length'] = metadata.fileSize;
} else {
uploadData = Buffer.from(binaryData.data, BINARY_ENCODING);
responseBody = Buffer.from(binaryData.data, BINARY_ENCODING);
headers['content-length'] = (responseBody as Buffer).length;
}

if (!headers['content-type']) {
headers['content-type'] = binaryData.mimeType;
}
responseBody = uploadData;
} else if (respondWith !== 'noData') {
throw new NodeOperationError(
this.getNode(),
Expand Down
2 changes: 1 addition & 1 deletion packages/workflow/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ export interface IHttpRequestOptions {
export type IN8nHttpResponse = IDataObject | Buffer | GenericValue | GenericValue[] | null;

export interface IN8nHttpFullResponse {
body: IN8nHttpResponse;
body: IN8nHttpResponse | Readable;
headers: IDataObject;
statusCode: number;
statusMessage?: string;
Expand Down

0 comments on commit e3dae5e

Please sign in to comment.