diff --git a/examples/multiplexer/multiplexed_client.py b/examples/multiplexer/multiplexed_client.py index fc27f33..6ffd7f2 100644 --- a/examples/multiplexer/multiplexed_client.py +++ b/examples/multiplexer/multiplexed_client.py @@ -4,7 +4,7 @@ from thriftpy.rpc import client_context from thriftpy.protocol import ( TBinaryProtocolFactory, - TMultiplexingProtocolFactory + TMultiplexedProtocolFactory ) dd_thrift = thriftpy.load("dingdong.thrift", module_name="dd_thrift") @@ -14,17 +14,16 @@ PP_SERVICE_NAME = "pp_thrift" - def main(): binary_factory = TBinaryProtocolFactory() - dd_factory = TMultiplexingProtocolFactory(binary_factory, DD_SERVICE_NAME) + dd_factory = TMultiplexedProtocolFactory(binary_factory, DD_SERVICE_NAME) with client_context(dd_thrift.DingService, '127.0.0.1', 9090, proto_factory=dd_factory) as c: # ring that doorbell dong = c.ding() print(dong) - pp_factory = TMultiplexingProtocolFactory(binary_factory, PP_SERVICE_NAME) + pp_factory = TMultiplexedProtocolFactory(binary_factory, PP_SERVICE_NAME) with client_context(pp_thrift.PingService, '127.0.0.1', 9090, proto_factory=pp_factory) as c: # play table tennis like a champ diff --git a/examples/multiplexer/multiplexed_server.py b/examples/multiplexer/multiplexed_server.py index 485571a..8f00768 100644 --- a/examples/multiplexer/multiplexed_server.py +++ b/examples/multiplexer/multiplexed_server.py @@ -3,7 +3,7 @@ import thriftpy from thriftpy.protocol import TBinaryProtocolFactory from thriftpy.server import TThreadedServer -from thriftpy.thrift import TProcessor, TMultiplexingProcessor +from thriftpy.thrift import TProcessor, TMultiplexedProcessor from thriftpy.transport import TBufferedTransportFactory, TServerSocket @@ -13,6 +13,7 @@ DD_SERVICE_NAME = "dd_thrift" PP_SERVICE_NAME = "pp_thrift" + class DingDispatcher(object): def ding(self): print("ding dong!") @@ -29,7 +30,7 @@ def main(): dd_proc = TProcessor(dd_thrift.DingService, DingDispatcher()) pp_proc = TProcessor(pp_thrift.PingService, PingDispatcher()) - mux_proc = TMultiplexingProcessor() + mux_proc = TMultiplexedProcessor() mux_proc.register_processor(DD_SERVICE_NAME, dd_proc) mux_proc.register_processor(PP_SERVICE_NAME, pp_proc) diff --git a/tests/test_multiplexed.py b/tests/test_multiplexed.py index 6757a23..4b5453b 100644 --- a/tests/test_multiplexed.py +++ b/tests/test_multiplexed.py @@ -11,11 +11,11 @@ import thriftpy from thriftpy.protocol import ( TBinaryProtocolFactory, - TMultiplexingProtocolFactory - ) + TMultiplexedProtocolFactory +) from thriftpy.rpc import client_context from thriftpy.server import TThreadedServer -from thriftpy.thrift import TProcessor, TMultiplexingProcessor +from thriftpy.thrift import TProcessor, TMultiplexedProcessor from thriftpy.transport import TBufferedTransportFactory, TServerSocket @@ -39,7 +39,7 @@ def server(request): p1 = TProcessor(mux.ThingOneService, DispatcherOne()) p2 = TProcessor(mux.ThingTwoService, DispatcherTwo()) - mux_proc = TMultiplexingProcessor() + mux_proc = TMultiplexedProcessor() mux_proc.register_processor("ThingOneService", p1) mux_proc.register_processor("ThingTwoService", p2) @@ -62,8 +62,8 @@ def fin(): def client_one(timeout=3000): binary_factory = TBinaryProtocolFactory() - multiplexing_factory = TMultiplexingProtocolFactory(binary_factory, - "ThingOneService") + multiplexing_factory = TMultiplexedProtocolFactory(binary_factory, + "ThingOneService") return client_context(mux.ThingOneService, unix_socket=sock_path, timeout=timeout, proto_factory=multiplexing_factory) @@ -71,8 +71,8 @@ def client_one(timeout=3000): def client_two(timeout=3000): binary_factory = TBinaryProtocolFactory() - multiplexing_factory = TMultiplexingProtocolFactory(binary_factory, - "ThingTwoService") + multiplexing_factory = TMultiplexedProtocolFactory(binary_factory, + "ThingTwoService") return client_context(mux.ThingTwoService, unix_socket=sock_path, timeout=timeout, proto_factory=multiplexing_factory) diff --git a/thriftpy/protocol/__init__.py b/thriftpy/protocol/__init__.py index 37c5d1e..f26bd99 100644 --- a/thriftpy/protocol/__init__.py +++ b/thriftpy/protocol/__init__.py @@ -4,7 +4,7 @@ from .binary import TBinaryProtocol, TBinaryProtocolFactory from .json import TJSONProtocol, TJSONProtocolFactory -from .multiplex import TMultiplexingProtocol, TMultiplexingProtocolFactory +from .multiplex import TMultiplexedProtocol, TMultiplexedProtocolFactory from thriftpy._compat import PYPY, CYTHON if not PYPY: @@ -21,4 +21,4 @@ __all__ = ['TBinaryProtocol', 'TBinaryProtocolFactory', 'TCyBinaryProtocol', 'TCyBinaryProtocolFactory', 'TJSONProtocol', 'TJSONProtocolFactory', - 'TMultiplexingProtocol', 'TMultiplexingProtocolFactory'] + 'TMultiplexedProtocol', 'TMultiplexedProtocolFactory'] diff --git a/thriftpy/protocol/multiplex.py b/thriftpy/protocol/multiplex.py index 4a7cce7..58b573c 100644 --- a/thriftpy/protocol/multiplex.py +++ b/thriftpy/protocol/multiplex.py @@ -1,37 +1,34 @@ -from thriftpy.thrift import TMultiplexingProcessor +# -*- coding: utf-8 -*- +from thriftpy.thrift import TMultiplexedProcessor, TMessageType -class TMultiplexingProtocol(object): - - """ - - Multiplex protocol - - for writing message begin, it prepend the service name to the api - for other functions, it simply delegate to the original protocol +class TMultiplexedProtocol(object): + """Multiplex the protocol by prepend service name to api for every api call. + Can be used together with all original protocols. """ def __init__(self, proto, service_name): self.service_name = service_name - self.proto = proto + self._proto = proto def __getattr__(self, name): - return getattr(self.proto, name) + return getattr(self._proto, name) def write_message_begin(self, name, ttype, seqid): - self.proto.write_message_begin( - self.service_name + TMultiplexingProcessor.SEPARATOR + name, - ttype, seqid) - + if ttype in (TMessageType.CALL, TMessageType.ONEWAY): + self._proto.write_message_begin( + self.service_name + TMultiplexedProcessor.SEPARATOR + name, + ttype, seqid) + else: + self._proto.write_message_begin(name, ttype, seqid) -class TMultiplexingProtocolFactory(object): +class TMultiplexedProtocolFactory(object): def __init__(self, proto_factory, service_name): - self.proto_factory = proto_factory + self._proto_factory = proto_factory self.service_name = service_name def get_protocol(self, trans): - proto = self.proto_factory.get_protocol(trans) - multi_proto = TMultiplexingProtocol(proto, self.service_name) - return multi_proto + proto = self._proto_factory.get_protocol(trans) + return TMultiplexedProtocol(proto, self.service_name) diff --git a/thriftpy/thrift.py b/thriftpy/thrift.py index b570595..06a2b59 100644 --- a/thriftpy/thrift.py +++ b/thriftpy/thrift.py @@ -85,7 +85,6 @@ def gen_init(cls, thrift_spec=None, default_spec=None): class TPayload(with_metaclass(TPayloadMeta, object)): - def read(self, iprot): iprot.read_struct(self) @@ -188,7 +187,7 @@ def process_in(self, iprot): if api not in self._service.thrift_services: iprot.skip(TType.STRUCT) iprot.read_message_end() - return api, seqid, TApplicationException(TApplicationException.UNKNOWN_METHOD), None # noqa + return api, seqid, TApplicationException(TApplicationException.UNKNOWN_METHOD), None # noqa args = getattr(self._service, api + "_args")() args.read(iprot) @@ -196,13 +195,11 @@ def process_in(self, iprot): result = getattr(self._service, api + "_result")() # convert kwargs to args - api_args = [args.thrift_spec[k][1] - for k in sorted(args.thrift_spec)] + api_args = [args.thrift_spec[k][1] for k in sorted(args.thrift_spec)] def call(): - return getattr(self._handler, api)( - *(args.__dict__[k] for k in api_args) - ) + f = getattr(self._handler, api) + return f(*(args.__dict__[k] for k in api_args)) return api, seqid, result, call @@ -246,33 +243,34 @@ def process(self, iprot, oprot): self.send_result(oprot, api, result, seqid) -class TMultiplexingProcessor(TProcessor): +class TMultiplexedProcessor(TProcessor): SEPARATOR = ":" def __init__(self): self.processors = {} - pass def register_processor(self, service_name, processor): - if service_name in self.processors: raise TApplicationException( type=TApplicationException.INTERNAL_ERROR, message='processor for `{0}` already registered' - .format(service_name)) - + .format(service_name)) self.processors[service_name] = processor def process_in(self, iprot): api, type, seqid = iprot.read_message_begin() + if type not in (TMessageType.CALL, TMessageType.ONEWAY): + raise TException("TMultiplex protocol only supports CALL & ONEWAY") + if TMultiplexedProcessor.SEPARATOR not in api: + raise TException("Service name not found in message. " + "You should use TMultiplexedProtocol in client.") - service_name, api = api.split(TMultiplexingProcessor.SEPARATOR) - + service_name, api = api.split(TMultiplexedProcessor.SEPARATOR) if service_name not in self.processors: iprot.skip(TType.STRUCT) iprot.read_message_end() e = TApplicationException(TApplicationException.UNKNOWN_METHOD) - return api, seqid, e, None # noqa + return api, seqid, e, None proc = self.processors[service_name] args = getattr(proc._service, api + "_args")() @@ -281,8 +279,7 @@ def process_in(self, iprot): result = getattr(proc._service, api + "_result")() # convert kwargs to args - api_args = [args.thrift_spec[k][1] - for k in sorted(args.thrift_spec)] + api_args = [args.thrift_spec[k][1] for k in sorted(args.thrift_spec)] def call(): f = getattr(proc._handler, api)