Skip to content

Commit

Permalink
Streaming input (#317)
Browse files Browse the repository at this point in the history
* Support streaming input

Signed-off-by: Matteo Collina <[email protected]>

* better error handling

Signed-off-by: Matteo Collina <[email protected]>

* code cov

Signed-off-by: Matteo Collina <[email protected]>

* delete transfer-encoding

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

---------

Signed-off-by: Matteo Collina <[email protected]>
  • Loading branch information
mcollina authored Dec 19, 2024
1 parent e91662f commit ef771a9
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 73 deletions.
31 changes: 29 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,43 @@ function inject (dispatchFunc, options, callback) {
}
}

function supportStream1 (req, next) {
const payload = req._lightMyRequest.payload
if (!payload || payload._readableState || typeof payload.resume !== 'function') { // does quack like a modern stream
return next()
}

// This is a non-compliant stream
const chunks = []

// We are accumulating because Readable.wrap() does not really work as expected
// in this case.
payload.on('data', (chunk) => chunks.push(Buffer.from(chunk)))

payload.on('end', () => {
const payload = Buffer.concat(chunks)
req.headers['content-length'] = req.headers['content-length'] || ('' + payload.length)
delete req.headers['transfer-encoding']
req._lightMyRequest.payload = payload
return next()
})

// Force to resume the stream. Needed for Stream 1
payload.resume()
}

function makeRequest (dispatchFunc, server, req, res) {
req.once('error', function (err) {
if (this.destroyed) res.destroy(err)
})

req.once('close', function () {
if (this.destroyed && !this._error) res.destroy()
if (this.destroyed && !this._error) {
res.destroy()
}
})

return req.prepare(() => dispatchFunc.call(server, req, res))
return supportStream1(req, () => dispatchFunc.call(server, req, res))
}

