Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Spanner): Batch Write Feature #7420

Merged
merged 19 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Spanner/src/Connection/ConnectionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,9 @@ public function partitionQuery(array $args);
* @param array $args
*/
public function partitionRead(array $args);

/**
* @param array $args
*/
public function batchWrite(array $args);
}
126 changes: 85 additions & 41 deletions Spanner/src/Connection/Grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@
use Google\Cloud\Spanner\Admin\Instance\V1\InstanceConfig;
use Google\Cloud\Spanner\Admin\Instance\V1\UpdateInstanceConfigMetadata;
use Google\Cloud\Spanner\Admin\Instance\V1\UpdateInstanceMetadata;
use Google\Cloud\Spanner\MutationGroup;
use Google\Cloud\Spanner\Operation;
use Google\Cloud\Spanner\SpannerClient as ManualSpannerClient;
use Google\Cloud\Spanner\RequestHeaderTrait;
use Google\Cloud\Spanner\V1\BatchWriteRequest\MutationGroup as MutationGroupProto;
use Google\Cloud\Spanner\V1\CreateSessionRequest;
use Google\Cloud\Spanner\V1\DeleteSessionRequest;
use Google\Cloud\Spanner\V1\DirectedReadOptions;
Expand Down Expand Up @@ -1124,47 +1126,7 @@ public function commit(array $args)
{
$inputMutations = $this->pluck('mutations', $args);

$mutations = [];
if (is_array($inputMutations)) {
foreach ($inputMutations as $mutation) {
$type = array_keys($mutation)[0];
$data = $mutation[$type];

switch ($type) {
case Operation::OP_DELETE:
if (isset($data['keySet'])) {
$data['keySet'] = $this->formatKeySet($data['keySet']);
}

$operation = $this->serializer->decodeMessage(
new Delete,
$data
);
break;
default:
$operation = new Write;
$operation->setTable($data['table']);
$operation->setColumns($data['columns']);

$modifiedData = [];
foreach ($data['values'] as $key => $param) {
$modifiedData[$key] = $this->fieldValue($param);
}

$list = new ListValue;
$list->setValues($modifiedData);
$values = [$list];
$operation->setValues($values);

break;
}

$setterName = $this->mutationSetters[$type];
$mutation = new Mutation;
$mutation->$setterName($operation);
$mutations[] = $mutation;
}
}
$mutations = $this->parseMutations($inputMutations);

if (isset($args['singleUseTransaction'])) {
$readWrite = $this->serializer->decodeMessage(
Expand Down Expand Up @@ -1194,6 +1156,40 @@ public function commit(array $args)
]);
}

/**
* @param array $args
* @return \Generator
*/
public function batchWrite(array $args)
bshaffer marked this conversation as resolved.
Show resolved Hide resolved
{
$databaseName = $this->pluck('database', $args);
$mutationGroups = $this->pluck('mutationGroups', $args);
$requestOptions = $this->pluck('requestOptions', $args, false) ?: [];

array_walk(
$mutationGroups,
fn(&$x) => $x['mutations'] = $this->parseMutations($x['mutations'])
);
bshaffer marked this conversation as resolved.
Show resolved Hide resolved

$mutationGroups = array_map(
fn($x) => $this->serializer->decodeMessage(new MutationGroupProto(), $x),
$mutationGroups
);

if ($requestOptions) {
$args['requestOptions'] = $this->serializer->decodeMessage(
new RequestOptions,
$requestOptions
);
}

return $this->send([$this->spannerClient, 'batchWrite'], [
$this->pluck('session', $args),
$mutationGroups,
$this->addResourcePrefixHeader($args, $databaseName)
]);
}

/**
* @param array $args
*/
Expand Down Expand Up @@ -1721,4 +1717,52 @@ private function getFieldDataFromRepeatedFields(?RepeatedField $fields): array

return $fieldsData;
}

private function parseMutations($rawMutations)
{
if (!is_array($rawMutations)) {
return [];
}

$mutations = [];
foreach ($rawMutations as $mutation) {
$type = array_keys($mutation)[0];
$data = $mutation[$type];

switch ($type) {
case Operation::OP_DELETE:
if (isset($data['keySet'])) {
$data['keySet'] = $this->formatKeySet($data['keySet']);
}

$operation = $this->serializer->decodeMessage(
new Delete,
$data
);
break;
default:
$operation = new Write;
$operation->setTable($data['table']);
$operation->setColumns($data['columns']);

$modifiedData = [];
foreach ($data['values'] as $key => $param) {
$modifiedData[$key] = $this->fieldValue($param);
}

$list = new ListValue;
$list->setValues($modifiedData);
$values = [$list];
$operation->setValues($values);

break;
}

$setterName = $this->mutationSetters[$type];
$mutation = new Mutation;
$mutation->$setterName($operation);
$mutations[] = $mutation;
}
return $mutations;
}
}
90 changes: 90 additions & 0 deletions Spanner/src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace Google\Cloud\Spanner;

