Skip to content
This repository has been archived by the owner on Dec 10, 2018. It is now read-only.

Add threadpool server #321

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ install:

script:
- tox -e py

env:
global:
- TRAVIS="true"
72 changes: 72 additions & 0 deletions thriftpy/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
import threading
from six.moves import queue

from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.transport import (
Expand Down Expand Up @@ -103,3 +104,74 @@ def handle(self, client):

def close(self):
self.closed = True


class TThreadPoolServer(TServer):
"""Server with a fixed size pool of threads which service requests."""

def __init__(self, *args, **kwargs):
TServer.__init__(self, *args)
self.clients = queue.Queue()
self.threads = 10
self.daemon = kwargs.get("daemon", False)
self.closed = False

def setNumThreads(self, num):
"""Set the number of worker threads that should be created"""
self.threads = num

def serveThread(self):
"""Loop around getting clients from the queue and process them."""
while True:
if self.closed:
break
try:
client = self.clients.get()
self.serveClient(client)
except Exception as x:
logger.exception(x)

def serveClient(self, client):
"""Process input/output from a client for as long as possible"""
itrans = self.itrans_factory.get_transport(client)
otrans = self.otrans_factory.get_transport(client)
iprot = self.iprot_factory.get_protocol(itrans)
oprot = self.oprot_factory.get_protocol(otrans)
try:
while True:
if self.closed:
break
self.processor.process(iprot, oprot)
except TTransportException as x:
pass
except Exception as x:
logger.exception(x)

itrans.close()
otrans.close()

def serve(self):
"""Start a fixed number of threads and put client into a queue"""
for i in range(self.threads):
try:
t = threading.Thread(target=self.serveThread)
t.setDaemon(self.daemon)
t.start()
except Exception as x:
logger.exception(x)

# Pump the socket for clients
self.trans.listen()
while True:
if self.closed:
break
try:
client = self.trans.accept()
if not client:
continue
self.clients.put(client)
except Exception as x:
logger.exception(x)

def close(self):
self.closed = True