Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Spanner): Inline Begin Transaction #6708

Merged
merged 34 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
720dcf1
feat(Spanner): Inline Begin Transaction
ajupazhamayil Oct 13, 2023
da0f0ed
tests with errors
ajupazhamayil Oct 15, 2023
17990d8
commit with error test
ajupazhamayil Oct 16, 2023
cf4e485
commit with error test
ajupazhamayil Oct 16, 2023
30011f8
Replace transactionId reference with Transaction object
ajupazhamayil Oct 31, 2023
7618765
Merge branch 'main' into spanner-ilb
ajupazhamayil Nov 2, 2023
00caa9c
Create buildUpdateOptions function
ajupazhamayil Nov 14, 2023
d1a5c38
Resolve snippet test
ajupazhamayil Nov 14, 2023
2663bfe
Resolve PR comments
ajupazhamayil Nov 26, 2023
25358ed
Merge branch 'main' into spanner-ilb
bshaffer Nov 27, 2023
a8170bc
Implement Alternative 2
ajupazhamayil Nov 28, 2023
c731efb
Resolve unit tests issues
ajupazhamayil Nov 28, 2023
b8f5684
Label TransactionalReadInterface as internal
ajupazhamayil Nov 27, 2023
fd7f63c
Add unit tests, part1
ajupazhamayil Nov 27, 2023
d7a73a6
Set transactionId in executeUpdate function
ajupazhamayil Nov 28, 2023
3b18e8b
Add unit tests, part2
ajupazhamayil Nov 28, 2023
b095213
Add unit tests, part3
ajupazhamayil Nov 28, 2023
6b86b87
Change Transaction type back from TransactionalReadInterface in Result
ajupazhamayil Nov 28, 2023
ca06a15
Revert system test changes
ajupazhamayil Nov 28, 2023
de9a81d
Add unit tests, part3
ajupazhamayil Nov 28, 2023
67df9e7
Add unit tests, part4
ajupazhamayil Nov 28, 2023
0ee448a
Resolve PR comments
ajupazhamayil Nov 29, 2023
5f1fd0b
Add last unit test
ajupazhamayil Nov 29, 2023
4ebf68a
Resolve PR comments
ajupazhamayil Nov 29, 2023
279b1b4
Add validity check to the created Generator in Result class
ajupazhamayil Nov 29, 2023
89d4d17
Resolve PR comments
ajupazhamayil Dec 1, 2023
bfba374
Resolve PR comments
ajupazhamayil Dec 3, 2023
8eae267
make TransactionalReadInterface::setId accepts null
ajupazhamayil Dec 3, 2023
fa04efc
Resolve PR comments
ajupazhamayil Dec 4, 2023
83dc7f8
Resolve style check errors
ajupazhamayil Dec 4, 2023
7d0b85d
Resolve doc block issues, and add integration tests
ajupazhamayil Dec 30, 2023
02c2470
resolve style issue
ajupazhamayil Jan 2, 2024
1b9be6c
rename the integration test
ajupazhamayil Jan 2, 2024
91acd09
Merge branch 'main' into spanner-ilb
vishwarajanand Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Spanner/src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,14 @@ public function runTransaction(callable $operation, array $options = [])

