Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BC Break] Implement waitUntilComplete and block while waiting for query results #642

Merged
merged 2 commits into from
Aug 21, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 27 additions & 67 deletions src/BigQuery/BigQueryClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public function __construct(array $config = [])
* ```
* $queryResults = $bigQuery->runQuery('SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100');
*
* foreach ($queryResults->rows() as $row) {
* foreach ($queryResults as $row) {
* echo $row['commit'];
* }
* ```
Expand All @@ -156,7 +156,7 @@ public function __construct(array $config = [])
* ]
* ]);
*
* foreach ($queryResults->rows() as $row) {
* foreach ($queryResults as $row) {
* echo $row['commit'];
* }
* ```
Expand All @@ -168,7 +168,7 @@ public function __construct(array $config = [])
* 'parameters' => ['A commit message.']
* ]);
*
* foreach ($queryResults->rows() as $row) {
* foreach ($queryResults as $row) {
* echo $row['commit'];
* }
* ```
Expand All @@ -183,78 +183,46 @@ public function __construct(array $config = [])
* of results. Setting this flag to a small value such as 1000 and
* then paging through results might improve reliability when the
* query result set is large.
* @type array $defaultDataset Specifies the default datasetId and
* projectId to assume for any unqualified table names in the
* query. If not set, all table names in the query string must be
* qualified in the format 'datasetId.tableId'.
* @type int $startIndex Zero-based index of the starting row.
* @type int $timeoutMs How long to wait for the query to complete, in
* milliseconds. **Defaults to** `10000` milliseconds (10 seconds).
* @type int $maxRetries The number of times to retry, checking if the
* query has completed. **Defaults to** `100`.
* @type bool $useQueryCache Whether to look for the result in the query
* cache.
* @type bool $useLegacySql If set to true the query will use

This comment was marked as spam.

This comment was marked as spam.

* [BigQuery's legacy SQL](https://cloud.google.com/bigquery/docs/reference/legacy-sql),
* otherwise [BigQuery's standard SQL](https://cloud.google.com/bigquery/sql-reference).
* **Defaults to** `false`.
* @type array $parameters Only available for standard SQL queries.
* When providing a non-associative array positional parameters
* (`?`) will be used. When providing an associative array
* named parameters will be used (`@name`).
* @type array $jobConfig Configuration settings for a query job are
* outlined in the [API Docs for `configuration.query`](https://goo.gl/PuRa3I).
* If not provided default settings will be used, with the exception
* of `configuration.query.useLegacySql`, which defaults to `false`
* in this client.
* }
* @return QueryResults
* @throws \RuntimeException if the maximum number of retries while waiting
* @throws \RuntimeException If the maximum number of retries while waiting
* for query completion has been exceeded.
*/
public function runQuery($query, array $options = [])
{
$options += [
'maxRetries' => 100,
'useLegacySql' => false
];

if (isset($options['parameters'])) {
$options += $this->formatQueryParameters($options['parameters']);
unset($options['parameters']);
}

$queryOptions = $options;
unset($queryOptions['timeoutMs'], $queryOptions['maxRetries']);

$response = $this->connection->query([
'projectId' => $this->projectId,
'query' => $query
] + $queryOptions);

$results = new QueryResults(
$this->connection,
$response['jobReference']['jobId'],
$this->projectId,
$response,
$options,
$this->mapper
);

if (!$results->isComplete()) {
$retryFn = function (QueryResults $results, array $options) {
$results->reload($options);

if (!$results->isComplete()) {
throw new \RuntimeException('Job did not complete within the allowed number of retries.');
}
};

$retry = new ExponentialBackoff($options['maxRetries']);
$retry->execute($retryFn, [$results, $options]);
}

return $results;
$jobOptions = $this->pluckArray([
'parameters',
'jobConfig'
], $options);
$queryResultsOptions = $this->pluckArray([
'maxResults',
'startIndex',
'timeoutMs',
'maxRetries'
], $options);

return $this->runQueryAsJob(
$query,
$jobOptions + $options
)->queryResults($queryResultsOptions + $options);
}

/**
* Runs a BigQuery SQL query in an asynchronous fashion. Running a query
* in this fashion requires you to poll for the status before being able
* to access results.
* Runs a BigQuery SQL query in an asynchronous fashion.
*
* Queries constructed using
* [standard SQL](https://cloud.google.com/bigquery/docs/reference/standard-sql/)
Expand All @@ -264,17 +232,9 @@ public function runQuery($query, array $options = [])
* Example:
* ```
* $job = $bigQuery->runQueryAsJob('SELECT commit FROM `bigquery-public-data.github_repos.commits` LIMIT 100');
*
* $isComplete = false;
* $queryResults = $job->queryResults();
*
* while (!$isComplete) {
* sleep(1); // let's wait for a moment...
* $queryResults->reload(); // trigger a network request
* $isComplete = $queryResults->isComplete(); // check the query's status
* }
*
* foreach ($queryResults->rows() as $row) {
* foreach ($queryResults as $row) {
* echo $row['commit'];
* }
* ```
Expand Down
72 changes: 66 additions & 6 deletions src/BigQuery/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
namespace Google\Cloud\BigQuery;

use Google\Cloud\BigQuery\Connection\ConnectionInterface;
use Google\Cloud\Core\ArrayTrait;
use Google\Cloud\Core\Exception\NotFoundException;
use Google\Cloud\Core\ExponentialBackoff;

/**
* [Jobs](https://cloud.google.com/bigquery/docs/reference/v2/jobs) are objects
Expand All @@ -27,6 +29,10 @@
*/
class Job
{
use ArrayTrait;

const MAX_RETRIES = 100;

/**
* @var ConnectionInterface Represents a connection to BigQuery.
*/
Expand Down Expand Up @@ -121,7 +127,8 @@ public function cancel(array $options = [])
}

/**
* Retrieves the results of a query job.
* Retrieves the results of a query job, blocking until results are
* available.
*
* Example:
* ```
Expand All @@ -138,21 +145,50 @@ public function cancel(array $options = [])
* @type int $startIndex Zero-based index of the starting row.
* @type int $timeoutMs How long to wait for the query to complete, in
* milliseconds. **Defaults to** `10000` milliseconds (10 seconds).
* @type int $maxRetries The number of times to retry, checking if the
* query has completed. **Defaults to** `100`.
* }
* @return QueryResults
* @throws \RuntimeException If the maximum number of retries while waiting
* for query completion has been exceeded.
*/
public function queryResults(array $options = [])
{
$response = $this->connection->getQueryResults($options + $this->identity);

return new QueryResults(
$maxRetries = $this->pluck('maxRetries', $options, false);
$results = new QueryResults(
$this->connection,
$this->identity['jobId'],
$this->identity['projectId'],
$response,
$options,
$this->connection->getQueryResults($options + $this->identity),
$this->mapper
);
$this->wait($results, $options + [
'maxRetries' => $maxRetries
]);

return $results;
}

/**
* Blocks until the job is complete.
*
* Example:
* ```
* $job->waitUntilComplete();
* ```
*
* @param array $options [optional] {
* Configuration options.
*
* @type int $maxRetries The number of times to retry, checking if the
* query has completed. **Defaults to** `100`.
* }
* @throws \RuntimeException If the maximum number of retries while waiting
* for query completion has been exceeded.
*/
public function waitUntilComplete(array $options = [])
{
$this->wait($this, $options);
}

/**
Expand All @@ -171,6 +207,7 @@ public function queryResults(array $options = [])
*
* echo 'Query complete!';
* ```
*
* @param array $options [optional] Configuration options.
* @return bool
*/
Expand Down Expand Up @@ -255,4 +292,27 @@ public function identity()
{
return $this->identity;
}

/**
* Waits for an operation to complete.
*
* @param mixed $context
* @param array $options
*/
private function wait($context, array $options)
{
if (!$context->isComplete()) {
$maxRetries = $this->pluck('maxRetries', $options, false) ?: self::MAX_RETRIES;
$retryFn = function () use ($context, $options) {
$context->reload($options);

if (!$context->isComplete()) {
throw new \RuntimeException('Job did not complete within the allowed number of retries.');
}
};

$retry = new ExponentialBackoff($maxRetries);
$retry->execute($retryFn);
}
}
}
Loading