Skip to content

Commit

Permalink
fix doip unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
polybassa committed Sep 23, 2024
1 parent 2972bda commit 65100e7
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions test/contrib/automotive/doip.uts
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ import tempfile
= Test DoIPSocket

server_up = threading.Event()
sniff_up = threading.Event()
def server():
buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand All @@ -426,6 +427,7 @@ def server():
sock.listen(1)
server_up.set()
connection, address = sock.accept()
sniff_up.wait(timeout=1)
connection.send(buffer)
connection.close()
finally:
Expand All @@ -437,7 +439,7 @@ server_thread.start()
server_up.wait(timeout=1)
sock = DoIPSocket(activate_routing=False)

pkts = sock.sniff(timeout=1, count=2)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2

Expand All @@ -446,6 +448,7 @@ assert len(pkts) == 2
~ linux

server_up = threading.Event()
sniff_up = threading.Event()
def server():
buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand All @@ -456,6 +459,7 @@ def server():
sock.listen(1)
server_up.set()
connection, address = sock.accept()
sniff_up.wait(timeout=1)
for i in range(len(buffer)):
connection.send(buffer[i:i+1])
time.sleep(0.01)
Expand All @@ -469,13 +473,14 @@ server_thread.start()
server_up.wait(timeout=1)
sock = DoIPSocket(activate_routing=False)

pkts = sock.sniff(timeout=1, count=2)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2

= Test DoIPSocket 3

server_up = threading.Event()
sniff_up = threading.Event()
def server():
buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand All @@ -486,6 +491,7 @@ def server():
sock.listen(1)
server_up.set()
connection, address = sock.accept()
sniff_up.wait(timeout=1)
while buffer:
randlen = random.randint(0, len(buffer))
connection.send(buffer[:randlen])
Expand All @@ -501,14 +507,15 @@ server_thread.start()
server_up.wait(timeout=1)
sock = DoIPSocket(activate_routing=False)

pkts = sock.sniff(timeout=1, count=2)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2


= Test DoIPSocket6

server_up = threading.Event()
sniff_up = threading.Event()
def server():
buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
Expand All @@ -519,6 +526,7 @@ def server():
sock.listen(1)
server_up.set()
connection, address = sock.accept()
sniff_up.wait(timeout=1)
connection.send(buffer)
connection.close()
finally:
Expand All @@ -530,7 +538,7 @@ server_thread.start()
server_up.wait(timeout=1)
sock = DoIPSocket(ip="::1", activate_routing=False)

pkts = sock.sniff(timeout=1, count=2)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2

Expand Down Expand Up @@ -604,6 +612,7 @@ def _load_certificate_chain(context) -> None:


server_up = threading.Event()
sniff_up = threading.Event()
def server():
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
_load_certificate_chain(context)
Expand All @@ -619,6 +628,7 @@ def server():
ssock.listen(1)
server_up.set()
connection, address = ssock.accept()
sniff_up.wait(timeout=1)
connection.send(buffer)
connection.close()
finally:
Expand All @@ -633,14 +643,15 @@ context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = DoIPSocket(activate_routing=False, force_tls=True, context=context)

pkts = sock.sniff(timeout=1, count=2)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2

= Test DoIPSslSocket6
~ broken_windows

server_up = threading.Event()
sniff_up = threading.Event()
def server():
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
_load_certificate_chain(context)
Expand All @@ -656,6 +667,7 @@ def server():
ssock.listen(1)
server_up.set()
connection, address = ssock.accept()
sniff_up.wait(timeout=1)
connection.send(buffer)
connection.close()
finally:
Expand All @@ -670,14 +682,15 @@ context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context)

pkts = sock.sniff(timeout=1, count=2)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2

= Test UDS_DoIPSslSocket6
~ broken_windows

server_up = threading.Event()
sniff_up = threading.Event()
def server():
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
_load_certificate_chain(context)
Expand All @@ -693,6 +706,7 @@ def server():
ssock.listen(1)
server_up.set()
connection, address = ssock.accept()
sniff_up.wait(timeout=1)
connection.send(buffer)
connection.close()
finally:
Expand All @@ -707,7 +721,7 @@ context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = UDS_DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context)

pkts = sock.sniff(timeout=1, count=2)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2

Expand All @@ -716,6 +730,7 @@ assert len(pkts) == 2

server_tcp_up = threading.Event()
server_tls_up = threading.Event()
sniff_up = threading.Event()
def server_tls():
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
_load_certificate_chain(context)
Expand All @@ -732,6 +747,7 @@ def server_tls():
ssock.listen(1)
server_tls_up.set()
connection, address = ssock.accept()
sniff_up.wait(timeout=1)
connection.send(buffer)
connection.close()
finally:
Expand Down Expand Up @@ -767,7 +783,7 @@ context.verify_mode = ssl.CERT_NONE

sock = UDS_DoIPSocket(ip="::1", context=context)

pkts = sock.sniff(timeout=1, count=2)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_tcp_thread.join(timeout=1)
server_tls_thread.join(timeout=1)
assert len(pkts) == 2

0 comments on commit 65100e7

Please sign in to comment.