diff --git a/.travis.yml b/.travis.yml index ed2b442..30dc6b5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,3 +19,7 @@ install: script: - tox -e py + +env: + global: + - TRAVIS="true" \ No newline at end of file diff --git a/thriftpy/server.py b/thriftpy/server.py index 664e4ec..4792524 100644 --- a/thriftpy/server.py +++ b/thriftpy/server.py @@ -4,6 +4,7 @@ import logging import threading +from six.moves import queue from thriftpy.protocol import TBinaryProtocolFactory from thriftpy.transport import ( @@ -103,3 +104,74 @@ def handle(self, client): def close(self): self.closed = True + + +class TThreadPoolServer(TServer): + """Server with a fixed size pool of threads which service requests.""" + + def __init__(self, *args, **kwargs): + TServer.__init__(self, *args) + self.clients = queue.Queue() + self.threads = 10 + self.daemon = kwargs.get("daemon", False) + self.closed = False + + def setNumThreads(self, num): + """Set the number of worker threads that should be created""" + self.threads = num + + def serveThread(self): + """Loop around getting clients from the queue and process them.""" + while True: + if self.closed: + break + try: + client = self.clients.get() + self.serveClient(client) + except Exception as x: + logger.exception(x) + + def serveClient(self, client): + """Process input/output from a client for as long as possible""" + itrans = self.itrans_factory.get_transport(client) + otrans = self.otrans_factory.get_transport(client) + iprot = self.iprot_factory.get_protocol(itrans) + oprot = self.oprot_factory.get_protocol(otrans) + try: + while True: + if self.closed: + break + self.processor.process(iprot, oprot) + except TTransportException as x: + pass + except Exception as x: + logger.exception(x) + + itrans.close() + otrans.close() + + def serve(self): + """Start a fixed number of threads and put client into a queue""" + for i in range(self.threads): + try: + t = threading.Thread(target=self.serveThread) + t.setDaemon(self.daemon) + t.start() + except Exception as x: + logger.exception(x) + + # Pump the socket for clients + self.trans.listen() + while True: + if self.closed: + break + try: + client = self.trans.accept() + if not client: + continue + self.clients.put(client) + except Exception as x: + logger.exception(x) + + def close(self): + self.closed = True