Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for ordering keys #2624

Merged
merged 3 commits into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 60 additions & 6 deletions PubSub/src/BatchPublisher.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class BatchPublisher
*/
private $topicName;

/**
* @var PubSubClient
*/
private $client;

/**
* @param string $topicName The topic name.
* @param array $options [optional] Please see
Expand All @@ -68,7 +73,7 @@ public function __construct($topicName, array $options = [])
$this->topicName = $topicName;
$this->setCommonBatchProperties($options + [
'identifier' => sprintf(self::ID_TEMPLATE, $topicName),
'batchMethod' => 'publishBatch'
'batchMethod' => 'publishDeferred'
]);
}

Expand All @@ -82,11 +87,17 @@ public function __construct($topicName, array $options = [])
* ]);
* ```
*
* @param array $message [Message Format](https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage).
* @param Message|array $message An instance of
* {@see Google\Cloud\PubSub\Message}, or an array in the correct
* [Message Format](https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage).
* @return bool
*/
public function publish(array $message)
public function publish($message)
{
$message = $message instanceof Message
? $message->toArray()
: $message;

return $this->batchRunner->submitItem($this->identifier, $message);
}

Expand All @@ -98,11 +109,54 @@ public function publish(array $message)
*/
protected function getCallback()
{
return [$this, $this->batchMethod];
}

