Skip to content

Commit

Permalink
Handling of multiple async method calls on client side
Browse files Browse the repository at this point in the history
  • Loading branch information
chrizog committed Oct 26, 2024
1 parent 0e73f12 commit c23233d
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 91 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ The library is still under development. The current major limitations and deviat
- Only unicast services are supported.
- SOME/IP-TP is not supported.
- IPv6 endpoints are not supported.
- Session handling is supported only for SOME/IP-SD and not for SOME/IP messages transporting events or methods.
- SOME/IP fields are not supported.

### Service Discovery
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/call_method_tcp/call_method_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ class service_sample {
}

void offer() {
app_->offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID);
app_->offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, 0x1);
}

void stop_offer() {
app_->stop_offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID);
app_->stop_offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, 0x1);
}

void on_state(vsomeip::state_type_e _state) {
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/call_method_udp/call_method_udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ class service_sample {
}

void offer() {
app_->offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID);
app_->offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, 0x1); // major version 0x1
}

void stop_offer() {
app_->stop_offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID);
app_->stop_offer_service(SAMPLE_SERVICE_ID, SAMPLE_INSTANCE_ID, 0x1);
}

void on_state(vsomeip::state_type_e _state) {
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = someipy
version = 0.0.9
version = 0.0.10
author = Christian H.
author_email = [email protected]
description = A Python package implementing the SOME/IP protocol
Expand Down
171 changes: 109 additions & 62 deletions src/someipy/client_service_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import asyncio
from enum import Enum
import struct
from typing import Iterable, Tuple, Callable, Set, List
from typing import Dict, Iterable, Tuple, Callable, Set, List

from someipy import Service
from someipy._internal.method_result import MethodResult
Expand Down Expand Up @@ -86,7 +86,10 @@ class ClientServiceInstance(ServiceDiscoveryObserver):
_callback: Callable[[bytes], None]
_found_services: Iterable[FoundService]
_subscription_active: bool
_method_call_future: asyncio.Future

_method_call_futures: Dict[int, asyncio.Future]
_client_id: int
_session_id: int

def __init__(
self,
Expand All @@ -97,6 +100,7 @@ def __init__(
someip_endpoint: SomeipEndpoint,
ttl: int = 0,
sd_sender=None,
client_id: int = 0,
):
self._service = service
self._instance_id = instance_id
Expand All @@ -119,7 +123,10 @@ def __init__(

self._found_services = []
self._subscription_active = False
self._method_call_future = None
self._method_call_futures: Dict[int, asyncio.Future] = {}
self._client_id = client_id

self._session_id = 0 # Starts from 1 to 0xFFFF

def register_callback(self, callback: Callable[[SomeIpMessage], None]) -> None:
"""
Expand Down Expand Up @@ -157,7 +164,7 @@ async def call_method(self, method_id: int, payload: bytes) -> MethodResult:
payload (bytes): The payload to send with the method call.
Returns:
MethodResult: The result of the method call which can contain an error or a successfull result including the response payload.
MethodResult: The result of the method call which can contain an error or a successful result including the response payload.
Raises:
RuntimeError: If the TCP connection to the server cannot be established or if the server service has not been found yet.
Expand All @@ -174,20 +181,25 @@ async def call_method(self, method_id: int, payload: bytes) -> MethodResult:
f"Method 0x{method_id:04x} called, but service 0x{self._service.id:04X} with instance 0x{self._instance_id:04X} not found yet."
)

# Session ID is a 16-bit value and should be incremented for each method call starting from 1
self._session_id = (self._session_id + 1) % 0xFFFF
session_id = self._session_id

header = SomeIpHeader(
service_id=self._service.id,
method_id=method_id,
client_id=0x00,
session_id=0x00,
client_id=self._client_id,
session_id=session_id,
protocol_version=0x01,
interface_version=0x00,
interface_version=self._service.major_version,
message_type=MessageType.REQUEST.value,
return_code=0x00,
length=len(payload) + 8,
)
someip_message = SomeIpMessage(header, payload)

self._method_call_future = asyncio.get_running_loop().create_future()
call_future = asyncio.get_running_loop().create_future()
self._method_call_futures[session_id] = call_future

dst_address = str(self._found_services[0].service.endpoint[0])
dst_port = self._found_services[0].service.endpoint[1]
Expand Down Expand Up @@ -234,20 +246,28 @@ async def call_method(self, method_id: int, payload: bytes) -> MethodResult:
endpoint_to_str_int_tuple(self._found_services[0].service.endpoint),
)

# After sending the method call wait for maximum three seconds
# After sending the method call wait for maximum 10 seconds
try:
await asyncio.wait_for(self._method_call_future, 3.0)
await asyncio.wait_for(call_future, 10.0)
except asyncio.TimeoutError:

# Remove the call_future from self._method_call_futures
del self._method_call_futures[session_id]

get_logger(_logger_name).error(
f"Waiting on response for method call 0x{method_id:04X} timed out."
)
raise

return self._method_call_future.result()
method_result = call_future.result()
del self._method_call_futures[session_id]
return method_result

def someip_message_received(
self, someip_message: SomeIpMessage, addr: Tuple[str, int]
) -> None:

# Handling a notification message
if (
someip_message.header.client_id == 0x00
and someip_message.header.message_type == MessageType.NOTIFICATION.value
Expand All @@ -257,16 +277,33 @@ def someip_message_received(
self._callback(someip_message)
return

# Handling a response message
if (
someip_message.header.message_type == MessageType.RESPONSE.value
or someip_message.header.message_type == MessageType.ERROR.value
):
if self._method_call_future is not None:
if someip_message.header.session_id not in self._method_call_futures.keys():
return
if someip_message.header.client_id != self._client_id:
return

call_future = None
try:
call_future = self._method_call_futures[
someip_message.header.session_id
]
except:
get_logger(_logger_name).error(
f"Received response for unknown session ID {someip_message.header.session_id}"
)
return

if call_future is not None:
result = MethodResult()
result.message_type = MessageType(someip_message.header.message_type)
result.return_code = ReturnCode(someip_message.header.return_code)
result.payload = someip_message.payload
self._method_call_future.set_result(result)
call_future.set_result(result)
return

def subscribe_eventgroup(self, eventgroup_id: int):
Expand Down Expand Up @@ -314,64 +351,72 @@ def handle_find_service(self):
def handle_offer_service(self, offered_service: SdService):
if self._service.id != offered_service.service_id:
return
if self._instance_id != offered_service.instance_id:
if (
self._instance_id != 0xFFFF
and offered_service.instance_id != 0xFFFF
and self._instance_id != offered_service.instance_id
):
# 0xFFFF allows to handle any instance ID
return
if self._service.major_version != offered_service.major_version:
return

if (
offered_service.service_id == self._service.id
and offered_service.instance_id == self._instance_id
self._service.minor_version != 0xFFFFFFFF
and self._service.minor_version != offered_service.minor_version
):
if FoundService(offered_service) not in self._found_services:
self._found_services.append(FoundService(offered_service))
# 0xFFFFFFFF allows to handle any minor version
return

if len(self._eventgroups_to_subscribe) == 0:
return
if FoundService(offered_service) not in self._found_services:
self._found_services.append(FoundService(offered_service))

# Try to subscribe to requested event groups
for eventgroup_to_subscribe in self._eventgroups_to_subscribe:
(
session_id,
reboot_flag,
) = self._sd_sender.get_unicast_session_handler().update_session()

# Improvement: Pack all entries into a single SD message
subscribe_sd_header = build_subscribe_eventgroup_sd_header(
service_id=self._service.id,
instance_id=self._instance_id,
major_version=self._service.major_version,
ttl=self._ttl,
event_group_id=eventgroup_to_subscribe,
session_id=session_id,
reboot_flag=reboot_flag,
endpoint=self._endpoint,
protocol=self._protocol,
)
if len(self._eventgroups_to_subscribe) == 0:
return

get_logger(_logger_name).debug(
f"Send subscribe for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}, "
f"eventgroup ID: {eventgroup_to_subscribe} TTL: {self._ttl}, version: "
f"session ID: {session_id}"
)
# Try to subscribe to requested event groups
for eventgroup_to_subscribe in self._eventgroups_to_subscribe:
(
session_id,
reboot_flag,
) = self._sd_sender.get_unicast_session_handler().update_session()
# Improvement: Pack all entries into a single SD message
subscribe_sd_header = build_subscribe_eventgroup_sd_header(
service_id=self._service.id,
instance_id=self._instance_id,
major_version=self._service.major_version,
ttl=self._ttl,
event_group_id=eventgroup_to_subscribe,
session_id=session_id,
reboot_flag=reboot_flag,
endpoint=self._endpoint,
protocol=self._protocol,
)

if self._protocol == TransportLayerProtocol.TCP:
if self._tcp_task is None:
get_logger(_logger_name).debug(
f"Create new TCP task for client of 0x{self._instance_id:04X}, 0x{self._service.id:04X}"
)
self._tcp_task = asyncio.create_task(
self.setup_tcp_connection(
str(self._endpoint[0]),
self._endpoint[1],
str(offered_service.endpoint[0]),
offered_service.endpoint[1],
)
get_logger(_logger_name).debug(
f"Send subscribe for instance 0x{self._instance_id:04X}, service: 0x{self._service.id:04X}, "
f"eventgroup ID: {eventgroup_to_subscribe} TTL: {self._ttl}, version: "
f"session ID: {session_id}"
)

if self._protocol == TransportLayerProtocol.TCP:
if self._tcp_task is None:
get_logger(_logger_name).debug(
f"Create new TCP task for client of 0x{self._instance_id:04X}, 0x{self._service.id:04X}"
)
self._tcp_task = asyncio.create_task(
self.setup_tcp_connection(
str(self._endpoint[0]),
self._endpoint[1],
str(offered_service.endpoint[0]),
offered_service.endpoint[1],
)
)

self._expected_acks.append(ExpectedAck(eventgroup_to_subscribe))
self._sd_sender.send_unicast(
buffer=subscribe_sd_header.to_buffer(),
dest_ip=offered_service.endpoint[0],
)
self._expected_acks.append(ExpectedAck(eventgroup_to_subscribe))
self._sd_sender.send_unicast(
buffer=subscribe_sd_header.to_buffer(),
dest_ip=offered_service.endpoint[0],
)

def handle_stop_offer_service(self, offered_service: SdService) -> None:
if self._service.id != offered_service.service_id:
Expand Down Expand Up @@ -489,6 +534,7 @@ async def construct_client_service_instance(
ttl: int = 0,
sd_sender=None,
protocol=TransportLayerProtocol.UDP,
client_id: int = 0,
) -> ClientServiceInstance:
"""
Asynchronously constructs a ClientServerInstance. Based on the given transport protocol, proper endpoints are setup before constructing the actual ServerServiceInstance.
Expand Down Expand Up @@ -523,6 +569,7 @@ async def construct_client_service_instance(
udp_endpoint,
ttl,
sd_sender,
client_id,
)

udp_endpoint.set_someip_callback(client_instance.someip_message_received)
Expand Down
Loading

0 comments on commit c23233d

Please sign in to comment.