diff --git a/packages/core/src/tools/create-file-stasher.js b/packages/core/src/tools/create-file-stasher.js index e0f558ea8..56d06e336 100644 --- a/packages/core/src/tools/create-file-stasher.js +++ b/packages/core/src/tools/create-file-stasher.js @@ -60,7 +60,11 @@ const resolveRemoteStream = async (stream) => { try { await streamPipeline(stream, fs.createWriteStream(tmpFilePath)); } catch (error) { - fs.unlinkSync(tmpFilePath); + try { + fs.unlinkSync(tmpFilePath); + } catch (e) { + // File doesn't exist? Probably okay + } throw error; } @@ -69,7 +73,11 @@ const resolveRemoteStream = async (stream) => { readStream.on('end', () => { // Burn after reading - fs.unlinkSync(tmpFilePath); + try { + fs.unlinkSync(tmpFilePath); + } catch (e) { + // TODO: We probably want to log warning here + } }); return { diff --git a/packages/core/src/tools/fetch.js b/packages/core/src/tools/fetch.js index fee465984..68810fb10 100644 --- a/packages/core/src/tools/fetch.js +++ b/packages/core/src/tools/fetch.js @@ -1,5 +1,7 @@ 'use strict'; +const { Writable } = require('stream'); + const fetch = require('node-fetch'); // XXX: PatchedRequest is to get past node-fetch's check that forbids GET requests @@ -7,10 +9,10 @@ const fetch = require('node-fetch'); // https://github.com/node-fetch/node-fetch/blob/v2.6.0/src/request.js#L75-L78 class PatchedRequest extends fetch.Request { constructor(url, opts) { - const origMethod = (opts.method || 'GET').toUpperCase(); + const origMethod = ((opts && opts.method) || 'GET').toUpperCase(); const isGetWithBody = - (origMethod === 'GET' || origMethod === 'HEAD') && opts.body; + (origMethod === 'GET' || origMethod === 'HEAD') && opts && opts.body; let newOpts = opts; if (isGetWithBody) { // Temporary remove body to fool fetch.Request constructor @@ -50,9 +52,31 @@ class PatchedRequest extends fetch.Request { const newFetch = (url, opts) => { const request = new PatchedRequest(url, opts); + // fetch actually accepts a Request object as an argument. It'll clone the // request internally, that's why the PatchedRequest.body hack works. - return fetch(request); + const responsePromise = fetch(request); + + // node-fetch clones request.body and use the cloned body internally. We need + // to make sure to consume the original body stream so its internal buffer is + // not filled up, which causes it to pause. + // See https://github.com/node-fetch/node-fetch/issues/151 + // + // Exclude form-data object to be consistent with + // https://github.com/node-fetch/node-fetch/blob/v2.6.6/src/body.js#L403-L412 + if ( + request.body && + typeof request.body.pipe === 'function' && + typeof request.body.getBoundary !== 'function' + ) { + const nullStream = new Writable(); + nullStream._write = function (chunk, encoding, done) { + done(); + }; + request.body.pipe(nullStream); + } + + return responsePromise; }; newFetch.Promise = require('./promise'); diff --git a/packages/core/test/tools/fetch.js b/packages/core/test/tools/fetch.js new file mode 100644 index 000000000..2bec804ad --- /dev/null +++ b/packages/core/test/tools/fetch.js @@ -0,0 +1,44 @@ +const nock = require('nock'); +const should = require('should'); + +const fetch = require('../../src/tools/fetch'); +const FormData = require('form-data'); + +const { HTTPBIN_URL } = require('../constants'); + +describe('node-fetch patch', () => { + it('should not hang due to backpressure', async () => { + nock('https://fake.zapier.com') + .put('/upload') + .reply(200, (uri, responseBody) => { + return { + length: Buffer.from(responseBody, 'hex').length, + }; + }); + + const downloadResponse = await fetch(`${HTTPBIN_URL}/stream-bytes/35000`); + const uploadResponse = await fetch('https://fake.zapier.com/upload', { + method: 'PUT', + body: downloadResponse.body, + }); + + const result = await uploadResponse.json(); + should(result).eql({ length: 35000 }); + }); + + it('should upload form data', async () => { + const downloadResponse = await fetch(`${HTTPBIN_URL}/stream-bytes/100`); + + const form = new FormData(); + form.append('name', 'hello'); + form.append('data', downloadResponse.body); + + const uploadResponse = await fetch(`${HTTPBIN_URL}/post`, { + method: 'POST', + body: form, + }); + + const result = await uploadResponse.json(); + should(result.form.name).eql(['hello']); + }); +});