forked from google/neuroglancer
-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(ConnectionPools): added connection pools class
Adding connection pools capability, but will add the actual pools to Storage when other PRs are merged first.
- Loading branch information
1 parent
03122a3
commit 81e169c
Showing
2 changed files
with
159 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
import Queue | ||
import threading | ||
import time | ||
import signal | ||
from functools import partial | ||
|
||
from google.cloud.storage import Client | ||
from boto.s3.connection import S3Connection | ||
|
||
from neuroglancer.pipeline.secrets import PROJECT_NAME, google_credentials_path, aws_credentials | ||
|
||
class ConnectionPool(object): | ||
""" | ||
This class is intended to be subclassed. See below. | ||
Creating fresh client or connection objects | ||
for Google or Amazon eventually starts causing | ||
breakdowns when too many connections open. | ||
To promote efficient resource use and prevent | ||
containers from dying, we create a ConnectionPool | ||
that allows for the reuse of connections. | ||
Storage interfaces may acquire and release connections | ||
when they need or finish using them. | ||
If the limit is reached, additional requests for | ||
acquiring connections will block until they can | ||
be serviced. | ||
""" | ||
def __init__(self): | ||
self.pool = Queue.Queue(maxsize=0) | ||
self.outstanding = 0 | ||
self._lock = threading.Lock() | ||
|
||
def handler(signum, frame): | ||
self.reset_pool() | ||
|
||
signal.signal(signal.SIGINT, handler) | ||
signal.signal(signal.SIGTERM, handler) | ||
|
||
def total_connections(self): | ||
return self.pool.qsize() + self.outstanding | ||
|
||
def _create_connection(self): | ||
raise NotImplementedError | ||
|
||
def get_connection(self): | ||
with self._lock: | ||
try: | ||
conn = self.pool.get(block=False) | ||
self.pool.task_done() | ||
except Queue.Empty: | ||
conn = self._create_connection() | ||
finally: | ||
self.outstanding += 1 | ||
|
||
return conn | ||
|
||
def release_connection(self, conn): | ||
if conn is None: | ||
return | ||
|
||
self.pool.put(conn) | ||
with self._lock: | ||
self.outstanding -= 1 | ||
|
||
def _close_function(self): | ||
return lambda x: x # no-op | ||
|
||
def reset_pool(self): | ||
closefn = self._close_function() | ||
while True: | ||
if not self.pool.qsize(): | ||
break | ||
try: | ||
conn = self.pool.get() | ||
closefn(conn) | ||
self.pool.task_done() | ||
except Queue.Empty: | ||
break | ||
|
||
with self._lock: | ||
self.outstanding = 0 | ||
|
||
def __del__(self): | ||
self.reset_pool() | ||
|
||
class S3ConnectionPool(ConnectionPool): | ||
def _create_connection(self): | ||
return S3Connection( | ||
aws_credentials['AWS_ACCESS_KEY_ID'], | ||
aws_credentials['AWS_SECRET_ACCESS_KEY'] | ||
) | ||
|
||
def _close_function(self): | ||
return lambda conn: conn.close() | ||
|
||
class GCloudConnectionPool(ConnectionPool): | ||
def _create_connection(self): | ||
return Client.from_service_account_json( | ||
google_credentials_path, | ||
project=PROJECT_NAME, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
from tqdm import tqdm | ||
import boto.s3.key | ||
|
||
from neuroglancer.pipeline.connection_pool import S3ConnectionPool, GCloudConnectionPool | ||
from neuroglancer.pipeline.threaded_queue import ThreadedQueue | ||
from neuroglancer.pipeline import Storage | ||
|
||
max_connections = 60 | ||
S3_POOL = S3ConnectionPool(max_connections=max_connections) | ||
GC_POOL = GCloudConnectionPool(max_connections=max_connections) | ||
|
||
def test_gc_stresstest(): | ||
with Storage('gs://neuroglancer/removeme/connection_pool/', n_threads=0) as stor: | ||
stor.put_file('test', 'some string') | ||
|
||
n_trials = 500 | ||
pbar = tqdm(total=n_trials) | ||
|
||
def create_conn(interface): | ||
conn = GC_POOL.get_connection() | ||
# assert GC_POOL.total_connections() <= GC_POOL.max_connections * 5 | ||
bucket = conn.get_bucket('neuroglancer') | ||
blob = bucket.get_blob('removeme/connection_pool/test') | ||
blob.download_as_string() | ||
GC_POOL.release_connection(conn) | ||
pbar.update() | ||
|
||
with ThreadedQueue(n_threads=200) as tq: | ||
for _ in xrange(n_trials): | ||
tq.put(create_conn) | ||
|
||
pbar.close() | ||
|
||
def test_s3_stresstest(): | ||
with Storage('s3://neuroglancer/removeme/connection_pool/', n_threads=0) as stor: | ||
stor.put_file('test', 'some string') | ||
|
||
n_trials = 500 | ||
pbar = tqdm(total=n_trials) | ||
|
||
def create_conn(interface): | ||
conn = S3_POOL.get_connection() | ||
# assert S3_POOL.total_connections() <= S3_POOL.max_connections * 5 | ||
bucket = conn.get_bucket('neuroglancer') | ||
k = boto.s3.key.Key(bucket) | ||
k.key = 'removeme/connection_pool/test' | ||
k.get_contents_as_string() | ||
S3_POOL.release_connection(conn) | ||
pbar.update() | ||
|
||
with ThreadedQueue(n_threads=200) as tq: | ||
for _ in xrange(n_trials): | ||
tq.put(create_conn) | ||
|
||
pbar.close() |