Skip to content

Commit

Permalink
Implement automatic downsizing of session pool (#646)
Browse files Browse the repository at this point in the history
* Implement automatic downsizing of sessions

* address review feedback

* remove wait calls
  • Loading branch information
dwsupplee authored Sep 18, 2017
1 parent c5985c8 commit 7f73556
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 66 deletions.
45 changes: 43 additions & 2 deletions src/Spanner/Connection/Grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace Google\Cloud\Spanner\Connection;

use Grpc\UnaryCall;
use Google\Cloud\Core\GrpcRequestWrapper;
use Google\Cloud\Core\GrpcTrait;
use Google\Cloud\Core\LongRunning\OperationResponseTrait;
Expand All @@ -25,6 +26,7 @@
use Google\Cloud\Spanner\Operation;
use Google\Cloud\Spanner\SpannerClient as ManualSpannerClient;
use Google\Cloud\Spanner\V1\SpannerClient;
use Google\GAX\AgentHeaderDescriptor;
use Google\GAX\Serializer;
use Google\Protobuf;
use Google\Protobuf\FieldMask;
Expand All @@ -34,6 +36,7 @@
use Google\Protobuf\Value;
use Google\Spanner\Admin\Database\V1\Database;
use Google\Spanner\Admin\Instance\V1\Instance;
use Google\Spanner\V1\DeleteSessionRequest;
use Google\Spanner\V1\KeySet;
use Google\Spanner\V1\Mutation;
use Google\Spanner\V1\Mutation_Delete;
Expand Down Expand Up @@ -116,6 +119,11 @@ class Grpc implements ConnectionInterface
*/
private $longRunningGrpcClients;

/**
* @var AgentHeaderDescriptor
*/
private $headerDescriptor;

/**
* @param array $config [optional]
*/
Expand All @@ -142,16 +150,20 @@ public function __construct(array $config = [])

$config['serializer'] = $this->serializer;
$this->setRequestWrapper(new GrpcRequestWrapper($config));

$grpcConfig = $this->getGaxConfig(ManualSpannerClient::VERSION);
$this->spannerClient = isset($config['gapicSpannerClient'])
? $config['gapicSpannerClient']
: new SpannerClient($grpcConfig);
$this->instanceAdminClient = new InstanceAdminClient($grpcConfig);
$this->databaseAdminClient = new DatabaseAdminClient($grpcConfig);
$this->spannerClient = new SpannerClient($grpcConfig);
$this->operationsClient = $this->instanceAdminClient->getOperationsClient();
$this->longRunningGrpcClients = [
$this->instanceAdminClient,
$this->databaseAdminClient
];
$this->headerDescriptor = new AgentHeaderDescriptor([
'gapicVersion' => trim(file_get_contents(__DIR__ . '/../VERSION'))
]);
}

/**
Expand Down Expand Up @@ -444,6 +456,35 @@ public function deleteSession(array $args)
]);
}

/**
* Note: This should be removed once GAPIC exposes the ability to execute
* concurrent requests.
*
* @access private
* @param array $args
* @return UnaryCall
* @experimental
*/
public function deleteSessionAsync(array $args)
{
$database = $this->pluck('database', $args);
$headers = $this->headerDescriptor->getHeader()
+ $this->addResourcePrefixHeader($args, $database)['userHeaders'];
$request = new DeleteSessionRequest();
$request->setName($this->pluck('name', $args));
$credentialsCallback = $this->spannerClient
->getCredentialsHelper()
->createCallCredentialsCallback();

return $this->spannerClient
->getStub()
->DeleteSession(
$request,
$headers,
['call_credentials_callback' => $credentialsCallback]
);
}

/**
* @param array $args
* @return \Generator
Expand Down
12 changes: 12 additions & 0 deletions src/Spanner/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,18 @@ public function identity()
];
}

/**
* Returns the underlying connection.
*
* @access private
* @return ConnectionInterface
* @experimental
*/
public function connection()
{
return $this->connection;
}

