Skip to content

Commit

Permalink
Replace transactionId reference with Transaction object
Browse files Browse the repository at this point in the history
  • Loading branch information
ajupazhamayil committed Oct 31, 2023
1 parent cf4e485 commit 4fb21a8
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 76 deletions.
5 changes: 3 additions & 2 deletions Spanner/src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -1786,8 +1786,9 @@ public function executePartitionedUpdate($statement, array $options = [])

try {
return $this->operation->executeUpdate($session, $transaction, $statement, [
'statsItem' => 'rowCountLowerBound'
] + $options, $transaction->getIdReference());
'statsItem' => 'rowCountLowerBound',
'transactionHandle' => $transaction
] + $options);
} finally {
$session->setExpiration();
}
Expand Down
87 changes: 52 additions & 35 deletions Spanner/src/Operation.php
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,10 @@ public function commitWithResponse(Session $session, array $mutations, array $op
*/
public function rollback(Session $session, $transactionId, array $options = [])
{
// TODO: Check with team on this and decide
if (is_array($transactionId) and isset($transactionId['begin'])) {
// @TODO: Check with team on this if check and decide
if (is_null($transactionId)) {
return;
// @TODO: Check with team and decide whether to throw exception or just return.
// throw new \RuntimeException('Streaming calls must invoke rows() function');
}
$this->connection->rollback([
Expand All @@ -213,10 +214,11 @@ public function rollback(Session $session, $transactionId, array $options = [])
* [the upstream documentation](https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions).
* Please note, if using the `priority` setting you may utilize the constants available
* on {@see \Google\Cloud\Spanner\V1\RequestOptions\Priority} to set a value.
* @type Transaction transactionHandle Transaction to be used for this operation.
* }
* @return Result
*/
public function execute(Session $session, $sql, array $options = [], &$transactionId = null)
public function execute(Session $session, $sql, array $options = [])
{
$options += [
'parameters' => [],
Expand All @@ -233,9 +235,12 @@ public function execute(Session $session, $sql, array $options = [], &$transacti
// Initially with begin, transactionId will be null.
// Once generated it will get populated from Result->rows()
// Incase of stream failure, the transactionId is preserved here to reuse.
$call = function ($resumeToken = null) use ($session, $sql, $options, &$transactionId) {
if ( !is_null($transactionId) and !is_array($transactionId) ) {
$options['transaction'] = ['id' => $transactionId];
$call = function ($resumeToken = null) use ($session, $sql, $options) {
if (
isset($options['transactionHandle']) &&
!is_null($options['transactionHandle']->id())
) {
$options['transaction'] = ['id' => $options['transactionHandle']->id()];
}
if ($resumeToken) {
$options['resumeToken'] = $resumeToken;
Expand All @@ -255,7 +260,7 @@ public function execute(Session $session, $sql, array $options = [], &$transacti
$context,
$this->mapper,
self::DEFAULT_RETRIES,
$transactionId
$options['transactionHandle'] ?? null
);
}

Expand All @@ -273,6 +278,7 @@ public function execute(Session $session, $sql, array $options = [], &$transacti
* [the upstream documentation](https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions).
* Please note, if using the `priority` setting you may utilize the constants available
* on {@see \Google\Cloud\Spanner\V1\RequestOptions\Priority} to set a value.
* @type Transaction transactionHandle Transaction to be used for this operation.
* }
* @return int
* @throws \InvalidArgumentException If the SQL string isn't an update operation.
Expand All @@ -281,13 +287,15 @@ public function executeUpdate(
Session $session,
Transaction $transaction,
$sql,
array $options = [],
&$transactionId = null
array $options = []
) {
if ( !is_null($transactionId) and !is_array($transactionId) ) {
$options += ['transactionId' => $transactionId];
if (
isset($options['transactionHandle']) &&
!is_null($options['transactionHandle']->id())
) {
$options += ['transactionId' => $options['transactionHandle']->id()];
}
$res = $this->execute($session, $sql, $options, $transactionId);
$res = $this->execute($session, $sql, $options);

// Iterate through the result to ensure we have query statistics available.
iterator_to_array($res->rows());
Expand Down Expand Up @@ -338,6 +346,7 @@ public function executeUpdate(
* [the upstream documentation](https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions).
* Please note, if using the `priority` setting you may utilize the constants available
* on {@see \Google\Cloud\Spanner\V1\RequestOptions\Priority} to set a value.
* @type Transaction transactionHandle Transaction to be used for this operation.
* }
* @return BatchDmlResult
* @throws \InvalidArgumentException If any statement is missing the `sql` key.
Expand All @@ -346,8 +355,7 @@ public function executeUpdateBatch(
Session $session,
Transaction $transaction,
array $statements,
array $options = [],
&$transactionId = null
array $options = []
) {
$stmts = [];
foreach ($statements as $statement) {
Expand All @@ -362,8 +370,11 @@ public function executeUpdateBatch(
] + $this->mapper->formatParamsForExecuteSql($parameters, $types);
}

if ( !is_null($transactionId) and !is_array($transactionId) ) {
$options += ['transactionId' => $transactionId];
if (
isset($options['transactionHandle']) &&
!is_null($options['transactionHandle']->id())
) {
$options += ['transactionId' => $options['transactionHandle']->id()];
}

$res = $this->connection->executeBatchDml([
Expand All @@ -373,10 +384,14 @@ public function executeUpdateBatch(
] + $options);

if (
isset($res['resultSets'][0]['metadata']['transaction']['id']) and
$res['resultSets'][0]['metadata']['transaction']['id']
isset($res['resultSets'][0]['metadata']['transaction']['id']) &&
$res['resultSets'][0]['metadata']['transaction']['id'] &&
isset($options['transactionHandle']) &&
is_null($options['transactionHandle']->id())
) {
$transactionId = $res['resultSets'][0]['metadata']['transaction']['id'];
$options['transactionHandle']->setId(
$res['resultSets'][0]['metadata']['transaction']['id']
);
}

$errorStatement = null;
Expand Down Expand Up @@ -414,8 +429,7 @@ public function read(
$table,
KeySet $keySet,
array $columns,
array $options = [],
&$transactionId = null
array $options = []
) {
$options += [
'index' => null,
Expand All @@ -427,6 +441,12 @@ public function read(
$context = $this->pluck('transactionContext', $options);

$call = function ($resumeToken = null) use ($table, $session, $columns, $keySet, $options) {
if (
isset($options['transactionHandle']) &&
!is_null($options['transactionHandle']->id())
) {
$options['transaction'] = ['id' => $options['transactionHandle']->id()];
}
if ($resumeToken) {
$options['resumeToken'] = $resumeToken;
}
Expand All @@ -447,7 +467,7 @@ public function read(
$context,
$this->mapper,
self::DEFAULT_RETRIES,
$transactionId
$options['transactionHandle'] ?? null
);
}

Expand All @@ -467,6 +487,8 @@ public function read(
* @type bool $isRetry If true, the resulting transaction will indicate
* that it is the result of a retry operation. **Defaults to**
* `false`.
* @type array $begin The begin Transaction options.
* [Refer](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#transactionoptions)
* }
* @return Transaction
*/
Expand All @@ -482,23 +504,15 @@ public function transaction(Session $session, array $options = [])
$options['requestOptions']['transactionTag'] = $transactionTag;
}

if (!$options['singleUse']) {
if (
isset($options['begin']) and
!isset($options['transactionOptions']['partitionedDml'])
) {
$res = ['id' => ['begin' => $options['begin']]];
} else {
$res = $this->beginTransaction($session, $options);
}
if (!$options['singleUse'] && (
!isset($options['begin']) ||
isset($options['transactionOptions']['partitionedDml'])
)) {
$res = $this->beginTransaction($session, $options);
} else {
$res = [];
}

if (isset($options['fromTransaction'])) {
return $res;
}

return $this->createTransaction(
$session,
$res,
Expand All @@ -512,6 +526,8 @@ public function transaction(Session $session, array $options = [])
* @param Session $session The session the transaction belongs to.
* @param array $res [optional] The createTransaction response.
* @param array $options [optional] Options for the transaction object.
* @type array $begin The begin Transaction options.
* [Refer](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#transactionoptions)
* @return Transaction
*/
public function createTransaction(Session $session, array $res = [], array $options = [])
Expand Down Expand Up @@ -811,6 +827,7 @@ private function partitionOptions(array $options)
*
* @param Session $session The session to start the snapshot in.
* @param array $options [optional] Configuration options.
*
* @return array
*/
private function beginTransaction(Session $session, array $options = [])
Expand Down
13 changes: 8 additions & 5 deletions Spanner/src/Result.php
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ class Result implements \IteratorAggregate
private $call;

/**
* @var string|array
* @var Transaction|null
*/
private $transactionId;
private $transactionHandle;

/**
* @param Operation $operation Runs operations against Google Cloud Spanner.
Expand All @@ -134,6 +134,7 @@ class Result implements \IteratorAggregate
* @param ValueMapper $mapper Maps values.
* @param int $retries Number of attempts to resume a broken stream, assuming
* a resume token is present. **Defaults to** 3.
* @param Transaction $transactionHandle The transaction of the Result.
*/
public function __construct(
Operation $operation,
Expand All @@ -142,15 +143,15 @@ public function __construct(
$transactionContext,
ValueMapper $mapper,
$retries = 3,
&$transactionId = null
$transactionHandle = null
) {
$this->operation = $operation;
$this->session = $session;
$this->call = $call;
$this->transactionContext = $transactionContext;
$this->mapper = $mapper;
$this->retries = $retries;
$this->transactionId = &$transactionId;
$this->transactionHandle = $transactionHandle;
}

/**
Expand Down Expand Up @@ -474,7 +475,9 @@ private function setResultData(array $result, $format)
}

if (isset($result['metadata']['transaction']['id']) && $result['metadata']['transaction']['id']) {
$this->transactionId = $result['metadata']['transaction']['id'];
if (!is_null($this->transactionHandle) && is_null($this->transactionHandle->id())) {
$this->transactionHandle->setId($result['metadata']['transaction']['id']);
}
if ($this->transactionContext === SessionPoolInterface::CONTEXT_READ) {
$this->snapshot = $this->operation->createSnapshot(
$this->session,
Expand Down
43 changes: 20 additions & 23 deletions Spanner/src/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ class Transaction implements TransactionalReadInterface
*/
private $isRetry = false;

/**
* @var array
*/
private $options = [];

/**
* @param Operation $operation The Operation instance.
* @param Session $session The session to use for spanner interactions.
Expand All @@ -94,6 +89,12 @@ class Transaction implements TransactionalReadInterface
* @param bool $isRetry Whether the transaction will automatically retry or not.
* @param string $tag A transaction tag. Requests made using this transaction will
* use this as the transaction tag.
* @param array $options [optional] {
* Configuration Options.
*
* @type array $begin The begin Transaction options.
* [Refer](https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#transactionoptions)
* }
* @throws \InvalidArgumentException if a tag is specified on a single-use transaction.
*/
public function __construct(
Expand All @@ -102,14 +103,14 @@ public function __construct(
$transactionId = null,
$isRetry = false,
$tag = null,
$options = null
$options = []
) {
$this->operation = $operation;
$this->session = $session;
$this->transactionId = $transactionId;
$this->isRetry = $isRetry;

$this->type = $transactionId
$this->type = ($transactionId || isset($options['begin']))
? self::TYPE_PRE_ALLOCATED
: self::TYPE_SINGLE_USE;

Expand Down Expand Up @@ -448,17 +449,18 @@ public function executeUpdate($sql, array $options = [])
$this->seqno++;

$options['transactionType'] = $this->context;
if (is_array($this->transactionId) and isset($this->transactionId['begin'])) {
$options['begin'] = $this->transactionId['begin'];
if (is_null($this->transactionId) && isset($this->options['begin'])) {
$options['begin'] = $this->options['begin'];
} else {
$options['transactionId'] = $this->transactionId;
}
$selector = $this->transactionSelector($options);

$options['transaction'] = $selector[0];
$options['transactionHandle'] = $this;

return $this->operation
->executeUpdate($this->session, $this, $sql, $options, $this->transactionId);
->executeUpdate($this->session, $this, $sql, $options);
}

/**
Expand Down Expand Up @@ -557,22 +559,22 @@ public function executeUpdateBatch(array $statements, array $options = [])
$this->seqno++;

$options['transactionType'] = $this->context;
if (is_array($this->transactionId) and isset($this->transactionId['begin'])) {
$options['begin'] = $this->transactionId['begin'];
if (is_null($this->transactionId) && isset($this->options['begin'])) {
$options['begin'] = $this->options['begin'];
} else {
$options['transactionId'] = $this->transactionId;
}
$selector = $this->transactionSelector($options);

$options['transaction'] = $selector[0];
$options['transactionHandle'] = $this;

return $this->operation
->executeUpdateBatch(
$this->session,
$this,
$statements,
$options,
$this->transactionId
$options
);
}

Expand Down Expand Up @@ -648,16 +650,11 @@ public function commit(array $options = [])
throw new \BadMethodCallException('The transaction cannot be committed because it is not active');
}

if (is_array($this->transactionId) and isset($this->transactionId['begin'])) {
if (is_null($this->transactionId) && isset($this->options['begin'])) {
unset($this->options['begin']);
$res = $this->operation->transaction($this->session, $this->options+[
'fromTransaction' => true
]);
if (is_array($res) and isset($res['id'])) {
$this->transactionId = $res['id'];
} else {
// singleUse transaction
$this->transactionId = null;
$transaction = $this->operation->transaction($this->session, $this->options);
if (!is_null($transaction->id())) {
$this->setId($transaction->id());
}
}

Expand Down
Loading

0 comments on commit 4fb21a8

Please sign in to comment.