From 86e39425ae099d4f61ba9f9f9ccda0fc963768a8 Mon Sep 17 00:00:00 2001 From: Brent Shaffer Date: Tue, 15 Oct 2024 13:00:57 -0700 Subject: [PATCH 01/10] feat: support excludeTxnFromChangeStreams option --- Spanner/src/Connection/Grpc.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Spanner/src/Connection/Grpc.php b/Spanner/src/Connection/Grpc.php index f6be4a6b4c59..68db35150a29 100644 --- a/Spanner/src/Connection/Grpc.php +++ b/Spanner/src/Connection/Grpc.php @@ -1103,6 +1103,11 @@ public function beginTransaction(array $args) $args = $this->addLarHeader($args, $this->larEnabled); } + // NOTE: if set for read-only actions, will throw exception + if (isset($transactionOptions['excludeTxnFromChangeStreams'])) { + $options->setExcludeTxnFromChangeStreams($args['excludeTxnFromChangeStreams']); + } + $requestOptions = $this->pluck('requestOptions', $args, false) ?: []; if ($requestOptions) { $args['requestOptions'] = $this->serializer->decodeMessage( From 69a9f93651fb081b09224e9071054178fdec9eeb Mon Sep 17 00:00:00 2001 From: Brent Shaffer Date: Wed, 16 Oct 2024 10:50:38 -0700 Subject: [PATCH 02/10] add test for runTransaction --- Spanner/src/Database.php | 2 +- Spanner/src/Operation.php | 1 + Spanner/src/TransactionConfigurationTrait.php | 19 ++++--- Spanner/tests/Unit/DatabaseTest.php | 55 +++++++++++++++++++ 4 files changed, 69 insertions(+), 8 deletions(-) diff --git a/Spanner/src/Database.php b/Spanner/src/Database.php index de55a773418f..8bd0274b3776 100644 --- a/Spanner/src/Database.php +++ b/Spanner/src/Database.php @@ -917,7 +917,7 @@ public function runTransaction(callable $operation, array $options = []) ]; // There isn't anything configurable here. - $options['transactionOptions'] = $this->configureTransactionOptions(); + $options['transactionOptions'] = $this->configureTransactionOptions($options['transactionOptions'] ?? []); $session = $this->selectSession( SessionPoolInterface::CONTEXT_READWRITE, diff --git a/Spanner/src/Operation.php b/Spanner/src/Operation.php index 6c8aeecace91..6036489b2662 100644 --- a/Spanner/src/Operation.php +++ b/Spanner/src/Operation.php @@ -270,6 +270,7 @@ public function executeUpdate( iterator_to_array($res->rows()); $stats = $res->stats(); + if (!$stats) { throw new InvalidArgumentException( 'Partitioned DML response missing stats, possible due to non-DML statement as input.' diff --git a/Spanner/src/TransactionConfigurationTrait.php b/Spanner/src/TransactionConfigurationTrait.php index f3307152d478..49610d2667c5 100644 --- a/Spanner/src/TransactionConfigurationTrait.php +++ b/Spanner/src/TransactionConfigurationTrait.php @@ -44,13 +44,12 @@ private function transactionSelector(array &$options, array $previous = []) 'transactionType' => SessionPoolInterface::CONTEXT_READ, ]; - $res = $this->transactionOptions($options, $previous); + [$transactionOptions, $type, $context] = $this->transactionOptions($options, $previous); // TransactionSelector uses a different key name for singleUseTransaction // and transactionId than transactionOptions, so we'll rewrite those here // so transactionOptions works as expected for commitRequest. - $type = $res[1]; if ($type === 'singleUseTransaction') { $type = 'singleUse'; } elseif ($type === 'transactionId') { @@ -58,8 +57,8 @@ private function transactionSelector(array &$options, array $previous = []) } return [ - [$type => $res[0]], - $res[2] + [$type => $transactionOptions], + $context ]; } @@ -130,7 +129,7 @@ private function transactionOptions(array &$options, array $previous = []) } elseif ($context === SessionPoolInterface::CONTEXT_READ) { $transactionOptions = $this->configureSnapshotOptions($options, $previous); } elseif ($context === SessionPoolInterface::CONTEXT_READWRITE) { - $transactionOptions = $this->configureTransactionOptions(); + $transactionOptions = $this->configureTransactionOptions($type == 'begin' ? $begin : []); } else { throw new \BadMethodCallException(sprintf( 'Invalid transaction context %s', @@ -141,11 +140,17 @@ private function transactionOptions(array &$options, array $previous = []) return [$transactionOptions, $type, $context]; } - private function configureTransactionOptions() + private function configureTransactionOptions(array $options = []) { - return [ + $transactionOptions = [ 'readWrite' => [] ]; + + if (isset($options['excludeTxnFromChangeStreams'])) { + $transactionOptions['excludeTxnFromChangeStreams'] = $options['excludeTxnFromChangeStreams']; + } + + return $transactionOptions; } /** diff --git a/Spanner/tests/Unit/DatabaseTest.php b/Spanner/tests/Unit/DatabaseTest.php index a20e1090ff6d..3c82af1a8763 100644 --- a/Spanner/tests/Unit/DatabaseTest.php +++ b/Spanner/tests/Unit/DatabaseTest.php @@ -17,6 +17,7 @@ namespace Google\Cloud\Spanner\Tests\Unit; +use Google\ApiCore\ServerStream; use Google\Cloud\Core\Exception\AbortedException; use Google\Cloud\Core\Exception\NotFoundException; use Google\Cloud\Core\Exception\ServerException; @@ -29,6 +30,7 @@ use Google\Cloud\Spanner\Admin\Database\V1\DatabaseAdminClient; use Google\Cloud\Spanner\Admin\Database\V1\DatabaseDialect; use Google\Cloud\Spanner\Connection\ConnectionInterface; +use Google\Cloud\Spanner\Connection\Grpc; use Google\Cloud\Spanner\Database; use Google\Cloud\Spanner\Duration; use Google\Cloud\Spanner\Instance; @@ -43,6 +45,9 @@ use Google\Cloud\Spanner\Tests\StubCreationTrait; use Google\Cloud\Spanner\Timestamp; use Google\Cloud\Spanner\Transaction; +use Google\Cloud\Spanner\V1\ResultSet; +use Google\Cloud\Spanner\V1\ResultSetStats; +use Google\Cloud\Spanner\V1\Session as SessionProto; use Google\Cloud\Spanner\V1\SpannerClient; use Google\Rpc\Code; use PHPUnit\Framework\TestCase; @@ -2002,6 +2007,56 @@ public function testRunTransactionWithRollback() }, ['tag' => self::TRANSACTION_TAG]); } + public function testRunTransactionWithExcludeTxnFromChangeStreams() + { + $sql = 'SELECT example FROM sql_query'; + $sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION); + $session = new SessionProto(['name' => $sessName]); + $resultSet = new ResultSet(['stats' => new ResultSetStats(['row_count_exact' => 0])]); + + $stream = $this->prophesize(ServerStream::class); + $stream->readAll() + ->shouldBeCalledOnce() + ->willReturn([$resultSet]); + $gapic = $this->prophesize(SpannerClient::class); + $gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session); + $gapic->deleteSession(Argument::cetera())->shouldBeCalled(); + $gapic->executeStreamingSql( + $sessName, + $sql, + Argument::that(function (array $options) { + $this->assertArrayHasKey('transaction', $options); + $this->assertNotNull($transactionOptions = $options['transaction']->getBegin()); + $this->assertTrue($transactionOptions->getExcludeTxnFromChangeStreams()); + return true; + }) + ) + ->shouldBeCalledOnce() + ->willReturn($stream->reveal()); + + $database = new Database( + new Grpc(['gapicSpannerClient' => $gapic->reveal()]), + $this->instance, + $this->lro->reveal(), + $this->lroCallables, + self::PROJECT, + self::DATABASE + ); + + $database->runTransaction( + function (Transaction $t) use ($sql) { + // Run a fake query + $t->executeUpdate($sql); + + // Simulate calling Transaction::commmit() + $prop = new \ReflectionProperty($t, 'state'); + $prop->setAccessible(true); + $prop->setValue($t, Transaction::STATE_COMMITTED); + }, + ['transactionOptions' => ['excludeTxnFromChangeStreams' => true]] + ); + } + private function createStreamingAPIArgs() { $row = ['id' => 1]; From b62f43e39c896535fca141e4db1395e9afe18a36 Mon Sep 17 00:00:00 2001 From: Brent Shaffer Date: Wed, 16 Oct 2024 14:24:16 -0700 Subject: [PATCH 03/10] ensure begin is an array --- Spanner/src/TransactionConfigurationTrait.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Spanner/src/TransactionConfigurationTrait.php b/Spanner/src/TransactionConfigurationTrait.php index 49610d2667c5..642279a39b2d 100644 --- a/Spanner/src/TransactionConfigurationTrait.php +++ b/Spanner/src/TransactionConfigurationTrait.php @@ -129,7 +129,9 @@ private function transactionOptions(array &$options, array $previous = []) } elseif ($context === SessionPoolInterface::CONTEXT_READ) { $transactionOptions = $this->configureSnapshotOptions($options, $previous); } elseif ($context === SessionPoolInterface::CONTEXT_READWRITE) { - $transactionOptions = $this->configureTransactionOptions($type == 'begin' ? $begin : []); + $transactionOptions = $this->configureTransactionOptions( + $type == 'begin' && is_array($begin) ? $begin : [] + ); } else { throw new \BadMethodCallException(sprintf( 'Invalid transaction context %s', From c3f94371c23a304c2be90827f46cacf9c3a7d035 Mon Sep 17 00:00:00 2001 From: Brent Shaffer Date: Wed, 16 Oct 2024 15:32:17 -0700 Subject: [PATCH 04/10] add test for Operation::transaction --- Spanner/src/Connection/Grpc.php | 4 +++- Spanner/src/Operation.php | 1 - Spanner/tests/Unit/DatabaseTest.php | 16 ++++++--------- Spanner/tests/Unit/OperationTest.php | 30 ++++++++++++++++++++++++++++ 4 files changed, 39 insertions(+), 12 deletions(-) diff --git a/Spanner/src/Connection/Grpc.php b/Spanner/src/Connection/Grpc.php index 68db35150a29..9f13c46e54da 100644 --- a/Spanner/src/Connection/Grpc.php +++ b/Spanner/src/Connection/Grpc.php @@ -1105,7 +1105,9 @@ public function beginTransaction(array $args) // NOTE: if set for read-only actions, will throw exception if (isset($transactionOptions['excludeTxnFromChangeStreams'])) { - $options->setExcludeTxnFromChangeStreams($args['excludeTxnFromChangeStreams']); + $options->setExcludeTxnFromChangeStreams( + $transactionOptions['excludeTxnFromChangeStreams'] + ); } $requestOptions = $this->pluck('requestOptions', $args, false) ?: []; diff --git a/Spanner/src/Operation.php b/Spanner/src/Operation.php index 6036489b2662..6c8aeecace91 100644 --- a/Spanner/src/Operation.php +++ b/Spanner/src/Operation.php @@ -270,7 +270,6 @@ public function executeUpdate( iterator_to_array($res->rows()); $stats = $res->stats(); - if (!$stats) { throw new InvalidArgumentException( 'Partitioned DML response missing stats, possible due to non-DML statement as input.' diff --git a/Spanner/tests/Unit/DatabaseTest.php b/Spanner/tests/Unit/DatabaseTest.php index 3c82af1a8763..509143eff879 100644 --- a/Spanner/tests/Unit/DatabaseTest.php +++ b/Spanner/tests/Unit/DatabaseTest.php @@ -2021,16 +2021,12 @@ public function testRunTransactionWithExcludeTxnFromChangeStreams() $gapic = $this->prophesize(SpannerClient::class); $gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session); $gapic->deleteSession(Argument::cetera())->shouldBeCalled(); - $gapic->executeStreamingSql( - $sessName, - $sql, - Argument::that(function (array $options) { - $this->assertArrayHasKey('transaction', $options); - $this->assertNotNull($transactionOptions = $options['transaction']->getBegin()); - $this->assertTrue($transactionOptions->getExcludeTxnFromChangeStreams()); - return true; - }) - ) + $gapic->executeStreamingSql($sessName, $sql, Argument::that(function (array $options) { + $this->assertArrayHasKey('transaction', $options); + $this->assertNotNull($transactionOptions = $options['transaction']->getBegin()); + $this->assertTrue($transactionOptions->getExcludeTxnFromChangeStreams()); + return true; + })) ->shouldBeCalledOnce() ->willReturn($stream->reveal()); diff --git a/Spanner/tests/Unit/OperationTest.php b/Spanner/tests/Unit/OperationTest.php index 0be80e576140..ddb382ba2160 100644 --- a/Spanner/tests/Unit/OperationTest.php +++ b/Spanner/tests/Unit/OperationTest.php @@ -22,6 +22,7 @@ use Google\Cloud\Spanner\Admin\Database\V1\DatabaseAdminClient; use Google\Cloud\Spanner\Batch\QueryPartition; use Google\Cloud\Spanner\Batch\ReadPartition; +use Google\Cloud\Spanner\Connection\Grpc; use Google\Cloud\Spanner\Database; use Google\Cloud\Spanner\Duration; use Google\Cloud\Spanner\KeyRange; @@ -35,6 +36,9 @@ use Google\Cloud\Spanner\Timestamp; use Google\Cloud\Spanner\Transaction; use Google\Cloud\Spanner\V1\CommitResponse; +use Google\Cloud\Spanner\V1\SpannerClient; +use Google\Cloud\Spanner\V1\Transaction as TransactionProto; +use Google\Cloud\Spanner\V1\TransactionOptions; use PHPUnit\Framework\TestCase; use Prophecy\Argument; use Prophecy\PhpUnit\ProphecyTrait; @@ -354,6 +358,32 @@ public function testTransactionNoTag() $this->assertEquals(self::TRANSACTION, $t->id()); } + public function testTransactionWithExcludeTxnFromChangeStreams() + { + $gapic = $this->prophesize(SpannerClient::class); + $gapic->beginTransaction( + self::SESSION, + Argument::that(function (TransactionOptions $options) { + $this->assertTrue($options->getExcludeTxnFromChangeStreams()); + return true; + }), + Argument::type('array') + ) + ->shouldBeCalled() + ->willReturn(new TransactionProto(['id' => 'foo'])); + + $operation = new Operation( + new Grpc(['gapicSpannerClient' => $gapic->reveal()]), + true + ); + + $transaction = $operation->transaction($this->session, [ + 'transactionOptions' => ['excludeTxnFromChangeStreams' => true] + ]); + + $this->assertEquals('foo', $transaction->id()); + } + public function testSnapshot() { $this->connection->beginTransaction(Argument::allOf( From 14da68f352b387fe7d0f93e6e76579527882f3d9 Mon Sep 17 00:00:00 2001 From: Brent Shaffer Date: Thu, 17 Oct 2024 11:20:14 -0700 Subject: [PATCH 05/10] add execute and executeUpdate tests --- Spanner/tests/Unit/OperationTest.php | 37 ++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/Spanner/tests/Unit/OperationTest.php b/Spanner/tests/Unit/OperationTest.php index ddb382ba2160..bfe2c23dfd4c 100644 --- a/Spanner/tests/Unit/OperationTest.php +++ b/Spanner/tests/Unit/OperationTest.php @@ -17,6 +17,7 @@ namespace Google\Cloud\Spanner\Tests\Unit; +use Google\ApiCore\ServerStream; use Google\Cloud\Core\Testing\GrpcTestTrait; use Google\Cloud\Core\Testing\TestHelpers; use Google\Cloud\Spanner\Admin\Database\V1\DatabaseAdminClient; @@ -36,6 +37,8 @@ use Google\Cloud\Spanner\Timestamp; use Google\Cloud\Spanner\Transaction; use Google\Cloud\Spanner\V1\CommitResponse; +use Google\Cloud\Spanner\V1\ResultSet; +use Google\Cloud\Spanner\V1\ResultSetStats; use Google\Cloud\Spanner\V1\SpannerClient; use Google\Cloud\Spanner\V1\Transaction as TransactionProto; use Google\Cloud\Spanner\V1\TransactionOptions; @@ -384,6 +387,40 @@ public function testTransactionWithExcludeTxnFromChangeStreams() $this->assertEquals('foo', $transaction->id()); } + public function testExecuteAndExecuteUpdateWithExcludeTxnFromChangeStreams() + { + $sql = 'SELECT example FROM sql_query'; + + $resultSet = new ResultSet(['stats' => new ResultSetStats(['row_count_exact' => 0])]); + $stream = $this->prophesize(ServerStream::class); + $stream->readAll()->shouldBeCalledTimes(2)->willReturn([$resultSet]); + + $gapic = $this->prophesize(SpannerClient::class); + $gapic->executeStreamingSql(self::SESSION, $sql, Argument::that(function (array $options) { + $this->assertArrayHasKey('transaction', $options); + $this->assertNotNull($transactionOptions = $options['transaction']->getBegin()); + $this->assertTrue($transactionOptions->getExcludeTxnFromChangeStreams()); + return true; + })) + ->shouldBeCalledTimes(2) + ->willReturn($stream->reveal()); + + $operation = new Operation( + new Grpc(['gapicSpannerClient' => $gapic->reveal()]), + true + ); + + $operation->execute($this->session, $sql, [ + 'transaction' => ['begin' => ['excludeTxnFromChangeStreams' => true]] + ]); + + $transaction = $this->prophesize(Transaction::class)->reveal(); + + $operation->executeUpdate($this->session, $transaction, $sql, [ + 'transaction' => ['begin' => ['excludeTxnFromChangeStreams' => true]] + ]); + } + public function testSnapshot() { $this->connection->beginTransaction(Argument::allOf( From 3d40936f3e7b368142f6cb8398d1d2aeb9856dd2 Mon Sep 17 00:00:00 2001 From: Brent Shaffer Date: Thu, 17 Oct 2024 15:51:45 -0700 Subject: [PATCH 06/10] add test for Database::executePartitionedUpdate --- Spanner/src/Database.php | 24 +++++---- Spanner/tests/Unit/DatabaseTest.php | 76 ++++++++++++++++++++++++----- 2 files changed, 77 insertions(+), 23 deletions(-) diff --git a/Spanner/src/Database.php b/Spanner/src/Database.php index 8bd0274b3776..f4aa17bb1622 100644 --- a/Spanner/src/Database.php +++ b/Spanner/src/Database.php @@ -1556,7 +1556,7 @@ public function delete($table, KeySet $keySet, array $options = []) * use Google\Cloud\Spanner\Session\SessionPoolInterface; * * $result = $database->execute('SELECT * FROM Posts WHERE ID = @postId', [ - * 'parameters' => [ + * 'parameters' => [ * 'postId' => 1337 * ], * 'begin' => true, @@ -1573,7 +1573,7 @@ public function delete($table, KeySet $keySet, array $options = []) * use Google\Cloud\Spanner\Session\SessionPoolInterface; * * $result = $database->execute('SELECT * FROM Posts WHERE ID = @postId', [ - * 'parameters' => [ + * 'parameters' => [ * 'postId' => 1337 * ], * 'begin' => true, @@ -1593,11 +1593,10 @@ public function delete($table, KeySet $keySet, array $options = []) * @param string $sql The query string to execute. * @param array $options [optional] { * Configuration Options. - * See [TransactionOptions](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions) - * for detailed description of available transaction options. Please - * note that only one of `$strong`, `$minReadTimestamp`, - * `$maxStaleness`, `$readTimestamp` or `$exactStaleness` may be set in - * a request. + * See {@see V1\TransactionOptions\ReadOnly} for detailed description of + * available transaction options. Please note that only one of + * `$strong`, `$minReadTimestamp`, `$maxStaleness`, `$readTimestamp` or + * `$exactStaleness` may be set in a request. * * @type array $parameters A key/value array of Query Parameters, where * the key is represented in the query string prefixed by a `@` @@ -1899,11 +1898,16 @@ public function executePartitionedUpdate($statement, array $options = []) unset($options['requestOptions']['transactionTag']); $session = $this->selectSession(SessionPoolInterface::CONTEXT_READWRITE); - $transaction = $this->operation->transaction($session, [ + $beginTransactionOptions = [ 'transactionOptions' => [ - 'partitionedDml' => [] + 'partitionedDml' => [], ] - ]); + ]; + if (isset($options['transactionOptions']['excludeTxnFromChangeStreams'])) { + $beginTransactionOptions['transactionOptions']['excludeTxnFromChangeStreams'] = + $options['transactionOptions']['excludeTxnFromChangeStreams']; + } + $transaction = $this->operation->transaction($session, $beginTransactionOptions); $options = $this->addLarHeader($options); diff --git a/Spanner/tests/Unit/DatabaseTest.php b/Spanner/tests/Unit/DatabaseTest.php index 509143eff879..f0bbbbb827f4 100644 --- a/Spanner/tests/Unit/DatabaseTest.php +++ b/Spanner/tests/Unit/DatabaseTest.php @@ -47,8 +47,11 @@ use Google\Cloud\Spanner\Transaction; use Google\Cloud\Spanner\V1\ResultSet; use Google\Cloud\Spanner\V1\ResultSetStats; +use Google\Cloud\Spanner\V1\DirectedReadOptions\ReplicaSelection\Type as ReplicaType; use Google\Cloud\Spanner\V1\Session as SessionProto; use Google\Cloud\Spanner\V1\SpannerClient; +use Google\Cloud\Spanner\V1\Transaction as TransactionProto; +use Google\Cloud\Spanner\V1\TransactionOptions; use Google\Rpc\Code; use PHPUnit\Framework\TestCase; use Prophecy\Argument; @@ -108,19 +111,24 @@ public function setUp(): void ]); $this->directedReadOptionsIncludeReplicas = [ 'includeReplicas' => [ + 'autoFailoverDisabled' => false, 'replicaSelections' => [ - 'location' => 'us-central1', - 'type' => 'READ_WRITE', - 'autoFailoverDisabled' => false + [ + 'location' => 'us-central1', + 'type' => ReplicaType::READ_WRITE, + + ] ] ] ]; $this->directedReadOptionsExcludeReplicas = [ 'excludeReplicas' => [ + 'autoFailoverDisabled' => false, 'replicaSelections' => [ - 'location' => 'us-central1', - 'type' => 'READ_WRITE', - 'autoFailoverDisabled' => false + [ + 'location' => 'us-central1', + 'type' => ReplicaType::READ_WRITE, + ] ] ] ]; @@ -2009,18 +2017,17 @@ public function testRunTransactionWithRollback() public function testRunTransactionWithExcludeTxnFromChangeStreams() { - $sql = 'SELECT example FROM sql_query'; + $gapic = $this->prophesize(SpannerClient::class); + $sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION); $session = new SessionProto(['name' => $sessName]); $resultSet = new ResultSet(['stats' => new ResultSetStats(['row_count_exact' => 0])]); - - $stream = $this->prophesize(ServerStream::class); - $stream->readAll() - ->shouldBeCalledOnce() - ->willReturn([$resultSet]); - $gapic = $this->prophesize(SpannerClient::class); $gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session); $gapic->deleteSession(Argument::cetera())->shouldBeCalled(); + + $sql = 'SELECT example FROM sql_query'; + $stream = $this->prophesize(ServerStream::class); + $stream->readAll()->shouldBeCalledOnce()->willReturn([$resultSet]); $gapic->executeStreamingSql($sessName, $sql, Argument::that(function (array $options) { $this->assertArrayHasKey('transaction', $options); $this->assertNotNull($transactionOptions = $options['transaction']->getBegin()); @@ -2053,6 +2060,49 @@ function (Transaction $t) use ($sql) { ); } + public function testExecutePartitionedUpdateWithExcludeTxnFromChangeStreams() + { + $gapic = $this->prophesize(SpannerClient::class); + + $sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION); + $session = new SessionProto(['name' => $sessName]); + $gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session); + $gapic->deleteSession(Argument::cetera())->shouldBeCalled(); + + $sql = 'SELECT example FROM sql_query'; + $resultSet = new ResultSet(['stats' => new ResultSetStats(['row_count_lower_bound' => 0])]); + $stream = $this->prophesize(ServerStream::class); + $stream->readAll()->shouldBeCalledOnce()->willReturn([$resultSet]); + $gapic->executeStreamingSql($sessName, $sql, Argument::type('array')) + ->shouldBeCalledOnce() + ->willReturn($stream->reveal()); + + $gapic->beginTransaction( + $sessName, + Argument::that(function (TransactionOptions $options) { + $this->assertTrue($options->getExcludeTxnFromChangeStreams()); + return true; + }), + Argument::type('array') + ) + ->shouldBeCalledOnce() + ->willReturn(new TransactionProto(['id' => 'foo'])); + + $database = new Database( + new Grpc(['gapicSpannerClient' => $gapic->reveal()]), + $this->instance, + $this->lro->reveal(), + $this->lroCallables, + self::PROJECT, + self::DATABASE + ); + + $database->executePartitionedUpdate( + $sql, + ['transactionOptions' => ['excludeTxnFromChangeStreams' => true]] + ); + } + private function createStreamingAPIArgs() { $row = ['id' => 1]; From 2c0faf611090e8a116e11492ad02590d5991eebd Mon Sep 17 00:00:00 2001 From: Brent Shaffer Date: Thu, 17 Oct 2024 16:06:00 -0700 Subject: [PATCH 07/10] add test for Database::batchWrite --- Spanner/tests/Unit/DatabaseTest.php | 36 +++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/Spanner/tests/Unit/DatabaseTest.php b/Spanner/tests/Unit/DatabaseTest.php index f0bbbbb827f4..a63c361fc6a8 100644 --- a/Spanner/tests/Unit/DatabaseTest.php +++ b/Spanner/tests/Unit/DatabaseTest.php @@ -2103,6 +2103,42 @@ public function testExecutePartitionedUpdateWithExcludeTxnFromChangeStreams() ); } + public function testBatchWriteWithExcludeTxnFromChangeStreams() + { + $gapic = $this->prophesize(SpannerClient::class); + + $sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION); + $session = new SessionProto(['name' => $sessName]); + $gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session); + $gapic->deleteSession(Argument::cetera())->shouldBeCalled(); + + $mutationGroups = []; + $gapic->batchWrite( + $sessName, + $mutationGroups, + Argument::that(function ($options) { + $this->assertArrayHasKey('excludeTxnFromChangeStreams', $options); + $this->assertTrue($options['excludeTxnFromChangeStreams']); + return true; + }) + ) + ->shouldBeCalledOnce() + ->willReturn(new TransactionProto(['id' => 'foo'])); + + $database = new Database( + new Grpc(['gapicSpannerClient' => $gapic->reveal()]), + $this->instance, + $this->lro->reveal(), + $this->lroCallables, + self::PROJECT, + self::DATABASE + ); + + $database->batchWrite($mutationGroups, [ + 'excludeTxnFromChangeStreams' => true + ]); + } + private function createStreamingAPIArgs() { $row = ['id' => 1]; From b794f3f00f3d27976abff926d6d517c4e998a834 Mon Sep 17 00:00:00 2001 From: Brent Shaffer Date: Mon, 21 Oct 2024 14:57:20 -0700 Subject: [PATCH 08/10] add change stream exception for individual DML requests --- Spanner/src/Transaction.php | 7 +++++++ Spanner/tests/Unit/TransactionTest.php | 12 ++++++++++++ 2 files changed, 19 insertions(+) diff --git a/Spanner/src/Transaction.php b/Spanner/src/Transaction.php index c88e8972d47e..0db1e79cef97 100644 --- a/Spanner/src/Transaction.php +++ b/Spanner/src/Transaction.php @@ -17,6 +17,7 @@ namespace Google\Cloud\Spanner; +use Google\ApiCore\ValidationException; use Google\Cloud\Core\Exception\AbortedException; use Google\Cloud\Spanner\Session\Session; use Google\Cloud\Spanner\Session\SessionPoolInterface; @@ -239,6 +240,12 @@ public function getCommitStats() */ public function executeUpdate($sql, array $options = []) { + if (isset($options['transaction']['begin']['excludeTxnFromChangeStreams'])) { + throw new ValidationException( + 'The excludeTxnFromChangeStreams option cannot be set for individual DML requests.' + . ' This option should be set at the transaction level.' + ); + } $options = $this->buildUpdateOptions($options); return $this->operation ->executeUpdate($this->session, $this, $sql, $options); diff --git a/Spanner/tests/Unit/TransactionTest.php b/Spanner/tests/Unit/TransactionTest.php index 71bc8a02475a..01e80b9a4abd 100644 --- a/Spanner/tests/Unit/TransactionTest.php +++ b/Spanner/tests/Unit/TransactionTest.php @@ -17,6 +17,7 @@ namespace Google\Cloud\Spanner\Tests\Unit; +use Google\ApiCore\ValidationException; use Google\Cloud\Core\Testing\GrpcTestTrait; use Google\Cloud\Core\Testing\TestHelpers; use Google\Cloud\Core\TimeTrait; @@ -252,6 +253,17 @@ public function testExecuteUpdate() $this->assertEquals(1, $res); } + public function testExecuteUpdateWithExcludeTxnFromChangeStreamsThrowsException() + { + $this->expectException(ValidationException::class); + $this->expectExceptionMessage('The excludeTxnFromChangeStreams option cannot be set for individual DML requests'); + + $sql = 'UPDATE foo SET bar = @bar'; + $this->transaction->executeUpdate($sql, [ + 'transaction' => ['begin' => ['excludeTxnFromChangeStreams' => true]] + ]); + } + public function testDmlSeqno() { $sql = 'UPDATE foo SET bar = @bar'; From 4cec64ffdb47becd5d614896a968a776b9779589 Mon Sep 17 00:00:00 2001 From: Brent Shaffer Date: Tue, 22 Oct 2024 09:19:01 -0700 Subject: [PATCH 09/10] fix cs --- Spanner/tests/Unit/TransactionTest.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Spanner/tests/Unit/TransactionTest.php b/Spanner/tests/Unit/TransactionTest.php index 01e80b9a4abd..1563a9c9c9b6 100644 --- a/Spanner/tests/Unit/TransactionTest.php +++ b/Spanner/tests/Unit/TransactionTest.php @@ -256,7 +256,9 @@ public function testExecuteUpdate() public function testExecuteUpdateWithExcludeTxnFromChangeStreamsThrowsException() { $this->expectException(ValidationException::class); - $this->expectExceptionMessage('The excludeTxnFromChangeStreams option cannot be set for individual DML requests'); + $this->expectExceptionMessage( + 'The excludeTxnFromChangeStreams option cannot be set for individual DML requests' + ); $sql = 'UPDATE foo SET bar = @bar'; $this->transaction->executeUpdate($sql, [ From 746c39bddcba05b69a9e944beb0f6cd1340498f5 Mon Sep 17 00:00:00 2001 From: Brent Shaffer Date: Thu, 31 Oct 2024 09:53:31 -0700 Subject: [PATCH 10/10] fix reference --- Spanner/src/Database.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Spanner/src/Database.php b/Spanner/src/Database.php index f4aa17bb1622..23e4317e0462 100644 --- a/Spanner/src/Database.php +++ b/Spanner/src/Database.php @@ -1593,7 +1593,7 @@ public function delete($table, KeySet $keySet, array $options = []) * @param string $sql The query string to execute. * @param array $options [optional] { * Configuration Options. - * See {@see V1\TransactionOptions\ReadOnly} for detailed description of + * See {@see V1\TransactionOptions\PBReadOnly} for detailed description of * available transaction options. Please note that only one of * `$strong`, `$minReadTimestamp`, `$maxStaleness`, `$readTimestamp` or * `$exactStaleness` may be set in a request.