/**
* Represent the class in a more readable and digestable fashion.
*
Expand Down
152 changes: 109 additions & 43 deletions src/Spanner/Session/CacheSessionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use Google\Cloud\Core\Lock\SemaphoreLock;
use Google\Cloud\Core\SysvTrait;
use Google\Cloud\Spanner\Database;
use Grpc\UnaryCall;
use Psr\Cache\CacheItemPoolInterface;

/**
Expand All @@ -39,19 +40,18 @@
* recommended way to bootstrap the session pool.
*
* Sessions are created on demand up to the maximum session value set during
* instantiation of the pool. After peak usage hours, you may find that more
* sessions are available than your demand may require. It is important to make
* sure the number of active sessions managed by the Spanner backend is kept
* as minimal as possible. In order to help maintain this balance, please use
* the {@see Google\Cloud\Spanner\Session\CacheSessionPool::downsize()} method
* on an interval that matches when you expect to see a decrease in traffic.
* This will help ensure you never run into issues where the Spanner backend is
* instantiation of the pool. To help ensure the minimum number of sessions
* required are managed by the pool, attempts will be made to automatically
* downsize after every 10 minute window. This feature is configurable and one
* may also downsize at their own choosing via
* {@see Google\Cloud\Spanner\Session\CacheSessionPool::downsize()}. Downsizing
* will help ensure you never run into issues where the Spanner backend is
* locked up after having met the maximum number of sessions assigned per node.
* For reference, the current maximum sessions per database per node is 10k. For
* more information on limits please see
* [here](https://cloud.google.com/spanner/docs/limits).
*
* Additionally, when expecting a long period of inactivity (such as a
* When expecting a long period of inactivity (such as a
* maintenance window), please make sure to call
* {@see Google\Cloud\Spanner\Session\CacheSessionPool::clear()} in order to
* delete any active sessions.
Expand Down Expand Up @@ -83,9 +83,9 @@ class CacheSessionPool implements SessionPoolInterface
use SysvTrait;

const CACHE_KEY_TEMPLATE = 'cache-session-pool.%s.%s.%s';

const DURATION_TWENTY_MINUTES = 1200;
const DURATION_ONE_MINUTE = 60;
const WINDOW_SIZE = 600;

/**
* @var array
Expand All @@ -95,7 +95,8 @@ class CacheSessionPool implements SessionPoolInterface
'minSessions' => 1,
'shouldWaitForSession' => true,
'maxCyclesToWaitForSession' => 30,
'sleepIntervalSeconds' => .5
'sleepIntervalSeconds' => .5,
'shouldAutoDownsize' => true
];

/**
Expand All @@ -118,6 +119,16 @@ class CacheSessionPool implements SessionPoolInterface
*/
private $database;

/**
* @var UnaryCall[]
*/
private $deleteCalls = [];

/**
* @var array
*/
private $deleteQueue = [];

/**
* @param CacheItemPoolInterface $cacheItemPool A PSR-6 compatible cache
* implementation used to store the session data.
Expand All @@ -140,6 +151,9 @@ class CacheSessionPool implements SessionPoolInterface
* **Defaults to** a semaphore based implementation if the
* required extensions are installed, otherwise an flock based
* implementation.
* @type bool $shouldAutoDownsize Determines whether or not to
* automatically attempt to downsize the pool after every 10
* minute window. **Defaults to** `true`.
* }
* @throws \InvalidArgumentException
*/
Expand Down Expand Up @@ -228,10 +242,13 @@ public function acquire($context = SessionPoolInterface::CONTEXT_READ)

if (!$exception) {
$session = array_shift($data['queue']);

$data['inUse'][$session['name']] = $session + [
'lastActive' => $this->time()
];

if ($this->config['shouldAutoDownsize']) {
$this->manageSessionsToDelete($data);
}
}

$this->cacheItemPool->save($item->set($data));
Expand All @@ -257,6 +274,11 @@ public function acquire($context = SessionPoolInterface::CONTEXT_READ)
$session = $this->waitForNextAvailableSession();
}

if ($this->deleteQueue) {
$this->deleteSessions($this->deleteQueue);
$this->deleteQueue = [];
}

return $this->database->session($session['name']);
}

