-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
test_tcp_address.py
71 lines (56 loc) · 2.12 KB
/
test_tcp_address.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import asyncio
import os
import pytest
from dbus_fast import Message
from dbus_fast._private.address import parse_address
from dbus_fast.aio import MessageBus
@pytest.mark.asyncio
async def test_tcp_connection_with_forwarding(event_loop):
closables = []
host = "127.0.0.1"
port = "55556"
addr_info = parse_address(os.environ.get("DBUS_SESSION_BUS_ADDRESS"))
assert addr_info
assert "abstract" in addr_info[0][1]
path = f'\0{addr_info[0][1]["abstract"]}'
async def handle_connection(tcp_reader, tcp_writer):
unix_reader, unix_writer = await asyncio.open_unix_connection(path)
closables.append(tcp_writer)
closables.append(unix_writer)
async def handle_read():
while True:
data = await tcp_reader.read(1)
if not data:
break
unix_writer.write(data)
async def handle_write():
while True:
data = await unix_reader.read(1)
if not data:
break
tcp_writer.write(data)
asyncio.run_coroutine_threadsafe(handle_read(), event_loop)
asyncio.run_coroutine_threadsafe(handle_write(), event_loop)
server = await asyncio.start_server(handle_connection, host, port)
closables.append(server)
bus = await MessageBus(bus_address=f"tcp:host={host},port={port}").connect()
# basic tests to see if it works
result = await bus.call(
Message(
destination="org.freedesktop.DBus",
path="/org/freedesktop/DBus",
interface="org.freedesktop.DBus.Peer",
member="Ping",
)
)
assert result
intr = await bus.introspect("org.freedesktop.DBus", "/org/freedesktop/DBus")
obj = bus.get_proxy_object("org.freedesktop.DBus", "/org/freedesktop/DBus", intr)
iface = obj.get_interface("org.freedesktop.DBus.Peer")
await iface.call_ping()
assert bus._sock.getpeername()[0] == host
assert bus._sock.getsockname()[0] == host
assert bus._sock.gettimeout() == 0
assert bus._stream.closed is False
for c in closables:
c.close()