Skip to content

Commit

Permalink
Merge pull request #42 from clue-labs/backlog
Browse files Browse the repository at this point in the history
Update writeBufferRequestBacklog() parameters and add writeBufferRequestBacklogAll()
  • Loading branch information
clue authored Jun 4, 2018
2 parents 1ed5ca5 + a4af181 commit 6fa93ac
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 6 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ $client->writeClientLogin($user, $password);
$client->writeHeartBeatRequest($time);
$client->writeHeartBeatReply($time);

$client->writeBufferRequestBacklog($bufferId, $maxAmount);
$client->writeBufferRequestBacklog($bufferId, $messageIdFirst, $messageIdLast, $maxAmount, $additional);
$client->writeBufferRequestBacklogAll($messageIdFirst, $messageIdLast, $maxAmount, $additional);
$client->writeBufferInput($bufferInfo, $input);

// many more…
Expand Down
95 changes: 95 additions & 0 deletions examples/05-backlog.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
<?php

use Clue\React\Quassel\Factory;
use Clue\React\Quassel\Client;
use Clue\React\Quassel\Io\Protocol;
use Clue\React\Quassel\Models\BufferInfo;
use Clue\React\Quassel\Models\Message;

require __DIR__ . '/../vendor/autoload.php';

$host = '127.0.0.1';
if (isset($argv[1])) { $host = $argv[1]; }

echo 'Server: ' . $host . PHP_EOL;

echo 'User name: ';
$user = trim(fgets(STDIN));

echo 'Password: ';
$pass = trim(fgets(STDIN));

echo 'Channel to export (empty=all): ';
$channel = trim(fgets(STDIN));

$loop = \React\EventLoop\Factory::create();
$factory = new Factory($loop);

$uri = rawurlencode($user) . ':' . rawurlencode($pass) . '@' . $host;

$factory->createClient($uri)->then(function (Client $client) use ($channel) {
$client->on('data', function ($message) use ($client, $channel) {
if (isset($message['MsgType']) && $message['MsgType'] === 'SessionInit') {
// session initialized => search channel ID for given channel name
$id = null;
foreach ($message['SessionState']['BufferInfos'] as $buffer) {
assert($buffer instanceof BufferInfo);
$combined = $buffer->getNetworkId() . '/' . $buffer->getName();
if (($channel !== '' && $channel === $buffer->getName()) || $channel === (string)$buffer->getId() || $channel === $combined) {
$id = $buffer->getId();
}
}

// list all channels if channel could not be found
if ($id === null && $channel !== '') {
echo 'Error: Could not find the given channel, see full list: ' . PHP_EOL;
var_dump($message['SessionState']['BufferInfos']) . PHP_EOL;
return $client->close();
}

// otherwise request backlog of last N messages
if ($id === null) {
$client->writeBufferRequestBacklogAll(-1, -1, 100, 0);
} else {
$client->writeBufferRequestBacklog($id, -1, -1, 100, 0);
}
return;
}

// print backlog and exit
if (isset($message[0]) && $message[0] === Protocol::REQUEST_SYNC && $message[1] === 'BacklogManager') {
// message for one buffer will be at index 9, for all buffers at index 8
$messages = isset($message[9]) ? $message[9] : $message[8];

foreach (array_reverse($messages) as $in) {
assert($in instanceof Message);

echo json_encode(
array(
'id' => $in->getId(),
'date' => date(\DATE_ATOM, $in->getTimestamp()),
'channel' => $in->getBufferInfo()->getName(),
'sender' => explode('!', $in->getSender())[0],
'contents' => $in->getContents()
),
JSON_UNESCAPED_SLASHES|JSON_UNESCAPED_UNICODE
) . PHP_EOL;
}

echo 'DONE (' . count($messages) . ' messages in backlog)' . PHP_EOL;
$client->end();
return;
}

echo 'received unexpected: ' . json_encode($message, JSON_PRETTY_PRINT) . PHP_EOL;
});

$client->on('error', 'printf');
$client->on('close', function () {
echo 'Connection closed' . PHP_EOL;
});
})->then(null, function ($e) {
echo $e;
});

$loop->run();
77 changes: 72 additions & 5 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,85 @@ public function writeBufferInput(BufferInfo $bufferInfo, $contents)
));
}