$attempt = 0;
$startTransactionFn = function ($session, $options) use (&$attempt) {
if ($attempt > 0) {

// Initial attempt requires to set `begin` options (ILB).
if ($attempt === 0) {
// Partitioned DML does not support ILB.
if (!isset($options['transactionOptions']['partitionedDml'])) {
$options['begin'] = $options['transactionOptions'];
}
} else {
$options['isRetry'] = true;
}

Expand Down
110 changes: 94 additions & 16 deletions Spanner/src/Operation.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
use Google\Cloud\Spanner\Session\Session;
use Google\Cloud\Spanner\V1\SpannerClient as GapicSpannerClient;
use Google\Rpc\Code;
use InvalidArgumentException;

/**
* Common interface for running operations against Cloud Spanner. This class is
Expand Down Expand Up @@ -184,9 +185,13 @@ public function commitWithResponse(Session $session, array $mutations, array $op
* @param string $transactionId The transaction to roll back.
* @param array $options [optional] Configuration Options.
* @return void
* @throws InvalidArgumentException If the transaction is not yet initialized.
*/
public function rollback(Session $session, $transactionId, array $options = [])
{
if (empty($transactionId)) {
throw new InvalidArgumentException('Rollback failed: Transaction not initiated.');
}
$this->connection->rollback([
'transactionId' => $transactionId,
'session' => $session->name(),
Expand All @@ -207,6 +212,9 @@ public function rollback(Session $session, $transactionId, array $options = [])
* [the upstream documentation](https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions).
* Please note, if using the `priority` setting you may utilize the constants available
* on {@see \Google\Cloud\Spanner\V1\RequestOptions\Priority} to set a value.
* @type array $transaction Transaction selector options.
* @type array $transaction.begin The begin transaction options.
* [Refer](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#transactionoptions)
* }
* @return Result
*/
Expand All @@ -224,7 +232,17 @@ public function execute(Session $session, $sql, array $options = [])

$context = $this->pluck('transactionContext', $options);

$call = function ($resumeToken = null) use ($session, $sql, $options) {
// Initially with begin, transactionId will be null.
// Once transaction is generated, even in the case of stream failure,
// transaction will be passed to this callable by the Result class.
$call = function ($resumeToken = null, $transaction = null) use (
$session,
$sql,
$options
) {
if ($transaction && !empty($transaction->id())) {
$options['transaction'] = ['id' => $transaction->id()];
}
if ($resumeToken) {
$options['resumeToken'] = $resumeToken;
}
Expand Down Expand Up @@ -253,26 +271,33 @@ public function execute(Session $session, $sql, array $options = [])
* [the upstream documentation](https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions).
* Please note, if using the `priority` setting you may utilize the constants available
* on {@see \Google\Cloud\Spanner\V1\RequestOptions\Priority} to set a value.
* @type array $transaction Transaction selector options.
* @type array $transaction.begin The begin transaction options.
* [Refer](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#transactionoptions)
* }
* @return int
* @throws \InvalidArgumentException If the SQL string isn't an update operation.
* @throws InvalidArgumentException If the SQL string isn't an update operation.
*/
public function executeUpdate(
Session $session,
Transaction $transaction,
$sql,
array $options = []
) {
$res = $this->execute($session, $sql, [
'transactionId' => $transaction->id()
] + $options);
if (!isset($options['transaction']['begin'])) {
$options['transaction'] = ['id' => $transaction->id()];
}
$res = $this->execute($session, $sql, $options);
if (empty($transaction->id()) && $res->transaction()) {
$transaction->setId($res->transaction()->id());
}

// Iterate through the result to ensure we have query statistics available.
iterator_to_array($res->rows());

$stats = $res->stats();
if (!$stats) {
throw new \InvalidArgumentException(
throw new InvalidArgumentException(
'Partitioned DML response missing stats, possible due to non-DML statement as input.'
);
}
Expand Down Expand Up @@ -316,9 +341,12 @@ public function executeUpdate(
* [the upstream documentation](https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions).
* Please note, if using the `priority` setting you may utilize the constants available
* on {@see \Google\Cloud\Spanner\V1\RequestOptions\Priority} to set a value.
* @type array $transaction Transaction selector options.
* @type array $transaction.begin The begin transaction options.
* [Refer](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#transactionoptions)
* }
* @return BatchDmlResult
* @throws \InvalidArgumentException If any statement is missing the `sql` key.
* @throws InvalidArgumentException If any statement is missing the `sql` key.
*/
public function executeUpdateBatch(
Session $session,
Expand All @@ -329,7 +357,7 @@ public function executeUpdateBatch(
$stmts = [];
foreach ($statements as $statement) {
if (!isset($statement['sql'])) {
throw new \InvalidArgumentException('Each statement must contain a SQL key.');
throw new InvalidArgumentException('Each statement must contain a SQL key.');
}

$parameters = $this->pluck('parameters', $statement, false) ?: [];
Expand All @@ -339,13 +367,23 @@ public function executeUpdateBatch(
] + $this->mapper->formatParamsForExecuteSql($parameters, $types);
}

if (!isset($options['transaction']['begin'])) {
$options['transaction'] = ['id' => $transaction->id()];
}

$res = $this->connection->executeBatchDml([
'session' => $session->name(),
'database' => $this->getDatabaseNameFromSession($session),
'transactionId' => $transaction->id(),
'statements' => $stmts
] + $options);

if (empty($transaction->id())) {
// Get the transaction from array of ResultSets.
// ResultSet contains transaction in the metadata.
// @see https://cloud.google.com/spanner/docs/reference/rest/v1/ResultSet
$transaction->setId($res['resultSets'][0]['metadata']['transaction']['id'] ?? null);
}

$errorStatement = null;
if (isset($res['status']) && $res['status']['code'] !== Code::OK) {
$errIndex = count($res['resultSets']);
Expand Down Expand Up @@ -373,11 +411,19 @@ public function executeUpdateBatch(
* [the upstream documentation](https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions).
* Please note, if using the `priority` setting you may utilize the constants available
* on {@see \Google\Cloud\Spanner\V1\RequestOptions\Priority} to set a value.
* @type array $transaction Transaction selector options.
* @type array $transaction.begin The begin transaction options.
* [Refer](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#transactionoptions)
* }
* @return Result
*/
public function read(Session $session, $table, KeySet $keySet, array $columns, array $options = [])
{
public function read(
Session $session,
$table,
KeySet $keySet,
array $columns,
array $options = []
) {
$options += [
'index' => null,
'limit' => null,
Expand All @@ -387,7 +433,16 @@ public function read(Session $session, $table, KeySet $keySet, array $columns, a

$context = $this->pluck('transactionContext', $options);

$call = function ($resumeToken = null) use ($table, $session, $columns, $keySet, $options) {
$call = function ($resumeToken = null, $transaction = null) use (
$table,
$session,
$columns,
$keySet,
$options
) {
if ($transaction && !empty($transaction->id())) {
$options['transaction'] = ['id' => $transaction->id()];
}
if ($resumeToken) {
$options['resumeToken'] = $resumeToken;
}
Expand Down Expand Up @@ -420,6 +475,8 @@ public function read(Session $session, $table, KeySet $keySet, array $columns, a
* @type bool $isRetry If true, the resulting transaction will indicate
* that it is the result of a retry operation. **Defaults to**
* `false`.
* @type array $begin The begin transaction options.
* [Refer](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#transactionoptions)
* }
* @return Transaction
*/
Expand All @@ -435,13 +492,23 @@ public function transaction(Session $session, array $options = [])
$options['requestOptions']['transactionTag'] = $transactionTag;
}

if (!$options['singleUse']) {
if (!$options['singleUse'] && (!isset($options['begin']) ||
isset($options['transactionOptions']['partitionedDml']))
) {
$res = $this->beginTransaction($session, $options);
} else {
$res = [];
}

return $this->createTransaction($session, $res, ['tag' => $transactionTag]);
return $this->createTransaction(
$session,
$res,
[
'tag' => $transactionTag,
'isRetry' => $options['isRetry'],
'transactionOptions' => $options
]
);
}

/**
Expand All @@ -450,6 +517,8 @@ public function transaction(Session $session, array $options = [])
* @param Session $session The session the transaction belongs to.
* @param array $res [optional] The createTransaction response.
* @param array $options [optional] Options for the transaction object.
* @type array $begin The begin transaction options.
* [Refer](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#transactionoptions)
* @return Transaction
*/
public function createTransaction(Session $session, array $res = [], array $options = [])
Expand All @@ -458,12 +527,20 @@ public function createTransaction(Session $session, array $res = [], array $opti
'id' => null
];
$options += [
'tag' => null
'tag' => null,
'transactionOptions' => null
];

$options['isRetry'] = $options['isRetry'] ?? false;

return new Transaction($this, $session, $res['id'], $options['isRetry'], $options['tag']);
return new Transaction(
$this,
$session,
$res['id'],
$options['isRetry'],
$options['tag'],
$options['transactionOptions']
);
}

/**
Expand Down Expand Up @@ -741,6 +818,7 @@ private function partitionOptions(array $options)
*
* @param Session $session The session to start the snapshot in.
* @param array $options [optional] Configuration options.
*
* @return array
*/
private function beginTransaction(Session $session, array $options = [])
Expand Down
Loading
Loading