Skip to content

Commit

Permalink
fix(nodejs#2311): End stream after body sent (nodejs#2314)
Browse files Browse the repository at this point in the history
  • Loading branch information
metcoder95 authored and crysmags committed Feb 27, 2024
1 parent d2d2796 commit ee773b7
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 53 deletions.
6 changes: 5 additions & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -2096,13 +2096,17 @@ async function writeIterable ({ h2stream, body, client, request, socket, content
throw socket[kError]
}

if (!h2stream.write(chunk)) {
const res = h2stream.write(chunk)
request.onBodySent(chunk)
if (!res) {
await waitForDrain()
}
}
} catch (err) {
h2stream.destroy(err)
} finally {
request.onRequestSent()
h2stream.end()
h2stream
.off('close', onDrain)
.off('drain', onDrain)
Expand Down
162 changes: 110 additions & 52 deletions test/fetch/http2.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,29 @@ const { Client, fetch } = require('../..')

const nodeVersion = Number(process.version.split('v')[1].split('.')[0])

plan(5)
plan(6)

test('[Fetch] Simple GET with h2', async t => {
const server = createSecureServer(pem)
const expectedRequestBody = 'hello h2!'
test('[Fetch] Issue#2311', async t => {
const expectedBody = 'hello from client!'

server.on('stream', async (stream, headers) => {
stream.respond({
const server = createSecureServer(pem, async (req, res) => {
let body = ''

req.setEncoding('utf8')

res.writeHead(200, {
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': headers['x-my-header'],
'x-method': headers[':method'],
':status': 200
'x-custom-h2': req.headers['x-my-header']
})

stream.end(expectedRequestBody)
for await (const chunk of req) {
body += chunk
}

res.end(body)
})

t.plan(3)
t.plan(1)

server.listen()
await once(server, 'listening')
Expand All @@ -46,12 +51,13 @@ test('[Fetch] Simple GET with h2', async t => {
`https://localhost:${server.address().port}/`,
// Needs to be passed to disable the reject unauthorized
{
method: 'GET',
method: 'POST',
dispatcher: client,
headers: {
'x-my-header': 'foo',
'content-type': 'text-plain'
}
},
body: expectedBody
}
)

Expand All @@ -60,30 +66,25 @@ test('[Fetch] Simple GET with h2', async t => {
t.teardown(server.close.bind(server))
t.teardown(client.close.bind(client))

t.equal(responseBody, expectedRequestBody)
t.equal(response.headers.get('x-method'), 'GET')
t.equal(response.headers.get('x-custom-h2'), 'foo')
t.equal(responseBody, expectedBody)
})

test('[Fetch] Should handle h2 request with body (string or buffer)', async t => {
test('[Fetch] Simple GET with h2', async t => {
const server = createSecureServer(pem)
const expectedBody = 'hello from client!'
const expectedRequestBody = 'hello h2!'
const requestBody = []

server.on('stream', async (stream, headers) => {
stream.on('data', chunk => requestBody.push(chunk))

stream.respond({
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': headers['x-my-header'],
'x-method': headers[':method'],
':status': 200
})

stream.end(expectedRequestBody)
})

t.plan(2)
t.plan(3)

server.listen()
await once(server, 'listening')
Expand All @@ -99,13 +100,12 @@ test('[Fetch] Should handle h2 request with body (string or buffer)', async t =>
`https://localhost:${server.address().port}/`,
// Needs to be passed to disable the reject unauthorized
{
method: 'POST',
method: 'GET',
dispatcher: client,
headers: {
'x-my-header': 'foo',
'content-type': 'text-plain'
},
body: expectedBody
}
}
)

Expand All @@ -114,38 +114,32 @@ test('[Fetch] Should handle h2 request with body (string or buffer)', async t =>
t.teardown(server.close.bind(server))
t.teardown(client.close.bind(client))

t.equal(Buffer.concat(requestBody).toString('utf-8'), expectedBody)
t.equal(responseBody, expectedRequestBody)
t.equal(response.headers.get('x-method'), 'GET')
t.equal(response.headers.get('x-custom-h2'), 'foo')
})

// Skipping for now, there is something odd in the way the body is handled
test('[Fetch] Should handle h2 request with body (stream)', { skip: nodeVersion === 16 }, async t => {
test('[Fetch] Should handle h2 request with body (string or buffer)', async t => {
const server = createSecureServer(pem)
const expectedBody = readFileSync(__filename, 'utf-8')
const stream = createReadStream(__filename)
const requestChunks = []
const expectedBody = 'hello from client!'
const expectedRequestBody = 'hello h2!'
const requestBody = []

server.on('stream', async (stream, headers) => {
t.equal(headers[':method'], 'PUT')
t.equal(headers[':path'], '/')
t.equal(headers[':scheme'], 'https')
stream.on('data', chunk => requestBody.push(chunk))

stream.respond({
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': headers['x-my-header'],
':status': 200
})

stream.end('hello h2!')

for await (const chunk of stream) {
requestChunks.push(chunk)
}
stream.end(expectedRequestBody)
})

t.plan(8)
t.plan(2)

server.listen(0)
server.listen()
await once(server, 'listening')

const client = new Client(`https://localhost:${server.address().port}`, {
Expand All @@ -155,32 +149,96 @@ test('[Fetch] Should handle h2 request with body (stream)', { skip: nodeVersion
allowH2: true
})

t.teardown(server.close.bind(server))
t.teardown(client.close.bind(client))

const response = await fetch(
`https://localhost:${server.address().port}/`,
// Needs to be passed to disable the reject unauthorized
{
method: 'PUT',
method: 'POST',
dispatcher: client,
headers: {
'x-my-header': 'foo',
'content-type': 'text-plain'
},
body: Readable.toWeb(stream),
duplex: 'half'
body: expectedBody
}
)

const responseBody = await response.text()

t.equal(response.status, 200)
t.equal(response.headers.get('content-type'), 'text/plain; charset=utf-8')
t.equal(response.headers.get('x-custom-h2'), 'foo')
t.equal(responseBody, 'hello h2!')
t.equal(Buffer.concat(requestChunks).toString('utf-8'), expectedBody)
t.teardown(server.close.bind(server))
t.teardown(client.close.bind(client))

t.equal(Buffer.concat(requestBody).toString('utf-8'), expectedBody)
t.equal(responseBody, expectedRequestBody)
})

// Skipping for now, there is something odd in the way the body is handled
test(
'[Fetch] Should handle h2 request with body (stream)',
{ skip: nodeVersion === 16 },
async t => {
const server = createSecureServer(pem)
const expectedBody = readFileSync(__filename, 'utf-8')
const stream = createReadStream(__filename)
const requestChunks = []

server.on('stream', async (stream, headers) => {
t.equal(headers[':method'], 'PUT')
t.equal(headers[':path'], '/')
t.equal(headers[':scheme'], 'https')

stream.respond({
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': headers['x-my-header'],
':status': 200
})

stream.end('hello h2!')

for await (const chunk of stream) {
requestChunks.push(chunk)
}
})

t.plan(8)

server.listen(0)
await once(server, 'listening')

const client = new Client(`https://localhost:${server.address().port}`, {
connect: {
rejectUnauthorized: false
},
allowH2: true
})

t.teardown(server.close.bind(server))
t.teardown(client.close.bind(client))

const response = await fetch(
`https://localhost:${server.address().port}/`,
// Needs to be passed to disable the reject unauthorized
{
method: 'PUT',
dispatcher: client,
headers: {
'x-my-header': 'foo',
'content-type': 'text-plain'
},
body: Readable.toWeb(stream),
duplex: 'half'
}
)

const responseBody = await response.text()

t.equal(response.status, 200)
t.equal(response.headers.get('content-type'), 'text/plain; charset=utf-8')
t.equal(response.headers.get('x-custom-h2'), 'foo')
t.equal(responseBody, 'hello h2!')
t.equal(Buffer.concat(requestChunks).toString('utf-8'), expectedBody)
}
)
test('Should handle h2 request with body (Blob)', { skip: !Blob }, async t => {
const server = createSecureServer(pem)
const expectedBody = 'asd'
Expand Down

0 comments on commit ee773b7

Please sign in to comment.