-
Notifications
You must be signed in to change notification settings - Fork 47
/
peer.py
236 lines (196 loc) · 7.96 KB
/
peer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
from __future__ import annotations
from base64 import b64encode
from collections import deque
from struct import unpack
from time import time
from typing import TYPE_CHECKING, Any, cast
from .keyvault.crypto import default_eccrypto
from .keyvault.keys import Key
from .messaging.interfaces.udp.endpoint import UDPv4Address, UDPv6Address
if TYPE_CHECKING:
from collections.abc import Mapping
from .types import Address
class DirtyDict(dict):
"""
Dictionary that becomes dirty when elements are changed.
"""
def __init__(self, **kwargs) -> None:
"""
Create a new dict that flags when its content has been updated.
"""
super().__init__(**kwargs)
self.dirty = True
def __setitem__(self, key: Any, value: Any) -> None: # noqa: ANN401
"""
Callback for when an item is set to a value. This dirties the dict.
"""
super().__setitem__(key, value)
self.dirty = True
def update(self, mapping: Mapping, **kwargs) -> None: # type: ignore[override]
"""
Callback for when another mapping is merged into this dict. This dirties the dict.
"""
super().update(mapping, **kwargs)
self.dirty = True
def clear(self) -> None:
"""
Callback for when all items are removed. This dirties the dict.
"""
super().clear()
self.dirty = True
def pop(self, key: Any) -> Any: # type: ignore[override] # noqa: ANN401
"""
Callback for when a particular item is popped. This dirties the dict.
"""
out = super().pop(key)
self.dirty = True
return out
def popitem(self) -> Any: # noqa: ANN401
"""
Callback for when an item is popped. This dirties the dict.
"""
out = super().popitem()
self.dirty = True
return out
class Peer:
"""
A public key that has additional information attached to it (like an IP address, measured RTT, etc.).
"""
INTERFACE_ORDER = [UDPv6Address, UDPv4Address, tuple]
def __init__(self, key: Key | bytes, address: Address | None = None, intro: bool = True) -> None:
"""
Create a new Peer.
:param key: the peer's Key (mostly public) or public key bin
:param address: the address object for this peer (e.g. ("1.2.3.4", 0) for IPv4 over UDP)
:param intro: is this peer suggested to us (otherwise it contacted us)
"""
if not isinstance(key, Key):
if key.startswith((b"-----BEGIN EC PRIVATE KEY-----", b"LibNaCLSK:")):
msg = ("You attempted to initialize a Peer with PRIVATE key material."
" This is normally ONLY done for your own Peer instance and internally handled by IPv8."
" If this is truly what you want to do, initialize Peer with a PrivateKey subclass.")
raise ValueError(msg)
self.key: Key = default_eccrypto.key_from_public_bin(key)
else:
self.key = cast(Key, key)
self.public_key = self.key.pub()
self.mid = self.public_key.key_to_hash()
self._addresses = DirtyDict()
if address is not None:
self._addresses[address.__class__] = address
self._address = address
self.creation_time = time()
self.last_response = 0 if intro else time()
self._lamport_timestamp = 0
self.pings: deque = deque(maxlen=5)
self.new_style_intro = False
self.address_frozen = False
"""
Set this to True if you want to avoid this Peer's address being updated.
"""
@property
def addresses(self) -> dict[type[Address], Address]:
"""
Retrieve the addresses belonging to this Peer.
You are not allowed to set this addresses dict for a Peer manually.
You can change the dictionary itself by setting its items or calling its functions, for example ``update()``.
"""
return self._addresses
@property
def address(self) -> Address:
"""
Retrieve the preferred address for this Peer.
If you want to manually select the interface, use the ``.addresses`` dictionary instead.
"""
if self._addresses.dirty:
self._update_preferred_address()
return self._address or UDPv4Address("0.0.0.0", 0)
@address.setter
def address(self, value: Address) -> None:
"""
Register an address of this peer.
Alias of ``add_address(value)``.
"""
self.add_address(value)
def add_address(self, value: Any) -> None: # noqa: ANN401
"""
Add a known address for this Peer.
Any object can form an address, but only one type of address can be used per object type.
For example (normally A, B and C are ``namedtuple`` types):
- Adding instances A(1), B(2) leads to addresses {A: A(1), B: B(2)}
- Adding instances A(1), B(2), A(3) leads to addresses {A: A(3), B: B(2)}
"""
if self.address_frozen:
return
self._addresses[value.__class__] = value
self._update_preferred_address()
def _update_preferred_address(self) -> None:
"""
Update the current address to be the most preferred.
"""
for interface in self.INTERFACE_ORDER:
if interface in self._addresses:
self._address = self._addresses[interface]
break
self._addresses.dirty = False
def get_median_ping(self) -> float | None:
"""
Get the median ping time of this peer.
:return: the median ping or None if no measurements were performed yet
:rtype: float or None
"""
if not self.pings:
return None
sorted_pings = sorted(self.pings)
if len(sorted_pings) % 2 == 0:
return (sorted_pings[len(sorted_pings) // 2 - 1] + sorted_pings[len(sorted_pings) // 2]) / 2
return sorted_pings[len(sorted_pings) // 2]
def get_average_ping(self) -> float | None:
"""
Get the average ping time of this peer.
:return: the average ping or None if no measurements were performed yet
:rtype: float or None
"""
if not self.pings:
return None
return sum(self.pings) / len(self.pings)
def update_clock(self, timestamp: int) -> None:
"""
Update the Lamport timestamp for this peer. The Lamport clock dictates that the current timestamp is
the maximum of the last known and the most recently delivered timestamp. This is useful when messages
are delivered asynchronously.
We also keep a real time timestamp of the last received message for timeout purposes.
:param timestamp: a received timestamp
"""
self._lamport_timestamp = max(self._lamport_timestamp, timestamp)
self.last_response = time() # This is in seconds since the epoch
def get_lamport_timestamp(self) -> int:
"""
Get the Lamport timestamp of this peer.
"""
return self._lamport_timestamp
def __hash__(self) -> int:
"""
Generate a hash based on the mid of this peer.
"""
as_long, = unpack(">Q", self.mid[:8])
return as_long
def __eq__(self, other: object) -> bool:
"""
Check if the other instance is a peer with the same public key.
"""
if not isinstance(other, Peer):
return False
return self.public_key.key_to_bin() == other.public_key.key_to_bin()
def __ne__(self, other: object) -> bool:
"""
Check if the other instance is NOT a peer with the same public key.
"""
if not isinstance(other, Peer):
return True
return self.public_key.key_to_bin() != other.public_key.key_to_bin()
def __str__(self) -> str:
"""
Represent this peer as a human-readable string.
"""
return f"Peer<{self.address[0]}:{self.address[1]}, {b64encode(self.mid).decode()}>"