Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Google Drive Node): Fix file upload for streams #11698

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import * as upload from '../../../../v2/actions/file/upload.operation';
import * as transport from '../../../../v2/transport';
import * as utils from '../../../../v2/helpers/utils';

import { createMockExecuteFunction, driveNode } from '../helpers';
import { createMockExecuteFunction, createTestStream, driveNode } from '../helpers';

jest.mock('../../../../v2/transport', () => {
const originalModule = jest.requireActual('../../../../v2/transport');
Expand All @@ -30,7 +30,7 @@ jest.mock('../../../../v2/helpers/utils', () => {
getItemBinaryData: jest.fn(async function () {
return {
contentLength: '123',
fileContent: 'Hello Drive!',
fileContent: Buffer.from('Hello Drive!'),
originalFilename: 'original.txt',
mimeType: 'text/plain',
};
Expand All @@ -43,13 +43,17 @@ describe('test GoogleDriveV2: file upload', () => {
nock.disableNetConnect();
});

beforeEach(() => {
jest.clearAllMocks();
});

afterAll(() => {
nock.restore();
jest.unmock('../../../../v2/transport');
jest.unmock('../../../../v2/helpers/utils');
});

it('should be called with', async () => {
it('should upload buffers', async () => {
const nodeParameters = {
name: 'newFile.txt',
folderId: {
Expand All @@ -73,10 +77,10 @@ describe('test GoogleDriveV2: file upload', () => {
expect(transport.googleApiRequest).toHaveBeenCalledWith(
'POST',
'/upload/drive/v3/files',
expect.any(Buffer),
{ uploadType: 'media' },
undefined,
{ uploadType: 'resumable' },
undefined,
{ returnFullResponse: true },
{ headers: { 'Content-Length': '123', 'Content-Type': 'text/plain' } },
);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
'PATCH',
Expand All @@ -94,4 +98,60 @@ describe('test GoogleDriveV2: file upload', () => {
expect(utils.getItemBinaryData).toBeCalledTimes(1);
expect(utils.getItemBinaryData).toHaveBeenCalled();
});

it('should stream large files in 2MB chunks', async () => {
const nodeParameters = {
name: 'newFile.jpg',
folderId: {
__rl: true,
value: 'folderIDxxxxxx',
mode: 'list',
cachedResultName: 'testFolder 3',
cachedResultUrl: 'https://drive.google.com/drive/folders/folderIDxxxxxx',
},
options: {
simplifyOutput: true,
},
};

const fakeExecuteFunction = createMockExecuteFunction(nodeParameters, driveNode);
const httpRequestSpy = jest.spyOn(fakeExecuteFunction.helpers, 'httpRequest');

const fileSize = 7 * 1024 * 1024; // 7MB
jest.mocked(utils.getItemBinaryData).mockResolvedValue({
mimeType: 'image/jpg',
originalFilename: 'test.jpg',
contentLength: fileSize,
fileContent: createTestStream(fileSize),
});

await upload.execute.call(fakeExecuteFunction, 0);

// 4 chunks: 7MB = 3x2MB + 1x1MB
expect(httpRequestSpy).toHaveBeenCalledTimes(4);
expect(httpRequestSpy).toHaveBeenCalledWith(
expect.objectContaining({ body: expect.any(Buffer) }),
);
expect(transport.googleApiRequest).toBeCalledTimes(2);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
'POST',
'/upload/drive/v3/files',
undefined,
{ uploadType: 'resumable' },
undefined,
{ returnFullResponse: true },
);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
'PATCH',
'/drive/v3/files/undefined',
{ mimeType: 'image/jpg', name: 'newFile.jpg', originalFilename: 'test.jpg' },
{
addParents: 'folderIDxxxxxx',
supportsAllDrives: true,
corpora: 'allDrives',
includeItemsFromAllDrives: true,
spaces: 'appDataFolder, drive',
},
);
});
});
23 changes: 23 additions & 0 deletions packages/nodes-base/nodes/Google/Drive/test/v2/node/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { IDataObject, IExecuteFunctions, IGetNodeParameterOptions, INode }

import { get } from 'lodash';
import { constructExecutionMetaData, returnJsonArray } from 'n8n-core';
import { Readable } from 'stream';

export const driveNode: INode = {
id: '11',
Expand Down Expand Up @@ -40,3 +41,25 @@ export const createMockExecuteFunction = (
} as unknown as IExecuteFunctions;
return fakeExecuteFunction;
};

export function createTestStream(byteSize: number) {
let bytesSent = 0;
const CHUNK_SIZE = 64 * 1024; // 64kB chunks (default NodeJS highWaterMark)

return new Readable({
read() {
const remainingBytes = byteSize - bytesSent;

if (remainingBytes <= 0) {
this.push(null);
return;
}

const chunkSize = Math.min(CHUNK_SIZE, remainingBytes);
const chunk = Buffer.alloc(chunkSize, 'A'); // Test data just a string of "A"

bytesSent += chunkSize;
this.push(chunk);
},
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
setFileProperties,
setUpdateCommonParams,
setParentFolder,
processInChunks,
} from '../../helpers/utils';
import { updateDisplayOptions } from '@utils/utilities';

Expand Down Expand Up @@ -129,25 +130,25 @@ export async function execute(this: IExecuteFunctions, i: number): Promise<INode

const uploadUrl = resumableUpload.headers.location;

let offset = 0;
for await (const chunk of fileContent) {
const nextOffset = offset + Number(chunk.length);
// 2MB chunks, needs to be a multiple of 256kB for Google Drive API
const chunkSizeBytes = 2048 * 1024;

await processInChunks(fileContent, chunkSizeBytes, async (chunk, offset) => {
try {
const response = await this.helpers.httpRequest({
method: 'PUT',
url: uploadUrl,
headers: {
'Content-Length': chunk.length,
'Content-Range': `bytes ${offset}-${nextOffset - 1}/${contentLength}`,
'Content-Range': `bytes ${offset}-${offset + chunk.byteLength - 1}/${contentLength}`,
},
body: chunk,
});
uploadId = response?.id;
} catch (error) {
if (error.response?.status !== 308) throw error;
}
offset = nextOffset;
}
});
}

const options = this.getNodeParameter('options', i, {});
Expand Down
26 changes: 26 additions & 0 deletions packages/nodes-base/nodes/Google/Drive/v2/helpers/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,29 @@ export function setParentFolder(
return 'root';
}
}

export async function processInChunks(
stream: Readable,
chunkSize: number,
process: (chunk: Buffer, offset: number) => void | Promise<void>,
) {
let buffer = Buffer.alloc(0);
let offset = 0;

for await (const chunk of stream) {
buffer = Buffer.concat([buffer, chunk]);

while (buffer.length >= chunkSize) {
const chunkToProcess = buffer.subarray(0, chunkSize);
await process(chunkToProcess, offset);

buffer = buffer.subarray(chunkSize);
offset += chunkSize;
}
}

// Process last chunk, could be smaller than chunkSize
if (buffer.length > 0) {
await process(buffer, offset);
}
}
Loading