Skip to content

Commit

Permalink
feat: support excludeTxnFromChangeStreams option (#7749)
Browse files Browse the repository at this point in the history
  • Loading branch information
bshaffer authored Nov 6, 2024
1 parent 7d368c2 commit b369460
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 24 deletions.
7 changes: 7 additions & 0 deletions Spanner/src/Connection/Grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,13 @@ public function beginTransaction(array $args)
$args = $this->addLarHeader($args, $this->larEnabled);
}

// NOTE: if set for read-only actions, will throw exception
if (isset($transactionOptions['excludeTxnFromChangeStreams'])) {
$options->setExcludeTxnFromChangeStreams(
$transactionOptions['excludeTxnFromChangeStreams']
);
}

$requestOptions = $this->pluck('requestOptions', $args, false) ?: [];
if ($requestOptions) {
$args['requestOptions'] = $this->serializer->decodeMessage(
Expand Down
26 changes: 15 additions & 11 deletions Spanner/src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ public function runTransaction(callable $operation, array $options = [])
];

// There isn't anything configurable here.
$options['transactionOptions'] = $this->configureTransactionOptions();
$options['transactionOptions'] = $this->configureTransactionOptions($options['transactionOptions'] ?? []);

$session = $this->selectSession(
SessionPoolInterface::CONTEXT_READWRITE,
Expand Down Expand Up @@ -1556,7 +1556,7 @@ public function delete($table, KeySet $keySet, array $options = [])
* use Google\Cloud\Spanner\Session\SessionPoolInterface;
*
* $result = $database->execute('SELECT * FROM Posts WHERE ID = @postId', [
* 'parameters' => [
* 'parameters' => [
* 'postId' => 1337
* ],
* 'begin' => true,
Expand All @@ -1573,7 +1573,7 @@ public function delete($table, KeySet $keySet, array $options = [])
* use Google\Cloud\Spanner\Session\SessionPoolInterface;
*
* $result = $database->execute('SELECT * FROM Posts WHERE ID = @postId', [
* 'parameters' => [
* 'parameters' => [
* 'postId' => 1337
* ],
* 'begin' => true,
Expand All @@ -1593,11 +1593,10 @@ public function delete($table, KeySet $keySet, array $options = [])
* @param string $sql The query string to execute.
* @param array $options [optional] {
* Configuration Options.
* See [TransactionOptions](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions)
* for detailed description of available transaction options. Please
* note that only one of `$strong`, `$minReadTimestamp`,
* `$maxStaleness`, `$readTimestamp` or `$exactStaleness` may be set in
* a request.
* See {@see V1\TransactionOptions\PBReadOnly} for detailed description of
* available transaction options. Please note that only one of
* `$strong`, `$minReadTimestamp`, `$maxStaleness`, `$readTimestamp` or
* `$exactStaleness` may be set in a request.
*
* @type array $parameters A key/value array of Query Parameters, where
* the key is represented in the query string prefixed by a `@`
Expand Down Expand Up @@ -1899,11 +1898,16 @@ public function executePartitionedUpdate($statement, array $options = [])
unset($options['requestOptions']['transactionTag']);
$session = $this->selectSession(SessionPoolInterface::CONTEXT_READWRITE);

$transaction = $this->operation->transaction($session, [
$beginTransactionOptions = [
'transactionOptions' => [
'partitionedDml' => []
'partitionedDml' => [],
]
]);
];
if (isset($options['transactionOptions']['excludeTxnFromChangeStreams'])) {
$beginTransactionOptions['transactionOptions']['excludeTxnFromChangeStreams'] =
$options['transactionOptions']['excludeTxnFromChangeStreams'];
}
$transaction = $this->operation->transaction($session, $beginTransactionOptions);

$options = $this->addLarHeader($options);

Expand Down
7 changes: 7 additions & 0 deletions Spanner/src/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace Google\Cloud\Spanner;

use Google\ApiCore\ValidationException;
use Google\Cloud\Core\Exception\AbortedException;
use Google\Cloud\Spanner\Session\Session;
use Google\Cloud\Spanner\Session\SessionPoolInterface;
Expand Down Expand Up @@ -239,6 +240,12 @@ public function getCommitStats()
*/
public function executeUpdate($sql, array $options = [])
{
if (isset($options['transaction']['begin']['excludeTxnFromChangeStreams'])) {
throw new ValidationException(
'The excludeTxnFromChangeStreams option cannot be set for individual DML requests.'
. ' This option should be set at the transaction level.'
);
}
$options = $this->buildUpdateOptions($options);
return $this->operation
->executeUpdate($this->session, $this, $sql, $options);
Expand Down
21 changes: 14 additions & 7 deletions Spanner/src/TransactionConfigurationTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,21 @@ private function transactionSelector(array &$options, array $previous = [])
'transactionType' => SessionPoolInterface::CONTEXT_READ,
];

$res = $this->transactionOptions($options, $previous);
[$transactionOptions, $type, $context] = $this->transactionOptions($options, $previous);

// TransactionSelector uses a different key name for singleUseTransaction
// and transactionId than transactionOptions, so we'll rewrite those here
// so transactionOptions works as expected for commitRequest.

$type = $res[1];
if ($type === 'singleUseTransaction') {
$type = 'singleUse';
} elseif ($type === 'transactionId') {
$type = 'id';
}

return [
[$type => $res[0]],
$res[2]
[$type => $transactionOptions],
$context
];
}

Expand Down Expand Up @@ -130,7 +129,9 @@ private function transactionOptions(array &$options, array $previous = [])
} elseif ($context === SessionPoolInterface::CONTEXT_READ) {
$transactionOptions = $this->configureSnapshotOptions($options, $previous);
} elseif ($context === SessionPoolInterface::CONTEXT_READWRITE) {
$transactionOptions = $this->configureTransactionOptions();
$transactionOptions = $this->configureTransactionOptions(
$type == 'begin' && is_array($begin) ? $begin : []
);
} else {
throw new \BadMethodCallException(sprintf(
'Invalid transaction context %s',
Expand All @@ -141,11 +142,17 @@ private function transactionOptions(array &$options, array $previous = [])
return [$transactionOptions, $type, $context];
}

private function configureTransactionOptions()
private function configureTransactionOptions(array $options = [])
{
return [
$transactionOptions = [
'readWrite' => []
];

if (isset($options['excludeTxnFromChangeStreams'])) {
$transactionOptions['excludeTxnFromChangeStreams'] = $options['excludeTxnFromChangeStreams'];
}

return $transactionOptions;
}

/**
Expand Down
149 changes: 143 additions & 6 deletions Spanner/tests/Unit/DatabaseTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace Google\Cloud\Spanner\Tests\Unit;

use Google\ApiCore\ServerStream;
use Google\Cloud\Core\Exception\AbortedException;
use Google\Cloud\Core\Exception\NotFoundException;
use Google\Cloud\Core\Exception\ServerException;
Expand All @@ -29,6 +30,7 @@
use Google\Cloud\Spanner\Admin\Database\V1\DatabaseAdminClient;
use Google\Cloud\Spanner\Admin\Database\V1\DatabaseDialect;
use Google\Cloud\Spanner\Connection\ConnectionInterface;
use Google\Cloud\Spanner\Connection\Grpc;
use Google\Cloud\Spanner\Database;
use Google\Cloud\Spanner\Duration;
use Google\Cloud\Spanner\Instance;
Expand All @@ -43,7 +45,13 @@
use Google\Cloud\Spanner\Tests\StubCreationTrait;
use Google\Cloud\Spanner\Timestamp;
use Google\Cloud\Spanner\Transaction;
use Google\Cloud\Spanner\V1\ResultSet;
use Google\Cloud\Spanner\V1\ResultSetStats;
use Google\Cloud\Spanner\V1\DirectedReadOptions\ReplicaSelection\Type as ReplicaType;
use Google\Cloud\Spanner\V1\Session as SessionProto;
use Google\Cloud\Spanner\V1\SpannerClient;
use Google\Cloud\Spanner\V1\Transaction as TransactionProto;
use Google\Cloud\Spanner\V1\TransactionOptions;
use Google\Rpc\Code;
use PHPUnit\Framework\TestCase;
use Prophecy\Argument;
Expand Down Expand Up @@ -103,19 +111,24 @@ public function setUp(): void
]);
$this->directedReadOptionsIncludeReplicas = [
'includeReplicas' => [
'autoFailoverDisabled' => false,
'replicaSelections' => [
'location' => 'us-central1',
'type' => 'READ_WRITE',
'autoFailoverDisabled' => false
[
'location' => 'us-central1',
'type' => ReplicaType::READ_WRITE,

]
]
]
];
$this->directedReadOptionsExcludeReplicas = [
'excludeReplicas' => [
'autoFailoverDisabled' => false,
'replicaSelections' => [
'location' => 'us-central1',
'type' => 'READ_WRITE',
'autoFailoverDisabled' => false
[
'location' => 'us-central1',
'type' => ReplicaType::READ_WRITE,
]
]
]
];
Expand Down Expand Up @@ -2002,6 +2015,130 @@ public function testRunTransactionWithRollback()
}, ['tag' => self::TRANSACTION_TAG]);
}

public function testRunTransactionWithExcludeTxnFromChangeStreams()
{
$gapic = $this->prophesize(SpannerClient::class);

$sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION);
$session = new SessionProto(['name' => $sessName]);
$resultSet = new ResultSet(['stats' => new ResultSetStats(['row_count_exact' => 0])]);
$gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session);
$gapic->deleteSession(Argument::cetera())->shouldBeCalled();

$sql = 'SELECT example FROM sql_query';
$stream = $this->prophesize(ServerStream::class);
$stream->readAll()->shouldBeCalledOnce()->willReturn([$resultSet]);
$gapic->executeStreamingSql($sessName, $sql, Argument::that(function (array $options) {
$this->assertArrayHasKey('transaction', $options);
$this->assertNotNull($transactionOptions = $options['transaction']->getBegin());
$this->assertTrue($transactionOptions->getExcludeTxnFromChangeStreams());
return true;
}))
->shouldBeCalledOnce()
->willReturn($stream->reveal());

$database = new Database(
new Grpc(['gapicSpannerClient' => $gapic->reveal()]),
$this->instance,
$this->lro->reveal(),
$this->lroCallables,
self::PROJECT,
self::DATABASE
);

$database->runTransaction(
function (Transaction $t) use ($sql) {
// Run a fake query
$t->executeUpdate($sql);

// Simulate calling Transaction::commmit()
$prop = new \ReflectionProperty($t, 'state');
$prop->setAccessible(true);
$prop->setValue($t, Transaction::STATE_COMMITTED);
},
['transactionOptions' => ['excludeTxnFromChangeStreams' => true]]
);
}

public function testExecutePartitionedUpdateWithExcludeTxnFromChangeStreams()
{
$gapic = $this->prophesize(SpannerClient::class);

$sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION);
$session = new SessionProto(['name' => $sessName]);
$gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session);
$gapic->deleteSession(Argument::cetera())->shouldBeCalled();

$sql = 'SELECT example FROM sql_query';
$resultSet = new ResultSet(['stats' => new ResultSetStats(['row_count_lower_bound' => 0])]);
$stream = $this->prophesize(ServerStream::class);
$stream->readAll()->shouldBeCalledOnce()->willReturn([$resultSet]);
$gapic->executeStreamingSql($sessName, $sql, Argument::type('array'))
->shouldBeCalledOnce()
->willReturn($stream->reveal());

$gapic->beginTransaction(
$sessName,
Argument::that(function (TransactionOptions $options) {
$this->assertTrue($options->getExcludeTxnFromChangeStreams());
return true;
}),
Argument::type('array')
)
->shouldBeCalledOnce()
->willReturn(new TransactionProto(['id' => 'foo']));

$database = new Database(
new Grpc(['gapicSpannerClient' => $gapic->reveal()]),
$this->instance,
$this->lro->reveal(),
$this->lroCallables,
self::PROJECT,
self::DATABASE
);

$database->executePartitionedUpdate(
$sql,
['transactionOptions' => ['excludeTxnFromChangeStreams' => true]]
);
}

public function testBatchWriteWithExcludeTxnFromChangeStreams()
{
$gapic = $this->prophesize(SpannerClient::class);

$sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION);
$session = new SessionProto(['name' => $sessName]);
$gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session);
$gapic->deleteSession(Argument::cetera())->shouldBeCalled();

$mutationGroups = [];
$gapic->batchWrite(
$sessName,
$mutationGroups,
Argument::that(function ($options) {
$this->assertArrayHasKey('excludeTxnFromChangeStreams', $options);
$this->assertTrue($options['excludeTxnFromChangeStreams']);
return true;
})
)
->shouldBeCalledOnce()
->willReturn(new TransactionProto(['id' => 'foo']));

$database = new Database(
new Grpc(['gapicSpannerClient' => $gapic->reveal()]),
$this->instance,
$this->lro->reveal(),
$this->lroCallables,
self::PROJECT,
self::DATABASE
);

$database->batchWrite($mutationGroups, [
'excludeTxnFromChangeStreams' => true
]);
}

private function createStreamingAPIArgs()
{
$row = ['id' => 1];
Expand Down
Loading

0 comments on commit b369460

Please sign in to comment.