Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement resource based routing feature #2535

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 104 additions & 17 deletions Spanner/src/Connection/Grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

namespace Google\Cloud\Spanner\Connection;

use Google\ApiCore\ApiException;
use Google\ApiCore\ApiStatus;
use Google\ApiCore\Call;
use Google\ApiCore\CredentialsWrapper;
use Google\ApiCore\Serializer;
Expand Down Expand Up @@ -76,6 +78,16 @@ class Grpc implements ConnectionInterface
*/
private $spannerClient;

/**
* @var array
*/
private $spannerClients;

/**
* @var bool
*/
private $enableResourceCaching;

/**
* @var Serializer
*/
Expand Down Expand Up @@ -166,11 +178,14 @@ public function __construct(array $config = [])
$grpcConfig['apiEndpoint'] = $config['apiEndpoint'];
}

$this->spannerClient = isset($config['gapicSpannerClient'])
? $config['gapicSpannerClient']
: $this->constructGapic(SpannerClient::class, $grpcConfig);
$this->enableResourceCaching = isset($config['enableCaching']) && $config['enableCaching'];
$this->spannerClients = [];

//@codeCoverageIgnoreStart
if (isset($config['gapicSpannerClient'])) {
$this->spannerClient = $config['gapicSpannerClient'];
}

