Skip to content

Commit

Permalink
feat: [PubSub] add Google Pub/Sub Schema Revision support
Browse files Browse the repository at this point in the history
  • Loading branch information
mrbiya committed Sep 19, 2023
1 parent a0ac411 commit f921d44
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/Connection/ConnectionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,21 @@ public function deleteSchema(array $args);
*/
public function validateSchema(array $args);

/**
* @param array $args
*/
public function listRevisions(array $args);

/**
* @param array $args
*/
public function commitSchema(array $args);

/**
* @param array $args
*/
public function deleteRevision(array $args);

/**
* @param array $args
*/
Expand Down
44 changes: 44 additions & 0 deletions src/Connection/Grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -761,4 +761,48 @@ protected function getSchemaClient()
$this->schemaClient = $this->constructGapic(SchemaServiceClient::class, $this->clientConfig);
return $this->schemaClient;
}

/**
* Retrieve schema revisions
*
* @param array $args
* @return array
*/
public function listRevisions(array $args)
{
return $this->send([$this->getSchemaClient(), 'listSchemaRevisions'], [
$this->pluck('name', $args),
$args,
]);
}

/**
* Create schema revisions
*
* @param array $args
* @return array
*/
public function commitSchema(array $args)
{
return $this->send([$this->getSchemaClient(), 'commitSchema'], [
$this->pluck('name', $args),
new Schema($this->pluck('schema', $args)),
$args,
]);
}

/**
* Delete schema revision
*
* @param array $args
* @return array
*/
public function deleteRevision(array $args)
{
return $this->send([$this->getSchemaClient(), 'deleteSchemaRevision'], [
$this->pluck('name', $args),
null,
$args,
]);
}
}
24 changes: 24 additions & 0 deletions src/Connection/Rest.php
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,30 @@ public function validateSchema(array $args)
return $this->send('schemas', 'validate', $args);
}

/**
* @param array $args
*/
public function listRevisions(array $args)
{
return $this->send('schemas', 'listRevisions', $args);
}

/**
* @param array $args
*/
public function commitSchema(array $args)
{
return $this->send('schemas', 'commit', $args);
}

/**
* @param array $args
*/
public function deleteRevision(array $args)
{
return $this->send('schemas', 'deleteRevision', $args);
}

/**
* @param array $args
*/
Expand Down
75 changes: 75 additions & 0 deletions src/Schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use Google\Cloud\Core\Exception\NotFoundException;
use Google\Cloud\PubSub\Connection\ConnectionInterface;
use Google\Cloud\PubSub\V1\Schema\Type;

/**
* Represents a Pub/Sub Schema resource.
Expand Down Expand Up @@ -104,6 +105,80 @@ public function delete(array $options = [])
] + $options);
}

/**
* Get list schema revisions.
*
* Example:
* ```
* $revisions = $schema->listRevisions();
* foreach ($revisions['schemas'] as $revision) {
* echo $revisions['definition'];
* }
* ```
* @see https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/listRevisions List revisions
* @param array $options [optional] Configuration Options
* @return array
*/
public function listRevisions(array $options = [])
{
return $this->connection->listRevisions([
'name' => $this->name,
] + $options);
}

/**
* Commit schema revision.
*
* Example:
* ```
* $definition = file_get_contents('my-schema.txt');
* $revision = $schema->commit($definition, 'AVRO);
*
* @see https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/commit Commit Schema revision.
* ```
*
* @param string $definition The definition of the schema. This should
* contain a string representing the full definition of the schema that
* is a valid schema definition of the type specified in `type`. See
* [Schema](https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas#Schema)
* for details.
* @param string $type The schema type. Allowed values are `AVRO` and `PROTOCOL_BUFFER`.
* @param array $options [optional] Configuration options
* @return array revision created
*/
public function commit($definition, $type, array $options = [])
{
return $this->connection->commitSchema([
'schema' => [
'definition' => $definition,
'type' => $type,
],
'name' => $this->name
] + $options);
}


/**
* Commit schema revision.
*
* Example:
* ```
* $definition = file_get_contents('my-schema.txt');
* $revision = $schema->commit($definition, 'AVRO);
*
* @see https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.schemas/deleteRevision Delete Schema revision.
* ```
*
* @param string $revisionId The revisionId
* @return array deleted revision
*/
public function deleteRevision($revisionId)
{
return $this->connection->deleteRevision([
'name' => $this->name .'@' . $revisionId
]);
}

/**
* Get schema information.
*
Expand Down
3 changes: 3 additions & 0 deletions tests/Unit/Connection/RestTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ public function methodProvider()
['deleteSchema'],
['validateSchema'],
['validateMessage'],
['listRevisions'],
['commitSchema'],
['deleteRevision'],
];
}
}
44 changes: 44 additions & 0 deletions tests/Unit/SchemaTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,48 @@ public function testExistsReturnsFalse()

$this->assertFalse($this->schema->exists());
}

public function testlistRevisions()
{
$this->connection
->listRevisions(['name' => self::NAME,])
->shouldBeCalledOnce()
->willReturn(['foo' => 'bar']);

$this->schema->___setProperty('connection', $this->connection->reveal());

$this->assertEquals('bar', $this->schema->listRevisions()['foo']);
}

public function testCommit()
{
$this->connection
->commitSchema(
[
'name' => self::NAME,
'schema' => [
'definition' => 'test',
'type' => 'AVRO',
],
]
)
->shouldBeCalledOnce()
->willReturn(['foo' => 'bar']);

$this->schema->___setProperty('connection', $this->connection->reveal());

$this->assertEquals('bar', $this->schema->commit('test', 'AVRO')['foo']);
}

public function testDeleteRevision()
{
$this->connection
->deleteRevision(['name' => self::NAME . '@1234567'])
->shouldBeCalledOnce()
->willReturn(['foo' => 'bar']);

$this->schema->___setProperty('connection', $this->connection->reveal());

$this->assertEquals('bar', $this->schema->deleteRevision('1234567')['foo']);
}
}

0 comments on commit f921d44

Please sign in to comment.