Skip to content

Commit

Permalink
stay same to upstream
Browse files Browse the repository at this point in the history
rename multiplexing -> multiplexed
enforce type check in write_message_begin
  • Loading branch information
lxyu committed Apr 15, 2015
1 parent 1e9f5e0 commit 375a4c8
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 53 deletions.
7 changes: 3 additions & 4 deletions examples/multiplexer/multiplexed_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from thriftpy.rpc import client_context
from thriftpy.protocol import (
TBinaryProtocolFactory,
TMultiplexingProtocolFactory
TMultiplexedProtocolFactory
)

dd_thrift = thriftpy.load("dingdong.thrift", module_name="dd_thrift")
Expand All @@ -14,17 +14,16 @@
PP_SERVICE_NAME = "pp_thrift"



def main():
binary_factory = TBinaryProtocolFactory()
dd_factory = TMultiplexingProtocolFactory(binary_factory, DD_SERVICE_NAME)
dd_factory = TMultiplexedProtocolFactory(binary_factory, DD_SERVICE_NAME)
with client_context(dd_thrift.DingService, '127.0.0.1', 9090,
proto_factory=dd_factory) as c:
# ring that doorbell
dong = c.ding()
print(dong)

pp_factory = TMultiplexingProtocolFactory(binary_factory, PP_SERVICE_NAME)
pp_factory = TMultiplexedProtocolFactory(binary_factory, PP_SERVICE_NAME)
with client_context(pp_thrift.PingService, '127.0.0.1', 9090,
proto_factory=pp_factory) as c:
# play table tennis like a champ
Expand Down
5 changes: 3 additions & 2 deletions examples/multiplexer/multiplexed_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import thriftpy
from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.server import TThreadedServer
from thriftpy.thrift import TProcessor, TMultiplexingProcessor
from thriftpy.thrift import TProcessor, TMultiplexedProcessor
from thriftpy.transport import TBufferedTransportFactory, TServerSocket


Expand All @@ -13,6 +13,7 @@
DD_SERVICE_NAME = "dd_thrift"
PP_SERVICE_NAME = "pp_thrift"


class DingDispatcher(object):
def ding(self):
print("ding dong!")
Expand All @@ -29,7 +30,7 @@ def main():
dd_proc = TProcessor(dd_thrift.DingService, DingDispatcher())
pp_proc = TProcessor(pp_thrift.PingService, PingDispatcher())

mux_proc = TMultiplexingProcessor()
mux_proc = TMultiplexedProcessor()
mux_proc.register_processor(DD_SERVICE_NAME, dd_proc)
mux_proc.register_processor(PP_SERVICE_NAME, pp_proc)

Expand Down
16 changes: 8 additions & 8 deletions tests/test_multiplexed.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
import thriftpy
from thriftpy.protocol import (
TBinaryProtocolFactory,
TMultiplexingProtocolFactory
)
TMultiplexedProtocolFactory
)
from thriftpy.rpc import client_context
from thriftpy.server import TThreadedServer
from thriftpy.thrift import TProcessor, TMultiplexingProcessor
from thriftpy.thrift import TProcessor, TMultiplexedProcessor
from thriftpy.transport import TBufferedTransportFactory, TServerSocket


Expand All @@ -39,7 +39,7 @@ def server(request):
p1 = TProcessor(mux.ThingOneService, DispatcherOne())
p2 = TProcessor(mux.ThingTwoService, DispatcherTwo())

mux_proc = TMultiplexingProcessor()
mux_proc = TMultiplexedProcessor()
mux_proc.register_processor("ThingOneService", p1)
mux_proc.register_processor("ThingTwoService", p2)

Expand All @@ -62,17 +62,17 @@ def fin():

def client_one(timeout=3000):
binary_factory = TBinaryProtocolFactory()
multiplexing_factory = TMultiplexingProtocolFactory(binary_factory,
"ThingOneService")
multiplexing_factory = TMultiplexedProtocolFactory(binary_factory,
"ThingOneService")
return client_context(mux.ThingOneService, unix_socket=sock_path,
timeout=timeout,
proto_factory=multiplexing_factory)


def client_two(timeout=3000):
binary_factory = TBinaryProtocolFactory()
multiplexing_factory = TMultiplexingProtocolFactory(binary_factory,
"ThingTwoService")
multiplexing_factory = TMultiplexedProtocolFactory(binary_factory,
"ThingTwoService")
return client_context(mux.ThingTwoService, unix_socket=sock_path,
timeout=timeout,
proto_factory=multiplexing_factory)
Expand Down
4 changes: 2 additions & 2 deletions thriftpy/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from .binary import TBinaryProtocol, TBinaryProtocolFactory
from .json import TJSONProtocol, TJSONProtocolFactory
from .multiplex import TMultiplexingProtocol, TMultiplexingProtocolFactory
from .multiplex import TMultiplexedProtocol, TMultiplexedProtocolFactory

