Skip to content

Commit

Permalink
http2: reinject data received before http2 is attached
Browse files Browse the repository at this point in the history
Reinject the data already received from the TLS
socket when the HTTP2 client is attached with a
delay

Fixes: nodejs#35475
  • Loading branch information
mmomtchev committed Oct 16, 2020
1 parent c205a80 commit 8ad82f0
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 2 deletions.
15 changes: 13 additions & 2 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@ function finishSessionClose(session, error) {
if (socket && !socket.destroyed) {
// Always wait for writable side to finish.
socket.end((err) => {
debugSessionObj(session, 'finishSessionClose socket end', err);
debugSessionObj(session, 'finishSessionClose socket end', err, error);
// Due to the way the underlying stream is handled in Http2Session we
// won't get graceful Readable end from the other side even if it was sent
// as the stream is already considered closed and will neither be read
Expand All @@ -1055,7 +1055,7 @@ function finishSessionClose(session, error) {
}

function closeSession(session, code, error) {
debugSessionObj(session, 'start closing/destroying');
debugSessionObj(session, 'start closing/destroying', error);

const state = session[kState];
state.flags |= SESSION_FLAGS_DESTROYED;
Expand Down Expand Up @@ -3128,6 +3128,17 @@ function connect(authority, options, listener) {

if (typeof listener === 'function')
session.once('connect', listener);

debug('Http2Session connect', options.createConnection);
// Socket already has some buffered data - emulate receiving it
// https://github.com/nodejs/node/issues/35475
if (typeof options.createConnection === 'function') {
let buf;
while ((buf = socket.read()) !== null) {
debug(`Http2Session connect: injecting ${buf.length} already in buffer`);
session[kHandle].receive(buf);
}
}
return session;
}

Expand Down
28 changes: 28 additions & 0 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1829,6 +1829,33 @@ void Http2Session::Consume(Local<Object> stream_obj) {
Debug(this, "i/o stream consumed");
}

// Allow injecting of data from JS
// This is used when the socket has already some data received
// before our listener was attached
// https://github.com/nodejs/node/issues/35475
void Http2Session::Receive(const FunctionCallbackInfo<Value>& args) {
Http2Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
CHECK(args[0]->IsObject());

ArrayBufferViewContents<char> buffer(args[0]);
const char* data = buffer.data();
size_t len = buffer.length();
Debug(session, "Receiving %zu bytes injected from JS", len);

// Copy given buffer
while (len > 0) {
uv_buf_t buf = session->OnStreamAlloc(len);
size_t copy = buf.len > len ? len : buf.len;
memcpy(buf.base, data, copy);
buf.len = copy;
session->OnStreamRead(copy, buf);

data += copy;
len -= copy;
}
}

Http2Stream* Http2Stream::New(Http2Session* session,
int32_t id,
nghttp2_headers_category category,
Expand Down Expand Up @@ -3054,6 +3081,7 @@ void Initialize(Local<Object> target,
env->SetProtoMethod(session, "altsvc", Http2Session::AltSvc);
env->SetProtoMethod(session, "ping", Http2Session::Ping);
env->SetProtoMethod(session, "consume", Http2Session::Consume);
env->SetProtoMethod(session, "receive", Http2Session::Receive);
env->SetProtoMethod(session, "destroy", Http2Session::Destroy);
env->SetProtoMethod(session, "goaway", Http2Session::Goaway);
env->SetProtoMethod(session, "settings", Http2Session::Settings);
Expand Down
1 change: 1 addition & 0 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ class Http2Session : public AsyncWrap,
// The JavaScript API
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Consume(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Receive(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Destroy(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Settings(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Request(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down
64 changes: 64 additions & 0 deletions test/parallel/test-http2-connect-tls-with-delay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Flags: --expose-internals
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

if (!common.hasMultiLocalhost())
common.skip('platform-specific test.');

const http2 = require('http2');
const assert = require('assert');
const tls = require('tls');
const fixtures = require('../common/fixtures');

const serverOptions = {
key: fixtures.readKey('agent1-key.pem'),
cert: fixtures.readKey('agent1-cert.pem')
};
const server = http2.createSecureServer(serverOptions, (req, res) => {
console.log(`Connect from: ${req.connection.remoteAddress}`);
assert.strictEqual(req.connection.remoteAddress, '127.0.0.2');

req.on('end', common.mustCall(() => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end(`You are from: ${req.connection.remoteAddress}`);
}));
req.resume();
});

server.listen(0, '127.0.0.1', common.mustCall(() => {
const options = {
ALPNProtocols: ['h2'],
host: '127.0.0.1',
servername: 'localhost',
localAddress: '127.0.0.2',
port: server.address().port,
rejectUnauthorized: false
};

console.log('Server ready', server.address().port);

const socket = tls.connect(options, async () => {

console.log('TLS Connected!');

setTimeout(() => {

const client = http2.connect(
'https://localhost:' + server.address().port,
{ ...options, createConnection: () => socket }
);
const req = client.request({
':path': '/'
});
req.on('data', () => req.resume());
req.on('end', common.mustCall(function() {
client.close();
req.close();
server.close();
}));
req.end();
}, 1000);
});
}));

0 comments on commit 8ad82f0

Please sign in to comment.