use Google\ApiCore\ApiException;
use Google\ApiCore\ValidationException;
use Google\Cloud\Core\Exception\AbortedException;
use Google\Cloud\Core\Exception\NotFoundException;
Expand All @@ -36,6 +37,7 @@
use Google\Cloud\Spanner\Session\Session;
use Google\Cloud\Spanner\Session\SessionPoolInterface;
use Google\Cloud\Spanner\Transaction;
use Google\Cloud\Spanner\V1\BatchWriteResponse;
use Google\Cloud\Spanner\V1\SpannerClient as GapicSpannerClient;
use Google\Cloud\Spanner\V1\TypeCode;
use Google\Rpc\Code;
Expand Down Expand Up @@ -185,6 +187,11 @@ class Database
*/
private $directedReadOptions;

/**
* @var bool
*/
private $returnInt64AsObject;

/**
* Create an object representing a Database.
*
Expand Down Expand Up @@ -231,6 +238,7 @@ public function __construct(
$this->setLroProperties($lroConnection, $lroCallables, $this->name);
$this->databaseRole = $databaseRole;
$this->directedReadOptions = $instance->directedReadOptions();
$this->returnInt64AsObject = $returnInt64AsObject;
}

/**
Expand Down Expand Up @@ -1687,6 +1695,88 @@ public function execute($sql, array $options = [])
}
}

/**
* Create a new {@see \Google\Cloud\Spanner\MutationGroup} object.
*
* @return MutationGroup
*/
public function mutationGroup()
{
return new MutationGroup($this->returnInt64AsObject);
}

/**
* Batches the supplied mutation groups in a collection of efficient
* transactions. All mutations in a group are committed atomically. However,
* mutations across groups can be committed non-atomically in an unspecified
* order and thus, they must be independent of each other. Partial failure is
* possible, i.e., some groups may have been committed successfully, while
* some may have failed. The results of individual batches are streamed into
* the response as the batches are applied.
*
* BatchWrite requests are not replay protected, meaning that each mutation
* group may be applied more than once. Replays of non-idempotent mutations
* may have undesirable effects. For example, replays of an insert mutation
* may produce an already exists error or if you use generated or commit
* timestamp-based keys, it may result in additional rows being added to the
* mutation's table. We recommend structuring your mutation groups to be
* idempotent to avoid this issue.
*
* Sample code:
* ```
* ```
*
* @param array<MutationGroup> $mutationGroups Required. The groups of mutations to be applied.
* @param array $options {
* Optional.
*
* @type array $requestOptions
* Common options for this request.
* @type bool $excludeTxnFromChangeStreams
* Optional. When `exclude_txn_from_change_streams` is set to `true`:
* * Mutations from all transactions in this batch write operation will not
* be recorded in change streams with DDL option `allow_txn_exclusion=true`
* that are tracking columns modified by these transactions.
* * Mutations from all transactions in this batch write operation will be
* recorded in change streams with DDL option `allow_txn_exclusion=false or
* not set` that are tracking columns modified by these transactions.
*
* When `exclude_txn_from_change_streams` is set to `false` or not set,
* mutations from all transactions in this batch write operation will be
* recorded in all change streams that are tracking columns modified by these
* transactions.
* }
*
* @retur \Generator {@see \Google\Cloud\Spanner\V1\BatchWriteResponse}
*
* @throws ApiException if the remote call fails
*/
public function batchWrite(array $mutationGroups, array $options = [])
{
if ($this->isRunningTransaction) {
throw new \BadMethodCallException('Nested transactions are not supported by this client.');
}
// Prevent nested transactions.
$this->isRunningTransaction = true;
$session = $this->selectSession(
SessionPoolInterface::CONTEXT_READWRITE,
$this->pluck('sessionOptions', $options, false) ?: []
);

$mutationGroups = array_map(fn ($x) => $x->toArray(), $mutationGroups);

try {
return $this->connection->batchWrite([
'database' => $this->name(),
'session' => $session->name(),
'mutationGroups' => $mutationGroups
] + $options);
} finally {
$this->isRunningTransaction = false;
$session->setExpiration();
}
}

/**
* Execute a partitioned DML update.
*
Expand Down
46 changes: 46 additions & 0 deletions Spanner/src/MutationGroup.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php
/**
* Copyright 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Google\Cloud\Spanner;

/**
* This should not be used directly. It should be accessed via
* Google\Cloud\Spanner\Database::mutationGroup().
*
* @internal
*/
class MutationGroup
{
use MutationTrait;

private ValueMapper $mapper;

/**
* @param bool $returnInt64AsObject [optional If true, 64 bit integers will
* be returned as a {@see \Google\Cloud\Core\Int64} object for 32 bit
* platform compatibility. **Defaults to** false.
*/
public function __construct($returnInt64AsObject)
{
$this->mapper = new ValueMapper($returnInt64AsObject);
}

public function toArray(): array
{
return ['mutations' => $this->mutationData];
}
}
Loading
Loading