Expand Down Expand Up @@ -414,40 +436,24 @@ public function warmup()
/**
* Clear the cache and attempt to delete all sessions in the pool.
*
* Please note this method will attempt to synchronously delete sessions and
* will block until complete.
* A session may be removed from the cache, but still tracked as active by
* the Spanner backend if a delete operation failed. To ensure you do not
* exceed the maximum number of sessions available per node, please be sure
* to check the return value of this method to be certain all sessions have
* been deleted.
*/
public function clear()
{
$sessions = $this->config['lock']->synchronize(function () {
$sessions = [];
$item = $this->cacheItemPool->getItem($this->cacheKey);
$data = (array) $item->get() ?: $this->initialize();

foreach ($data['queue'] as $session) {
$sessions[] = $session['name'];
}

foreach ($data['inUse'] as $session) {
$sessions[] = $session['name'];
}

$sessions = $data['queue'] + $data['inUse'];
$this->cacheItemPool->clear();

return $sessions;
});

foreach ($sessions as $sessionName) {
$session = $this->database->session($sessionName);

try {
$session->delete();
} catch (\Exception $ex) {
if ($ex instanceof NotFoundException) {
continue;
}
}
}
$this->deleteSessions($sessions);
}

/**
Expand Down Expand Up @@ -557,7 +563,9 @@ private function initialize()
return [
'queue' => [],
'inUse' => [],
'toCreate' => []
'toCreate' => [],
'windowStart' => $this->time(),
'maxInUseSessions' => 0
];
}

Expand All @@ -570,17 +578,13 @@ private function initialize()
*/
private function getSessionCount(array $data)
{
$count = 0;

foreach ($data as $sessionType) {
$count += count($sessionType);
}

return $count;
return count($data['queue'])
+ count($data['inUse'])
+ count($data['toCreate']);
}

/**
* Gets the next session in the queue, clearing out which are expired.
* Gets the next session in the queue, clearing out any which are expired.
*
* @param array $data
* @return array|null
Expand All @@ -597,6 +601,10 @@ private function getSession(array &$data)
$data['inUse'][$session['name']] = $session + [
'lastActive' => $this->time()
];

if ($this->config['shouldAutoDownsize']) {
$this->manageSessionsToDelete($data);
}
}

return $session;
Expand Down Expand Up @@ -746,4 +754,62 @@ private function validateConfig()
);
}
}

/**
* Delete the provided sessions.
*
* @param array $sessions
*/
private function deleteSessions(array $sessions)
{
// gRPC calls appear to cancel when the corresponding UnaryCall object
// goes out of scope. Keeping the calls in scope allows time for the
// calls to complete at the expense of a small memory footprint.
$this->deleteCalls = [];

foreach ($sessions as $session) {
$this->deleteCalls[] = $this->database->connection()
->deleteSessionAsync([
'name' => $session['name'],
'database' => $this->database->name()
]);
}
}

/**
* Checks the maximum number of sessions in use over the last window(s) then
* removes the sessions from the cache and prepares them to be deleted from
* the Spanner backend.
*
* @param array $data
*/
private function manageSessionsToDelete(array &$data)
{
$secondsSinceLastWindow = $this->time() - $data['windowStart'];
$inUseCount = count($data['inUse']);

if ($secondsSinceLastWindow < self::WINDOW_SIZE + 1) {
if ($data['maxInUseSessions'] < $inUseCount) {
$data['maxInUseSessions'] = $inUseCount;
}

return;
}

$totalCount = $inUseCount + count($data['queue']) + count($data['toCreate']);
$windowsPassed = (int) ($secondsSinceLastWindow / self::WINDOW_SIZE);
$deletionCount = min(
$totalCount - (int) round($data['maxInUseSessions'] / $windowsPassed),
$totalCount - $this->config['minSessions']
);
$data['maxInUseSessions'] = $inUseCount;
$data['windowStart'] = $this->time();

if ($deletionCount) {
$this->deleteQueue += array_splice(
$data['queue'],
(int) -$deletionCount
);
}
}
}
Loading

0 comments on commit 7f73556

Please sign in to comment.