Skip to content

Commit

Permalink
fix(Google Drive Node): Fix file upload for streams (#11698)
Browse files Browse the repository at this point in the history
  • Loading branch information
elsmr authored and burivuhster committed Nov 14, 2024
1 parent 6fb954c commit 66f98e1
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import * as upload from '../../../../v2/actions/file/upload.operation';
import * as transport from '../../../../v2/transport';
import * as utils from '../../../../v2/helpers/utils';

import { createMockExecuteFunction, driveNode } from '../helpers';
import { createMockExecuteFunction, createTestStream, driveNode } from '../helpers';

jest.mock('../../../../v2/transport', () => {
const originalModule = jest.requireActual('../../../../v2/transport');
Expand All @@ -30,7 +30,7 @@ jest.mock('../../../../v2/helpers/utils', () => {
getItemBinaryData: jest.fn(async function () {
return {
contentLength: '123',
fileContent: 'Hello Drive!',
fileContent: Buffer.from('Hello Drive!'),
originalFilename: 'original.txt',
mimeType: 'text/plain',
};
Expand All @@ -43,13 +43,17 @@ describe('test GoogleDriveV2: file upload', () => {
nock.disableNetConnect();
});

beforeEach(() => {
jest.clearAllMocks();
});

afterAll(() => {
nock.restore();
jest.unmock('../../../../v2/transport');
jest.unmock('../../../../v2/helpers/utils');
});

it('should be called with', async () => {
it('should upload buffers', async () => {
const nodeParameters = {
name: 'newFile.txt',
folderId: {
Expand All @@ -73,10 +77,10 @@ describe('test GoogleDriveV2: file upload', () => {
expect(transport.googleApiRequest).toHaveBeenCalledWith(
'POST',
'/upload/drive/v3/files',
expect.any(Buffer),
{ uploadType: 'media' },
undefined,
{ uploadType: 'resumable' },
undefined,
{ returnFullResponse: true },
{ headers: { 'Content-Length': '123', 'Content-Type': 'text/plain' } },
);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
'PATCH',
Expand All @@ -94,4 +98,60 @@ describe('test GoogleDriveV2: file upload', () => {
expect(utils.getItemBinaryData).toBeCalledTimes(1);
expect(utils.getItemBinaryData).toHaveBeenCalled();
});

it('should stream large files in 2MB chunks', async () => {
const nodeParameters = {
name: 'newFile.jpg',
folderId: {
__rl: true,
value: 'folderIDxxxxxx',
mode: 'list',
cachedResultName: 'testFolder 3',
cachedResultUrl: 'https://drive.google.com/drive/folders/folderIDxxxxxx',
},
options: {
simplifyOutput: true,
},
};

const fakeExecuteFunction = createMockExecuteFunction(nodeParameters, driveNode);
const httpRequestSpy = jest.spyOn(fakeExecuteFunction.helpers, 'httpRequest');

const fileSize = 7 * 1024 * 1024; // 7MB
jest.mocked(utils.getItemBinaryData).mockResolvedValue({
mimeType: 'image/jpg',
originalFilename: 'test.jpg',
contentLength: fileSize,
fileContent: createTestStream(fileSize),
});

await upload.execute.call(fakeExecuteFunction, 0);

// 4 chunks: 7MB = 3x2MB + 1x1MB
expect(httpRequestSpy).toHaveBeenCalledTimes(4);
expect(httpRequestSpy).toHaveBeenCalledWith(
expect.objectContaining({ body: expect.any(Buffer) }),
);
expect(transport.googleApiRequest).toBeCalledTimes(2);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
'POST',
'/upload/drive/v3/files',
undefined,
{ uploadType: 'resumable' },
undefined,
{ returnFullResponse: true },
);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
'PATCH',
'/drive/v3/files/undefined',
{ mimeType: 'image/jpg', name: 'newFile.jpg', originalFilename: 'test.jpg' },
{
addParents: 'folderIDxxxxxx',
supportsAllDrives: true,
corpora: 'allDrives',
includeItemsFromAllDrives: true,
spaces: 'appDataFolder, drive',
},
);
});
});
23 changes: 23 additions & 0 deletions packages/nodes-base/nodes/Google/Drive/test/v2/node/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { IDataObject, IExecuteFunctions, IGetNodeParameterOptions, INode }

import { get } from 'lodash';
import { constructExecutionMetaData, returnJsonArray } from 'n8n-core';
import { Readable } from 'stream';

export const driveNode: INode = {
id: '11',
Expand Down Expand Up @@ -40,3 +41,25 @@ export const createMockExecuteFunction = (
} as unknown as IExecuteFunctions;
return fakeExecuteFunction;
};

export function createTestStream(byteSize: number) {
let bytesSent = 0;
const CHUNK_SIZE = 64 * 1024; // 64kB chunks (default NodeJS highWaterMark)

return new Readable({
read() {
const remainingBytes = byteSize - bytesSent;

if (remainingBytes <= 0) {
this.push(null);
return;
}

const chunkSize = Math.min(CHUNK_SIZE, remainingBytes);
const chunk = Buffer.alloc(chunkSize, 'A'); // Test data just a string of "A"

bytesSent += chunkSize;
this.push(chunk);
},
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
setFileProperties,
setUpdateCommonParams,
setParentFolder,
processInChunks,
} from '../../helpers/utils';
import { updateDisplayOptions } from '@utils/utilities';

Expand Down Expand Up @@ -129,25 +130,25 @@ export async function execute(this: IExecuteFunctions, i: number): Promise<INode

const uploadUrl = resumableUpload.headers.location;

let offset = 0;
for await (const chunk of fileContent) {
const nextOffset = offset + Number(chunk.length);
// 2MB chunks, needs to be a multiple of 256kB for Google Drive API
const chunkSizeBytes = 2048 * 1024;

await processInChunks(fileContent, chunkSizeBytes, async (chunk, offset) => {
try {
const response = await this.helpers.httpRequest({
method: 'PUT',
url: uploadUrl,
headers: {
'Content-Length': chunk.length,
'Content-Range': `bytes ${offset}-${nextOffset - 1}/${contentLength}`,
'Content-Range': `bytes ${offset}-${offset + chunk.byteLength - 1}/${contentLength}`,
},
body: chunk,
});
uploadId = response?.id;
} catch (error) {
if (error.response?.status !== 308) throw error;
}
offset = nextOffset;
}
});
}

const options = this.getNodeParameter('options', i, {});
Expand Down
26 changes: 26 additions & 0 deletions packages/nodes-base/nodes/Google/Drive/v2/helpers/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,29 @@ export function setParentFolder(
return 'root';
}
}

export async function processInChunks(
stream: Readable,
chunkSize: number,
process: (chunk: Buffer, offset: number) => void | Promise<void>,
) {
let buffer = Buffer.alloc(0);
let offset = 0;

for await (const chunk of stream) {
buffer = Buffer.concat([buffer, chunk]);

while (buffer.length >= chunkSize) {
const chunkToProcess = buffer.subarray(0, chunkSize);
await process(chunkToProcess, offset);

buffer = buffer.subarray(chunkSize);
offset += chunkSize;
}
}

// Process last chunk, could be smaller than chunkSize
if (buffer.length > 0) {
await process(buffer, offset);
}
}

0 comments on commit 66f98e1

Please sign in to comment.