Skip to content

Commit

Permalink
fix(Spanner): retry logic of ExecuteStreamingSql (#6149)
Browse files Browse the repository at this point in the history
  • Loading branch information
vishwarajanand authored May 3, 2023
1 parent a3af7ed commit fbd16a8
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
5 changes: 4 additions & 1 deletion Spanner/src/Result.php
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public function rows($format = self::RETURN_ASSOCIATIVE)
$call = $this->call;
$generator = null;
$shouldRetry = false;
$isResultsYielded = false;
$backoff = new ExponentialBackoff($this->retries, function ($ex) {
if ($ex instanceof ServiceException) {
return $ex->getCode() === Grpc\STATUS_UNAVAILABLE;
Expand Down Expand Up @@ -209,13 +210,13 @@ public function rows($format = self::RETURN_ASSOCIATIVE)
list($yieldableRows, $chunkedResult) = $this->parseRowsFromBufferedResults($bufferedResults);

foreach ($yieldableRows as $row) {
$isResultsYielded = true;
yield $this->mapper->decodeValues($this->columns, $row, $format);
}
}

// Now that we've yielded all available rows, flush the buffer.
$bufferedResults = [];
$shouldRetry = $hasResumeToken;

// If the last item in the buffer had a chunked value let's
// hold on to it so we can stitch it together into a yieldable
Expand All @@ -225,6 +226,8 @@ public function rows($format = self::RETURN_ASSOCIATIVE)
}
}

// retry without resume token when results have not yielded
$shouldRetry = !$isResultsYielded || $hasResumeToken;
$generator->next();
$valid = $generator->valid();
} catch (ServiceException $ex) {
Expand Down
71 changes: 71 additions & 0 deletions Spanner/tests/Unit/ResultTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,77 @@ function () use ($chunks, &$timesCalled) {
$this->assertEquals(2, $timesCalled);
}

public function testRowsRetriesWithoutResumeTokenWhenNotYieldedRows()
{
$timesCalled = 0;
$chunks = [
[
'metadata' => $this->metadata,
'values' => ['a']
],
[
'values' => ['b']
],
[
'values' => ['c']
]
];

$result = $this->getResultClass(
null,
'r',
null,
function () use ($chunks, &$timesCalled) {
$timesCalled++;
foreach ($chunks as $key => $chunk) {
if ($key === 1 && $timesCalled < 2) {
throw new ServiceException('Unavailable', 14);
}
yield $chunk;
}
}
);

iterator_to_array($result->rows());
$this->assertEquals(2, $timesCalled);
}

public function testRowsRetriesWithResumeTokenWhenNotYieldedRows()
{
$timesCalled = 0;
$chunks = [
[
'metadata' => $this->metadata,
'values' => ['a'],
'resumeToken' => 'abc'
],
[
'values' => ['b']
],
[
'values' => ['c']
]
];

$result = $this->getResultClass(
null,
'r',
null,
function () use ($chunks, &$timesCalled) {
$timesCalled++;
foreach ($chunks as $key => $chunk) {
if ($key === 1 && $timesCalled < 2) {
throw new ServiceException('Unavailable', 14);
}
yield $chunk;
}
}
);

iterator_to_array($result->rows());
$this->assertEquals(2, $timesCalled);
}

public function testThrowsExceptionWhenCannotRetry()
{
$this->expectException(ServiceException::class);
Expand Down

0 comments on commit fbd16a8

Please sign in to comment.