from thriftpy._compat import PYPY, CYTHON
if not PYPY:
Expand All @@ -21,4 +21,4 @@
__all__ = ['TBinaryProtocol', 'TBinaryProtocolFactory',
'TCyBinaryProtocol', 'TCyBinaryProtocolFactory',
'TJSONProtocol', 'TJSONProtocolFactory',
'TMultiplexingProtocol', 'TMultiplexingProtocolFactory']
'TMultiplexedProtocol', 'TMultiplexedProtocolFactory']
37 changes: 17 additions & 20 deletions thriftpy/protocol/multiplex.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,34 @@
from thriftpy.thrift import TMultiplexingProcessor
# -*- coding: utf-8 -*-

from thriftpy.thrift import TMultiplexedProcessor, TMessageType

class TMultiplexingProtocol(object):

"""
Multiplex protocol
for writing message begin, it prepend the service name to the api
for other functions, it simply delegate to the original protocol

class TMultiplexedProtocol(object):
"""Multiplex the protocol by prepend service name to api for every api call.
Can be used together with all original protocols.
"""

def __init__(self, proto, service_name):
self.service_name = service_name
self.proto = proto
self._proto = proto

def __getattr__(self, name):
return getattr(self.proto, name)
return getattr(self._proto, name)

def write_message_begin(self, name, ttype, seqid):
self.proto.write_message_begin(
self.service_name + TMultiplexingProcessor.SEPARATOR + name,
ttype, seqid)

if ttype in (TMessageType.CALL, TMessageType.ONEWAY):
self._proto.write_message_begin(
self.service_name + TMultiplexedProcessor.SEPARATOR + name,
ttype, seqid)
else:
self._proto.write_message_begin(name, ttype, seqid)

class TMultiplexingProtocolFactory(object):

class TMultiplexedProtocolFactory(object):
def __init__(self, proto_factory, service_name):
self.proto_factory = proto_factory
self._proto_factory = proto_factory
self.service_name = service_name

def get_protocol(self, trans):
proto = self.proto_factory.get_protocol(trans)
multi_proto = TMultiplexingProtocol(proto, self.service_name)
return multi_proto
proto = self._proto_factory.get_protocol(trans)
return TMultiplexedProtocol(proto, self.service_name)
31 changes: 14 additions & 17 deletions thriftpy/thrift.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def gen_init(cls, thrift_spec=None, default_spec=None):


class TPayload(with_metaclass(TPayloadMeta, object)):

def read(self, iprot):
iprot.read_struct(self)

Expand Down Expand Up @@ -188,21 +187,19 @@ def process_in(self, iprot):
if api not in self._service.thrift_services:
iprot.skip(TType.STRUCT)
iprot.read_message_end()
return api, seqid, TApplicationException(TApplicationException.UNKNOWN_METHOD), None # noqa
return api, seqid, TApplicationException(TApplicationException.UNKNOWN_METHOD), None # noqa

args = getattr(self._service, api + "_args")()
args.read(iprot)
iprot.read_message_end()
result = getattr(self._service, api + "_result")()

# convert kwargs to args
api_args = [args.thrift_spec[k][1]
for k in sorted(args.thrift_spec)]
api_args = [args.thrift_spec[k][1] for k in sorted(args.thrift_spec)]

def call():
return getattr(self._handler, api)(
*(args.__dict__[k] for k in api_args)
)
f = getattr(self._handler, api)
return f(*(args.__dict__[k] for k in api_args))

return api, seqid, result, call

Expand Down Expand Up @@ -246,33 +243,34 @@ def process(self, iprot, oprot):
self.send_result(oprot, api, result, seqid)


class TMultiplexingProcessor(TProcessor):
class TMultiplexedProcessor(TProcessor):
SEPARATOR = ":"

def __init__(self):
self.processors = {}
pass

def register_processor(self, service_name, processor):

if service_name in self.processors:
raise TApplicationException(
type=TApplicationException.INTERNAL_ERROR,
message='processor for `{0}` already registered'
.format(service_name))

.format(service_name))
self.processors[service_name] = processor

def process_in(self, iprot):
api, type, seqid = iprot.read_message_begin()
if type not in (TMessageType.CALL, TMessageType.ONEWAY):
raise TException("TMultiplex protocol only supports CALL & ONEWAY")
if TMultiplexedProcessor.SEPARATOR not in api:
raise TException("Service name not found in message. "
"You should use TMultiplexedProtocol in client.")

service_name, api = api.split(TMultiplexingProcessor.SEPARATOR)

service_name, api = api.split(TMultiplexedProcessor.SEPARATOR)
if service_name not in self.processors:
iprot.skip(TType.STRUCT)
iprot.read_message_end()
e = TApplicationException(TApplicationException.UNKNOWN_METHOD)
return api, seqid, e, None # noqa
return api, seqid, e, None

proc = self.processors[service_name]
args = getattr(proc._service, api + "_args")()
Expand All @@ -281,8 +279,7 @@ def process_in(self, iprot):
result = getattr(proc._service, api + "_result")()

# convert kwargs to args
api_args = [args.thrift_spec[k][1]
for k in sorted(args.thrift_spec)]
api_args = [args.thrift_spec[k][1] for k in sorted(args.thrift_spec)]

def call():
f = getattr(proc._handler, api)
Expand Down

0 comments on commit 375a4c8

Please sign in to comment.