From c23233dc07fe7edfaaff230826b55aeb33ea8416 Mon Sep 17 00:00:00 2001 From: Christian Date: Sat, 26 Oct 2024 16:46:19 +0200 Subject: [PATCH] Handling of multiple async method calls on client side --- README.md | 1 - .../call_method_tcp/call_method_tcp.cpp | 4 +- .../call_method_udp/call_method_udp.cpp | 4 +- setup.cfg | 2 +- src/someipy/client_service_instance.py | 171 +++++++++++------- src/someipy/server_service_instance.py | 69 ++++--- 6 files changed, 160 insertions(+), 91 deletions(-) diff --git a/README.md b/README.md index b98759d..f4c7afa 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,6 @@ The library is still under development. The current major limitations and deviat - Only unicast services are supported. - SOME/IP-TP is not supported. - IPv6 endpoints are not supported. -- Session handling is supported only for SOME/IP-SD and not for SOME/IP messages transporting events or methods. - SOME/IP fields are not supported. ### Service Discovery diff --git a/integration_tests/call_method_tcp/call_method_tcp.cpp b/integration_tests/call_method_tcp/call_method_tcp.cpp index c3e48e1..c31fa28 100644 --- a/integration_tests/call_method_tcp/call_method_tcp.cpp +++ b/integration_tests/call_method_tcp/call_method_tcp.cpp @@ -62,11 +62,11 @@ class service_sample { } void offer() { - app_->offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID); + app_->offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, 0x1); } void stop_offer() { - app_->stop_offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID); + app_->stop_offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, 0x1); } void on_state(vsomeip::state_type_e _state) { diff --git a/integration_tests/call_method_udp/call_method_udp.cpp b/integration_tests/call_method_udp/call_method_udp.cpp index c3e48e1..3e660b2 100644 --- a/integration_tests/call_method_udp/call_method_udp.cpp +++ b/integration_tests/call_method_udp/call_method_udp.cpp @@ -62,11 +62,11 @@ class service_sample { } void offer() { - app_->offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID); + app_->offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, 0x1); // major version 0x1 } void stop_offer() { - app_->stop_offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID); + app_->stop_offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, 0x1); } void on_state(vsomeip::state_type_e _state) { diff --git a/setup.cfg b/setup.cfg index 4a40668..e8ce7b8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = someipy -version = 0.0.9 +version = 0.0.10 author = Christian H. author_email = someipy.package@gmail.com description = A Python package implementing the SOME/IP protocol diff --git a/src/someipy/client_service_instance.py b/src/someipy/client_service_instance.py index 8d1d3ad..c0054b4 100644 --- a/src/someipy/client_service_instance.py +++ b/src/someipy/client_service_instance.py @@ -16,7 +16,7 @@ import asyncio from enum import Enum import struct -from typing import Iterable, Tuple, Callable, Set, List +from typing import Dict, Iterable, Tuple, Callable, Set, List from someipy import Service from someipy._internal.method_result import MethodResult @@ -86,7 +86,10 @@ class ClientServiceInstance(ServiceDiscoveryObserver): _callback: Callable[[bytes], None] _found_services: Iterable[FoundService] _subscription_active: bool - _method_call_future: asyncio.Future + + _method_call_futures: Dict[int, asyncio.Future] + _client_id: int + _session_id: int def __init__( self, @@ -97,6 +100,7 @@ def __init__( someip_endpoint: SomeipEndpoint, ttl: int = 0, sd_sender=None, + client_id: int = 0, ): self._service = service self._instance_id = instance_id @@ -119,7 +123,10 @@ def __init__( self._found_services = [] self._subscription_active = False - self._method_call_future = None + self._method_call_futures: Dict[int, asyncio.Future] = {} + self._client_id = client_id + + self._session_id = 0 # Starts from 1 to 0xFFFF def register_callback(self, callback: Callable[[SomeIpMessage], None]) -> None: """ @@ -157,7 +164,7 @@ async def call_method(self, method_id: int, payload: bytes) -> MethodResult: payload (bytes): The payload to send with the method call. Returns: - MethodResult: The result of the method call which can contain an error or a successfull result including the response payload. + MethodResult: The result of the method call which can contain an error or a successful result including the response payload. Raises: RuntimeError: If the TCP connection to the server cannot be established or if the server service has not been found yet. @@ -174,20 +181,25 @@ async def call_method(self, method_id: int, payload: bytes) -> MethodResult: f"Method 0x{method_id:04x} called, but service 0x{self._service.id:04X} with instance 0x{self._instance_id:04X} not found yet." ) + # Session ID is a 16-bit value and should be incremented for each method call starting from 1 + self._session_id = (self._session_id + 1) % 0xFFFF + session_id = self._session_id + header = SomeIpHeader( service_id=self._service.id, method_id=method_id, - client_id=0x00, - session_id=0x00, + client_id=self._client_id, + session_id=session_id, protocol_version=0x01, - interface_version=0x00, + interface_version=self._service.major_version, message_type=MessageType.REQUEST.value, return_code=0x00, length=len(payload) + 8, ) someip_message = SomeIpMessage(header, payload) - self._method_call_future = asyncio.get_running_loop().create_future() + call_future = asyncio.get_running_loop().create_future() + self._method_call_futures[session_id] = call_future dst_address = str(self._found_services[0].service.endpoint[0]) dst_port = self._found_services[0].service.endpoint[1] @@ -234,20 +246,28 @@ async def call_method(self, method_id: int, payload: bytes) -> MethodResult: endpoint_to_str_int_tuple(self._found_services[0].service.endpoint), ) - # After sending the method call wait for maximum three seconds + # After sending the method call wait for maximum 10 seconds try: - await asyncio.wait_for(self._method_call_future, 3.0) + await asyncio.wait_for(call_future, 10.0) except asyncio.TimeoutError: + + # Remove the call_future from self._method_call_futures + del self._method_call_futures[session_id] + get_logger(_logger_name).error( f"Waiting on response for method call 0x{method_id:04X} timed out." ) raise - return self._method_call_future.result() + method_result = call_future.result() + del self._method_call_futures[session_id] + return method_result def someip_message_received( self, someip_message: SomeIpMessage, addr: Tuple[str, int] ) -> None: + + # Handling a notification message if ( someip_message.header.client_id == 0x00 and someip_message.header.message_type == MessageType.NOTIFICATION.value @@ -257,16 +277,33 @@ def someip_message_received( self._callback(someip_message) return + # Handling a response message if ( someip_message.header.message_type == MessageType.RESPONSE.value or someip_message.header.message_type == MessageType.ERROR.value ): - if self._method_call_future is not None: + if someip_message.header.session_id not in self._method_call_futures.keys(): + return + if someip_message.header.client_id != self._client_id: + return + + call_future = None + try: + call_future = self._method_call_futures[ + someip_message.header.session_id + ] + except: + get_logger(_logger_name).error( + f"Received response for unknown session ID {someip_message.header.session_id}" + ) + return + + if call_future is not None: result = MethodResult() result.message_type = MessageType(someip_message.header.message_type) result.return_code = ReturnCode(someip_message.header.return_code) result.payload = someip_message.payload - self._method_call_future.set_result(result) + call_future.set_result(result) return def subscribe_eventgroup(self, eventgroup_id: int): @@ -314,64 +351,72 @@ def handle_find_service(self): def handle_offer_service(self, offered_service: SdService): if self._service.id != offered_service.service_id: return - if self._instance_id != offered_service.instance_id: + if ( + self._instance_id != 0xFFFF + and offered_service.instance_id != 0xFFFF + and self._instance_id != offered_service.instance_id + ): + # 0xFFFF allows to handle any instance ID + return + if self._service.major_version != offered_service.major_version: return - if ( - offered_service.service_id == self._service.id - and offered_service.instance_id == self._instance_id + self._service.minor_version != 0xFFFFFFFF + and self._service.minor_version != offered_service.minor_version ): - if FoundService(offered_service) not in self._found_services: - self._found_services.append(FoundService(offered_service)) + # 0xFFFFFFFF allows to handle any minor version + return - if len(self._eventgroups_to_subscribe) == 0: - return + if FoundService(offered_service) not in self._found_services: + self._found_services.append(FoundService(offered_service)) - # Try to subscribe to requested event groups - for eventgroup_to_subscribe in self._eventgroups_to_subscribe: - ( - session_id, - reboot_flag, - ) = self._sd_sender.get_unicast_session_handler().update_session() - - # Improvement: Pack all entries into a single SD message - subscribe_sd_header = build_subscribe_eventgroup_sd_header( - service_id=self._service.id, - instance_id=self._instance_id, - major_version=self._service.major_version, - ttl=self._ttl, - event_group_id=eventgroup_to_subscribe, - session_id=session_id, - reboot_flag=reboot_flag, - endpoint=self._endpoint, - protocol=self._protocol, - ) + if len(self._eventgroups_to_subscribe) == 0: + return - get_logger(_logger_name).debug( - f"Send subscribe for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}, " - f"eventgroup ID: {eventgroup_to_subscribe} TTL: {self._ttl}, version: " - f"session ID: {session_id}" - ) + # Try to subscribe to requested event groups + for eventgroup_to_subscribe in self._eventgroups_to_subscribe: + ( + session_id, + reboot_flag, + ) = self._sd_sender.get_unicast_session_handler().update_session() + # Improvement: Pack all entries into a single SD message + subscribe_sd_header = build_subscribe_eventgroup_sd_header( + service_id=self._service.id, + instance_id=self._instance_id, + major_version=self._service.major_version, + ttl=self._ttl, + event_group_id=eventgroup_to_subscribe, + session_id=session_id, + reboot_flag=reboot_flag, + endpoint=self._endpoint, + protocol=self._protocol, + ) - if self._protocol == TransportLayerProtocol.TCP: - if self._tcp_task is None: - get_logger(_logger_name).debug( - f"Create new TCP task for client of 0x{self._instance_id:04X}, 0x{self._service.id:04X}" - ) - self._tcp_task = asyncio.create_task( - self.setup_tcp_connection( - str(self._endpoint[0]), - self._endpoint[1], - str(offered_service.endpoint[0]), - offered_service.endpoint[1], - ) + get_logger(_logger_name).debug( + f"Send subscribe for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}, " + f"eventgroup ID: {eventgroup_to_subscribe} TTL: {self._ttl}, version: " + f"session ID: {session_id}" + ) + + if self._protocol == TransportLayerProtocol.TCP: + if self._tcp_task is None: + get_logger(_logger_name).debug( + f"Create new TCP task for client of 0x{self._instance_id:04X}, 0x{self._service.id:04X}" + ) + self._tcp_task = asyncio.create_task( + self.setup_tcp_connection( + str(self._endpoint[0]), + self._endpoint[1], + str(offered_service.endpoint[0]), + offered_service.endpoint[1], ) + ) - self._expected_acks.append(ExpectedAck(eventgroup_to_subscribe)) - self._sd_sender.send_unicast( - buffer=subscribe_sd_header.to_buffer(), - dest_ip=offered_service.endpoint[0], - ) + self._expected_acks.append(ExpectedAck(eventgroup_to_subscribe)) + self._sd_sender.send_unicast( + buffer=subscribe_sd_header.to_buffer(), + dest_ip=offered_service.endpoint[0], + ) def handle_stop_offer_service(self, offered_service: SdService) -> None: if self._service.id != offered_service.service_id: @@ -489,6 +534,7 @@ async def construct_client_service_instance( ttl: int = 0, sd_sender=None, protocol=TransportLayerProtocol.UDP, + client_id: int = 0, ) -> ClientServiceInstance: """ Asynchronously constructs a ClientServerInstance. Based on the given transport protocol, proper endpoints are setup before constructing the actual ServerServiceInstance. @@ -523,6 +569,7 @@ async def construct_client_service_instance( udp_endpoint, ttl, sd_sender, + client_id, ) udp_endpoint.set_someip_callback(client_instance.someip_message_received) diff --git a/src/someipy/server_service_instance.py b/src/someipy/server_service_instance.py index c06f309..56d61a9 100644 --- a/src/someipy/server_service_instance.py +++ b/src/someipy/server_service_instance.py @@ -14,7 +14,7 @@ # along with this program. If not, see . import asyncio -from typing import Tuple +from typing import Set, Tuple from someipy._internal.someip_message import SomeIpMessage from someipy.service import Service @@ -25,7 +25,6 @@ from someipy._internal.someip_sd_builder import ( build_stop_offer_service_sd_header, build_subscribe_eventgroup_ack_entry, - build_offer_service_sd_header, build_subscribe_eventgroup_ack_sd_header, ) from someipy._internal.someip_header import SomeIpHeader @@ -69,9 +68,11 @@ class ServerServiceInstance(ServiceDiscoveryObserver): _subscribers: Subscribers _offer_timer: SimplePeriodicTimer - _handler_tasks: set[asyncio.Task] + _handler_tasks: Set[asyncio.Task] _is_running: bool + _session_id: int + def __init__( self, service: Service, @@ -97,6 +98,7 @@ def __init__( self._handler_tasks = set() self._is_running = True + self._session_id = 0 # Starts from 1 to 0xFFFF def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None: """ @@ -117,15 +119,18 @@ def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None self._subscribers.update() + # Session ID is a 16-bit value and should be incremented for each method call starting from 1 + self._session_id = (self._session_id + 1) % 0xFFFF + length = 8 + len(payload) someip_header = SomeIpHeader( service_id=self._service.id, method_id=event_id, length=length, - client_id=0x00, # TODO - session_id=0x01, # TODO - protocol_version=1, # TODO - interface_version=1, # TODO + client_id=0x00, + session_id=self._session_id, + protocol_version=1, + interface_version=self._service.major_version, message_type=MessageType.NOTIFICATION.value, return_code=0x00, ) @@ -142,16 +147,21 @@ def send_event(self, event_group_id: int, event_id: int, payload: bytes) -> None ) async def _handle_method_call(self, method_handler, dst_addr, header_to_return): - result = await method_handler - header_to_return.message_type = result.message_type.value - header_to_return.return_code = result.return_code.value - payload_to_return = result.payload - - # Update length in header to the correct length - header_to_return.length = 8 + len(payload_to_return) - self._someip_endpoint.sendto( - header_to_return.to_buffer() + payload_to_return, dst_addr - ) + try: + result = await method_handler + header_to_return.message_type = result.message_type.value + header_to_return.return_code = result.return_code.value + payload_to_return = result.payload + + # Update length in header to the correct length + header_to_return.length = 8 + len(payload_to_return) + self._someip_endpoint.sendto( + header_to_return.to_buffer() + payload_to_return, dst_addr + ) + except asyncio.CancelledError: + get_logger(_logger_name).debug( + f"Method call for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X} was canceled" + ) def someip_message_received( self, message: SomeIpMessage, addr: Tuple[str, int] @@ -199,6 +209,15 @@ def send_response(): send_response() return + if header.interface_version != self._service.major_version: + get_logger(_logger_name).warning( + f"Unknown interface version received from {addr}: Version {header.interface_version}" + ) + header_to_return.message_type = MessageType.RESPONSE.value + header_to_return.return_code = ReturnCode.E_WRONG_INTERFACE_VERSION.value + send_response() + return + if header.method_id not in self._service.methods.keys(): get_logger(_logger_name).warning( f"Unknown method ID received from {addr}: ID 0x{header.method_id:04X}" @@ -208,12 +227,16 @@ def send_response(): send_response() return - # TODO: Test for protocol and interface version + if header.message_type != MessageType.REQUEST.value: + get_logger(_logger_name).warning( + f"Unknown message type received from {addr}: Type 0x{header.message_type:04X}" + ) + header_to_return.message_type = MessageType.RESPONSE.value + header_to_return.return_code = ReturnCode.E_WRONG_MESSAGE_TYPE.value + send_response() + return - if ( - header.message_type == MessageType.REQUEST.value - and header.return_code == 0x00 - ): + if header.return_code == 0x00: method_handler = self._service.methods[header.method_id].method_handler coro = method_handler(message.payload, addr) @@ -227,7 +250,7 @@ def send_response(): else: get_logger(_logger_name).warning( - f"Unknown message type received from {addr}: Type 0x{header.message_type:04X}" + f"Wrong return type received from {addr}: Type 0x{header.return_code:02X}" ) def handle_find_service(self):