/**
* Publish a set of deferred messages, sorted into multiple calls by ordering key.
*
* Intended for internal use only by the batch publisher.
*
* @see https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics/publish Publish Message
*
* @param array[] $messages A list of messages. Each message must be in the correct
* [Message Format](https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage).
* @param array $options [optional] Configuration Options
* @return array A list of message IDs.
* @internal
* @access private
*/
public function publishDeferred(array $messages, array $options = [])
{
$calls = [];
foreach ($messages as $message) {
$key = isset($message['orderingKey'])
? $message['orderingKey']
: '';

if (!isset($calls[$key])) {
$calls[$key] = [];
}

$calls[$key][] = $message;
}

if (!array_key_exists($this->topicName, self::$topics)) {
$client = new PubSubClient($this->getUnwrappedClientConfig());
self::$topics[$this->topicName] = $client->topic($this->topicName);
if (!$this->client) {
//@codeCoverageIgnoreStart
$this->client = new PubSubClient($this->getUnwrappedClientConfig());
//@codeCoverageIgnoreEnd
}
self::$topics[$this->topicName] = $this->client->topic($this->topicName);
}

$topic = self::$topics[$this->topicName];

$res = [];
foreach ($calls as $call) {
$res = array_merge($res, $topic->publishBatch($call, $options));
}

return [self::$topics[$this->topicName], $this->batchMethod];
return $res;
}
}
2 changes: 2 additions & 0 deletions PubSub/src/Connection/Grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Grpc implements ConnectionInterface
*/
public function __construct(array $config = [])
{
//@codeCoverageIgnoreStart
$this->serializer = new Serializer([
'publish_time' => function ($v) {
return $this->formatTimestampFromApi($v);
Expand Down Expand Up @@ -98,6 +99,7 @@ public function __construct(array $config = [])
if ((bool) $config['emulatorHost']) {
$grpcConfig += $this->emulatorGapicConfig($config['emulatorHost']);
}
//@codeCoverageIgnoreEnd

$this->publisherClient = $this->constructGapic(PublisherClient::class, $grpcConfig);
$this->subscriberClient = $this->constructGapic(SubscriberClient::class, $grpcConfig);
Expand Down
43 changes: 41 additions & 2 deletions PubSub/src/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

namespace Google\Cloud\PubSub;

use Google\Cloud\Core\ArrayTrait;

/**
* Represents a PubSub Message.
*
Expand All @@ -35,6 +37,8 @@
*/
class Message
{
use ArrayTrait;

/**
* @var array
*/
Expand Down Expand Up @@ -64,8 +68,9 @@ class Message
* @type string $publishTime The time at which the message was
* published, populated by the server when it receives the publish
* call.
* @type string $orderingKey The message ordering key.
* }
* @param array $metadata {
* @param array $metadata [optional] {
* Message metadata
*
* @type string $ackId The message ackId. This is only set when messages
Expand All @@ -74,13 +79,14 @@ class Message
* obtained from. This is only set when messages are delivered by
* pushDelivery.
*/
public function __construct(array $message, array $metadata)
public function __construct(array $message, array $metadata = [])
{
$this->message = $message + [
'data' => null,
'messageId' => null,
'publishTime' => null,
'attributes' => [],
'orderingKey' => null
];

$metadata += [
Expand Down Expand Up @@ -158,6 +164,21 @@ public function id()
return $this->message['messageId'];
}

/**
* Get the message ordering key.
*
* Example:
* ```
* $orderingKey = $message->orderingKey();
* ```
*
* @return string|null
*/
public function orderingKey()
{
return $this->message['orderingKey'];
}

/**
* Get the message published time.
*
Expand Down Expand Up @@ -230,4 +251,22 @@ public function info()
'message' => $this->message
];
}

/**
* Get the message as an array.
*
* @return array
* @internal
*/
public function toArray()
{
$message = $this->arrayFilterRemoveNull($this->message);

// force json-encode to empty map instead of empty list.
if (isset($message['attributes']) && empty($message['attributes'])) {
$message['attributes'] = (object) [];
}

return $message;
}
}
182 changes: 182 additions & 0 deletions PubSub/src/MessageBuilder.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
<?php
/**
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Google\Cloud\PubSub;

use Google\Cloud\Core\ArrayTrait;

/**
* Builds a PubSub Message.
*
* This class may be used to build an API-compliant message for publication.
*
* This class is immutable and each build method returns a new instance with
* your changes applied.
*
* Note that messages are invalid unless they include a data field or at least
* one attribute. Both may be provided, but omission of both will result in an
* error.
*
* Example:
* ```
* use Google\Cloud\PubSub\MessageBuilder;
* use Google\Cloud\PubSub\PubSubClient;
*
* $client = new PubSubClient();
* $topic = $client->topic($topicId);
*
* $builder = new MessageBuilder();
* $builder = $builder->setData('hello friend!')
* ->addAttribute('from', 'Bob')
* ->addAttribute('to', 'Jane');
*
* $topic->publish($builder->build());
* ```
*/
class MessageBuilder
{
use ArrayTrait;

/**
* @var array
*/
private $message;

/**
* @param array $message The initial message data.
*/
public function __construct(array $message = [])
{
$this->message = $message;
}

/**
* Set the message data.
*
* Do not base64-encode the value; If it must be encoded, the client will
* do it on your behalf.
*
* Example:
* ```
* $builder = $builder->setData('Hello friend!');
* ```
*
* @param string $data The message data field.
* @return MessageBuilder
*/
public function setData($data)
{
return $this->newMessage([
'data' => $data
]);
}

/**
* Set optional attributes for this message.
*
* Example:
* ```
* $builder = $builder->setAttributes([
* 'from' => 'Bob'
* ]);
* ```
*
* @param array $attributes A set of key/value pairs, where both key and
* value are strings.
* @return MessageBuilder
*/
public function setAttributes(array $attributes)
{
return $this->newMessage([
'attributes' => $attributes
]);
}

/**
* Add a single attribute to the message.
*
* Example:
* ```
* $builder = $builder->addAttribute('to', 'Jane');
* ```
*
* @param string $key The attribute key.
* @param string $value The attribute value.
* @return MessageBuilder
*/
public function addAttribute($key, $value)
{
$attributes = [];
if (isset($this->message['attributes'])) {
$attributes = $this->message['attributes'];
}

$attributes[$key] = $value;

return $this->newMessage([
'attributes' => $attributes
]);
}

/**
* Set the message's ordering key.
*
* Example:
* ```
* $builder = $builder->setOrderingKey('order');
* ```
*
* @param string $orderingKey The ordering key.
* @return MessageBuilder
*/
public function setOrderingKey($orderingKey)
{
return $this->newMessage([
'orderingKey' => $orderingKey
]);
}

/**
* Build a message.
*
* Example:
* ```
* $message = $builder->build();
* ```
*
* @return Message
* @throws \BadMethodCallException If required data is missing.
*/
public function build()
{
$hasAttributes = isset($this->message['attributes']) && $this->message['attributes'];
if (!isset($this->message['data']) && !$hasAttributes) {
throw new \BadMethodCallException(
'Messages must contain either a non-empty data field or at least one attribute.'
);
}

return new Message($this->message);
}

private function newMessage(array $data)
{
$data = $this->arrayMergeRecursive($this->message, $data);

return new static($data);
}
}
Loading