function doInject (dispatchFunc, options, callback) {
Expand Down
53 changes: 34 additions & 19 deletions lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,34 +193,46 @@ function Request (options) {
addAbortSignal(signal, this)
}

return this
}
{
const payload = this._lightMyRequest.payload
if (payload && payload._readableState) { // does quack like a modern stream
this._read = readStream

util.inherits(Request, Readable)
util.inherits(CustomRequest, Request)
payload.on('error', (err) => {
this.destroy(err)
})

Request.prototype.prepare = function (next) {
const payload = this._lightMyRequest.payload
if (!payload || typeof payload.resume !== 'function') { // does not quack like a stream
return next()
payload.on('end', () => {
this.push(null)
})
} else {
// Stream v1 are handled in index.js asynchronously
this._read = readEverythingElse
}
}

const chunks = []
return this
}

payload.on('data', (chunk) => chunks.push(Buffer.from(chunk)))
function readStream (size) {
const payload = this._lightMyRequest.payload

payload.on('end', () => {
const payload = Buffer.concat(chunks)
this.headers['content-length'] = this.headers['content-length'] || ('' + payload.length)
this._lightMyRequest.payload = payload
return next()
})
let more = true
let pushed = false
let chunk
while (more && (chunk = payload.read())) {
pushed = true
more = this.push(chunk)
}

// Force to resume the stream. Needed for Stream 1
payload.resume()
// We set up a recursive 'readable' event only if we didn't read anything.
// Otheriwse, the stream machinery will call _read() for us.
if (more && !pushed) {
this._lightMyRequest.payload.once('readable', this._read.bind(this))
}
}

Request.prototype._read = function (size) {
function readEverythingElse (size) {
setImmediate(() => {
if (this._lightMyRequest.isDone) {
// 'end' defaults to true
Expand Down Expand Up @@ -257,6 +269,9 @@ Request.prototype._read = function (size) {
})
}

util.inherits(Request, Readable)
util.inherits(CustomRequest, Request)

Request.prototype.destroy = function (error) {
if (this.destroyed || this._lightMyRequest.isDone) return
this.destroyed = true
Expand Down
35 changes: 21 additions & 14 deletions lib/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function Response (req, onEnd, reject) {

let called = false
const onEndSuccess = (payload) => {
// no need to early-return if already called because this handler is bound `once`
if (called) return
called = true
if (this._promiseCallback) {
return process.nextTick(() => onEnd(payload))
Expand All @@ -37,26 +37,33 @@ function Response (req, onEnd, reject) {
}
this._lightMyRequest.onEndSuccess = onEndSuccess

let finished = false
const onEndFailure = (err) => {
if (called) return
called = true
if (this._lightMyRequest.stream) {
const res = generatePayload(this)
res.raw.req = req
this._lightMyRequest.stream._read = function () {
this.destroy(err || new Error('premature close'))
}
onEndSuccess(res)
} else {
if (this._promiseCallback) {
return process.nextTick(() => reject(err))
if (called) {
if (this._lightMyRequest.stream && !finished) {
if (!err) {
err = new Error('response destroyed before completion')
err.code = 'LIGHT_ECONNRESET'
}
this._lightMyRequest.stream.destroy(err)
this._lightMyRequest.stream.on('error', () => {})
}
process.nextTick(() => onEnd(err, null))
return
}
called = true
if (!err) {
err = new Error('response destroyed before completion')
err.code = 'LIGHT_ECONNRESET'
}
if (this._promiseCallback) {
return process.nextTick(() => reject(err))
}
process.nextTick(() => onEnd(err, null))
}

if (this._lightMyRequest.stream) {
this.once('finish', () => {
finished = true
this._lightMyRequest.stream.push(null)
})
} else {
Expand Down
71 changes: 43 additions & 28 deletions test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,25 @@ test('can handle a stream payload', (t, done) => {
})
})

test('can handle a stream payload that errors', (t, done) => {
t.plan(2)
const dispatch = function (req, res) {
req.resume()
}

const payload = new Readable({
read () {
this.destroy(new Error('kaboom'))
}
})

inject(dispatch, { method: 'POST', url: '/', payload }, (err, res) => {
t.assert.ok(err)
t.assert.equal(err.message, 'kaboom')
done()
})
})

test('can handle a stream payload of utf-8 strings', (t, done) => {
t.plan(2)
const dispatch = function (req, res) {
Expand Down Expand Up @@ -771,17 +790,6 @@ test('can override stream payload content-length header', (t, done) => {
})
})

test('can override stream payload content-length header without request content-length', (t, done) => {
t.plan(1)
const dispatch = function (req, res) {
res.writeHead(200, { 'Content-Type': 'text/plain' })
t.assert.strictEqual(req.headers['content-length'], '2')
done()
}

inject(dispatch, { method: 'POST', url: '/', payload: getTestStream() }, () => {})
})

test('writeHead returns single buffer payload', (t, done) => {
t.plan(4)
const reply = 'Hello World'
Expand Down Expand Up @@ -1095,9 +1103,10 @@ test('HTTP method is case insensitive', (t, done) => {
})

test('form-data should be handled correctly', (t, done) => {
t.plan(3)
t.plan(4)

const dispatch = function (req, res) {
t.assert.strictEqual(req.headers['transfer-encoding'], undefined)
let body = ''
req.on('data', d => {
body += d
Expand All @@ -1113,6 +1122,10 @@ test('form-data should be handled correctly', (t, done) => {
inject(dispatch, {
method: 'POST',
url: 'http://example.com:8080/hello',
headers: {
// Transfer-encoding is automatically deleted if Stream1 is used
'transfer-encoding': 'chunked'
},
payload: form
}, (err, res) => {
t.assert.ifError(err)
Expand Down Expand Up @@ -1822,15 +1835,16 @@ test('simulate invalid alter _lightMyRequest.isDone without end', (t, done) => {
})
})

test('no error for response destory', (t, done) => {
t.plan(1)
test('no error for response destroy', (t, done) => {
t.plan(2)

const dispatch = function (req, res) {
res.destroy()
}

inject(dispatch, { method: 'GET', url: '/' }, (err, res) => {
t.assert.ifError(err)
t.assert.equal(res, null)
t.assert.equal(err.code, 'LIGHT_ECONNRESET')
done()
})
})
Expand All @@ -1843,8 +1857,8 @@ test('request destory without.assert.ifError', (t, done) => {
}

inject(dispatch, { method: 'GET', url: '/' }, (err, res) => {
t.assert.ifError(err)
t.assert.strictEqual(res, null)
t.assert.equal(err.code, 'LIGHT_ECONNRESET')
t.assert.equal(res, null)
done()
})
})
Expand Down Expand Up @@ -1877,14 +1891,14 @@ test('compatible with stream.finished', (t, done) => {
}

inject(dispatch, { method: 'GET', url: '/' }, (err, res) => {
t.assert.ifError(err)
t.assert.strictEqual(res, null)
t.assert.equal(err.code, 'LIGHT_ECONNRESET')
t.assert.equal(res, null)
done()
})
})

test('compatible with eos', (t, done) => {
t.plan(3)
t.plan(4)

const dispatch = function (req, res) {
eos(res, (err) => {
Expand All @@ -1895,8 +1909,9 @@ test('compatible with eos', (t, done) => {
}

inject(dispatch, { method: 'GET', url: '/' }, (err, res) => {
t.assert.ifError(err)
t.assert.strictEqual(res, null)
t.assert.ok(err)
t.assert.equal(err.code, 'LIGHT_ECONNRESET')
t.assert.equal(res, null)
done()
})
})
Expand Down Expand Up @@ -1953,8 +1968,8 @@ test('multiple calls to req.destroy should not be called', (t, done) => {
}

inject(dispatch, { method: 'GET', url: '/' }, (err, res) => {
t.assert.ifError(err)
t.assert.strictEqual(res, null)
t.assert.equal(res, null)
t.assert.equal(err.code, 'LIGHT_ECONNRESET')
done()
})
})
Expand Down Expand Up @@ -2113,23 +2128,23 @@ test("passes payload when using express' send", (t, done) => {
})
})

test('request that is destroyed does not.assert.ifError', (t, done) => {
test('request that is destroyed errors', (t, done) => {
t.plan(2)
const dispatch = function (req, res) {
readStream(req, (buff) => {
req.destroy() // this should be a no-op
setImmediate(() => {
res.writeHead(200, { 'Content-Type': 'text/plain' })
res.end(buff)
res.end('hi')
})
})
}

const payload = getTestStream()

inject(dispatch, { method: 'POST', url: '/', payload }, (err, res) => {
t.assert.ifError(err)
t.assert.strictEqual(res.payload, 'hi')
t.assert.equal(res, null)
t.assert.equal(err.code, 'LIGHT_ECONNRESET')
done()
})
})
Expand Down
5 changes: 3 additions & 2 deletions test/response.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ const { test } = require('node:test')
const Response = require('../lib/response')

test('multiple calls to res.destroy should not be called', (t, done) => {
t.plan(1)
t.plan(2)

const mockReq = {}
const res = new Response(mockReq, (err, response) => {
t.assert.ifError(err)
t.assert.ok(err)
t.assert.strictEqual(err.code, 'LIGHT_ECONNRESET')
done()
})

Expand Down
18 changes: 10 additions & 8 deletions test/stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,10 @@ test('stream mode - error for response destroy', (t, done) => {
t.plan(2)

const dispatch = function (req, res) {
res.destroy()
res.writeHead(200)
setImmediate(() => {
res.destroy()
})
}

inject(dispatch, { method: 'GET', url: '/', payloadAsStream: true }, (err, res) => {
Expand All @@ -270,8 +273,8 @@ test('stream mode - error for response destroy', (t, done) => {
})
})

test('stream mode - request destory with error', (t, done) => {
t.plan(2)
test('stream mode - request destroy with error', (t, done) => {
t.plan(3)

const fakeError = new Error('some-err')

Expand All @@ -280,11 +283,10 @@ test('stream mode - request destory with error', (t, done) => {
}

inject(dispatch, { method: 'GET', url: '/', payloadAsStream: true }, (err, res) => {
t.assert.ifError(err)
accumulate(res.stream(), (err, res) => {
t.assert.strictEqual(err, fakeError)
done()
})
t.assert.ok(err)
t.assert.strictEqual(err, fakeError)
t.assert.strictEqual(res, null)
done()
})
})

Expand Down

0 comments on commit ef771a9

Please sign in to comment.