public function writeBufferRequestBacklog($bufferId, $maxAmount, $messageIdFirst = -1, $messageIdLast = -1)
/**
* Sends a backlog request for the given buffer/channel
*
* If you want to fetch the newest 20 messages for a channel, you can simply
* pass the correct buffer ID, a $maxAmount of 20 and leave the other
* parameters unset. This will respond with a message that contains the last
* 20 messages (if any) where the newest message is the first element in the
* array of messages.
*
* ```php
* $client->writeBufferRequestBacklog($id, -1, -1, 20, 0);
* ```
*
* If you want to fetch the next 20 older messages for this channel, you
* can simply pick the message ID of the oldested (and thus last) message
* in this array and pass this to this method as `$messageIdLast`.
*
* ```php
* $oldest = end($messages)->getId();
* $client->writeBufferRequestBacklog($id, -1, $oldest, 20, 0);
* ```
*
* If you want to poll the channel for new messages, you can simply pick the
* message ID of the newest (and thus first) message in the previous array
* and pass this ID to this method as `$messageIdFirst`. This will return
* the last 20 messages (if any) and will include the given message ID as
* the last element in the array of messages if no more than 20 new messages
* arrived in the meantime. If no new messages are available, this array
* will contain the given message ID as the only entry.
*
* ```php
* $newest = reset($messages)->getId();
* $client->writeBufferRequestBacklog($id, $newest, -1, 20, 0);
* ```
*
* @param int $bufferId buffer/channel to fetch backlog from
* @param int $messageIdFirst optional, only fetch messages newer than this ID, -1=no limit
* @param int $messageIdLast optional, only fetch messages older than this ID, -1=no limit
* @param int $maxAmount maximum number of messages to fetch at once, -1=no limit
* @param int $additional number of additional messages to fetch, 0=none, -1=no limit
* @return bool
* @see self::writeBufferRequestBacklogAll()
*/
public function writeBufferRequestBacklog($bufferId, $messageIdFirst, $messageIdLast, $maxAmount, $additional)
{
return $this->write(array(
Protocol::REQUEST_SYNC,
"BacklogManager",
"",
"requestBacklog",
new QVariant($bufferId, 'BufferId'),
new QVariant($messageIdFirst, 'MsgId'),
new QVariant($messageIdLast, 'MsgId'),
new QVariant((int)$bufferId, 'BufferId'),
new QVariant((int)$messageIdFirst, 'MsgId'),
new QVariant((int)$messageIdLast, 'MsgId'),
(int)$maxAmount,
(int)$additional
));
}

/**
* Sends a backlog request for all messages in all channels
*
* @param int $messageIdFirst
* @param int $messageIdLast
* @param int $maxAmount
* @param int $additional
* @return bool
* @see self::writeBufferRequestBacklog() for parameter description
*/
public function writeBufferRequestBacklogAll($messageIdFirst, $messageIdLast, $maxAmount, $additional)
{
return $this->write(array(
Protocol::REQUEST_SYNC,
"BacklogManager",
"",
"requestBacklogAll",
new QVariant((int)$messageIdFirst, 'MsgId'),
new QVariant((int)$messageIdLast, 'MsgId'),
(int)$maxAmount,
0
(int)$additional
));
}

Expand Down
66 changes: 66 additions & 0 deletions tests/FunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
use Clue\React\Quassel\Client;
use React\Promise\Promise;
use Clue\React\Quassel\Io\Protocol;
use Clue\React\Quassel\Models\BufferInfo;
use Clue\React\Quassel\Models\Message;

class FunctionalTest extends TestCase
{
Expand Down Expand Up @@ -194,6 +196,70 @@ public function testCreateClientWithAuthUrlReceivesSessionInit()
$client->close();
}

public function testRequestBacklogReceivesBacklog()
{
$factory = new Factory(self::$loop);

$url = rawurlencode(self::$username) . ':' . rawurlencode(self::$password) . '@' . self::$host;
$promise = $factory->createClient($url);
$client = Block\await($promise, self::$loop, 10.0);
/* @var $client Client */

$message = $this->awaitMessage($client);
$this->assertEquals('SessionInit', $message['MsgType']);

// try to pick first buffer
$buffer = reset($message['SessionState']['BufferInfos']);
if ($buffer === false) {
$client->close();
$this->markTestSkipped('Empty quassel core with no buffers?');
}

// fetch newest messages for this buffer
$this->assertTrue($buffer instanceof BufferInfo);
$client->writeBufferRequestBacklog($buffer->getId(), -1, -1, $maximum = 2, 0);

$received = $this->awaitMessage($client);
$this->assertTrue(isset($received[0]));
$this->assertSame(1, $received[0]);
$this->assertSame('BacklogManager', $received[1]);
$this->assertSame('receiveBacklog', $received[3]);
$this->assertSame($maximum, $received[7]);
$this->assertTrue(is_array($received[9]));
$this->assertLessThanOrEqual($maximum, count($received[9]));

// try to pick newest message
$newest = reset($received[9]);
if ($newest === false) {
$client->close();
$this->markTestSkipped('No messages in first buffer?');
}

// poll for newer messages in all channels
$this->assertTrue($newest instanceof Message);
$client->writeBufferRequestBacklogAll($newest->getId(), -1, $maximum, 0);

$received = $this->awaitMessage($client);
$this->assertTrue(isset($received[0]));
$this->assertSame(1, $received[0]);
$this->assertSame('BacklogManager', $received[1]);
$this->assertSame('receiveBacklogAll', $received[3]);
$this->assertSame($maximum, $received[6]);
$this->assertTrue(is_array($received[8]));
$this->assertLessThanOrEqual($maximum, count($received[8]));

// try to pick newest message
$newest = reset($received[8]);
if ($newest === false) {
$client->close();
$this->markTestSkipped('No newer messages found?');
}

$this->assertTrue($newest instanceof Message);

$client->close();
}

/**
* @expectedException RuntimeException
*/
Expand Down

0 comments on commit 6fa93ac

Please sign in to comment.