From bfe7a95ee4a3feaca68da56678b4e221a66e51f0 Mon Sep 17 00:00:00 2001 From: alex Date: Wed, 1 Jan 2020 00:50:19 -0500 Subject: [PATCH] feat: implement resource based routing feature --- Spanner/src/Connection/Grpc.php | 121 ++++++++++++++++++--- Spanner/src/SpannerClient.php | 15 ++- Spanner/tests/Unit/Connection/GrpcTest.php | 90 ++++++++++++++- Spanner/tests/Unit/SpannerClientTest.php | 25 +++++ 4 files changed, 231 insertions(+), 20 deletions(-) diff --git a/Spanner/src/Connection/Grpc.php b/Spanner/src/Connection/Grpc.php index b0a4f310e4a1..718cd469273d 100644 --- a/Spanner/src/Connection/Grpc.php +++ b/Spanner/src/Connection/Grpc.php @@ -17,6 +17,8 @@ namespace Google\Cloud\Spanner\Connection; +use Google\ApiCore\ApiException; +use Google\ApiCore\ApiStatus; use Google\ApiCore\Call; use Google\ApiCore\CredentialsWrapper; use Google\ApiCore\Serializer; @@ -76,6 +78,16 @@ class Grpc implements ConnectionInterface */ private $spannerClient; + /** + * @var array + */ + private $spannerClients; + + /** + * @var bool + */ + private $enableResourceCaching; + /** * @var Serializer */ @@ -166,11 +178,14 @@ public function __construct(array $config = []) $grpcConfig['apiEndpoint'] = $config['apiEndpoint']; } - $this->spannerClient = isset($config['gapicSpannerClient']) - ? $config['gapicSpannerClient'] - : $this->constructGapic(SpannerClient::class, $grpcConfig); + $this->enableResourceCaching = isset($config['enableCaching']) && $config['enableCaching']; + $this->spannerClients = []; //@codeCoverageIgnoreStart + if (isset($config['gapicSpannerClient'])) { + $this->spannerClient = $config['gapicSpannerClient']; + } + if (isset($config['gapicSpannerInstanceAdminClient'])) { $this->instanceAdminClient = $config['gapicSpannerInstanceAdminClient']; } @@ -444,12 +459,13 @@ public function createSession(array $args) { $databaseName = $this->pluck('database', $args); + $instanceName = $this->getInstanceName($databaseName); $session = $this->pluck('session', $args, false); if ($session) { $args['session'] = $this->serializer->decodeMessage(new Session, $session); } - return $this->send([$this->spannerClient, 'createSession'], [ + return $this->send([$this->getSpannerClient($instanceName), 'createSession'], [ $databaseName, $this->addResourcePrefixHeader($args, $databaseName) ]); @@ -467,9 +483,10 @@ public function createSession(array $args) public function createSessionAsync(array $args) { $database = $this->pluck('database', $args); + $instanceName = $this->getInstanceName($database); $opts = $this->addResourcePrefixHeader([], $database); $opts['credentialsWrapper'] = $this->credentialsWrapper; - $transport = $this->spannerClient->getTransport(); + $transport = $this->getSpannerClient($instanceName)->getTransport(); $request = new CreateSessionRequest([ 'database' => $database @@ -503,7 +520,8 @@ public function batchCreateSessions(array $args) ); $database = $this->pluck('database', $args); - return $this->send([$this->spannerClient, 'batchCreateSessions'], [ + $instanceName = $this->getInstanceName($database); + return $this->send([$this->getSpannerClient($instanceName), 'batchCreateSessions'], [ $database, $this->pluck('sessionCount', $args), $this->addResourcePrefixHeader($args, $database) @@ -516,7 +534,8 @@ public function batchCreateSessions(array $args) public function getSession(array $args) { $database = $this->pluck('database', $args); - return $this->send([$this->spannerClient, 'getSession'], [ + $instanceName = $this->getInstanceName($database); + return $this->send([$this->getSpannerClient($instanceName), 'getSession'], [ $this->pluck('name', $args), $this->addResourcePrefixHeader($args, $database) ]); @@ -528,7 +547,8 @@ public function getSession(array $args) public function deleteSession(array $args) { $database = $this->pluck('database', $args); - return $this->send([$this->spannerClient, 'deleteSession'], [ + $instanceName = $this->getInstanceName($database); + return $this->send([$this->getSpannerClient($instanceName), 'deleteSession'], [ $this->pluck('name', $args), $this->addResourcePrefixHeader($args, $database) ]); @@ -546,10 +566,11 @@ public function deleteSession(array $args) public function deleteSessionAsync(array $args) { $database = $this->pluck('database', $args); + $instanceName = $this->getInstanceName($database); $request = new DeleteSessionRequest(); $request->setName($this->pluck('name', $args)); - $transport = $this->spannerClient->getTransport(); + $transport = $this->getSpannerClient($instanceName)->getTransport(); $opts = $this->addResourcePrefixHeader([], $database); $opts['credentialsWrapper'] = $this->credentialsWrapper; @@ -573,8 +594,9 @@ public function executeStreamingSql(array $args) $args['transaction'] = $this->createTransactionSelector($args); $database = $this->pluck('database', $args); + $instanceName = $this->getInstanceName($database); - return $this->send([$this->spannerClient, 'executeStreamingSql'], [ + return $this->send([$this->getSpannerClient($instanceName), 'executeStreamingSql'], [ $this->pluck('session', $args), $this->pluck('sql', $args), $this->addResourcePrefixHeader($args, $database) @@ -593,7 +615,8 @@ public function streamingRead(array $args) $args['transaction'] = $this->createTransactionSelector($args); $database = $this->pluck('database', $args); - return $this->send([$this->spannerClient, 'streamingRead'], [ + $instanceName = $this->getInstanceName($database); + return $this->send([$this->getSpannerClient($instanceName), 'streamingRead'], [ $this->pluck('session', $args), $this->pluck('table', $args), $this->pluck('columns', $args), @@ -608,6 +631,7 @@ public function streamingRead(array $args) public function executeBatchDml(array $args) { $database = $this->pluck('database', $args); + $instanceName = $this->getInstanceName($database); $args['transaction'] = $this->createTransactionSelector($args); $statements = []; @@ -616,7 +640,7 @@ public function executeBatchDml(array $args) $statements[] = $this->serializer->decodeMessage(new Statement, $statement); } - return $this->send([$this->spannerClient, 'executeBatchDml'], [ + return $this->send([$this->getSpannerClient($instanceName), 'executeBatchDml'], [ $this->pluck('session', $args), $this->pluck('transaction', $args), $statements, @@ -648,7 +672,8 @@ public function beginTransaction(array $args) } $database = $this->pluck('database', $args); - return $this->send([$this->spannerClient, 'beginTransaction'], [ + $instanceName = $this->getInstanceName($database); + return $this->send([$this->getSpannerClient($instanceName), 'beginTransaction'], [ $this->pluck('session', $args), $options, $this->addResourcePrefixHeader($args, $database) @@ -716,7 +741,8 @@ public function commit(array $args) } $database = $this->pluck('database', $args); - return $this->send([$this->spannerClient, 'commit'], [ + $instanceName = $this->getInstanceName($database); + return $this->send([$this->getSpannerClient($instanceName), 'commit'], [ $this->pluck('session', $args), $mutations, $this->addResourcePrefixHeader($args, $database) @@ -729,7 +755,8 @@ public function commit(array $args) public function rollback(array $args) { $database = $this->pluck('database', $args); - return $this->send([$this->spannerClient, 'rollback'], [ + $instanceName = $this->getInstanceName($database); + return $this->send([$this->getSpannerClient($instanceName), 'rollback'], [ $this->pluck('session', $args), $this->pluck('transactionId', $args), $this->addResourcePrefixHeader($args, $database) @@ -750,7 +777,8 @@ public function partitionQuery(array $args) ); $database = $this->pluck('database', $args); - return $this->send([$this->spannerClient, 'partitionQuery'], [ + $instanceName = $this->getInstanceName($database); + return $this->send([$this->getSpannerClient($instanceName), 'partitionQuery'], [ $this->pluck('session', $args), $this->pluck('sql', $args), $this->addResourcePrefixHeader($args, $database) @@ -773,7 +801,8 @@ public function partitionRead(array $args) ); $database = $this->pluck('database', $args); - return $this->send([$this->spannerClient, 'partitionRead'], [ + $instanceName = $this->getInstanceName($database); + return $this->send([$this->getSpannerClient($instanceName), 'partitionRead'], [ $this->pluck('session', $args), $this->pluck('table', $args), $keySet, @@ -1092,4 +1121,62 @@ private function getDatabaseAdminClient() return $this->databaseAdminClient; } + + /** + * Allow lazy instantiation of the spanner client. + * If routing is enabled, a SpannerClient is returned for a specific instanceName + * with a binding in the connection of a specific endpoint. + * If the endpoint list is not requested, a request is first made for this instanceName. + * + * @param string $instanceName Optional. Full name of instance. + * @return SpannerClient + */ + private function getSpannerClient($instanceName) + { + if ($this->spannerClient) { + return $this->spannerClient; + } + + if (isset($this->spannerClients[$instanceName])) { + return $this->spannerClients[$instanceName]; + } elseif ($this->enableResourceCaching) { + $projectId = InstanceAdminClient::parseName($instanceName)['project']; + try { + $instanceInfo = $this->getInstance([ + 'projectId' => $projectId, + 'name' => $instanceName, + 'fieldMask' => ['endpointUris'] + ]); + if (empty($instanceInfo['endpointUris'])) { + $this->setSpannerClient($instanceName, $this->grpcConfig); + return $this->spannerClients[$instanceName]; + } else { + $grpcConfig = $this->grpcConfig; + // for now disregarding multiple endpointUris + $grpcConfig['apiEndpoint'] = $instanceInfo['endpointUris'][0]; + $this->setSpannerClient($instanceName, $grpcConfig); + return $this->spannerClients[$instanceName]; + } + } catch (ApiException $ex) { + if (ApiStatus::PERMISSION_DENIED == $ex->getStatus()) { + error_log('To use resource caching please add "spanner.instances.get" permission.', 0); + } else { + throw $ex; + } + } + } + $this->setSpannerClient($instanceName, $this->grpcConfig); + return $this->spannerClients[$instanceName]; + } + + private function setSpannerClient($instanceName, array $config) + { + $this->spannerClients[$instanceName] = $this->constructGapic(SpannerClient::class, $config); + } + + private function getInstanceName($sessionOrDbName) + { + $parsed = SpannerClient::parseName($sessionOrDbName); + return InstanceAdminClient::instanceName($parsed['project'], $parsed['instance']); + } } diff --git a/Spanner/src/SpannerClient.php b/Spanner/src/SpannerClient.php index b9d968be335c..d9f40521678d 100644 --- a/Spanner/src/SpannerClient.php +++ b/Spanner/src/SpannerClient.php @@ -141,7 +141,9 @@ public function __construct(array $config = []) 'projectIdRequired' => true ]; - $this->connection = new Grpc($this->configureAuthentication($config)); + $config['enableCaching'] = 'true' == strtolower(getenv('GOOGLE_CLOUD_ENABLE_RESOURCE_BASED_ROUTING')); + + $this->connection = $this->getGrpc($config); $this->returnInt64AsObject = $config['returnInt64AsObject']; $this->setLroProperties(new LongRunningConnection($this->connection), [ @@ -571,4 +573,15 @@ public function commitTimestamp() { return new CommitTimestamp; } + + /** + * Construct an instance of Google\Cloud\Spanner\Connection\Grpc class. Allows for tests to intercept. + * + * @param array $config + * @return Grpc + */ + protected function getGrpc(array $config) + { + return new Grpc($this->configureAuthentication($config)); + } } diff --git a/Spanner/tests/Unit/Connection/GrpcTest.php b/Spanner/tests/Unit/Connection/GrpcTest.php index 5303f88035ab..5b8d5739aec0 100644 --- a/Spanner/tests/Unit/Connection/GrpcTest.php +++ b/Spanner/tests/Unit/Connection/GrpcTest.php @@ -17,6 +17,8 @@ namespace Google\Cloud\Spanner\Tests\Unit\Connection; +use Google\ApiCore\ApiException; +use Google\ApiCore\ApiStatus; use Google\ApiCore\Call; use Google\ApiCore\OperationResponse; use Google\ApiCore\Serializer; @@ -49,6 +51,7 @@ use Google\Protobuf\Struct; use Google\Protobuf\Timestamp; use Google\Protobuf\Value; +use Google\Rpc\Code; use GuzzleHttp\Promise\PromiseInterface; use PHPUnit\Framework\TestCase; use Prophecy\Argument; @@ -75,6 +78,9 @@ class GrpcTest extends TestCase private $successMessage; private $lro; + private $grpc; + private $getInstance; + public function setUp() { $this->checkAndSkipGrpcTests(); @@ -83,6 +89,57 @@ public function setUp() $this->serializer = new Serializer; $this->successMessage = 'success'; $this->lro = $this->prophesize(OperationResponse::class)->reveal(); + + // $this->grpc = TestHelpers::stub(GrpcStub::class, [['enableCaching' => true]]); + } + + public function testCacheResourceEnpointUrisIsEmpty() + { + $grpc = new GrpcStub(['enableCaching' => true, 'expectedReturn' => []]); + $grpc->getSession(['database' => self::DATABASE, 'name' => self::SESSION]); + // make sure `getSession` request is ony sent once. + $grpc->getSession(['database' => self::DATABASE, 'name' => self::SESSION]); + + $this->assertEquals($grpc->calledTimes, 1); + $this->assertFalse(isset($grpc->config['apiEndpoint'])); + } + + public function testCacheResourceValidEndpointUri() + { + $instanceEndpointUri = 'some.endpoit.uri'; + $grpc = new GrpcStub([ + 'enableCaching' => true, + 'expectedReturn' => ['endpointUris' => [$instanceEndpointUri]] + ]); + $grpc->getSession(['database' => self::DATABASE, 'name' => self::SESSION]); + // make sure `getSession` request is ony sent once. + $grpc->getSession(['database' => self::DATABASE, 'name' => self::SESSION]); + + $this->assertEquals($grpc->calledTimes, 1); + $this->assertEquals($grpc->config['apiEndpoint'], $instanceEndpointUri); + } + + public function testCacheResourcePermissionError() + { + $exception = new ApiException('error', Code::PERMISSION_DENIED, ApiStatus::PERMISSION_DENIED); + $grpc = new GrpcStub(['enableCaching' => true, 'expectedReturn' => $exception]); + + $grpc->getSession(['database' => self::DATABASE, 'name' => self::SESSION]); + + $this->assertEquals($grpc->calledTimes, 1); + $this->assertFalse(isset($grpc->config['apiEndpoint'])); + } + + public function testCacheResourceOtherError() + { + $this->setExpectedException(ApiException::class); + $exception = new ApiException('error', Code::UNAUTHENTICATED, ApiStatus::UNAUTHENTICATED); + $grpc = new GrpcStub(['enableCaching' => true, 'expectedReturn' => $exception]); + + $grpc->getSession(['database' => self::DATABASE, 'name' => self::SESSION]); + + $this->assertEquals($grpc->calledTimes, 1); + $this->assertFalse(isset($grpc->config['apiEndpoint'])); } public function testApiEndpoint() @@ -91,6 +148,7 @@ public function testApiEndpoint() $grpc = new GrpcStub(['apiEndpoint' => $expected]); + $grpc->deleteInstance(['name' => self::INSTANCE]); $this->assertEquals($expected, $grpc->config['apiEndpoint']); } @@ -335,7 +393,7 @@ public function testCreateSessionAsync() $grpc = new Grpc(['gapicSpannerClient' => $client->reveal()]); $promise = $grpc->createSessionAsync([ - 'database' => 'database1', + 'database' => self::DATABASE, 'session' => [ 'labels' => [ 'foo' => 'bar' ] ] @@ -389,7 +447,7 @@ public function testDeleteSessionAsync() $promise = $this->prophesize(PromiseInterface::class) ->reveal(); $sessionName = 'session1'; - $databaseName = 'database1'; + $databaseName = self::DATABASE; $request = new DeleteSessionRequest(); $request->setName($sessionName); $client = $this->prophesize(SpannerClient::class); @@ -1061,6 +1119,18 @@ private function transactionSelector() class GrpcStub extends Grpc { public $config; + public $calledTimes = 0; + private $expectedReturn = []; + + public function __construct(array $args) + { + if (isset($args['expectedReturn'])) { + $this->expectedReturn = $args['expectedReturn']; + unset($args['expectedReturn']); + } + + parent::__construct($args); + } protected function constructGapic($gapicName, array $config) { @@ -1068,5 +1138,21 @@ protected function constructGapic($gapicName, array $config) return parent::constructGapic($gapicName, $config); } + + public function send(callable $request, array $args, $whitelisted = false) + { + return null; + } + + public function getInstance(array $args) + { + $this->calledTimes+=1; + + if ($this->expectedReturn instanceOf ApiException) { + throw $this->expectedReturn; + } + + return $this->expectedReturn; + } } //@codingStandardsIgnoreEnd diff --git a/Spanner/tests/Unit/SpannerClientTest.php b/Spanner/tests/Unit/SpannerClientTest.php index 4a2fa1e2c841..096564d3d786 100644 --- a/Spanner/tests/Unit/SpannerClientTest.php +++ b/Spanner/tests/Unit/SpannerClientTest.php @@ -39,6 +39,7 @@ use Google\Cloud\Spanner\Timestamp; use PHPUnit\Framework\TestCase; use Prophecy\Argument; +use ReflectionObject; /** * @group spanner @@ -65,6 +66,17 @@ public function setUp() ]); } + public function testResourceCachingEnvVar() + { + $this->assertTrue(putenv("GOOGLE_CLOUD_ENABLE_RESOURCE_BASED_ROUTING=true")); + + $client = TestHelpers::stub(SpannerClientStub::class, [ + ['projectId' => self::PROJECT] + ]); + + $this->assertTrue($client->config['enableCaching']); + } + public function testBatch() { $batch = $this->client->batch('foo', 'bar'); @@ -314,3 +326,16 @@ public function testCommitTimestamp() $this->assertInstanceOf(CommitTimestamp::class, $t); } } + +//@codingStandardsIgnoreStart +class SpannerClientStub extends SpannerClient +{ + public $config; + + protected function getGrpc(array $config) + { + $this->config = $config; + return parent::getGrpc($config); + } +} +//@codingStandardsIgnoreEnd