Skip to content

Commit

Permalink
Bigtable: refactor mutation model (#1367)
Browse files Browse the repository at this point in the history
* refactor mutation model

* fix doc comment

* address feedback

* use ternary operator

* fix comment
  • Loading branch information
ajaaym authored and dwsupplee committed Oct 25, 2018
1 parent 04c80e0 commit 9cff275
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 369 deletions.
167 changes: 93 additions & 74 deletions Bigtable/src/DataClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use Google\Cloud\Bigtable\ReadModifyWriteRowRules;
use Google\Cloud\Bigtable\Mutations;
use Google\Cloud\Bigtable\V2\BigtableClient as TableClient;
use Google\Cloud\Bigtable\V2\MutateRowsRequest\Entry;
use Google\Cloud\Bigtable\V2\Row;
use Google\Cloud\Bigtable\V2\RowRange;
use Google\Cloud\Bigtable\V2\RowSet;
Expand Down Expand Up @@ -129,46 +130,32 @@ public function __construct($instanceId, $tableId, array $config = [])
* Example:
* ```
* use Google\Cloud\Bigtable\DataClient;
* use Google\Cloud\Bigtable\RowMutation;
* use Google\Cloud\Bigtable\Mutations;
*
* $rowMutation = new RowMutation('r1');
* $rowMutation->upsert('cf1','cq1','value1',1534183334215000);
* $mutations = (new Mutations)
* ->upsert('cf1', 'cq1', 'value1', 1534183334215000);
*
* $dataClient->mutateRows([$rowMutation]);
* $dataClient->mutateRows(['r1' => $mutations]);
* ```
* @param array $rowMutations array of RowMutation object.
* @param array $rowMutations Associative array of rowKey as key and value as
* {@see Google\Cloud\Bigtable\Mutations}.
* @param array $options [optional] Configuration options.
* @return void
* @throws ApiException|BigtableDataOperationException if the remote call fails or operation fails
* @throws ApiException|BigtableDataOperationException If the remote call fails or operation fails
* @throws InvalidArgumentException If rowMutations is a list instead of associative array indexed by rowkey.
*/
public function mutateRows(array $rowMutations, array $options = [])
{
$entries = [];
foreach ($rowMutations as $rowMutation) {
$entries[] = $rowMutation->toProto();
}
$responseStream = $this->bigtableClient->mutateRows($this->tableName, $entries, $options + $this->options);
$rowMutationsFailedResponse = [];
$failureCode = Code::OK;
$message = 'partial failure';
foreach ($responseStream->readAll() as $mutateRowsResponse) {
$mutateRowsResponseEntries = $mutateRowsResponse->getEntries();
foreach ($mutateRowsResponseEntries as $mutateRowsResponseEntry) {
if ($mutateRowsResponseEntry->getStatus()->getCode() !== Code::OK) {
$failureCode = $mutateRowsResponseEntry->getStatus()->getCode();
$message = $mutateRowsResponseEntry->getStatus()->getMessage();
$rowMutationsFailedResponse[] = [
'rowKey' => $rowMutations[$mutateRowsResponseEntry->getIndex()]->getRowKey(),
'rowMutationIndex' => $mutateRowsResponseEntry->getIndex(),
'statusCode' => $failureCode,
'message' => $message
];
}
}
if (!$this->isAssoc($rowMutations)) {
throw new \InvalidArgumentException(
'Expected rowMutations to be of type associative array, instead got list.'
);
}
if (!empty($rowMutationsFailedResponse)) {
throw new BigtableDataOperationException($message, $failureCode, $rowMutationsFailedResponse);
$entries = [];
foreach ($rowMutations as $rowKey => $mutations) {
$entries[] = $this->toEntry($rowKey, $mutations);
}
$this->mutateRowsWithEntries($entries, $options);
}

/**
Expand All @@ -183,30 +170,30 @@ public function mutateRows(array $rowMutations, array $options = [])
* @param array[] $rows array of row.
* @param array $options [optional] Configuration options.
* @return void
* @throws ApiException|BigtableDataOperationException if the remote call fails or operation fails
* @throws ApiException|BigtableDataOperationException If the remote call fails or operation fails
*/
public function upsert(array $rows, array $options = [])
{
$rowMutations = [];
$entries = [];
foreach ($rows as $rowKey => $families) {
$rowMutation = new RowMutation($rowKey);
$mutations = new Mutations;
foreach ($families as $family => $qualifiers) {
foreach ($qualifiers as $qualifier => $value) {
if (isset($value['timeStamp'])) {
$rowMutation->upsert(
$mutations->upsert(
$family,
$qualifier,
$value['value'],
$value['timeStamp']
);
} else {
$rowMutation->upsert($family, $qualifier, $value['value']);
$mutations->upsert($family, $qualifier, $value['value']);
}
}
}
$rowMutations[] = $rowMutation;
$entries[] = $this->toEntry($rowKey, $mutations);
}
$this->mutateRows($rowMutations, $options);
$this->mutateRowsWithEntries($entries, $options);
}

/**
Expand Down Expand Up @@ -348,12 +335,12 @@ public function readRow($rowKey, array $options = [])
* // Increments value
* use Google\Cloud\Bigtable\DataUtil;
* use Google\Cloud\Bigtable\ReadModifyWriteRowRules;
* use Google\Cloud\Bigtable\RowMutation;
* use Google\Cloud\Bigtable\Mutations;
*
* $rowMutation = new RowMutation('rk1');
* $rowMutation->upsert('cf1', 'cq1', DataUtil::intToByteString(2));
* $mutations = (new Mutations)
* ->upsert('cf1', 'cq1', DataUtil::intToByteString(2));
*
* $dataClient->mutateRows([$rowMutation]);
* $dataClient->mutateRows(['rk1' => $mutations]);
*
* $rules = (new ReadModifyWriteRowRules)
* ->increment('cf1', 'cq1', 3);
Expand All @@ -366,7 +353,7 @@ public function readRow($rowKey, array $options = [])
* @param ReadModifyWriteRowRules $rules Rules to apply on row.
* @param array $options [optional] Configuration options.
* @return array Returns array containing all column family keyed by family name.
* @throws ApiException if the remote call fails or operation fails
* @throws ApiException If the remote call fails or operation fails
*/
public function readModifyWriteRow($rowKey, ReadModifyWriteRowRules $rules, array $options = [])
{
Expand All @@ -379,32 +366,6 @@ public function readModifyWriteRow($rowKey, ReadModifyWriteRowRules $rules, arra
return $this->convertToArray($readModifyWriteRowResponse->getRow());
}

private function convertToArray(Row $row)
{
if ($row === null) {
return [];
}
$families = [];
foreach ($row->getFamilies() as $family) {
$qualifiers = [];
foreach ($family->getColumns() as $column) {
$values = [];
foreach ($column->getCells() as $cell) {
$values[] = [
'value' => $cell->getValue(),
'timeStamp' => $cell->getTimestampMicros(),
'labels' => ($cell->getLabels()->getIterator()->valid())
? implode(iterator_to_array($cell->getLabels()->getIterator()))
: ''
];
}
$qualifiers[$column->getQualifier()] = $values;
}
$families[$family->getName()] = $qualifiers;
}
return $families;
}

/**
* Returns a sample of row keys in the table. The returned row keys will
* delimit contiguous sections of the table of approximately equal size,
Expand All @@ -421,7 +382,7 @@ private function convertToArray(Row $row)
*
* @param array $options [optional] Configuration options.
* @return \Generator<array> A list of associative arrays, each with the keys `rowKey` and `offset`.
* @throws ApiException if the remote call fails or operation fails
* @throws ApiException If the remote call fails or operation fails
*/
public function sampleRowKeys(array $options = [])
{
Expand Down Expand Up @@ -481,11 +442,11 @@ public function sampleRowKeys(array $options = [])
* }
*
* @return bool Returns true if predicate filter yielded any output, false otherwise.
* @throws ApiException if the remote call fails or operation fails
* @throws \InvalidArgumentException if neither of $trueMutations or $falseMutations is set.
* if $predicateFilter is not instance of {@see Google\Cloud\Bigtable\Filter\FilterInterface}.
* if $trueMutations is set and is not instance of {@see Google\Cloud\Bigtable\Mutations}.
* if $falseMutations is set and is not instance of {@see Google\Cloud\Bigtable\Mutations}.
* @throws ApiException If the remote call fails or operation fails
* @throws \InvalidArgumentException If neither of $trueMutations or $falseMutations is set.
* If $predicateFilter is not instance of {@see Google\Cloud\Bigtable\Filter\FilterInterface}.
* If $trueMutations is set and is not instance of {@see Google\Cloud\Bigtable\Mutations}.
* If $falseMutations is set and is not instance of {@see Google\Cloud\Bigtable\Mutations}.
*/
public function checkAndMutateRow($rowKey, array $options = [])
{
Expand Down Expand Up @@ -515,6 +476,64 @@ public function checkAndMutateRow($rowKey, array $options = [])
->getPredicateMatched();
}

private function mutateRowsWithEntries(array $entries, array $options = [])
{
$responseStream = $this->bigtableClient->mutateRows($this->tableName, $entries, $options + $this->options);
$rowMutationsFailedResponse = [];
$failureCode = Code::OK;
$message = 'partial failure';
foreach ($responseStream->readAll() as $mutateRowsResponse) {
$mutateRowsResponseEntries = $mutateRowsResponse->getEntries();
foreach ($mutateRowsResponseEntries as $mutateRowsResponseEntry) {
if ($mutateRowsResponseEntry->getStatus()->getCode() !== Code::OK) {
$failureCode = $mutateRowsResponseEntry->getStatus()->getCode();
$message = $mutateRowsResponseEntry->getStatus()->getMessage();
$rowMutationsFailedResponse[] = [
'rowKey' => $entries[$mutateRowsResponseEntry->getIndex()]->getRowKey(),
'statusCode' => $failureCode,
'message' => $message
];
}
}
}
if (!empty($rowMutationsFailedResponse)) {
throw new BigtableDataOperationException($message, $failureCode, $rowMutationsFailedResponse);
}
}

private function toEntry($rowKey, Mutations $mutations)
{
return (new Entry)
->setRowKey($rowKey)
->setMutations($mutations->toProto());
}

private function convertToArray(Row $row)
{
if ($row === null) {
return [];
}
$families = [];
foreach ($row->getFamilies() as $family) {
$qualifiers = [];
foreach ($family->getColumns() as $column) {
$values = [];
foreach ($column->getCells() as $cell) {
$values[] = [
'value' => $cell->getValue(),
'timeStamp' => $cell->getTimestampMicros(),
'labels' => ($cell->getLabels()->getIterator()->valid())
? implode(iterator_to_array($cell->getLabels()->getIterator()))
: ''
];
}
$qualifiers[$column->getQualifier()] = $values;
}
$families[$family->getName()] = $qualifiers;
}
return $families;
}

private function convertToProto(array &$options, $key, $expectedType)
{
if (!$options[$key] instanceof $expectedType) {
Expand Down
122 changes: 0 additions & 122 deletions Bigtable/src/RowMutation.php

This file was deleted.

Loading

0 comments on commit 9cff275

Please sign in to comment.