Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [PubSub] add Google Pub/Sub Schema Revision support #9

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/Connection/ConnectionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,21 @@ public function deleteSchema(array $args);
*/
public function validateSchema(array $args);

/**
* @param array $args
*/
public function listRevisions(array $args);

/**
* @param array $args
*/
public function commitSchema(array $args);

/**
* @param array $args
*/
public function deleteRevision(array $args);

/**
* @param array $args
*/
Expand Down
44 changes: 44 additions & 0 deletions src/Connection/Grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -761,4 +761,48 @@ protected function getSchemaClient()
$this->schemaClient = $this->constructGapic(SchemaServiceClient::class, $this->clientConfig);
return $this->schemaClient;
}

/**
* Retrieve schema revisions
*
* @param array $args
* @return array
*/
public function listRevisions(array $args)
{
return $this->send([$this->getSchemaClient(), 'listSchemaRevisions'], [
$this->pluck('name', $args),
$args,
]);
}

/**
* Create schema revisions
*
* @param array $args
* @return array
*/
public function commitSchema(array $args)
{
return $this->send([$this->getSchemaClient(), 'commitSchema'], [
$this->pluck('name', $args),
new Schema($this->pluck('schema', $args)),
$args,
]);
}

/**
* Delete schema revision
*
* @param array $args
* @return array
*/
public function deleteRevision(array $args)
{
return $this->send([$this->getSchemaClient(), 'deleteSchemaRevision'], [
$this->pluck('name', $args),
null,
$args,
]);
}
}
24 changes: 24 additions & 0 deletions src/Connection/Rest.php
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,30 @@ public function validateSchema(array $args)
return $this->send('schemas', 'validate', $args);
}

/**
* @param array $args
*/
public function listRevisions(array $args)
{
return $this->send('schemas', 'listRevisions', $args);
}

/**
* @param array $args
*/
public function commitSchema(array $args)
{
return $this->send('schemas', 'commit', $args);
}

/**
* @param array $args
*/
public function deleteRevision(array $args)
{
return $this->send('schemas', 'deleteRevision', $args);
}

/**
* @param array $args
*/
Expand Down
75 changes: 75 additions & 0 deletions src/Schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use Google\Cloud\Core\Exception\NotFoundException;
use Google\Cloud\PubSub\Connection\ConnectionInterface;
use Google\Cloud\PubSub\V1\Schema\Type;

/**
* Represents a Pub/Sub Schema resource.
Expand Down Expand Up @@ -104,6 +105,80 @@ public function delete(array $options = [])
] + $options);
}

/**
* Get list schema revisions.
*
* Example:
* ```
* $revisions = $schema->listRevisions();
* foreach ($revisions['schemas'] as $revision) {
* echo $revisions['definition'];
* }
* ```
* @see https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/listRevisions List revisions
* @param array $options [optional] Configuration Options
* @return array
*/
public function listRevisions(array $options = [])
{
return $this->connection->listRevisions([
'name' => $this->name,
] + $options);
}

/**
* Commit schema revision.
*
* Example:
* ```
* $definition = file_get_contents('my-schema.txt');
* $revision = $schema->commit($definition, 'AVRO);
*
* @see https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/commit Commit Schema revision.
* ```
*
* @param string $definition The definition of the schema. This should
* contain a string representing the full definition of the schema that
* is a valid schema definition of the type specified in `type`. See
* [Schema](https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas#Schema)
* for details.
* @param string $type The schema type. Allowed values are `AVRO` and `PROTOCOL_BUFFER`.
* @param array $options [optional] Configuration options
* @return array revision created
*/
public function commit($definition, $type, array $options = [])
{
return $this->connection->commitSchema([
'schema' => [
'definition' => $definition,
'type' => $type,
],
'name' => $this->name
] + $options);
}


/**
* Commit schema revision.
*
* Example:
* ```
* $definition = file_get_contents('my-schema.txt');
* $revision = $schema->commit($definition, 'AVRO);
*
* @see https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/deleteRevision Delete Schema revision.
* ```
*
* @param string $revisionId The revisionId
* @return array deleted revision
*/
public function deleteRevision($revisionId)
{
return $this->connection->deleteRevision([
'name' => $this->name .'@' . $revisionId
]);
}

/**
* Get schema information.
*
Expand Down
3 changes: 3 additions & 0 deletions tests/Unit/Connection/RestTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ public function methodProvider()
['deleteSchema'],
['validateSchema'],
['validateMessage'],
['listRevisions'],
['commitSchema'],
['deleteRevision'],
];
}
}
44 changes: 44 additions & 0 deletions tests/Unit/SchemaTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,48 @@ public function testExistsReturnsFalse()

$this->assertFalse($this->schema->exists());
}

public function testlistRevisions()
{
$this->connection
->listRevisions(['name' => self::NAME,])
->shouldBeCalledOnce()
->willReturn(['foo' => 'bar']);

$this->schema->___setProperty('connection', $this->connection->reveal());

$this->assertEquals('bar', $this->schema->listRevisions()['foo']);
}

public function testCommit()
{
$this->connection
->commitSchema(
[
'name' => self::NAME,
'schema' => [
'definition' => 'test',
'type' => 'AVRO',
],
]
)
->shouldBeCalledOnce()
->willReturn(['foo' => 'bar']);

$this->schema->___setProperty('connection', $this->connection->reveal());

$this->assertEquals('bar', $this->schema->commit('test', 'AVRO')['foo']);
}

public function testDeleteRevision()
{
$this->connection
->deleteRevision(['name' => self::NAME . '@1234567'])
->shouldBeCalledOnce()
->willReturn(['foo' => 'bar']);

$this->schema->___setProperty('connection', $this->connection->reveal());

$this->assertEquals('bar', $this->schema->deleteRevision('1234567')['foo']);
}
}