Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BC Break] Remove paging from Subscription::pull #375

Merged
merged 4 commits into from
Mar 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 19 additions & 28 deletions src/PubSub/Subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -331,42 +331,36 @@ public function reload(array $options = [])
* @param array $options [optional] {
* Configuration Options
*
* @type bool $returnImmediately If set, the system will respond
* @type bool $returnImmediately If true, the system will respond
* immediately, even if no messages are available. Otherwise,
* wait until new messages are available.
* @type int $maxMessages Limit the amount of messages pulled.
* wait until new messages are available. **Defaults to**
* `false`.
* @type int $maxMessages Limit the amount of messages pulled.
* **Defaults to** `1000`.
* }
* @codingStandardsIgnoreStart
* @return \Generator<Message>
* @codingStandardsIgnoreEnd
* @return Message[]
*/
public function pull(array $options = [])
{
$options['pageToken'] = null;
$messages = [];
$options['returnImmediately'] = isset($options['returnImmediately'])
? $options['returnImmediately']
: false;

$options['maxMessages'] = isset($options['maxMessages'])
? $options['maxMessages']
: self::MAX_MESSAGES;

do {
$response = $this->connection->pull($options + [
'subscription' => $this->name
]);
$response = $this->connection->pull($options + [
'subscription' => $this->name
]);

if (isset($response['receivedMessages'])) {
foreach ($response['receivedMessages'] as $message) {
yield $this->messageFactory($message, $this->connection, $this->projectId, $this->encode);
}
if (isset($response['receivedMessages'])) {
foreach ($response['receivedMessages'] as $message) {
$messages[] = $this->messageFactory($message, $this->connection, $this->projectId, $this->encode);
}
}

// If there's a page token, we'll request the next page.
$options['pageToken'] = isset($response['nextPageToken'])
? $response['nextPageToken']
: null;
} while ($options['pageToken']);
return $messages;
}

/**
Expand All @@ -378,9 +372,8 @@ public function pull(array $options = [])
* Example:
* ```
* $messages = $subscription->pull();
* $messagesArray = iterator_to_array($messages);
*
* $subscription->acknowledge($messagesArray[0]);
* $subscription->acknowledge($messages[0]);
* ```
*
* @codingStandardsIgnoreStart
Expand All @@ -405,9 +398,8 @@ public function acknowledge(Message $message, array $options = [])
* Example:
* ```
* $messages = $subscription->pull();
* $messagesArray = iterator_to_array($messages);
*
* $subscription->acknowledgeBatch($messagesArray);
* $subscription->acknowledgeBatch($messages);
* ```
*
* @codingStandardsIgnoreStart
Expand Down Expand Up @@ -477,16 +469,15 @@ public function modifyAckDeadline(Message $message, $seconds, array $options = [
* Example:
* ```
* $messages = $subscription->pull();
* $messagesArray = iterator_to_array($messages);
*
* // Set the ack deadline to three seconds from now for every message
* $subscription->modifyAckDeadlineBatch($messagesArray, 3);
* $subscription->modifyAckDeadlineBatch($messages, 3);
*
* // Delay execution, or make a sandwich or something.
* sleep(2);
*
* // Now we'll acknowledge
* $subscription->acknowledgeBatch($messagesArray);
* $subscription->acknowledgeBatch($messages);
* ```
*
* @codingStandardsIgnoreStart
Expand Down
2 changes: 1 addition & 1 deletion tests/snippets/PubSub/MessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public function testClass()
);

$res = $snippet->invoke('messages');
$this->assertInstanceOf(\Generator::class, $res->returnVal());
$this->assertContainsOnlyInstancesOf(Message::class, $res->returnVal());
$this->assertEquals('hello world', $res->output());
}

Expand Down
3 changes: 2 additions & 1 deletion tests/snippets/PubSub/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Google\Cloud\Dev\Snippet\SnippetTestCase;
use Google\Cloud\Iam\Iam;
use Google\Cloud\PubSub\Connection\ConnectionInterface;
use Google\Cloud\PubSub\Message;
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\Subscription;
use Prophecy\Argument;
Expand Down Expand Up @@ -169,7 +170,7 @@ public function testPull()
$this->subscription->setConnection($this->connection->reveal());

$res = $snippet->invoke('messages');
$this->assertInstanceOf(\Generator::class, $res->returnVal());
$this->assertContainsOnlyInstancesOf(Message::class, $res->returnVal());

This comment was marked as spam.

$this->assertEquals('hello world', $res->output());
}

Expand Down
4 changes: 2 additions & 2 deletions tests/system/PubSub/PublishAndPullTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public function testPublishMessageAndPull($client)
];
$topic->publish($message);

$messages = iterator_to_array($sub->pull());
$messages = $sub->pull();
$sub->modifyAckDeadline($messages[0], 15);
$sub->acknowledge($messages[0]);

Expand Down Expand Up @@ -79,7 +79,7 @@ public function testPublishMessagesAndPull($client)

$topic->publishBatch($messages);

$actualMessages = iterator_to_array($sub->pull());
$actualMessages = $sub->pull();
$sub->modifyAckDeadlineBatch($actualMessages, 15);
$sub->acknowledgeBatch($actualMessages);

Expand Down
61 changes: 6 additions & 55 deletions tests/unit/PubSub/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,9 @@ public function testPull()
'foo' => 'bar'
]);

$this->assertInstanceOf(Generator::class, $result);

$arr = iterator_to_array($result);
$this->assertInstanceOf(Message::class, $arr[0]);
$this->assertInstanceOf(Message::class, $arr[1]);
$this->assertContainsOnlyInstancesOf(Message::class, $result);
$this->assertInstanceOf(Message::class, $result[0]);
$this->assertInstanceOf(Message::class, $result[1]);
}

public function testPullWithCustomArgs()
Expand Down Expand Up @@ -235,56 +233,9 @@ public function testPullWithCustomArgs()
'maxMessages' => 2
]);

$this->assertInstanceOf(Generator::class, $result);

$arr = iterator_to_array($result);
$this->assertInstanceOf(Message::class, $arr[0]);
$this->assertInstanceOf(Message::class, $arr[1]);
}

public function testPullPaged()
{
$messages = [
'receivedMessages' => [
[
'message' => []
], [
'message' => []
]
],
'nextPageToken' => 'foo'
];

$this->connection->pull(Argument::that(function ($args) {
if ($args['foo'] !== 'bar') return false;
if ($args['returnImmediately'] !== true) return false;
if ($args['maxMessages'] !== 2) return false;
if (!in_array($args['pageToken'], [null, 'foo'])) return false;

return true;
}))->willReturn($messages)
->shouldBeCalledTimes(3);

$this->subscription->setConnection($this->connection->reveal());

$result = $this->subscription->pull([
'foo' => 'bar',
'returnImmediately' => true,
'maxMessages' => 2
]);

$this->assertInstanceOf(Generator::class, $result);

// enumerate the iterator and kill after it loops twice.
$arr = [];
$i = 0;
foreach ($result as $message) {
$i++;
$arr[] = $message;
if ($i == 6) break;
}

$this->assertEquals(6, count($arr));
$this->assertContainsOnlyInstancesOf(Message::class, $result);
$this->assertInstanceOf(Message::class, $result[0]);
$this->assertInstanceOf(Message::class, $result[1]);
}

public function testAcknowledge()
Expand Down