-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.py
175 lines (129 loc) · 5.9 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import asyncio
import socket
import pickle
import random
from asyncio import BaseTransport
from model.congestion_wnd import CongestionWindow
from model.tcp_header import TCPHeader
from model.tcp_packet import TCPPacket
class ClientProtocol(asyncio.DatagramProtocol):
__slots__ = "response_list"
def __init__(self):
self.response_list = asyncio.Queue()
def datagram_received(self, data, addr):
self.response_list.put_nowait(data)
def error_received(self, exc):
print(f'Received an error: {exc}')
def connection_lost(self, exc):
if exc is None:
print("Connection closed successfully")
else:
print(f'Lost the connection due to an error: {exc}')
async def receive_from(self):
return await self.response_list.get()
def get_serialized_packet(seq_num):
data = "Hello World"
tcp_header = TCPHeader()
tcp_header.set_seq_num(seq_num)
tcp_packet = TCPPacket()
tcp_packet.set_header(tcp_header)
tcp_packet.set_data(data)
return pickle.dumps(tcp_packet)
def fast_retransmission(packet: bytes, retransmission_pkt_seq, transport: BaseTransport,
congestion_wnd: CongestionWindow):
transport.sendto(packet) # retransmit
print(f'Retransmitted packet: {retransmission_pkt_seq}')
new_ssthresh = congestion_wnd.get_cwnd() // 2
congestion_wnd.reset_cwnd()
return new_ssthresh
async def main():
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
lambda: ClientProtocol(),
remote_addr=(socket.gethostbyname(socket.gethostname()), 12349))
congestion_wnd = CongestionWindow()
try:
print('Client Started')
num_rtt = 20 # testing purpose
rtt_completed = 0 # testing purpose
time_out = 5
ss_thresh = 8
cwnd_offset = 0
num_packets_sent = 0
last_ack = 0
dup_acks = 0
is_packet_loss = False
lost_packet_seq = None
# while True: # one RTT
while rtt_completed < num_rtt:
task_list = []
received_responses = []
while num_packets_sent < (cwnd_offset + congestion_wnd.get_cwnd()): # sending one window
next_seq_num = num_packets_sent
if random.random() <= 0.01:
is_packet_loss = True
lost_packet_seq = next_seq_num
num_packets_sent += 1
continue
data_to_send = get_serialized_packet(next_seq_num)
transport.sendto(data_to_send)
congestion_wnd.add_packet_in_flight(next_seq_sent=next_seq_num)
task_list.append(protocol.receive_from())
num_packets_sent += 1
# print packets in flight
print(
f'\nCONGESTION WINDOW: (size = {congestion_wnd.get_cwnd()}) \n{congestion_wnd.get_packets_in_flight()}')
if is_packet_loss:
print(f'Lost packet: {lost_packet_seq}')
lost_packet_seq = None
completed_tasks, pending_tasks = await asyncio.wait(task_list,
timeout=time_out,
return_when=asyncio.FIRST_EXCEPTION)
for future in completed_tasks:
try:
response = await future
response_packet = pickle.loads(response)
received_ack = response_packet.get_header().get_ack_num()
received_responses.append(received_ack)
except Exception as e:
print(f"Exception occurred: {e}")
# handle the received responses
received_responses.sort()
print(f'{"".join(str(received_responses))} <= Received Acks')
for received_ack in received_responses:
if len(congestion_wnd.get_packets_in_flight()) != 0:
# handling dup_Acks
if last_ack == received_ack:
dup_acks += 1
if dup_acks == 3:
lost_packet_seq = last_ack
ss_thresh = fast_retransmission(get_serialized_packet(lost_packet_seq), lost_packet_seq,
transport, congestion_wnd)
retransmission_response = await protocol.receive_from()
retransmission_response_packet = pickle.loads(retransmission_response)
retransmission_received_ack = retransmission_response_packet.get_header().get_ack_num()
print(f'{retransmission_received_ack} <= Received Ack after retransmission')
congestion_wnd.remove_packet_in_flight(retransmission_received_ack)
last_ack = retransmission_received_ack
continue
else:
congestion_wnd.remove_packet_in_flight(received_ack)
last_ack = received_ack
else:
break
# waiting for the completion of completed tasks
remaining_tasks = [task for task in pending_tasks]
remaining_results = await asyncio.gather(*remaining_tasks)
cwnd_offset += congestion_wnd.get_cwnd_before_reset() if is_packet_loss else congestion_wnd.get_cwnd()
if congestion_wnd.get_cwnd() < ss_thresh: # slow start
if not is_packet_loss:
congestion_wnd.ss_increase_cwnd()
else:
is_packet_loss = False
else:
congestion_wnd.cavd_increase_cwnd()
rtt_completed += 1
finally:
transport.close()
if __name__ == '__main__':
asyncio.run(main())