Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigtable: refactor mutation model #1367

Merged
merged 5 commits into from
Oct 25, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 94 additions & 75 deletions Bigtable/src/DataClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use Google\Cloud\Bigtable\ReadModifyWriteRowRules;
use Google\Cloud\Bigtable\Mutations;
use Google\Cloud\Bigtable\V2\BigtableClient as TableClient;
use Google\Cloud\Bigtable\V2\MutateRowsRequest\Entry;
use Google\Cloud\Bigtable\V2\Row;
use Google\Cloud\Bigtable\V2\RowRange;
use Google\Cloud\Bigtable\V2\RowSet;
Expand Down Expand Up @@ -129,46 +130,32 @@ public function __construct($instanceId, $tableId, array $config = [])
* Example:
* ```
* use Google\Cloud\Bigtable\DataClient;
* use Google\Cloud\Bigtable\RowMutation;
* use Google\Cloud\Bigtable\Mutations;
*
* $rowMutation = new RowMutation('r1');
* $rowMutation->upsert('cf1','cq1','value1',1534183334215000);
* $mutations = (new Mutations)
* ->upsert('cf1', 'cq1', 'value1', 1534183334215000);
*
* $dataClient->mutateRows([$rowMutation]);
* $dataClient->mutateRows(['r1' => $mutations]);
* ```
* @param array $rowMutations array of RowMutation object.
* @param array $rowMutations Associative array of rowKey as key and value as
* {@see Google\Cloud\Bigtable\Mutations}.
* @param array $options [optional] Configuration options.
* @return void
* @throws ApiException|BigtableDataOperationException if the remote call fails or operation fails
* @throws ApiException|BigtableDataOperationException If the remote call fails or operation fails
* @throws InvalidArgumentException If rowMutations is a list instead of associative array indexed by rowkey.
*/
public function mutateRows(array $rowMutations, array $options = [])
{
$entries = [];
foreach ($rowMutations as $rowMutation) {
$entries[] = $rowMutation->toProto();
}
$responseStream = $this->bigtableClient->mutateRows($this->tableName, $entries, $options + $this->options);
$rowMutationsFailedResponse = [];
$failureCode = Code::OK;
$message = 'partial failure';
foreach ($responseStream->readAll() as $mutateRowsResponse) {
$mutateRowsResponseEntries = $mutateRowsResponse->getEntries();
foreach ($mutateRowsResponseEntries as $mutateRowsResponseEntry) {
if ($mutateRowsResponseEntry->getStatus()->getCode() !== Code::OK) {
$failureCode = $mutateRowsResponseEntry->getStatus()->getCode();
$message = $mutateRowsResponseEntry->getStatus()->getMessage();
$rowMutationsFailedResponse[] = [
'rowKey' => $rowMutations[$mutateRowsResponseEntry->getIndex()]->getRowKey(),
'rowMutationIndex' => $mutateRowsResponseEntry->getIndex(),
'statusCode' => $failureCode,
'message' => $message
];
}
}
if (!$this->isAssoc($rowMutations)) {
throw new \InvalidArgumentException(
'Expected rowMutations to be of type associative array, instead got list.'
);
}
if (!empty($rowMutationsFailedResponse)) {
throw new BigtableDataOperationException($message, $failureCode, $rowMutationsFailedResponse);
$entries = [];
foreach ($rowMutations as $rowKey => $mutations) {
$entries[] = $this->toEntry($rowKey, $mutations);
}
$this->mutateRowsWithEntries($entries, $options);
}

/**
Expand All @@ -183,30 +170,30 @@ public function mutateRows(array $rowMutations, array $options = [])
* @param array[] $rows array of row.
* @param array $options [optional] Configuration options.
* @return void
* @throws ApiException|BigtableDataOperationException if the remote call fails or operation fails
* @throws ApiException|BigtableDataOperationException If the remote call fails or operation fails
*/
public function upsert(array $rows, array $options = [])
{
$rowMutations = [];
$entries = [];
foreach ($rows as $rowKey => $families) {
$rowMutation = new RowMutation($rowKey);
$mutations = new Mutations;
foreach ($families as $family => $qualifiers) {
foreach ($qualifiers as $qualifier => $value) {
if (isset($value['timeStamp'])) {
$rowMutation->upsert(
$mutations->upsert(
$family,
$qualifier,
$value['value'],
$value['timeStamp']
);
} else {
$rowMutation->upsert($family, $qualifier, $value['value']);
$mutations->upsert($family, $qualifier, $value['value']);
}
}
}
$rowMutations[] = $rowMutation;
$entries[] = $this->toEntry($rowKey, $mutations);
}
$this->mutateRows($rowMutations, $options);
$this->mutateRowsWithEntries($entries, $options);
}

/**
Expand Down Expand Up @@ -348,12 +335,12 @@ public function readRow($rowKey, array $options = [])
* // Increments value
* use Google\Cloud\Bigtable\DataUtil;
* use Google\Cloud\Bigtable\ReadModifyWriteRowRules;
* use Google\Cloud\Bigtable\RowMutation;
* use Google\Cloud\Bigtable\Mutations;
*
* $rowMutation = new RowMutation('rk1');
* $rowMutation->upsert('cf1', 'cq1', DataUtil::intToByteString(2));
* $mutations = (new Mutations)
* ->upsert('cf1', 'cq1', DataUtil::intToByteString(2));
*
* $dataClient->mutateRows([$rowMutation]);
* $dataClient->mutateRows(['rk1' => $mutations]);
*
* $rules = (new ReadModifyWriteRowRules)
* ->increment('cf1', 'cq1', 3);
Expand All @@ -366,7 +353,7 @@ public function readRow($rowKey, array $options = [])
* @param ReadModifyWriteRowRules $rules Rules to apply on row.
* @param array $options [optional] Configuration options.
* @return array Returns array containing all column family keyed by family name.
* @throws ApiException if the remote call fails or operation fails
* @throws ApiException If the remote call fails or operation fails
*/
public function readModifyWriteRow($rowKey, ReadModifyWriteRowRules $rules, array $options = [])
{
Expand All @@ -379,32 +366,6 @@ public function readModifyWriteRow($rowKey, ReadModifyWriteRowRules $rules, arra
return $this->convertToArray($readModifyWriteRowResponse->getRow());
}

private function convertToArray(Row $row)
{
if ($row === null) {
return [];
}
$families = [];
foreach ($row->getFamilies() as $family) {
$qualifiers = [];
foreach ($family->getColumns() as $column) {
$values = [];
foreach ($column->getCells() as $cell) {
$values[] = [
'value' => $cell->getValue(),
'timeStamp' => $cell->getTimestampMicros(),
'labels' => ($cell->getLabels()->getIterator()->valid())
? implode(iterator_to_array($cell->getLabels()->getIterator()))
: ''
];
}
$qualifiers[$column->getQualifier()] = $values;
}
$families[$family->getName()] = $qualifiers;
}
return $families;
}

/**
* Returns a sample of row keys in the table. The returned row keys will
* delimit contiguous sections of the table of approximately equal size,
Expand All @@ -421,7 +382,7 @@ private function convertToArray(Row $row)
*
* @param array $options [optional] Configuration options.
* @return \Generator<array> A list of associative arrays, each with the keys `rowKey` and `offset`.
* @throws ApiException if the remote call fails or operation fails
* @throws ApiException If the remote call fails or operation fails
*/
public function sampleRowKeys(array $options = [])
{
Expand Down Expand Up @@ -480,12 +441,12 @@ public function sampleRowKeys(array $options = [])
* either `trueMutations` or `falseMutations` must be provided.
* }
*
* @return bool Returns true if predicate filter yielded any output, false otherwise.
* @throws ApiException if the remote call fails or operation fails
* @throws \InvalidArgumentException if neither of $trueMutations or $falseMutations is set.
* if $predicateFilter is not instance of {@see Google\Cloud\Bigtable\Filter\FilterInterface}.
* if $trueMutations is set and is not instance of {@see Google\Cloud\Bigtable\Mutations}.
* if $falseMutations is set and is not instance of {@see Google\Cloud\Bigtable\Mutations}.
* @return bool Returns true If predicate filter yielded any output, false otherwise.

This comment was marked as spam.

This comment was marked as spam.

* @throws ApiException If the remote call fails or operation fails
* @throws \InvalidArgumentException If neither of $trueMutations or $falseMutations is set.
* If $predicateFilter is not instance of {@see Google\Cloud\Bigtable\Filter\FilterInterface}.
* If $trueMutations is set and is not instance of {@see Google\Cloud\Bigtable\Mutations}.
* If $falseMutations is set and is not instance of {@see Google\Cloud\Bigtable\Mutations}.
*/
public function checkAndMutateRow($rowKey, array $options = [])
{
Expand Down Expand Up @@ -515,6 +476,64 @@ public function checkAndMutateRow($rowKey, array $options = [])
->getPredicateMatched();
}

private function mutateRowsWithEntries(array $entries, array $options = [])
{
$responseStream = $this->bigtableClient->mutateRows($this->tableName, $entries, $options + $this->options);
$rowMutationsFailedResponse = [];
$failureCode = Code::OK;
$message = 'partial failure';
foreach ($responseStream->readAll() as $mutateRowsResponse) {
$mutateRowsResponseEntries = $mutateRowsResponse->getEntries();
foreach ($mutateRowsResponseEntries as $mutateRowsResponseEntry) {
if ($mutateRowsResponseEntry->getStatus()->getCode() !== Code::OK) {
$failureCode = $mutateRowsResponseEntry->getStatus()->getCode();
$message = $mutateRowsResponseEntry->getStatus()->getMessage();
$rowMutationsFailedResponse[] = [
'rowKey' => $entries[$mutateRowsResponseEntry->getIndex()]->getRowKey(),
'statusCode' => $failureCode,
'message' => $message
];
}
}
}
if (!empty($rowMutationsFailedResponse)) {
throw new BigtableDataOperationException($message, $failureCode, $rowMutationsFailedResponse);
}
}

private function toEntry($rowKey, Mutations $mutations)
{
return (new Entry)
->setRowKey($rowKey)
->setMutations($mutations->toProto());
}

private function convertToArray(Row $row)
{
if ($row === null) {
return [];
}
$families = [];
foreach ($row->getFamilies() as $family) {
$qualifiers = [];
foreach ($family->getColumns() as $column) {
$values = [];
foreach ($column->getCells() as $cell) {
$values[] = [
'value' => $cell->getValue(),
'timeStamp' => $cell->getTimestampMicros(),
'labels' => ($cell->getLabels()->getIterator()->valid())
? implode(iterator_to_array($cell->getLabels()->getIterator()))
: ''
];
}
$qualifiers[$column->getQualifier()] = $values;
}
$families[$family->getName()] = $qualifiers;
}
return $families;
}

private function convertToProto(array &$options, $key, $expectedType)
{
if (!$options[$key] instanceof $expectedType) {
Expand Down
122 changes: 0 additions & 122 deletions Bigtable/src/RowMutation.php

This file was deleted.

Loading