From 81e169c1d305b5594db25815d24a6b7241e26f00 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Fri, 19 May 2017 12:31:24 -0400 Subject: [PATCH] feat(ConnectionPools): added connection pools class Adding connection pools capability, but will add the actual pools to Storage when other PRs are merged first. --- .../neuroglancer/pipeline/connection_pool.py | 104 ++++++++++++++++++ python/test/test_connectionpool.py | 55 +++++++++ 2 files changed, 159 insertions(+) create mode 100644 python/neuroglancer/pipeline/connection_pool.py create mode 100644 python/test/test_connectionpool.py diff --git a/python/neuroglancer/pipeline/connection_pool.py b/python/neuroglancer/pipeline/connection_pool.py new file mode 100644 index 0000000000..e68c5c8fd1 --- /dev/null +++ b/python/neuroglancer/pipeline/connection_pool.py @@ -0,0 +1,104 @@ +import Queue +import threading +import time +import signal +from functools import partial + +from google.cloud.storage import Client +from boto.s3.connection import S3Connection + +from neuroglancer.pipeline.secrets import PROJECT_NAME, google_credentials_path, aws_credentials + +class ConnectionPool(object): + """ + This class is intended to be subclassed. See below. + + Creating fresh client or connection objects + for Google or Amazon eventually starts causing + breakdowns when too many connections open. + + To promote efficient resource use and prevent + containers from dying, we create a ConnectionPool + that allows for the reuse of connections. + + Storage interfaces may acquire and release connections + when they need or finish using them. + + If the limit is reached, additional requests for + acquiring connections will block until they can + be serviced. + """ + def __init__(self): + self.pool = Queue.Queue(maxsize=0) + self.outstanding = 0 + self._lock = threading.Lock() + + def handler(signum, frame): + self.reset_pool() + + signal.signal(signal.SIGINT, handler) + signal.signal(signal.SIGTERM, handler) + + def total_connections(self): + return self.pool.qsize() + self.outstanding + + def _create_connection(self): + raise NotImplementedError + + def get_connection(self): + with self._lock: + try: + conn = self.pool.get(block=False) + self.pool.task_done() + except Queue.Empty: + conn = self._create_connection() + finally: + self.outstanding += 1 + + return conn + + def release_connection(self, conn): + if conn is None: + return + + self.pool.put(conn) + with self._lock: + self.outstanding -= 1 + + def _close_function(self): + return lambda x: x # no-op + + def reset_pool(self): + closefn = self._close_function() + while True: + if not self.pool.qsize(): + break + try: + conn = self.pool.get() + closefn(conn) + self.pool.task_done() + except Queue.Empty: + break + + with self._lock: + self.outstanding = 0 + + def __del__(self): + self.reset_pool() + +class S3ConnectionPool(ConnectionPool): + def _create_connection(self): + return S3Connection( + aws_credentials['AWS_ACCESS_KEY_ID'], + aws_credentials['AWS_SECRET_ACCESS_KEY'] + ) + + def _close_function(self): + return lambda conn: conn.close() + +class GCloudConnectionPool(ConnectionPool): + def _create_connection(self): + return Client.from_service_account_json( + google_credentials_path, + project=PROJECT_NAME, + ) diff --git a/python/test/test_connectionpool.py b/python/test/test_connectionpool.py new file mode 100644 index 0000000000..f81430e165 --- /dev/null +++ b/python/test/test_connectionpool.py @@ -0,0 +1,55 @@ +from tqdm import tqdm +import boto.s3.key + +from neuroglancer.pipeline.connection_pool import S3ConnectionPool, GCloudConnectionPool +from neuroglancer.pipeline.threaded_queue import ThreadedQueue +from neuroglancer.pipeline import Storage + +max_connections = 60 +S3_POOL = S3ConnectionPool(max_connections=max_connections) +GC_POOL = GCloudConnectionPool(max_connections=max_connections) + +def test_gc_stresstest(): + with Storage('gs://neuroglancer/removeme/connection_pool/', n_threads=0) as stor: + stor.put_file('test', 'some string') + + n_trials = 500 + pbar = tqdm(total=n_trials) + + def create_conn(interface): + conn = GC_POOL.get_connection() + # assert GC_POOL.total_connections() <= GC_POOL.max_connections * 5 + bucket = conn.get_bucket('neuroglancer') + blob = bucket.get_blob('removeme/connection_pool/test') + blob.download_as_string() + GC_POOL.release_connection(conn) + pbar.update() + + with ThreadedQueue(n_threads=200) as tq: + for _ in xrange(n_trials): + tq.put(create_conn) + + pbar.close() + +def test_s3_stresstest(): + with Storage('s3://neuroglancer/removeme/connection_pool/', n_threads=0) as stor: + stor.put_file('test', 'some string') + + n_trials = 500 + pbar = tqdm(total=n_trials) + + def create_conn(interface): + conn = S3_POOL.get_connection() + # assert S3_POOL.total_connections() <= S3_POOL.max_connections * 5 + bucket = conn.get_bucket('neuroglancer') + k = boto.s3.key.Key(bucket) + k.key = 'removeme/connection_pool/test' + k.get_contents_as_string() + S3_POOL.release_connection(conn) + pbar.update() + + with ThreadedQueue(n_threads=200) as tq: + for _ in xrange(n_trials): + tq.put(create_conn) + + pbar.close()