Skip to content

Commit

Permalink
Merge pull request #39 from clue-labs/ping
Browse files Browse the repository at this point in the history
 Automatically send heartbeat requests (ping) unless given "?ping=0"
  • Loading branch information
clue authored May 30, 2018
2 parents cf30a3d + e8deecf commit f1b2120
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 52 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@ optional `?pong=0` parameter like this:
$factory->createClient('quassel://localhost?pong=0');
```

This automatic "pong" mechanism allows the Quassel core to detect the connection
to the client is still active. However, it does not allow the client to detect
if the connection to the Quassel core is still active. Because of this, this
project will automatically send a "ping" (heartbeat request) message to the
Quassel core if it did not receive any messages for 60s by default. If no
message has been received after waiting for another period, the connection is
assumed to be dead and will be closed. You can pass the `?ping=120.0` parameter
to change this default interval. The Quassel core uses a configurable ping
interval of 30s by default and also sends all IRC network state changes to the
client, so this mechanism should only really kick in if the connection looks
dead. If you do not want this and want to handle outgoing heartbeat request
messages yourself, you may pass the optional `?ping=0` parameter like this:

```php
$factory->createClient('quassel://localhost?ping=0');
```

> This method uses Quassel IRC's probing mechanism for the correct protocol to
use (newer "datastream" protocol or original "legacy" protocol).
Protocol handling will be abstracted away for you, so you don't have to
Expand Down
14 changes: 2 additions & 12 deletions examples/02-chatbot.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,14 @@

$uri = rawurlencode($user) . ':' . rawurlencode($pass) . '@' . $host;

$factory->createClient($uri)->then(function (Client $client) use ($loop, $keyword) {
$factory->createClient($uri)->then(function (Client $client) use ($keyword) {
var_dump('CONNECTED');

$client->on('data', function ($message) use ($client, $keyword, $loop) {
$client->on('data', function ($message) use ($client, $keyword) {
// session initialized
if (isset($message['MsgType']) && $message['MsgType']=== 'SessionInit') {
var_dump('session initialized, now waiting for incoming messages');

// send heartbeat message every 30s to check dropped connection
$timer = $loop->addPeriodicTimer(30.0, function () use ($client) {
$client->writeHeartBeatRequest();
});

// stop heartbeat timer once connection closes
$client->on('close', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
});

return;
}

Expand Down
14 changes: 2 additions & 12 deletions examples/03-pingbot.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

$uri = rawurlencode($user) . ':' . rawurlencode($pass) . '@' . $host;

$factory->createClient($uri)->then(function (Client $client) use ($loop) {
$factory->createClient($uri)->then(function (Client $client) {
var_dump('CONNECTED');

$nicks = array();

$client->on('data', function ($message) use ($client, &$nicks, $loop) {
$client->on('data', function ($message) use ($client, &$nicks) {
// session initialized => initialize all networks
if (isset($message['MsgType']) && $message['MsgType'] === 'SessionInit') {
var_dump('session initialized, now waiting for incoming messages');
Expand All @@ -37,16 +37,6 @@
$client->writeInitRequest("Network", $nid);
}

// send heartbeat message every 30s to check dropped connection
$timer = $loop->addPeriodicTimer(30.0, function () use ($client) {
$client->writeHeartBeatRequest();
});

// stop heartbeat timer once connection closes
$client->on('close', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
});

return;
}

Expand Down
14 changes: 2 additions & 12 deletions examples/04-connect.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

$uri = rawurlencode($user) . ':' . rawurlencode($pass) . '@' . $host;

$factory->createClient($uri)->then(function (Client $client) use ($loop) {
$factory->createClient($uri)->then(function (Client $client) {
var_dump('CONNECTED');

$client->on('data', function ($message) use ($client, $loop) {
$client->on('data', function ($message) use ($client) {
// session initialized => initialize all networks and buffers
if (isset($message['MsgType']) && $message['MsgType'] === 'SessionInit') {
var_dump('session initialized');
Expand All @@ -42,16 +42,6 @@
}
}

// send heartbeat message every 30s to check dropped connection
$timer = $loop->addPeriodicTimer(30.0, function () use ($client) {
$client->writeHeartBeatRequest();
});

// stop heartbeat timer once connection closes
$client->on('close', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
});

var_dump('initialization completed, now waiting for incoming messages (assuming core receives any)');

return;
Expand Down
14 changes: 2 additions & 12 deletions examples/11-debug.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
$factory = new Factory($loop);

echo '[1/5] Connecting' . PHP_EOL;
$factory->createClient($host)->then(function (Client $client) use ($loop, $user) {
$factory->createClient($host)->then(function (Client $client) use ($user) {
echo '[2/5] Connected, now initializing' . PHP_EOL;
$client->writeClientInit();

$client->on('data', function ($message) use ($client, $user, $loop) {
$client->on('data', function ($message) use ($client, $user) {
if (isset($message[3]['IrcUsersAndChannels'])) {
// print network information except for huge users/channels list
$debug = $message;
Expand Down Expand Up @@ -96,16 +96,6 @@
}
}

// send heartbeat message every 30s to check dropped connection
$timer = $loop->addPeriodicTimer(30.0, function () use ($client) {
$client->writeHeartBeatRequest();
});

// stop heartbeat timer once connection closes
$client->on('close', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
});

return;
}
});
Expand Down
43 changes: 39 additions & 4 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,48 @@ function (DuplexStreamInterface $stream) use (&$probe) {
});
}

// automatically send ping requests and await pong replies unless "?ping=0" is given
// automatically reply to incoming ping requests with a pong unless "?pong=0" is given
if (!isset($args['pong']) || $args['pong']) {
$promise = $promise->then(function (Client $client) {
$client->on('data', function ($message) use ($client) {
if (isset($message[0]) && $message[0] === Protocol::REQUEST_HEARTBEAT) {
$ping = (!isset($args['ping'])) ? 60 : (float)$args['ping'];
$pong = (!isset($args['pong']) || $args['pong']) ? true : false;
if ($ping !== 0.0 || $pong) {
$loop = $this->loop;
$await = false;
$promise = $promise->then(function (Client $client) use ($loop, $ping, $pong) {
$timer = null;
if ($ping !== 0.0) {
// send heartbeat message every X seconds to check dropped connection
$timer = $loop->addPeriodicTimer($ping, function () use ($client, &$await) {
if ($await) {
$client->emit('error', array(
new \RuntimeException('Connection to Quassel core timed out')
));
$client->close();
} else {
$client->writeHeartBeatRequest();
$await = true;
}
});

// stop heartbeat timer once connection closes
$client->on('close', function () use ($loop, &$timer) {
$loop->cancelTimer($timer);
$timer = null;
});
}

$client->on('data', function ($message) use ($client, $pong, &$timer, &$await, $loop) {
// reply to incoming ping messages with pong
if (isset($message[0]) && $message[0] === Protocol::REQUEST_HEARTBEAT && $pong) {
$client->writeHeartBeatReply($message[1]);
}

// restart heartbeat timer once data comes in
if ($timer !== null) {
$loop->cancelTimer($timer);
$timer = $loop->addPeriodicTimer($timer->getInterval(), $timer->getCallback());
$await = false;
}
});

return $client;
Expand Down
113 changes: 113 additions & 0 deletions tests/FactoryIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,119 @@ public function testCreateClientDoesNotRespondWithHeartBeatResponseIfPongIsDisab
$client->close();
}

public function testCreateClientSendsHeartBeatRequestAtInterval()
{
$loop = LoopFactory::create();
$server = new Server(0, $loop);

// expect heartbeat response packet
$data = $this->expectCallableOnceWith($this->callback(function ($packet) {
$protocol = Protocol::createFromProbe(0x02);
$data = $protocol->parseVariantPacket(substr($packet, 4));

return (isset($data[0]) && $data[0] === Protocol::REQUEST_HEARTBEAT);
}));

$server->on('connection', function (ConnectionInterface $conn) use ($data) {
$conn->once('data', function () use ($conn, $data) {
$conn->write("\x00\x00\x00\x02");

// expect heartbeat request next
$conn->on('data', $data);
});
});

$uri = str_replace('tcp://', '', $server->getAddress());
$factory = new Factory($loop);
$promise = $factory->createClient($uri . '?ping=0.05');

Block\sleep(0.1, $loop);

$client = Block\await($promise, $loop);
$client->close();
}

public function testCreateClientSendsNoHeartBeatRequestIfServerKeepsSendingMessages()
{
$loop = LoopFactory::create();
$server = new Server(0, $loop);

// expect heartbeat response packet
$data = $this->expectCallableNever();

$server->on('connection', function (ConnectionInterface $conn) use ($data, $loop) {
$conn->once('data', function () use ($conn, $data, $loop) {
$conn->write("\x00\x00\x00\x02");

// expect no heartbeat request
$conn->on('data', $data);

// periodically send some dummy messages
$loop->addPeriodicTimer(0.01, function() use ($conn) {
$conn->write(FactoryIntegrationTest::encode(array(0)));
});
});
});

$uri = str_replace('tcp://', '', $server->getAddress());
$factory = new Factory($loop);
$promise = $factory->createClient($uri . '?ping=0.05&pong=0');

Block\sleep(0.1, $loop);

$client = Block\await($promise, $loop);
$client->close();
}

public function testCreateClientClosesWithErrorIfServerDoesNotRespondToHeartBeatRequests()
{
$loop = LoopFactory::create();
$server = new Server(0, $loop);

$server->on('connection', function (ConnectionInterface $conn) {
$conn->once('data', function () use ($conn) {
$conn->write("\x00\x00\x00\x02");
});
});

$uri = str_replace('tcp://', '', $server->getAddress());
$factory = new Factory($loop);
$promise = $factory->createClient($uri . '?ping=0.03');

$client = Block\await($promise, $loop, 0.1);

$client->on('error', $this->expectCallableOnce());
$client->on('close', $this->expectCallableOnce());
Block\sleep(0.1, $loop);
}

public function testCreateClientSendsNoHeartBeatRequestIfPingIsDisabled()
{
$loop = LoopFactory::create();
$server = new Server(0, $loop);

// expect heartbeat response packet
$data = $this->expectCallableNever();

$server->on('connection', function (ConnectionInterface $conn) use ($data) {
$conn->once('data', function () use ($conn, $data) {
$conn->write("\x00\x00\x00\x02");

// expect no heartbeat request
$conn->on('data', $data);
});
});

$uri = str_replace('tcp://', '', $server->getAddress());
$factory = new Factory($loop);
$promise = $factory->createClient($uri . '?ping=0');

Block\sleep(0.1, $loop);

$client = Block\await($promise, $loop);
$client->close();
}

public static function encode($data)
{
$protocol = Protocol::createFromProbe(0x02);
Expand Down

0 comments on commit f1b2120

Please sign in to comment.