diff --git a/README.md b/README.md index 4bd6309..876e4a2 100644 --- a/README.md +++ b/README.md @@ -168,6 +168,7 @@ Injects a fake request into an HTTP server. - `signal` - An `AbortSignal` that may be used to abort an ongoing request. Requires Node v16+. - `Request` - Optional type from which the `request` object should inherit instead of `stream.Readable` + - `payloadAsStream` - if set to `true`, the response will be streamed and not accumulated; in this case `res.payload`, `res.rawPayload` will be undefined. - `callback` - the callback function using the signature `function (err, res)` where: - `err` - error object - `res` - a response object where: diff --git a/lib/request.js b/lib/request.js index 99c0c13..49ad577 100644 --- a/lib/request.js +++ b/lib/request.js @@ -182,7 +182,9 @@ function Request (options) { this._lightMyRequest = { payload, isDone: false, - simulate: options.simulate || {} + simulate: options.simulate || {}, + payloadAsStream: options.payloadAsStream, + signal: options.signal } const signal = options.signal diff --git a/lib/response.js b/lib/response.js index 322bc22..9b80eae 100644 --- a/lib/response.js +++ b/lib/response.js @@ -1,7 +1,7 @@ 'use strict' const http = require('node:http') -const { Writable, Readable } = require('node:stream') +const { Writable, Readable, addAbortSignal } = require('node:stream') const util = require('node:util') const setCookie = require('set-cookie-parser') @@ -9,7 +9,16 @@ const setCookie = require('set-cookie-parser') function Response (req, onEnd, reject) { http.ServerResponse.call(this, req) - this._lightMyRequest = { headers: null, trailers: {}, payloadChunks: [] } + if (req._lightMyRequest?.payloadAsStream) { + this._lightMyRequest = { headers: null, trailers: {}, stream: new Readable({ read () {} }) } + const signal = req._lightMyRequest.signal + + if (signal) { + addAbortSignal(signal, this._lightMyRequest.stream) + } + } else { + this._lightMyRequest = { headers: null, trailers: {}, payloadChunks: [] } + } // This forces node@8 to always render the headers this.setHeader('foo', 'bar'); this.removeHeader('foo') @@ -26,21 +35,37 @@ function Response (req, onEnd, reject) { } process.nextTick(() => onEnd(null, payload)) } + this._lightMyRequest.onEndSuccess = onEndSuccess const onEndFailure = (err) => { if (called) return called = true - if (this._promiseCallback) { - return process.nextTick(() => reject(err)) + if (this._lightMyRequest.stream) { + const res = generatePayload(this) + res.raw.req = req + this._lightMyRequest.stream._read = function () { + this.destroy(err || new Error('premature close')) + } + onEndSuccess(res) + } else { + if (this._promiseCallback) { + return process.nextTick(() => reject(err)) + } + process.nextTick(() => onEnd(err, null)) } - process.nextTick(() => onEnd(err, null)) } - this.once('finish', () => { - const res = generatePayload(this) - res.raw.req = req - onEndSuccess(res) - }) + if (this._lightMyRequest.stream) { + this.once('finish', () => { + this._lightMyRequest.stream.push(null) + }) + } else { + this.once('finish', () => { + const res = generatePayload(this) + res.raw.req = req + onEndSuccess(res) + }) + } this.connection.once('error', onEndFailure) @@ -64,6 +89,10 @@ Response.prototype.writeHead = function () { copyHeaders(this) + if (this._lightMyRequest.stream) { + this._lightMyRequest.onEndSuccess(generatePayload(this)) + } + return result } @@ -72,7 +101,11 @@ Response.prototype.write = function (data, encoding, callback) { clearTimeout(this.timeoutHandle) } http.ServerResponse.prototype.write.call(this, data, encoding, callback) - this._lightMyRequest.payloadChunks.push(Buffer.from(data, encoding)) + if (this._lightMyRequest.stream) { + this._lightMyRequest.stream.push(Buffer.from(data, encoding)) + } else { + this._lightMyRequest.payloadChunks.push(Buffer.from(data, encoding)) + } return true } @@ -129,22 +162,32 @@ function generatePayload (response) { } } - // Prepare payload and trailers - const rawBuffer = Buffer.concat(response._lightMyRequest.payloadChunks) - res.rawPayload = rawBuffer - - // we keep both of them for compatibility reasons - res.payload = rawBuffer.toString() - res.body = res.payload res.trailers = response._lightMyRequest.trailers - // Prepare payload parsers - res.json = function parseJsonPayload () { - return JSON.parse(res.payload) + if (response._lightMyRequest.payloadChunks) { + // Prepare payload and trailers + const rawBuffer = Buffer.concat(response._lightMyRequest.payloadChunks) + res.rawPayload = rawBuffer + + // we keep both of them for compatibility reasons + res.payload = rawBuffer.toString() + res.body = res.payload + + // Prepare payload parsers + res.json = function parseJsonPayload () { + return JSON.parse(res.payload) + } + } else { + res.json = function () { + throw new Error('Response payload is not available with payloadAsStream: true') + } } // Provide stream Readable for advanced user res.stream = function streamPayload () { + if (response._lightMyRequest.stream) { + return response._lightMyRequest.stream + } return Readable.from(response._lightMyRequest.payloadChunks) } @@ -179,7 +222,7 @@ function copyHeaders (response) { // Add raw headers ;['Date', 'Connection', 'Transfer-Encoding'].forEach((name) => { const regex = new RegExp('\\r\\n' + name + ': ([^\\r]*)\\r\\n') - const field = response._header.match(regex) + const field = response._header?.match(regex) if (field) { response._lightMyRequest.headers[name.toLowerCase()] = field[1] } diff --git a/test/stream.test.js b/test/stream.test.js new file mode 100644 index 0000000..ffdca65 --- /dev/null +++ b/test/stream.test.js @@ -0,0 +1,317 @@ +'use strict' + +const t = require('tap') +const fs = require('node:fs') +const test = t.test +const zlib = require('node:zlib') +const express = require('express') + +const inject = require('../index') + +function accumulate (stream, cb) { + const chunks = [] + stream.on('error', cb) + stream.on('data', (chunk) => { + chunks.push(chunk) + }) + stream.on('end', () => { + cb(null, Buffer.concat(chunks)) + }) +} + +test('stream mode - non-chunked payload', (t) => { + t.plan(9) + const output = 'example.com:8080|/hello' + + const dispatch = function (req, res) { + res.statusMessage = 'Super' + res.setHeader('x-extra', 'hello') + res.writeHead(200, { 'Content-Type': 'text/plain', 'Content-Length': output.length }) + res.end(req.headers.host + '|' + req.url) + } + + inject(dispatch, { + url: 'http://example.com:8080/hello', + payloadAsStream: true + }, (err, res) => { + t.error(err) + t.equal(res.statusCode, 200) + t.equal(res.statusMessage, 'Super') + t.ok(res.headers.date) + t.strictSame(res.headers, { + date: res.headers.date, + connection: 'keep-alive', + 'x-extra': 'hello', + 'content-type': 'text/plain', + 'content-length': output.length.toString() + }) + t.equal(res.payload, undefined) + t.equal(res.rawPayload, undefined) + + accumulate(res.stream(), (err, payload) => { + t.error(err) + t.equal(payload.toString(), 'example.com:8080|/hello') + }) + }) +}) + +test('stream mode - passes headers', (t) => { + t.plan(3) + const dispatch = function (req, res) { + res.writeHead(200, { 'Content-Type': 'text/plain' }) + res.end(req.headers.super) + } + + inject(dispatch, { + method: 'GET', + url: 'http://example.com:8080/hello', + headers: { Super: 'duper' }, + payloadAsStream: true + }, (err, res) => { + t.error(err) + accumulate(res.stream(), (err, payload) => { + t.error(err) + t.equal(payload.toString(), 'duper') + }) + }) +}) + +test('stream mode - returns chunked payload', (t) => { + t.plan(6) + const dispatch = function (req, res) { + res.writeHead(200, 'OK') + res.write('a') + res.write('b') + res.end() + } + + inject(dispatch, { method: 'GET', url: '/', payloadAsStream: true }, (err, res) => { + t.error(err) + t.ok(res.headers.date) + t.ok(res.headers.connection) + t.equal(res.headers['transfer-encoding'], 'chunked') + accumulate(res.stream(), (err, payload) => { + t.error(err) + t.equal(payload.toString(), 'ab') + }) + }) +}) + +test('stream mode - sets trailers in response object', (t) => { + t.plan(4) + const dispatch = function (req, res) { + res.setHeader('Trailer', 'Test') + res.addTrailers({ Test: 123 }) + res.end() + } + + inject(dispatch, { method: 'GET', url: '/', payloadAsStream: true }, (err, res) => { + t.error(err) + t.equal(res.headers.trailer, 'Test') + t.equal(res.headers.test, undefined) + t.equal(res.trailers.test, '123') + }) +}) + +test('stream mode - parses zipped payload', (t) => { + t.plan(5) + const dispatch = function (req, res) { + res.writeHead(200, 'OK') + const stream = fs.createReadStream('./package.json') + stream.pipe(zlib.createGzip()).pipe(res) + } + + inject(dispatch, { method: 'GET', url: '/', payloadAsStream: true }, (err, res) => { + t.error(err) + fs.readFile('./package.json', { encoding: 'utf-8' }, (err, file) => { + t.error(err) + + accumulate(res.stream(), (err, payload) => { + t.error(err) + + zlib.unzip(payload, (err, unzipped) => { + t.error(err) + t.equal(unzipped.toString('utf-8'), file) + }) + }) + }) + }) +}) + +test('stream mode - returns multi buffer payload', (t) => { + t.plan(3) + const dispatch = function (req, res) { + res.writeHead(200) + res.write('a') + res.write(Buffer.from('b')) + res.end() + } + + inject(dispatch, { method: 'GET', url: '/', payloadAsStream: true }, (err, res) => { + t.error(err) + + const chunks = [] + const stream = res.stream() + stream.on('data', (chunk) => { + chunks.push(chunk) + }) + + stream.on('end', () => { + t.equal(chunks.length, 2) + t.equal(Buffer.concat(chunks).toString(), 'ab') + }) + }) +}) + +test('stream mode - returns null payload', (t) => { + t.plan(4) + const dispatch = function (req, res) { + res.writeHead(200, { 'Content-Length': 0 }) + res.end() + } + + inject(dispatch, { method: 'GET', url: '/', payloadAsStream: true }, (err, res) => { + t.error(err) + t.equal(res.payload, undefined) + accumulate(res.stream(), (err, payload) => { + t.error(err) + t.equal(payload.toString(), '') + }) + }) +}) + +test('stream mode - simulates error', (t) => { + t.plan(3) + const dispatch = function (req, res) { + req.on('readable', () => { + }) + + req.on('error', () => { + res.writeHead(200, { 'Content-Length': 0 }) + res.end('error') + }) + } + + const body = 'something special just for you' + inject(dispatch, { method: 'GET', url: '/', payload: body, simulate: { error: true }, payloadAsStream: true }, (err, res) => { + t.error(err) + accumulate(res.stream(), (err, payload) => { + t.error(err) + t.equal(payload.toString(), 'error') + }) + }) +}) + +test('stream mode - promises support', (t) => { + t.plan(1) + const dispatch = function (req, res) { + res.writeHead(200, { 'Content-Type': 'text/plain' }) + res.end('hello') + } + + inject(dispatch, { method: 'GET', url: 'http://example.com:8080/hello', payloadAsStream: true }) + .then((res) => { + return new Promise((resolve, reject) => { + accumulate(res.stream(), (err, payload) => { + if (err) { + return reject(err) + } + resolve(payload) + }) + }) + }) + .then(payload => t.equal(payload.toString(), 'hello')) + .catch(t.fail) +}) + +test('stream mode - Response.json() should throw', (t) => { + t.plan(2) + + const jsonData = { + a: 1, + b: '2' + } + + const dispatch = function (req, res) { + res.writeHead(200, { 'Content-Type': 'application/json' }) + res.end(JSON.stringify(jsonData)) + } + + inject(dispatch, { method: 'GET', path: 'http://example.com:8080/hello', payloadAsStream: true }, (err, res) => { + t.error(err) + const { json } = res + t.throws(json) + }) +}) + +test('stream mode - error for response destroy', (t) => { + t.plan(2) + + const dispatch = function (req, res) { + res.destroy() + } + + inject(dispatch, { method: 'GET', url: '/', payloadAsStream: true }, (err, res) => { + t.error(err) + accumulate(res.stream(), (err) => { + t.ok(err) + }) + }) +}) + +test('stream mode - request destory with error', (t) => { + t.plan(2) + + const fakeError = new Error('some-err') + + const dispatch = function (req, res) { + req.destroy(fakeError) + } + + inject(dispatch, { method: 'GET', url: '/', payloadAsStream: true }, (err, res) => { + t.error(err) + accumulate(res.stream(), (err, res) => { + t.equal(err, fakeError) + }) + }) +}) + +test('stream mode - Can abort a request using AbortController/AbortSignal', async (t) => { + const dispatch = function (req, res) { + res.writeHead(200) + } + + const controller = new AbortController() + const res = await inject(dispatch, { + method: 'GET', + url: 'http://example.com:8080/hello', + signal: controller.signal, + payloadAsStream: true + }) + controller.abort() + + await t.rejects(async () => { + for await (const c of res.stream()) { + t.fail(`should not loop, got ${c.toString()}`) + } + }) +}, { skip: globalThis.AbortController == null }) + +test("stream mode - passes payload when using express' send", (t) => { + t.plan(4) + + const app = express() + + app.get('/hello', (req, res) => { + res.send('some text') + }) + + inject(app, { method: 'GET', url: 'http://example.com:8080/hello', payloadAsStream: true }, (err, res) => { + t.error(err) + t.equal(res.headers['content-length'], '9') + accumulate(res.stream(), function (err, payload) { + t.error(err) + t.equal(payload.toString(), 'some text') + }) + }) +}) diff --git a/types/index.d.ts b/types/index.d.ts index 273869b..285fa03 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -59,6 +59,7 @@ declare namespace inject { cookies?: { [k: string]: string }, signal?: AbortSignal, Request?: object, + payloadAsStream?: boolean } /** diff --git a/types/index.test-d.ts b/types/index.test-d.ts index 581446a..bba26b7 100644 --- a/types/index.test-d.ts +++ b/types/index.test-d.ts @@ -142,3 +142,8 @@ inject(httpDispatch, { method: 'get', url: '/' }, (err, res) => { expectType(err) expectResponse(res) }) + +inject(httpDispatch, { method: 'get', url: '/', payloadAsStream: true }, (err, res) => { + expectType(err) + expectResponse(res) +})