Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Spanner): Inline Begin Transaction #6708

Merged
merged 34 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
720dcf1
feat(Spanner): Inline Begin Transaction
ajupazhamayil Oct 13, 2023
da0f0ed
tests with errors
ajupazhamayil Oct 15, 2023
17990d8
commit with error test
ajupazhamayil Oct 16, 2023
cf4e485
commit with error test
ajupazhamayil Oct 16, 2023
30011f8
Replace transactionId reference with Transaction object
ajupazhamayil Oct 31, 2023
7618765
Merge branch 'main' into spanner-ilb
ajupazhamayil Nov 2, 2023
00caa9c
Create buildUpdateOptions function
ajupazhamayil Nov 14, 2023
d1a5c38
Resolve snippet test
ajupazhamayil Nov 14, 2023
2663bfe
Resolve PR comments
ajupazhamayil Nov 26, 2023
25358ed
Merge branch 'main' into spanner-ilb
bshaffer Nov 27, 2023
a8170bc
Implement Alternative 2
ajupazhamayil Nov 28, 2023
c731efb
Resolve unit tests issues
ajupazhamayil Nov 28, 2023
b8f5684
Label TransactionalReadInterface as internal
ajupazhamayil Nov 27, 2023
fd7f63c
Add unit tests, part1
ajupazhamayil Nov 27, 2023
d7a73a6
Set transactionId in executeUpdate function
ajupazhamayil Nov 28, 2023
3b18e8b
Add unit tests, part2
ajupazhamayil Nov 28, 2023
b095213
Add unit tests, part3
ajupazhamayil Nov 28, 2023
6b86b87
Change Transaction type back from TransactionalReadInterface in Result
ajupazhamayil Nov 28, 2023
ca06a15
Revert system test changes
ajupazhamayil Nov 28, 2023
de9a81d
Add unit tests, part3
ajupazhamayil Nov 28, 2023
67df9e7
Add unit tests, part4
ajupazhamayil Nov 28, 2023
0ee448a
Resolve PR comments
ajupazhamayil Nov 29, 2023
5f1fd0b
Add last unit test
ajupazhamayil Nov 29, 2023
4ebf68a
Resolve PR comments
ajupazhamayil Nov 29, 2023
279b1b4
Add validity check to the created Generator in Result class
ajupazhamayil Nov 29, 2023
89d4d17
Resolve PR comments
ajupazhamayil Dec 1, 2023
bfba374
Resolve PR comments
ajupazhamayil Dec 3, 2023
8eae267
make TransactionalReadInterface::setId accepts null
ajupazhamayil Dec 3, 2023
fa04efc
Resolve PR comments
ajupazhamayil Dec 4, 2023
83dc7f8
Resolve style check errors
ajupazhamayil Dec 4, 2023
7d0b85d
Resolve doc block issues, and add integration tests
ajupazhamayil Dec 30, 2023
02c2470
resolve style issue
ajupazhamayil Jan 2, 2024
1b9be6c
rename the integration test
ajupazhamayil Jan 2, 2024
91acd09
Merge branch 'main' into spanner-ilb
vishwarajanand Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Spanner/src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,9 @@ public function runTransaction(callable $operation, array $options = [])
$startTransactionFn = function ($session, $options) use (&$attempt) {
if ($attempt > 0) {
$options['isRetry'] = true;
} elseif (!isset($options['transactionOptions']['partitionedDml'])) {
bshaffer marked this conversation as resolved.
Show resolved Hide resolved
// Make the begin options
$options['begin'] = $options['transactionOptions'];
}

$transaction = $this->operation->transaction($session, $options);
Expand Down Expand Up @@ -1784,7 +1787,7 @@ public function executePartitionedUpdate($statement, array $options = [])
try {
return $this->operation->executeUpdate($session, $transaction, $statement, [
'statsItem' => 'rowCountLowerBound'
] + $options);
] + $options, $transaction->getIdReference());
ajupazhamayil marked this conversation as resolved.
Show resolved Hide resolved
} finally {
$session->setExpiration();
}
Expand Down
102 changes: 86 additions & 16 deletions Spanner/src/Operation.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Operation
const OP_INSERT_OR_UPDATE = 'insertOrUpdate';
const OP_REPLACE = 'replace';
const OP_DELETE = 'delete';
const DEFAULT_RETRIES = 3;
bshaffer marked this conversation as resolved.
Show resolved Hide resolved

