From 88028fd8b7b4973aaf332dd89f0a5cd491511a7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Tue, 1 May 2018 21:05:39 +0200 Subject: [PATCH 1/2] Automatically send heartbeat requests (ping) unless given "?ping=0" --- README.md | 16 ++++++ examples/02-chatbot.php | 14 +---- examples/03-pingbot.php | 14 +---- examples/04-connect.php | 14 +---- examples/11-debug.php | 14 +---- src/Factory.php | 33 ++++++++++-- tests/FactoryIntegrationTest.php | 91 ++++++++++++++++++++++++++++++++ 7 files changed, 144 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index e9971d3..5a6371d 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,22 @@ optional `?pong=0` parameter like this: $factory->createClient('quassel://localhost?pong=0'); ``` +This automatic "pong" mechanism allows the Quassel core to detect the connection +to the client is still active. However, it does not allow the client to detect +if the connection to the Quassel core is still active. Because of this, this +project will automatically send a "ping" (heartbeat request) message to the +Quassel core if it did not receive any messages for 60s by default. You can pass +the `?ping=120.0` parameter to change this default interval. The Quassel core +uses a configurable ping interval of 30s by default and also sends all IRC +network state changes to the client, so this mechanism should only really kick +in if the connection looks dead. If you do not want this and want to handle +outgoing heartbeat request messages yourself, you may pass the optional `?ping=0` +parameter like this: + +```php +$factory->createClient('quassel://localhost?ping=0'); +``` + > This method uses Quassel IRC's probing mechanism for the correct protocol to use (newer "datastream" protocol or original "legacy" protocol). Protocol handling will be abstracted away for you, so you don't have to diff --git a/examples/02-chatbot.php b/examples/02-chatbot.php index 873e6fa..39617a7 100644 --- a/examples/02-chatbot.php +++ b/examples/02-chatbot.php @@ -29,24 +29,14 @@ $uri = rawurlencode($user) . ':' . rawurlencode($pass) . '@' . $host; -$factory->createClient($uri)->then(function (Client $client) use ($loop, $keyword) { +$factory->createClient($uri)->then(function (Client $client) use ($keyword) { var_dump('CONNECTED'); - $client->on('data', function ($message) use ($client, $keyword, $loop) { + $client->on('data', function ($message) use ($client, $keyword) { // session initialized if (isset($message['MsgType']) && $message['MsgType']=== 'SessionInit') { var_dump('session initialized, now waiting for incoming messages'); - // send heartbeat message every 30s to check dropped connection - $timer = $loop->addPeriodicTimer(30.0, function () use ($client) { - $client->writeHeartBeatRequest(); - }); - - // stop heartbeat timer once connection closes - $client->on('close', function () use ($loop, $timer) { - $loop->cancelTimer($timer); - }); - return; } diff --git a/examples/03-pingbot.php b/examples/03-pingbot.php index 6329363..97edbdb 100644 --- a/examples/03-pingbot.php +++ b/examples/03-pingbot.php @@ -22,12 +22,12 @@ $uri = rawurlencode($user) . ':' . rawurlencode($pass) . '@' . $host; -$factory->createClient($uri)->then(function (Client $client) use ($loop) { +$factory->createClient($uri)->then(function (Client $client) { var_dump('CONNECTED'); $nicks = array(); - $client->on('data', function ($message) use ($client, &$nicks, $loop) { + $client->on('data', function ($message) use ($client, &$nicks) { // session initialized => initialize all networks if (isset($message['MsgType']) && $message['MsgType'] === 'SessionInit') { var_dump('session initialized, now waiting for incoming messages'); @@ -37,16 +37,6 @@ $client->writeInitRequest("Network", $nid); } - // send heartbeat message every 30s to check dropped connection - $timer = $loop->addPeriodicTimer(30.0, function () use ($client) { - $client->writeHeartBeatRequest(); - }); - - // stop heartbeat timer once connection closes - $client->on('close', function () use ($loop, $timer) { - $loop->cancelTimer($timer); - }); - return; } diff --git a/examples/04-connect.php b/examples/04-connect.php index beb6b08..5b0ed3d 100644 --- a/examples/04-connect.php +++ b/examples/04-connect.php @@ -22,10 +22,10 @@ $uri = rawurlencode($user) . ':' . rawurlencode($pass) . '@' . $host; -$factory->createClient($uri)->then(function (Client $client) use ($loop) { +$factory->createClient($uri)->then(function (Client $client) { var_dump('CONNECTED'); - $client->on('data', function ($message) use ($client, $loop) { + $client->on('data', function ($message) use ($client) { // session initialized => initialize all networks and buffers if (isset($message['MsgType']) && $message['MsgType'] === 'SessionInit') { var_dump('session initialized'); @@ -42,16 +42,6 @@ } } - // send heartbeat message every 30s to check dropped connection - $timer = $loop->addPeriodicTimer(30.0, function () use ($client) { - $client->writeHeartBeatRequest(); - }); - - // stop heartbeat timer once connection closes - $client->on('close', function () use ($loop, $timer) { - $loop->cancelTimer($timer); - }); - var_dump('initialization completed, now waiting for incoming messages (assuming core receives any)'); return; diff --git a/examples/11-debug.php b/examples/11-debug.php index 4c1e791..ff58afe 100644 --- a/examples/11-debug.php +++ b/examples/11-debug.php @@ -22,11 +22,11 @@ $factory = new Factory($loop); echo '[1/5] Connecting' . PHP_EOL; -$factory->createClient($host)->then(function (Client $client) use ($loop, $user) { +$factory->createClient($host)->then(function (Client $client) use ($user) { echo '[2/5] Connected, now initializing' . PHP_EOL; $client->writeClientInit(); - $client->on('data', function ($message) use ($client, $user, $loop) { + $client->on('data', function ($message) use ($client, $user) { if (isset($message[3]['IrcUsersAndChannels'])) { // print network information except for huge users/channels list $debug = $message; @@ -96,16 +96,6 @@ } } - // send heartbeat message every 30s to check dropped connection - $timer = $loop->addPeriodicTimer(30.0, function () use ($client) { - $client->writeHeartBeatRequest(); - }); - - // stop heartbeat timer once connection closes - $client->on('close', function () use ($loop, $timer) { - $loop->cancelTimer($timer); - }); - return; } }); diff --git a/src/Factory.php b/src/Factory.php index c1e049f..0669fc4 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -90,13 +90,38 @@ function (DuplexStreamInterface $stream) use (&$probe) { }); } + // automatically send ping requests and await pong replies unless "?ping=0" is given // automatically reply to incoming ping requests with a pong unless "?pong=0" is given - if (!isset($args['pong']) || $args['pong']) { - $promise = $promise->then(function (Client $client) { - $client->on('data', function ($message) use ($client) { - if (isset($message[0]) && $message[0] === Protocol::REQUEST_HEARTBEAT) { + $ping = (!isset($args['ping'])) ? 60 : (float)$args['ping']; + $pong = (!isset($args['pong']) || $args['pong']) ? true : false; + if ($ping !== 0.0 || $pong) { + $loop = $this->loop; + $promise = $promise->then(function (Client $client) use ($loop, $ping, $pong) { + $timer = null; + if ($ping !== 0.0) { + // send heartbeat message every X seconds to check dropped connection + $timer = $loop->addPeriodicTimer($ping, function () use ($client) { + $client->writeHeartBeatRequest(); + }); + + // stop heartbeat timer once connection closes + $client->on('close', function () use ($loop, &$timer) { + $loop->cancelTimer($timer); + $timer = null; + }); + } + + $client->on('data', function ($message) use ($client, $pong, &$timer, $loop) { + // reply to incoming ping messages with pong + if (isset($message[0]) && $message[0] === Protocol::REQUEST_HEARTBEAT && $pong) { $client->writeHeartBeatReply($message[1]); } + + // restart heartbeat timer once data comes in + if ($timer !== null) { + $loop->cancelTimer($timer); + $timer = $loop->addPeriodicTimer($timer->getInterval(), $timer->getCallback()); + } }); return $client; diff --git a/tests/FactoryIntegrationTest.php b/tests/FactoryIntegrationTest.php index 716f200..f577bc3 100644 --- a/tests/FactoryIntegrationTest.php +++ b/tests/FactoryIntegrationTest.php @@ -375,6 +375,97 @@ public function testCreateClientDoesNotRespondWithHeartBeatResponseIfPongIsDisab $client->close(); } + public function testCreateClientSendsHeartBeatRequestAtInterval() + { + $loop = LoopFactory::create(); + $server = new Server(0, $loop); + + // expect heartbeat response packet + $data = $this->expectCallableOnceWith($this->callback(function ($packet) { + $protocol = Protocol::createFromProbe(0x02); + $data = $protocol->parseVariantPacket(substr($packet, 4)); + + return (isset($data[0]) && $data[0] === Protocol::REQUEST_HEARTBEAT); + })); + + $server->on('connection', function (ConnectionInterface $conn) use ($data) { + $conn->once('data', function () use ($conn, $data) { + $conn->write("\x00\x00\x00\x02"); + + // expect heartbeat request next + $conn->on('data', $data); + }); + }); + + $uri = str_replace('tcp://', '', $server->getAddress()); + $factory = new Factory($loop); + $promise = $factory->createClient($uri . '?ping=0.05'); + + Block\sleep(0.1, $loop); + + $client = Block\await($promise, $loop); + $client->close(); + } + + public function testCreateClientSendsNoHeartBeatRequestIfServerKeepsSendingMessages() + { + $loop = LoopFactory::create(); + $server = new Server(0, $loop); + + // expect heartbeat response packet + $data = $this->expectCallableNever(); + + $server->on('connection', function (ConnectionInterface $conn) use ($data, $loop) { + $conn->once('data', function () use ($conn, $data, $loop) { + $conn->write("\x00\x00\x00\x02"); + + // expect no heartbeat request + $conn->on('data', $data); + + // periodically send some dummy messages + $loop->addPeriodicTimer(0.01, function() use ($conn) { + $conn->write(FactoryIntegrationTest::encode(array(0))); + }); + }); + }); + + $uri = str_replace('tcp://', '', $server->getAddress()); + $factory = new Factory($loop); + $promise = $factory->createClient($uri . '?ping=0.05&pong=0'); + + Block\sleep(0.1, $loop); + + $client = Block\await($promise, $loop); + $client->close(); + } + + public function testCreateClientSendsNoHeartBeatRequestIfPingIsDisabled() + { + $loop = LoopFactory::create(); + $server = new Server(0, $loop); + + // expect heartbeat response packet + $data = $this->expectCallableNever(); + + $server->on('connection', function (ConnectionInterface $conn) use ($data) { + $conn->once('data', function () use ($conn, $data) { + $conn->write("\x00\x00\x00\x02"); + + // expect no heartbeat request + $conn->on('data', $data); + }); + }); + + $uri = str_replace('tcp://', '', $server->getAddress()); + $factory = new Factory($loop); + $promise = $factory->createClient($uri . '?ping=0'); + + Block\sleep(0.1, $loop); + + $client = Block\await($promise, $loop); + $client->close(); + } + public static function encode($data) { $protocol = Protocol::createFromProbe(0x02); From e8deecff2f4ed6ee597c20a9dff35cc39b6e56e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Wed, 30 May 2018 19:27:44 +0200 Subject: [PATCH 2/2] Automatically disconnect if Quassel core does not respond after ping --- README.md | 15 ++++++++------- src/Factory.php | 16 +++++++++++++--- tests/FactoryIntegrationTest.php | 22 ++++++++++++++++++++++ 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 5a6371d..5854988 100644 --- a/README.md +++ b/README.md @@ -126,13 +126,14 @@ This automatic "pong" mechanism allows the Quassel core to detect the connection to the client is still active. However, it does not allow the client to detect if the connection to the Quassel core is still active. Because of this, this project will automatically send a "ping" (heartbeat request) message to the -Quassel core if it did not receive any messages for 60s by default. You can pass -the `?ping=120.0` parameter to change this default interval. The Quassel core -uses a configurable ping interval of 30s by default and also sends all IRC -network state changes to the client, so this mechanism should only really kick -in if the connection looks dead. If you do not want this and want to handle -outgoing heartbeat request messages yourself, you may pass the optional `?ping=0` -parameter like this: +Quassel core if it did not receive any messages for 60s by default. If no +message has been received after waiting for another period, the connection is +assumed to be dead and will be closed. You can pass the `?ping=120.0` parameter +to change this default interval. The Quassel core uses a configurable ping +interval of 30s by default and also sends all IRC network state changes to the +client, so this mechanism should only really kick in if the connection looks +dead. If you do not want this and want to handle outgoing heartbeat request +messages yourself, you may pass the optional `?ping=0` parameter like this: ```php $factory->createClient('quassel://localhost?ping=0'); diff --git a/src/Factory.php b/src/Factory.php index 0669fc4..5d2d531 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -96,12 +96,21 @@ function (DuplexStreamInterface $stream) use (&$probe) { $pong = (!isset($args['pong']) || $args['pong']) ? true : false; if ($ping !== 0.0 || $pong) { $loop = $this->loop; + $await = false; $promise = $promise->then(function (Client $client) use ($loop, $ping, $pong) { $timer = null; if ($ping !== 0.0) { // send heartbeat message every X seconds to check dropped connection - $timer = $loop->addPeriodicTimer($ping, function () use ($client) { - $client->writeHeartBeatRequest(); + $timer = $loop->addPeriodicTimer($ping, function () use ($client, &$await) { + if ($await) { + $client->emit('error', array( + new \RuntimeException('Connection to Quassel core timed out') + )); + $client->close(); + } else { + $client->writeHeartBeatRequest(); + $await = true; + } }); // stop heartbeat timer once connection closes @@ -111,7 +120,7 @@ function (DuplexStreamInterface $stream) use (&$probe) { }); } - $client->on('data', function ($message) use ($client, $pong, &$timer, $loop) { + $client->on('data', function ($message) use ($client, $pong, &$timer, &$await, $loop) { // reply to incoming ping messages with pong if (isset($message[0]) && $message[0] === Protocol::REQUEST_HEARTBEAT && $pong) { $client->writeHeartBeatReply($message[1]); @@ -121,6 +130,7 @@ function (DuplexStreamInterface $stream) use (&$probe) { if ($timer !== null) { $loop->cancelTimer($timer); $timer = $loop->addPeriodicTimer($timer->getInterval(), $timer->getCallback()); + $await = false; } }); diff --git a/tests/FactoryIntegrationTest.php b/tests/FactoryIntegrationTest.php index f577bc3..2b4c39e 100644 --- a/tests/FactoryIntegrationTest.php +++ b/tests/FactoryIntegrationTest.php @@ -439,6 +439,28 @@ public function testCreateClientSendsNoHeartBeatRequestIfServerKeepsSendingMessa $client->close(); } + public function testCreateClientClosesWithErrorIfServerDoesNotRespondToHeartBeatRequests() + { + $loop = LoopFactory::create(); + $server = new Server(0, $loop); + + $server->on('connection', function (ConnectionInterface $conn) { + $conn->once('data', function () use ($conn) { + $conn->write("\x00\x00\x00\x02"); + }); + }); + + $uri = str_replace('tcp://', '', $server->getAddress()); + $factory = new Factory($loop); + $promise = $factory->createClient($uri . '?ping=0.03'); + + $client = Block\await($promise, $loop, 0.1); + + $client->on('error', $this->expectCallableOnce()); + $client->on('close', $this->expectCallableOnce()); + Block\sleep(0.1, $loop); + } + public function testCreateClientSendsNoHeartBeatRequestIfPingIsDisabled() { $loop = LoopFactory::create();