From fbd16a805367c70987d88529c7e7a4bb386a95c8 Mon Sep 17 00:00:00 2001 From: Vishwaraj Anand Date: Wed, 3 May 2023 22:46:35 +0530 Subject: [PATCH] fix(Spanner): retry logic of ExecuteStreamingSql (#6149) --- Spanner/src/Result.php | 5 ++- Spanner/tests/Unit/ResultTest.php | 71 +++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/Spanner/src/Result.php b/Spanner/src/Result.php index 10694076320c..0a731d77332b 100644 --- a/Spanner/src/Result.php +++ b/Spanner/src/Result.php @@ -178,6 +178,7 @@ public function rows($format = self::RETURN_ASSOCIATIVE) $call = $this->call; $generator = null; $shouldRetry = false; + $isResultsYielded = false; $backoff = new ExponentialBackoff($this->retries, function ($ex) { if ($ex instanceof ServiceException) { return $ex->getCode() === Grpc\STATUS_UNAVAILABLE; @@ -209,13 +210,13 @@ public function rows($format = self::RETURN_ASSOCIATIVE) list($yieldableRows, $chunkedResult) = $this->parseRowsFromBufferedResults($bufferedResults); foreach ($yieldableRows as $row) { + $isResultsYielded = true; yield $this->mapper->decodeValues($this->columns, $row, $format); } } // Now that we've yielded all available rows, flush the buffer. $bufferedResults = []; - $shouldRetry = $hasResumeToken; // If the last item in the buffer had a chunked value let's // hold on to it so we can stitch it together into a yieldable @@ -225,6 +226,8 @@ public function rows($format = self::RETURN_ASSOCIATIVE) } } + // retry without resume token when results have not yielded + $shouldRetry = !$isResultsYielded || $hasResumeToken; $generator->next(); $valid = $generator->valid(); } catch (ServiceException $ex) { diff --git a/Spanner/tests/Unit/ResultTest.php b/Spanner/tests/Unit/ResultTest.php index e886a88b5315..00e1a54e9704 100644 --- a/Spanner/tests/Unit/ResultTest.php +++ b/Spanner/tests/Unit/ResultTest.php @@ -159,6 +159,77 @@ function () use ($chunks, &$timesCalled) { $this->assertEquals(2, $timesCalled); } + public function testRowsRetriesWithoutResumeTokenWhenNotYieldedRows() + { + $timesCalled = 0; + $chunks = [ + [ + 'metadata' => $this->metadata, + 'values' => ['a'] + ], + [ + 'values' => ['b'] + ], + [ + 'values' => ['c'] + ] + ]; + + $result = $this->getResultClass( + null, + 'r', + null, + function () use ($chunks, &$timesCalled) { + $timesCalled++; + foreach ($chunks as $key => $chunk) { + if ($key === 1 && $timesCalled < 2) { + throw new ServiceException('Unavailable', 14); + } + yield $chunk; + } + } + ); + + iterator_to_array($result->rows()); + $this->assertEquals(2, $timesCalled); + } + + public function testRowsRetriesWithResumeTokenWhenNotYieldedRows() + { + $timesCalled = 0; + $chunks = [ + [ + 'metadata' => $this->metadata, + 'values' => ['a'], + 'resumeToken' => 'abc' + ], + [ + 'values' => ['b'] + ], + [ + 'values' => ['c'] + ] + ]; + + $result = $this->getResultClass( + null, + 'r', + null, + function () use ($chunks, &$timesCalled) { + $timesCalled++; + foreach ($chunks as $key => $chunk) { + if ($key === 1 && $timesCalled < 2) { + throw new ServiceException('Unavailable', 14); + } + yield $chunk; + } + } + ); + + iterator_to_array($result->rows()); + $this->assertEquals(2, $timesCalled); + } + public function testThrowsExceptionWhenCannotRetry() { $this->expectException(ServiceException::class);