Skip to content

Commit

Permalink
[BC Break] Implement waitUntilComplete and block while waiting for qu…
Browse files Browse the repository at this point in the history
…ery results (#642)

* implement waitUntilComplete on job and block waiting for queryResults

* fix getIterator docblock
  • Loading branch information
dwsupplee authored Aug 21, 2017
1 parent 411f3d2 commit a105bfa
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 206 deletions.
94 changes: 27 additions & 67 deletions src/BigQuery/BigQueryClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public function __construct(array $config = [])
* ```
* $queryResults = $bigQuery->runQuery('SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100');
*
* foreach ($queryResults->rows() as $row) {
* foreach ($queryResults as $row) {
* echo $row['commit'];
* }
* ```
Expand All @@ -156,7 +156,7 @@ public function __construct(array $config = [])
* ]
* ]);
*
* foreach ($queryResults->rows() as $row) {
* foreach ($queryResults as $row) {
* echo $row['commit'];
* }
* ```
Expand All @@ -168,7 +168,7 @@ public function __construct(array $config = [])
* 'parameters' => ['A commit message.']
* ]);
*
* foreach ($queryResults->rows() as $row) {
* foreach ($queryResults as $row) {
* echo $row['commit'];
* }
* ```
Expand All @@ -183,78 +183,46 @@ public function __construct(array $config = [])
* of results. Setting this flag to a small value such as 1000 and
* then paging through results might improve reliability when the
* query result set is large.
* @type array $defaultDataset Specifies the default datasetId and
* projectId to assume for any unqualified table names in the
* query. If not set, all table names in the query string must be
* qualified in the format 'datasetId.tableId'.
* @type int $startIndex Zero-based index of the starting row.
* @type int $timeoutMs How long to wait for the query to complete, in
* milliseconds. **Defaults to** `10000` milliseconds (10 seconds).
* @type int $maxRetries The number of times to retry, checking if the
* query has completed. **Defaults to** `100`.
* @type bool $useQueryCache Whether to look for the result in the query
* cache.
* @type bool $useLegacySql If set to true the query will use
* [BigQuery's legacy SQL](https://cloud.google.com/bigquery/docs/reference/legacy-sql),
* otherwise [BigQuery's standard SQL](https://cloud.google.com/bigquery/sql-reference).
* **Defaults to** `false`.
* @type array $parameters Only available for standard SQL queries.
* When providing a non-associative array positional parameters
* (`?`) will be used. When providing an associative array
* named parameters will be used (`@name`).
* @type array $jobConfig Configuration settings for a query job are
* outlined in the [API Docs for `configuration.query`](https://goo.gl/PuRa3I).
* If not provided default settings will be used, with the exception
* of `configuration.query.useLegacySql`, which defaults to `false`
* in this client.
* }
* @return QueryResults
* @throws \RuntimeException if the maximum number of retries while waiting
* @throws \RuntimeException If the maximum number of retries while waiting
* for query completion has been exceeded.
*/
public function runQuery($query, array $options = [])
{
$options += [
'maxRetries' => 100,
'useLegacySql' => false
];

if (isset($options['parameters'])) {
$options += $this->formatQueryParameters($options['parameters']);
unset($options['parameters']);
}

$queryOptions = $options;
unset($queryOptions['timeoutMs'], $queryOptions['maxRetries']);

$response = $this->connection->query([
'projectId' => $this->projectId,
'query' => $query
] + $queryOptions);

$results = new QueryResults(
$this->connection,
$response['jobReference']['jobId'],
$this->projectId,
$response,
$options,
$this->mapper
);

if (!$results->isComplete()) {
$retryFn = function (QueryResults $results, array $options) {
$results->reload($options);

if (!$results->isComplete()) {
throw new \RuntimeException('Job did not complete within the allowed number of retries.');
}
};

$retry = new ExponentialBackoff($options['maxRetries']);
$retry->execute($retryFn, [$results, $options]);
}

return $results;
$jobOptions = $this->pluckArray([
'parameters',
'jobConfig'
], $options);
$queryResultsOptions = $this->pluckArray([
'maxResults',
'startIndex',
'timeoutMs',
'maxRetries'
], $options);

return $this->runQueryAsJob(
$query,
$jobOptions + $options
)->queryResults($queryResultsOptions + $options);
}

/**
* Runs a BigQuery SQL query in an asynchronous fashion. Running a query
* in this fashion requires you to poll for the status before being able
* to access results.
* Runs a BigQuery SQL query in an asynchronous fashion.
*
* Queries constructed using
* [standard SQL](https://cloud.google.com/bigquery/docs/reference/standard-sql/)
Expand All @@ -264,17 +232,9 @@ public function runQuery($query, array $options = [])
* Example:
* ```
* $job = $bigQuery->runQueryAsJob('SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100');
*
* $isComplete = false;
* $queryResults = $job->queryResults();
*
* while (!$isComplete) {
* sleep(1); // let's wait for a moment...
* $queryResults->reload(); // trigger a network request
* $isComplete = $queryResults->isComplete(); // check the query's status
* }
*
* foreach ($queryResults->rows() as $row) {
* foreach ($queryResults as $row) {
* echo $row['commit'];
* }
* ```
Expand Down
72 changes: 66 additions & 6 deletions src/BigQuery/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
namespace Google\Cloud\BigQuery;

use Google\Cloud\BigQuery\Connection\ConnectionInterface;
use Google\Cloud\Core\ArrayTrait;
use Google\Cloud\Core\Exception\NotFoundException;
use Google\Cloud\Core\ExponentialBackoff;

/**
* [Jobs](https://cloud.google.com/bigquery/docs/reference/v2/jobs) are objects
Expand All @@ -27,6 +29,10 @@
*/
class Job
{
use ArrayTrait;

const MAX_RETRIES = 100;

/**
* @var ConnectionInterface Represents a connection to BigQuery.
*/
Expand Down Expand Up @@ -121,7 +127,8 @@ public function cancel(array $options = [])
}

/**
* Retrieves the results of a query job.
* Retrieves the results of a query job, blocking until results are
* available.
*
* Example:
* ```
Expand All @@ -138,21 +145,50 @@ public function cancel(array $options = [])
* @type int $startIndex Zero-based index of the starting row.
* @type int $timeoutMs How long to wait for the query to complete, in
* milliseconds. **Defaults to** `10000` milliseconds (10 seconds).
* @type int $maxRetries The number of times to retry, checking if the
* query has completed. **Defaults to** `100`.
* }
* @return QueryResults
* @throws \RuntimeException If the maximum number of retries while waiting
* for query completion has been exceeded.
*/
public function queryResults(array $options = [])
{
$response = $this->connection->getQueryResults($options + $this->identity);

return new QueryResults(
$maxRetries = $this->pluck('maxRetries', $options, false);
$results = new QueryResults(
$this->connection,
$this->identity['jobId'],
$this->identity['projectId'],
$response,
$options,
$this->connection->getQueryResults($options + $this->identity),
$this->mapper
);
$this->wait($results, $options + [
'maxRetries' => $maxRetries
]);

return $results;
}

/**
* Blocks until the job is complete.
*
* Example:
* ```
* $job->waitUntilComplete();
* ```
*
* @param array $options [optional] {
* Configuration options.
*
* @type int $maxRetries The number of times to retry, checking if the
* query has completed. **Defaults to** `100`.
* }
* @throws \RuntimeException If the maximum number of retries while waiting
* for query completion has been exceeded.
*/
public function waitUntilComplete(array $options = [])
{
$this->wait($this, $options);
}

/**
Expand All @@ -171,6 +207,7 @@ public function queryResults(array $options = [])
*
* echo 'Query complete!';
* ```
*
* @param array $options [optional] Configuration options.
* @return bool
*/
Expand Down Expand Up @@ -255,4 +292,27 @@ public function identity()
{
return $this->identity;
}

/**
* Waits for an operation to complete.
*
* @param mixed $context
* @param array $options
*/
private function wait($context, array $options)
{
if (!$context->isComplete()) {
$maxRetries = $this->pluck('maxRetries', $options, false) ?: self::MAX_RETRIES;
$retryFn = function () use ($context, $options) {
$context->reload($options);

if (!$context->isComplete()) {
throw new \RuntimeException('Job did not complete within the allowed number of retries.');
}
};

$retry = new ExponentialBackoff($maxRetries);
$retry->execute($retryFn);
}
}
}
Loading

0 comments on commit a105bfa

Please sign in to comment.