/**
* @var ConnectionInterface
Expand Down Expand Up @@ -187,6 +188,11 @@ public function commitWithResponse(Session $session, array $mutations, array $op
*/
public function rollback(Session $session, $transactionId, array $options = [])
{
// TODO: Check with team on this and decide
if (is_array($transactionId) and isset($transactionId['begin'])) {
bshaffer marked this conversation as resolved.
Show resolved Hide resolved
return;
// throw new \RuntimeException('Streaming calls must invoke rows() function');
bshaffer marked this conversation as resolved.
Show resolved Hide resolved
}
$this->connection->rollback([
'transactionId' => $transactionId,
'session' => $session->name(),
Expand All @@ -210,7 +216,7 @@ public function rollback(Session $session, $transactionId, array $options = [])
* }
* @return Result
*/
public function execute(Session $session, $sql, array $options = [])
public function execute(Session $session, $sql, array $options = [], &$transactionId = null)
ajupazhamayil marked this conversation as resolved.
Show resolved Hide resolved
{
$options += [
'parameters' => [],
Expand All @@ -224,7 +230,13 @@ public function execute(Session $session, $sql, array $options = [])

$context = $this->pluck('transactionContext', $options);

$call = function ($resumeToken = null) use ($session, $sql, $options) {
// Initially with begin, transactionId will be null.
// Once generated it will get populated from Result->rows()
// Incase of stream failure, the transactionId is preserved here to reuse.
$call = function ($resumeToken = null) use ($session, $sql, $options, &$transactionId) {
if ( !is_null($transactionId) and !is_array($transactionId) ) {
ajupazhamayil marked this conversation as resolved.
Show resolved Hide resolved
$options['transaction'] = ['id' => $transactionId];
vishwarajanand marked this conversation as resolved.
Show resolved Hide resolved
ajupazhamayil marked this conversation as resolved.
Show resolved Hide resolved
}
if ($resumeToken) {
$options['resumeToken'] = $resumeToken;
}
Expand All @@ -236,7 +248,15 @@ public function execute(Session $session, $sql, array $options = [])
] + $options);
};

return new Result($this, $session, $call, $context, $this->mapper);
return new Result(
$this,
$session,
$call,
$context,
$this->mapper,
self::DEFAULT_RETRIES,
$transactionId
);
}

/**
Expand All @@ -261,11 +281,13 @@ public function executeUpdate(
Session $session,
Transaction $transaction,
$sql,
array $options = []
array $options = [],
&$transactionId = null
ajupazhamayil marked this conversation as resolved.
Show resolved Hide resolved
) {
$res = $this->execute($session, $sql, [
'transactionId' => $transaction->id()
] + $options);
if ( !is_null($transactionId) and !is_array($transactionId) ) {
$options += ['transactionId' => $transactionId];
}
$res = $this->execute($session, $sql, $options, $transactionId);

// Iterate through the result to ensure we have query statistics available.
iterator_to_array($res->rows());
Expand Down Expand Up @@ -324,7 +346,8 @@ public function executeUpdateBatch(
Session $session,
Transaction $transaction,
array $statements,
array $options = []
array $options = [],
&$transactionId = null
) {
$stmts = [];
foreach ($statements as $statement) {
Expand All @@ -339,13 +362,23 @@ public function executeUpdateBatch(
] + $this->mapper->formatParamsForExecuteSql($parameters, $types);
}

if ( !is_null($transactionId) and !is_array($transactionId) ) {
$options += ['transactionId' => $transactionId];
}

$res = $this->connection->executeBatchDml([
'session' => $session->name(),
'database' => $this->getDatabaseNameFromSession($session),
'transactionId' => $transaction->id(),
'statements' => $stmts
] + $options);

if (
isset($res['resultSets'][0]['metadata']['transaction']['id']) and
$res['resultSets'][0]['metadata']['transaction']['id']
) {
$transactionId = $res['resultSets'][0]['metadata']['transaction']['id'];
}

$errorStatement = null;
if (isset($res['status']) && $res['status']['code'] !== Code::OK) {
$errIndex = count($res['resultSets']);
Expand Down Expand Up @@ -376,8 +409,14 @@ public function executeUpdateBatch(
* }
* @return Result
*/
public function read(Session $session, $table, KeySet $keySet, array $columns, array $options = [])
{
public function read(
Session $session,
$table,
KeySet $keySet,
array $columns,
array $options = [],
&$transactionId = null
) {
$options += [
'index' => null,
'limit' => null,
Expand All @@ -401,7 +440,15 @@ public function read(Session $session, $table, KeySet $keySet, array $columns, a
] + $options);
};

return new Result($this, $session, $call, $context, $this->mapper);
return new Result(
$this,
$session,
$call,
$context,
$this->mapper,
self::DEFAULT_RETRIES,
bshaffer marked this conversation as resolved.
Show resolved Hide resolved
$transactionId
);
}

/**
Expand Down Expand Up @@ -436,12 +483,27 @@ public function transaction(Session $session, array $options = [])
}

if (!$options['singleUse']) {
$res = $this->beginTransaction($session, $options);
if (
isset($options['begin']) and
!isset($options['transactionOptions']['partitionedDml'])
) {
$res = ['id' => ['begin' => $options['begin']]];
} else {
$res = $this->beginTransaction($session, $options);
}
} else {
$res = [];
}

return $this->createTransaction($session, $res, ['tag' => $transactionTag]);
if (isset($options['fromTransaction'])) {
return $res;
}

return $this->createTransaction(
$session,
$res,
['tag' => $transactionTag, 'isRetry' => $options['isRetry'], 'options' => $options]
ajupazhamayil marked this conversation as resolved.
Show resolved Hide resolved
);
}

/**
Expand All @@ -458,12 +520,20 @@ public function createTransaction(Session $session, array $res = [], array $opti
'id' => null
];
$options += [
'tag' => null
'tag' => null,
'options' => null
];

$options['isRetry'] = $options['isRetry'] ?? false;

return new Transaction($this, $session, $res['id'], $options['isRetry'], $options['tag']);
return new Transaction(
$this,
$session,
$res['id'],
$options['isRetry'],
$options['tag'],
$options['options']
bshaffer marked this conversation as resolved.
Show resolved Hide resolved
);
}

/**
Expand Down
10 changes: 9 additions & 1 deletion Spanner/src/Result.php
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class Result implements \IteratorAggregate
*/
private $call;

/**
* @var string|array
*/
private $transactionId;

/**
* @param Operation $operation Runs operations against Google Cloud Spanner.
* @param Session $session The session used for any operations executed.
Expand All @@ -136,14 +141,16 @@ public function __construct(
callable $call,
$transactionContext,
ValueMapper $mapper,
$retries = 3
$retries = 3,
&$transactionId = null
bshaffer marked this conversation as resolved.
Show resolved Hide resolved
) {
$this->operation = $operation;
$this->session = $session;
$this->call = $call;
$this->transactionContext = $transactionContext;
$this->mapper = $mapper;
$this->retries = $retries;
$this->transactionId = &$transactionId;
bshaffer marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down Expand Up @@ -467,6 +474,7 @@ private function setResultData(array $result, $format)
}

if (isset($result['metadata']['transaction']['id']) && $result['metadata']['transaction']['id']) {
$this->transactionId = $result['metadata']['transaction']['id'];
if ($this->transactionContext === SessionPoolInterface::CONTEXT_READ) {
$this->snapshot = $this->operation->createSnapshot(
$this->session,
Expand Down
52 changes: 49 additions & 3 deletions Spanner/src/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ class Transaction implements TransactionalReadInterface
*/
private $isRetry = false;

/**
* @var array
*/
private $options = [];

bshaffer marked this conversation as resolved.
Show resolved Hide resolved
/**
* @param Operation $operation The Operation instance.
* @param Session $session The session to use for spanner interactions.
Expand All @@ -96,7 +101,8 @@ public function __construct(
Session $session,
$transactionId = null,
$isRetry = false,
$tag = null
$tag = null,
$options = null
bshaffer marked this conversation as resolved.
Show resolved Hide resolved
) {
$this->operation = $operation;
$this->session = $session;
Expand All @@ -115,6 +121,7 @@ public function __construct(
$this->tag = $tag;

$this->context = SessionPoolInterface::CONTEXT_READWRITE;
$this->options = $options;
}

/**
Expand Down Expand Up @@ -440,8 +447,18 @@ public function executeUpdate($sql, array $options = [])
$options['seqno'] = $this->seqno;
$this->seqno++;

$options['transactionType'] = $this->context;
if (is_array($this->transactionId) and isset($this->transactionId['begin'])) {
$options['begin'] = $this->transactionId['begin'];
} else {
$options['transactionId'] = $this->transactionId;
}
$selector = $this->transactionSelector($options);

$options['transaction'] = $selector[0];

return $this->operation
->executeUpdate($this->session, $this, $sql, $options);
->executeUpdate($this->session, $this, $sql, $options, $this->transactionId);
}

/**
Expand Down Expand Up @@ -538,9 +555,25 @@ public function executeUpdateBatch(array $statements, array $options = [])
}
$options['seqno'] = $this->seqno;
$this->seqno++;

$options['transactionType'] = $this->context;
if (is_array($this->transactionId) and isset($this->transactionId['begin'])) {
$options['begin'] = $this->transactionId['begin'];
} else {
$options['transactionId'] = $this->transactionId;
}
$selector = $this->transactionSelector($options);

$options['transaction'] = $selector[0];
bshaffer marked this conversation as resolved.
Show resolved Hide resolved

return $this->operation
->executeUpdateBatch($this->session, $this, $statements, $options);
->executeUpdateBatch(
$this->session,
$this,
$statements,
$options,
$this->transactionId
);
}

/**
Expand Down Expand Up @@ -615,6 +648,19 @@ public function commit(array $options = [])
throw new \BadMethodCallException('The transaction cannot be committed because it is not active');
}

if (is_array($this->transactionId) and isset($this->transactionId['begin'])) {
bshaffer marked this conversation as resolved.
Show resolved Hide resolved
unset($this->options['begin']);
vishwarajanand marked this conversation as resolved.
Show resolved Hide resolved
vishwarajanand marked this conversation as resolved.
Show resolved Hide resolved
$res = $this->operation->transaction($this->session, $this->options+[
'fromTransaction' => true
]);
if (is_array($res) and isset($res['id'])) {
bshaffer marked this conversation as resolved.
Show resolved Hide resolved
$this->transactionId = $res['id'];
} else {
// singleUse transaction
$this->transactionId = null;
}
bshaffer marked this conversation as resolved.
Show resolved Hide resolved
}
bshaffer marked this conversation as resolved.
Show resolved Hide resolved

if (!$this->singleUseState()) {
$this->state = self::STATE_COMMITTED;
}
Expand Down
Loading
Loading