if (isset($config['gapicSpannerInstanceAdminClient'])) {
$this->instanceAdminClient = $config['gapicSpannerInstanceAdminClient'];
}
Expand Down Expand Up @@ -444,12 +459,13 @@ public function createSession(array $args)
{
$databaseName = $this->pluck('database', $args);

$instanceName = $this->getInstanceName($databaseName);
$session = $this->pluck('session', $args, false);
if ($session) {
$args['session'] = $this->serializer->decodeMessage(new Session, $session);
}

return $this->send([$this->spannerClient, 'createSession'], [
return $this->send([$this->getSpannerClient($instanceName), 'createSession'], [
$databaseName,
$this->addResourcePrefixHeader($args, $databaseName)
]);
Expand All @@ -467,9 +483,10 @@ public function createSession(array $args)
public function createSessionAsync(array $args)
{
$database = $this->pluck('database', $args);
$instanceName = $this->getInstanceName($database);
$opts = $this->addResourcePrefixHeader([], $database);
$opts['credentialsWrapper'] = $this->credentialsWrapper;
$transport = $this->spannerClient->getTransport();
$transport = $this->getSpannerClient($instanceName)->getTransport();

$request = new CreateSessionRequest([
'database' => $database
Expand Down Expand Up @@ -503,7 +520,8 @@ public function batchCreateSessions(array $args)
);

$database = $this->pluck('database', $args);
return $this->send([$this->spannerClient, 'batchCreateSessions'], [
$instanceName = $this->getInstanceName($database);
return $this->send([$this->getSpannerClient($instanceName), 'batchCreateSessions'], [
$database,
$this->pluck('sessionCount', $args),
$this->addResourcePrefixHeader($args, $database)
Expand All @@ -516,7 +534,8 @@ public function batchCreateSessions(array $args)
public function getSession(array $args)
{
$database = $this->pluck('database', $args);
return $this->send([$this->spannerClient, 'getSession'], [
$instanceName = $this->getInstanceName($database);
return $this->send([$this->getSpannerClient($instanceName), 'getSession'], [
$this->pluck('name', $args),
$this->addResourcePrefixHeader($args, $database)
]);
Expand All @@ -528,7 +547,8 @@ public function getSession(array $args)
public function deleteSession(array $args)
{
$database = $this->pluck('database', $args);
return $this->send([$this->spannerClient, 'deleteSession'], [
$instanceName = $this->getInstanceName($database);
return $this->send([$this->getSpannerClient($instanceName), 'deleteSession'], [
$this->pluck('name', $args),
$this->addResourcePrefixHeader($args, $database)
]);
Expand All @@ -546,10 +566,11 @@ public function deleteSession(array $args)
public function deleteSessionAsync(array $args)
{
$database = $this->pluck('database', $args);
$instanceName = $this->getInstanceName($database);
$request = new DeleteSessionRequest();
$request->setName($this->pluck('name', $args));

$transport = $this->spannerClient->getTransport();
$transport = $this->getSpannerClient($instanceName)->getTransport();
$opts = $this->addResourcePrefixHeader([], $database);
$opts['credentialsWrapper'] = $this->credentialsWrapper;

Expand All @@ -573,8 +594,9 @@ public function executeStreamingSql(array $args)
$args['transaction'] = $this->createTransactionSelector($args);

$database = $this->pluck('database', $args);
$instanceName = $this->getInstanceName($database);

return $this->send([$this->spannerClient, 'executeStreamingSql'], [
return $this->send([$this->getSpannerClient($instanceName), 'executeStreamingSql'], [
$this->pluck('session', $args),
$this->pluck('sql', $args),
$this->addResourcePrefixHeader($args, $database)
Expand All @@ -593,7 +615,8 @@ public function streamingRead(array $args)
$args['transaction'] = $this->createTransactionSelector($args);

$database = $this->pluck('database', $args);
return $this->send([$this->spannerClient, 'streamingRead'], [
$instanceName = $this->getInstanceName($database);
return $this->send([$this->getSpannerClient($instanceName), 'streamingRead'], [
$this->pluck('session', $args),
$this->pluck('table', $args),
$this->pluck('columns', $args),
Expand All @@ -608,6 +631,7 @@ public function streamingRead(array $args)
public function executeBatchDml(array $args)
{
$database = $this->pluck('database', $args);
$instanceName = $this->getInstanceName($database);
$args['transaction'] = $this->createTransactionSelector($args);

$statements = [];
Expand All @@ -616,7 +640,7 @@ public function executeBatchDml(array $args)
$statements[] = $this->serializer->decodeMessage(new Statement, $statement);
}

return $this->send([$this->spannerClient, 'executeBatchDml'], [
return $this->send([$this->getSpannerClient($instanceName), 'executeBatchDml'], [
$this->pluck('session', $args),
$this->pluck('transaction', $args),
$statements,
Expand Down Expand Up @@ -648,7 +672,8 @@ public function beginTransaction(array $args)
}

$database = $this->pluck('database', $args);
return $this->send([$this->spannerClient, 'beginTransaction'], [
$instanceName = $this->getInstanceName($database);
return $this->send([$this->getSpannerClient($instanceName), 'beginTransaction'], [
$this->pluck('session', $args),
$options,
$this->addResourcePrefixHeader($args, $database)
Expand Down Expand Up @@ -716,7 +741,8 @@ public function commit(array $args)
}

$database = $this->pluck('database', $args);
return $this->send([$this->spannerClient, 'commit'], [
$instanceName = $this->getInstanceName($database);
return $this->send([$this->getSpannerClient($instanceName), 'commit'], [
$this->pluck('session', $args),
$mutations,
$this->addResourcePrefixHeader($args, $database)
Expand All @@ -729,7 +755,8 @@ public function commit(array $args)
public function rollback(array $args)
{
$database = $this->pluck('database', $args);
return $this->send([$this->spannerClient, 'rollback'], [
$instanceName = $this->getInstanceName($database);
return $this->send([$this->getSpannerClient($instanceName), 'rollback'], [
$this->pluck('session', $args),
$this->pluck('transactionId', $args),
$this->addResourcePrefixHeader($args, $database)
Expand All @@ -750,7 +777,8 @@ public function partitionQuery(array $args)
);

$database = $this->pluck('database', $args);
return $this->send([$this->spannerClient, 'partitionQuery'], [
$instanceName = $this->getInstanceName($database);
return $this->send([$this->getSpannerClient($instanceName), 'partitionQuery'], [
$this->pluck('session', $args),
$this->pluck('sql', $args),
$this->addResourcePrefixHeader($args, $database)
Expand All @@ -773,7 +801,8 @@ public function partitionRead(array $args)
);

$database = $this->pluck('database', $args);
return $this->send([$this->spannerClient, 'partitionRead'], [
$instanceName = $this->getInstanceName($database);
return $this->send([$this->getSpannerClient($instanceName), 'partitionRead'], [
$this->pluck('session', $args),
$this->pluck('table', $args),
$keySet,
Expand Down Expand Up @@ -1092,4 +1121,62 @@ private function getDatabaseAdminClient()

return $this->databaseAdminClient;
}

/**
* Allow lazy instantiation of the spanner client.
* If routing is enabled, a SpannerClient is returned for a specific instanceName
* with a binding in the connection of a specific endpoint.
* If the endpoint list is not requested, a request is first made for this instanceName.
*
* @param string $instanceName Optional. Full name of instance.
* @return SpannerClient
*/
private function getSpannerClient($instanceName)
{
if ($this->spannerClient) {
return $this->spannerClient;
}

if (isset($this->spannerClients[$instanceName])) {
return $this->spannerClients[$instanceName];
} elseif ($this->enableResourceCaching) {
$projectId = InstanceAdminClient::parseName($instanceName)['project'];
try {
$instanceInfo = $this->getInstance([
'projectId' => $projectId,
'name' => $instanceName,
'fieldMask' => ['endpointUris']
]);
if (empty($instanceInfo['endpointUris'])) {
$this->setSpannerClient($instanceName, $this->grpcConfig);
return $this->spannerClients[$instanceName];
} else {
$grpcConfig = $this->grpcConfig;
// for now disregarding multiple endpointUris
$grpcConfig['apiEndpoint'] = $instanceInfo['endpointUris'][0];
$this->setSpannerClient($instanceName, $grpcConfig);
return $this->spannerClients[$instanceName];
}
} catch (ApiException $ex) {
if (ApiStatus::PERMISSION_DENIED == $ex->getStatus()) {
error_log('To use resource caching please add "spanner.instances.get" permission.', 0);
} else {
throw $ex;
}
}
}
$this->setSpannerClient($instanceName, $this->grpcConfig);
return $this->spannerClients[$instanceName];
}

private function setSpannerClient($instanceName, array $config)
{
$this->spannerClients[$instanceName] = $this->constructGapic(SpannerClient::class, $config);
}

private function getInstanceName($sessionOrDbName)
{
$parsed = SpannerClient::parseName($sessionOrDbName);
return InstanceAdminClient::instanceName($parsed['project'], $parsed['instance']);
}
}
15 changes: 14 additions & 1 deletion Spanner/src/SpannerClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ public function __construct(array $config = [])
'projectIdRequired' => true
];

$this->connection = new Grpc($this->configureAuthentication($config));
$config['enableCaching'] = 'true' == strtolower(getenv('GOOGLE_CLOUD_ENABLE_RESOURCE_BASED_ROUTING'));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING


$this->connection = $this->getGrpc($config);
$this->returnInt64AsObject = $config['returnInt64AsObject'];

$this->setLroProperties(new LongRunningConnection($this->connection), [
Expand Down Expand Up @@ -571,4 +573,15 @@ public function commitTimestamp()
{
return new CommitTimestamp;
}

/**
* Construct an instance of Google\Cloud\Spanner\Connection\Grpc class. Allows for tests to intercept.
*
* @param array $config
* @return Grpc
*/
protected function getGrpc(array $config)
{
return new Grpc($this->configureAuthentication($config));
}
}
Loading