Skip to content

Commit

Permalink
Run query and wait for it to complete (googleapis#604)
Browse files Browse the repository at this point in the history
* Run query and wait for it to complete

* Document exception on BigQueryClient::runQuery()

* Update snippet test
  • Loading branch information
jdpedrie authored and dwsupplee committed Oct 16, 2017
1 parent 47e5839 commit 19240c1
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 32 deletions.
60 changes: 32 additions & 28 deletions src/BigQuery/BigQueryClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

use Google\Cloud\BigQuery\Connection\ConnectionInterface;
use Google\Cloud\BigQuery\Connection\Rest;
use Google\Cloud\BigQuery\Job;
use Google\Cloud\Core\ArrayTrait;
use Google\Cloud\Core\Iterator\ItemIterator;
use Google\Cloud\Core\Iterator\PageIterator;
use Google\Cloud\Core\ClientTrait;
use Google\Cloud\Core\ExponentialBackoff;
use Google\Cloud\Core\Int64;
use Google\Cloud\Core\Iterator\ItemIterator;
use Google\Cloud\Core\Iterator\PageIterator;
use Psr\Cache\CacheItemPoolInterface;
use Psr\Http\Message\StreamInterface;

Expand Down Expand Up @@ -138,14 +140,6 @@ public function __construct(array $config = [])
* ```
* $queryResults = $bigQuery->runQuery('SELECT commit FROM [bigquery-public-data:github_repos.commits] LIMIT 100');
*
* $isComplete = $queryResults->isComplete();
*
* while (!$isComplete) {
* sleep(1); // let's wait for a moment...
* $queryResults->reload(); // trigger a network request
* $isComplete = $queryResults->isComplete(); // check the query's status
* }
*
* foreach ($queryResults->rows() as $row) {
* echo $row['commit'];
* }
Expand All @@ -162,14 +156,6 @@ public function __construct(array $config = [])
* ]
* ]);
*
* $isComplete = $queryResults->isComplete();
*
* while (!$isComplete) {
* sleep(1); // let's wait for a moment...
* $queryResults->reload(); // trigger a network request
* $isComplete = $queryResults->isComplete(); // check the query's status
* }
*
* foreach ($queryResults->rows() as $row) {
* echo $row['commit'];
* }
Expand All @@ -182,14 +168,6 @@ public function __construct(array $config = [])
* 'parameters' => ['A commit message.']
* ]);
*
* $isComplete = $queryResults->isComplete();
*
* while (!$isComplete) {
* sleep(1); // let's wait for a moment...
* $queryResults->reload(); // trigger a network request
* $isComplete = $queryResults->isComplete(); // check the query's status
* }
*
* foreach ($queryResults->rows() as $row) {
* echo $row['commit'];
* }
Expand All @@ -211,6 +189,8 @@ public function __construct(array $config = [])
* qualified in the format 'datasetId.tableId'.
* @type int $timeoutMs How long to wait for the query to complete, in
* milliseconds. **Defaults to** `10000` milliseconds (10 seconds).
* @type int $maxRetries The number of times to retry, checking if the
* query has completed. **Defaults to** `100`.
* @type bool $useQueryCache Whether to look for the result in the query
* cache.
* @type bool $useLegacySql Specifies whether to use BigQuery's legacy
Expand All @@ -223,27 +203,51 @@ public function __construct(array $config = [])
* named parameters will be used (`@name`).
* }
* @return QueryResults
* @throws \RuntimeException if the maximum number of retries while waiting
* for query completion has been exceeded.
*/
public function runQuery($query, array $options = [])
{
$options += [
'maxRetries' => 100
];

if (isset($options['parameters'])) {
$options += $this->formatQueryParameters($options['parameters']);
unset($options['parameters']);
}

$queryOptions = $options;
unset($queryOptions['timeoutMs'], $queryOptions['maxRetries']);

$response = $this->connection->query([
'projectId' => $this->projectId,
'query' => $query
] + $options);
] + $queryOptions);

return new QueryResults(
$results = new QueryResults(
$this->connection,
$response['jobReference']['jobId'],
$this->projectId,
$response,
$options,
$this->mapper
);

if (!$results->isComplete()) {
$retryFn = function (QueryResults $results, array $options) {
$results->reload($options);

if (!$results->isComplete()) {
throw new \RuntimeException('Job did not complete within the allowed number of retries.');
}
};

$retry = new ExponentialBackoff($options['maxRetries']);
$retry->execute($retryFn, [$results, $options]);
}

return $results;
}

/**
Expand Down
3 changes: 0 additions & 3 deletions tests/snippets/BigQuery/BigQueryClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public function testRunQuery()
{
$snippet = $this->snippetFromMethod(BigQueryClient::class, 'runQuery');
$snippet->addLocal('bigQuery', $this->client);
$snippet->replace('sleep(1);', '');
$this->connection->query(Argument::any())
->shouldBeCalled()
->willReturn([
Expand All @@ -102,7 +101,6 @@ public function testRunQueryWithNamedParameters()
{
$snippet = $this->snippetFromMethod(BigQueryClient::class, 'runQuery', 1);
$snippet->addLocal('bigQuery', $this->client);
$snippet->replace('sleep(1);', '');
$this->connection
->query(Argument::withEntry('queryParameters', [
[
Expand Down Expand Up @@ -145,7 +143,6 @@ public function testRunQueryWithPositionalParameters()
{
$snippet = $this->snippetFromMethod(BigQueryClient::class, 'runQuery', 2);
$snippet->addLocal('bigQuery', $this->client);
$snippet->replace('sleep(1);', '');
$this->connection
->query(Argument::withEntry('queryParameters', [
[
Expand Down
34 changes: 33 additions & 1 deletion tests/unit/BigQuery/BigQueryClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
use Google\Cloud\BigQuery\QueryResults;
use Google\Cloud\BigQuery\Time;
use Google\Cloud\BigQuery\Timestamp;
use Google\Cloud\Core\Iterator\ItemIterator;
use Prophecy\Argument;

/**
Expand Down Expand Up @@ -54,9 +55,40 @@ public function testRunsQuery($query, $options, $expected)
->willReturn([
'jobReference' => [
'jobId' => $this->jobId
]
],
'jobComplete' => true
])
->shouldBeCalledTimes(1);
$this->client->setConnection($this->connection->reveal());
$queryResults = $this->client->runQuery($query, $options);

$this->assertInstanceOf(QueryResults::class, $queryResults);
$this->assertEquals($this->jobId, $queryResults->identity()['jobId']);
}

/**
* @dataProvider queryDataProvider
*/
public function testRunsQueryWithRetry($query, $options, $expected)
{
$this->connection->query($expected)
->willReturn([
'jobReference' => [
'jobId' => $this->jobId
],
'jobComplete' => false
])
->shouldBeCalledTimes(1);

$this->connection->getQueryResults(Argument::any())
->willReturn([
'jobReference' => [
'jobId' => $this->jobId
],
'jobComplete' => true
])
->shouldBeCalledTimes(1);

$this->client->setConnection($this->connection->reveal());
$queryResults = $this->client->runQuery($query, $options);

Expand Down

0 comments on commit 19240c1

Please sign in to comment.