Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor conditions to add a contact to the routing table #1211

Merged
merged 79 commits into from
Jun 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
e6ffd7c
remove hashwatcher
jackrobison May 23, 2018
4bd9f3b
remove popular hash tracking, simplify DHTPeerFinder
jackrobison May 23, 2018
b673c50
disable NegotiatedPaymentRateManager, use OnlyFreePaymentsManager for…
jackrobison May 23, 2018
a503a80
disable Cryptonator exchange rate feed
jackrobison May 23, 2018
9ed08f8
remove unused constant
jackrobison May 23, 2018
159e153
make DataStore clock mockable
jackrobison May 23, 2018
d65dc0a
disable loading DictDataStore in Node.__init__
jackrobison May 23, 2018
e52689a
remove OptimizedTreeRoutingTable for now, use TreeRoutingTable
jackrobison May 23, 2018
406ddaa
use base class to simplify Node init
jackrobison May 23, 2018
ad2dcf0
add the parent node id to KBucket
jackrobison May 23, 2018
23c202b
refactor Contact class, DHT RPCs, and Contact addition/removal
jackrobison May 23, 2018
e570383
prevent duplicate entries in the datastore
jackrobison May 23, 2018
f1e0a78
refactor iterativeFind, move to own file
jackrobison May 23, 2018
95ed1e0
raise TransportNotConnected instead of logging a warning
jackrobison May 23, 2018
d4e2821
sort KBucket.getContacts
jackrobison May 23, 2018
c654bfe
use reactor clock in TreeRoutingTable instead of time module
jackrobison May 23, 2018
5631a24
improve findCloseNodes, choose closest contacts from higher and lower…
jackrobison May 23, 2018
cf33590
fix conditions for when a kbucket should be split
jackrobison May 23, 2018
0524101
update contact replacement in KBucket to follow BEP0005
jackrobison May 23, 2018
1adf4f7
fix constant used to check if a bucket is fresh
jackrobison May 23, 2018
c65274e
add PingQueue to KademliaProtocol
jackrobison May 23, 2018
372fb45
refactor dht bootstrap
jackrobison May 23, 2018
aee7a3a
simplify announceHaveBlob, remove unused getPeersForBlob
jackrobison May 23, 2018
ae22468
fix CallLaterManager trying to remove pending calls multiple times
jackrobison May 23, 2018
9920ff5
force KBucket refreshes
jackrobison May 23, 2018
4f72098
use PingQueue to try refresh all contacts
jackrobison May 23, 2018
cc32d98
update peer_list
jackrobison May 23, 2018
bdd6f94
add port to routing_table_get
jackrobison May 23, 2018
e1079a0
add seed node script and monitor tool, update dht monitor script
jackrobison May 23, 2018
950ec5b
update mocks and dht tests
jackrobison May 23, 2018
d250e4d
add iterative find test
jackrobison May 23, 2018
d3f4155
add unit tests for contact_is_good
jackrobison May 23, 2018
7da70cc
test peer expiration
jackrobison May 23, 2018
760417f
pylint
jackrobison May 23, 2018
3dfc6bd
update CallLaterManager to be an object
jackrobison May 24, 2018
a952d2d
reset _listeningPort and _listening Deferred on teardown
jackrobison May 24, 2018
877da78
ping contacts right away during refresh instead of using PingQueue
jackrobison May 24, 2018
b5f3ed5
update contact expiration test and add re-join after expiration test
jackrobison May 24, 2018
470ebe2
remove unnecessary CallLaterManager from test_contact_rpc
jackrobison May 24, 2018
d02ed29
add kademlia store and expiration test
jackrobison May 24, 2018
c521120
update and fix hash announcer test
jackrobison May 24, 2018
98e21cd
test re-join dht
jackrobison May 24, 2018
07f9201
omit bad contacts from getPeersForBlob
jackrobison May 24, 2018
29d5750
pylint
jackrobison May 24, 2018
ec1b6b2
comments, cleaner key_bits constant
jackrobison May 29, 2018
0d23c68
raise attribute error for non-rpc functions in Contact
jackrobison May 29, 2018
73e813f
verify key size in Distance
jackrobison May 29, 2018
fb3aac1
add optional delay argument to enqueue_maybe_ping, fix default value
jackrobison May 29, 2018
8efede6
maybe_ping bad and unknown contacts instead of only unknown
jackrobison May 29, 2018
921ee3c
use refreshTimeout in getRefreshList
jackrobison May 29, 2018
945da59
fix age used in datastore to determine if a value is expired
jackrobison May 29, 2018
9582b7f
use maybe_ping for refreshing stale buckets and storing peers
jackrobison May 29, 2018
51b42da
remove unused scripts
jackrobison May 29, 2018
545930c
dht test environment fixes
jackrobison May 29, 2018
659632b
fix and update tests
jackrobison May 29, 2018
30c4b16
use epoll reactor for seed node script
jackrobison May 29, 2018
0e80123
use 12 minutes instead of 15 as delay in contact_is_good
jackrobison May 29, 2018
cce3c8c
increase kademlia rpc timeout to 8 seconds
jackrobison May 29, 2018
7d21cc5
pylint and more tests
jackrobison May 29, 2018
9a63db4
add protocol version to the dht and migrate old arg format for store
jackrobison May 31, 2018
0386bfa
update seed script
jackrobison May 31, 2018
42eb172
refactor announceHaveBlob
jackrobison Jun 5, 2018
537df6c
log socket errors
jackrobison Jun 5, 2018
b0a741b
fix hash announcer semaphore
jackrobison Jun 6, 2018
db06191
reduce default concurrent announcers to 10
jackrobison Jun 6, 2018
4fbaaac
default new contacts to protocol version 0
jackrobison Jun 6, 2018
adca5f5
fix routing_table_get
jackrobison Jun 6, 2018
e8b402f
remove deferredLock from iterativeFind
jackrobison Jun 6, 2018
4464467
add profiler
jackrobison Jun 6, 2018
16cb6d8
remove Session._join_deferred
jackrobison Jun 6, 2018
bc0da5e
only use seeds in iterative bootstrap if no contacts are known yet
jackrobison Jun 6, 2018
ae631f0
fix teardown error
jackrobison Jun 6, 2018
af096ae
update test
jackrobison Jun 6, 2018
b0e4fc4
fix iterative find lockup
jackrobison Jun 7, 2018
f3e848b
work around upnp bug
jackrobison Jun 7, 2018
7f3ead6
disable forced bucket refresh during join
jackrobison Jun 7, 2018
a821647
pylint and appveyor
jackrobison Jun 7, 2018
1d01069
move daemon test to the same folder as the others
jackrobison Jun 7, 2018
665c73c
changelog
jackrobison Jun 7, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ clone_folder: c:\projects\lbry
test_script:
- cd C:\projects\lbry\
- pip install cython
- pip install mock pylint unqlite
- pip install mock pylint unqlite Faker
- pip install .
- pylint lbrynet
# disable tests for now so that appveyor can build the app
Expand Down
19 changes: 16 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ at anytime.
* several internal dht functions to use inlineCallbacks
* `DHTHashAnnouncer` and `Node` manage functions to use `LoopingCall`s instead of scheduling with `callLater`.
* `store` kademlia rpc method to block on the call finishing and to return storing peer information
* refactored `DHTHashAnnouncer` to longer use locks, use a `DeferredSemaphore` to limit concurrent announcers
* refactored `DHTHashAnnouncer` to no longer use locks, use a `DeferredSemaphore` to limit concurrent announcers
* decoupled `DiskBlobManager` from `DHTHashAnnouncer`
* blob hashes to announce to be controlled by`SQLiteStorage`
* kademlia protocol to not delay writes to the UDP socket
* kademlia protocol to minimally delay writes to the UDP socket
* `reactor` and `callLater`, `listenUDP`, and `resolve` functions to be configurable (to allow easier testing)
* calls to get the current time to use `reactor.seconds` (to control callLater and LoopingCall timing in tests)
* `blob_announce` to queue the blob announcement but not block on it
Expand All @@ -56,21 +56,34 @@ at anytime.
* track successful reflector uploads in sqlite to minimize how many streams are attempted by auto re-reflect
* increase the default `auto_re_reflect_interval` to a day
* predictable result sorting for `claim_list` and `claim_list_mine`
* changed the bucket splitting condition in the dht routing table to be more aggressive
* ping dht nodes who have stored to us periodically to determine whether we should include them as an active peer for the hash when we are queried. Nodes that are known to be not reachable by the node storing the record are no longer returned as peers by the storing node.
* temporarily disabled data price negotiation, treat all data as free
* changed dht bootstrap join process to better populate the routing table initially
* cache dht node tokens used during announcement to minimize the number of requests that are needed
* implement BEP0005 dht rules to classify nodes as good, bad, or unknown and for when to add them to the routing table (http://www.bittorrent.org/beps/bep_0005.html)
* refactored internal dht contact class to track failure counts/times, the time the contact last replied to us, and the time the node last requested something fom us
* refactored dht iterativeFind
* sort dht contacts returned by `findCloseNodes` in the routing table
* disabled Cryptonator price feed

### Added
* virtual kademlia network and mock udp transport for dht integration tests
* integration tests for bootstrapping the dht
* functional tests for bootstrapping the dht, announcing and expiring hashes, finding and pinging nodes, protocol version 0/1 backwards/forwards compatibility, and rejoining the network
* configurable `concurrent_announcers` and `s3_headers_depth` settings
* `peer_ping` command
* `--sort` option in `file_list`
* linux distro and desktop name added to analytics
* certifi module for Twisted SSL verification on Windows
* protocol version to dht requests and to the response from `findValue`
* added `port` field to contacts returned by `routing_table_get`

### Removed
* `announce_all` argument from `blob_announce`
* old `blob_announce_all` command
* `AuthJSONRPCServer.auth_required` decorator
* unused `--wallet` argument to `lbrynet-daemon`, which used to be to support `PTCWallet`.
* `OptimizedTreeRoutingTable` class used by the dht node for the time being

## [0.19.3] - 2018-05-04
### Changed
Expand Down
2 changes: 1 addition & 1 deletion lbrynet/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
KB = 2 ** 10
MB = 2 ** 20

DEFAULT_CONCURRENT_ANNOUNCERS = 100
DEFAULT_CONCURRENT_ANNOUNCERS = 10

DEFAULT_DHT_NODES = [
('lbrynet1.lbry.io', 4444),
Expand Down
15 changes: 0 additions & 15 deletions lbrynet/core/BlobAvailability.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,14 @@ def __init__(self, blob_manager, peer_finder, dht_node):
self._blob_manager = blob_manager
self._peer_finder = peer_finder
self._dht_node = dht_node
self._check_popular = LoopingCall(self._update_most_popular)
self._check_mine = LoopingCall(self._update_mine)

def start(self):
log.info("Starting blob availability tracker.")
self._check_popular.start(600)
self._check_mine.start(600)

def stop(self):
log.info("Stopping blob availability tracker.")
if self._check_popular.running:
self._check_popular.stop()
if self._check_mine.running:
self._check_mine.stop()

Expand Down Expand Up @@ -68,17 +64,6 @@ def _save_peer_info(blob_hash, peers):
d.addCallback(lambda peers: _save_peer_info(blob, peers))
return d

def _get_most_popular(self):
dl = []
for (hash, _) in self._dht_node.get_most_popular_hashes(10):
encoded = hash.encode('hex')
dl.append(self._update_peers_for_blob(encoded))
return defer.DeferredList(dl)

def _update_most_popular(self):
d = self._get_most_popular()
d.addCallback(lambda _: self._set_mean_peers())

def _update_mine(self):
def _get_peers(blobs):
dl = []
Expand Down
1 change: 1 addition & 0 deletions lbrynet/core/PaymentRateManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def __init__(self, **kwargs):

self.base = BasePaymentRateManager(0.0, 0.0)
self.points_paid = 0.0
self.min_blob_data_payment_rate = 0.0
self.generous = True
self.strategy = OnlyFreeStrategy()

Expand Down
85 changes: 39 additions & 46 deletions lbrynet/core/Session.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
from lbrynet.database.storage import SQLiteStorage
from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.utils import generate_id
from lbrynet.core.PaymentRateManager import BasePaymentRateManager, NegotiatedPaymentRateManager
from lbrynet.core.BlobAvailability import BlobAvailabilityTracker
from lbrynet.core.PaymentRateManager import BasePaymentRateManager, OnlyFreePaymentsManager

log = logging.getLogger(__name__)

Expand All @@ -33,14 +32,11 @@ class Session(object):
peers can connect to this peer.
"""

def __init__(self, blob_data_payment_rate, db_dir=None,
node_id=None, peer_manager=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None,
hash_announcer=None, blob_dir=None,
blob_manager=None, peer_port=None, use_upnp=True,
rate_limiter=None, wallet=None,
dht_node_class=node.Node, blob_tracker_class=None,
payment_rate_manager_class=None, is_generous=True, external_ip=None, storage=None):
def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, peer_manager=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None,
peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node,
blob_tracker_class=None, payment_rate_manager_class=None, is_generous=True, external_ip=None,
storage=None):
"""@param blob_data_payment_rate: The default payment rate for blob data

@param db_dir: The directory in which levelDB files should be stored
Expand Down Expand Up @@ -107,8 +103,8 @@ def __init__(self, blob_data_payment_rate, db_dir=None,
self.known_dht_nodes = []
self.blob_dir = blob_dir
self.blob_manager = blob_manager
self.blob_tracker = None
self.blob_tracker_class = blob_tracker_class or BlobAvailabilityTracker
# self.blob_tracker = None
# self.blob_tracker_class = blob_tracker_class or BlobAvailabilityTracker
self.peer_port = peer_port
self.use_upnp = use_upnp
self.rate_limiter = rate_limiter
Expand All @@ -118,11 +114,10 @@ def __init__(self, blob_data_payment_rate, db_dir=None,
self.dht_node_class = dht_node_class
self.dht_node = None
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
self.payment_rate_manager = None
self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager
self.is_generous = is_generous
self.payment_rate_manager = OnlyFreePaymentsManager()
# self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager
# self.is_generous = is_generous
self.storage = storage or SQLiteStorage(self.db_dir)
self._join_dht_deferred = None

def setup(self):
"""Create the blob directory and database if necessary, start all desired services"""
Expand All @@ -147,8 +142,8 @@ def shut_down(self):
ds = []
if self.hash_announcer:
self.hash_announcer.stop()
if self.blob_tracker is not None:
ds.append(defer.maybeDeferred(self.blob_tracker.stop))
# if self.blob_tracker is not None:
# ds.append(defer.maybeDeferred(self.blob_tracker.stop))
if self.dht_node is not None:
ds.append(defer.maybeDeferred(self.dht_node.stop))
if self.rate_limiter is not None:
Expand All @@ -171,24 +166,24 @@ def get_free_port(upnp, port, protocol):
if not mapping:
return port
if upnp.lanaddr == mapping[0]:
return mapping
return mapping[1]
return get_free_port(upnp, port + 1, protocol)

def get_port_mapping(upnp, internal_port, protocol, description):
def get_port_mapping(upnp, port, protocol, description):
# try to map to the requested port, if there is already a mapping use the next external
# port available
if protocol not in ['UDP', 'TCP']:
raise Exception("invalid protocol")
external_port = get_free_port(upnp, internal_port, protocol)
if isinstance(external_port, tuple):
port = get_free_port(upnp, port, protocol)
if isinstance(port, tuple):
log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it",
self.external_ip, external_port[1], protocol, upnp.lanaddr, internal_port)
return external_port[1], protocol
upnp.addportmapping(external_port, protocol, upnp.lanaddr, internal_port,
self.external_ip, port, protocol, upnp.lanaddr, port)
return port
upnp.addportmapping(port, protocol, upnp.lanaddr, port,
description, '')
log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, external_port,
protocol, upnp.lanaddr, internal_port)
return external_port, protocol
log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, port,
protocol, upnp.lanaddr, port)
return port

def threaded_try_upnp():
if self.use_upnp is False:
Expand All @@ -203,13 +198,11 @@ def threaded_try_upnp():
# best not to rely on this external ip, the router can be behind layers of NATs
self.external_ip = external_ip
if self.peer_port:
self.upnp_redirects.append(
get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port')
)
self.peer_port = get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port')
self.upnp_redirects.append((self.peer_port, 'TCP'))
if self.dht_node_port:
self.upnp_redirects.append(
get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port')
)
self.dht_node_port = get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port')
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
return True
return False

Expand All @@ -234,9 +227,9 @@ def _setup_dht(self): # does not block startup, the dht will re-attempt if nece
self.hash_announcer = hashannouncer.DHTHashAnnouncer(self.dht_node, self.storage)
self.peer_manager = self.dht_node.peer_manager
self.peer_finder = self.dht_node.peer_finder
self._join_dht_deferred = self.dht_node.joinNetwork(self.known_dht_nodes)
self._join_dht_deferred.addCallback(lambda _: log.info("Joined the dht"))
self._join_dht_deferred.addCallback(lambda _: self.hash_announcer.start())
d = self.dht_node.start(self.known_dht_nodes)
d.addCallback(lambda _: log.info("Joined the dht"))
d.addCallback(lambda _: self.hash_announcer.start())

def _setup_other_components(self):
log.debug("Setting up the rest of the components")
Expand All @@ -251,19 +244,19 @@ def _setup_other_components(self):
else:
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage)

if self.blob_tracker is None:
self.blob_tracker = self.blob_tracker_class(
self.blob_manager, self.dht_node.peer_finder, self.dht_node
)
if self.payment_rate_manager is None:
self.payment_rate_manager = self.payment_rate_manager_class(
self.base_payment_rate_manager, self.blob_tracker, self.is_generous
)
# if self.blob_tracker is None:
# self.blob_tracker = self.blob_tracker_class(
# self.blob_manager, self.dht_node.peer_finder, self.dht_node
# )
# if self.payment_rate_manager is None:
# self.payment_rate_manager = self.payment_rate_manager_class(
# self.base_payment_rate_manager, self.blob_tracker, self.is_generous
# )

self.rate_limiter.start()
d = self.blob_manager.setup()
d.addCallback(lambda _: self.wallet.start())
d.addCallback(lambda _: self.blob_tracker.start())
# d.addCallback(lambda _: self.blob_tracker.start())
return d

def _unset_upnp(self):
Expand Down
66 changes: 29 additions & 37 deletions lbrynet/core/call_later_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,25 @@


class CallLaterManager(object):
_callLater = None
_pendingCallLaters = []
_delay = MIN_DELAY

@classmethod
def get_min_delay(cls):
cls._pendingCallLaters = [cl for cl in cls._pendingCallLaters if cl.active()]
queue_size = len(cls._pendingCallLaters)
def __init__(self, callLater):
"""
:param callLater: (IReactorTime.callLater)
"""

self._callLater = callLater
self._pendingCallLaters = []
self._delay = MIN_DELAY

def get_min_delay(self):
self._pendingCallLaters = [cl for cl in self._pendingCallLaters if cl.active()]
queue_size = len(self._pendingCallLaters)
if queue_size > QUEUE_SIZE_THRESHOLD:
cls._delay = min((cls._delay + DELAY_INCREMENT), MAX_DELAY)
self._delay = min((self._delay + DELAY_INCREMENT), MAX_DELAY)
else:
cls._delay = max((cls._delay - 2.0 * DELAY_INCREMENT), MIN_DELAY)
return cls._delay
self._delay = max((self._delay - 2.0 * DELAY_INCREMENT), MIN_DELAY)
return self._delay

@classmethod
def _cancel(cls, call_later):
def _cancel(self, call_later):
"""
:param call_later: DelayedCall
:return: (callable) canceller function
Expand All @@ -38,26 +41,25 @@ def cancel(reason=None):

if call_later.active():
call_later.cancel()
cls._pendingCallLaters.remove(call_later)
if call_later in self._pendingCallLaters:
self._pendingCallLaters.remove(call_later)
return reason
return cancel

@classmethod
def stop(cls):
def stop(self):
"""
Cancel any callLaters that are still running
"""

from twisted.internet import defer
while cls._pendingCallLaters:
canceller = cls._cancel(cls._pendingCallLaters[0])
while self._pendingCallLaters:
canceller = self._cancel(self._pendingCallLaters[0])
try:
canceller()
except (defer.CancelledError, defer.AlreadyCalledError):
except (defer.CancelledError, defer.AlreadyCalledError, ValueError):
pass

@classmethod
def call_later(cls, when, what, *args, **kwargs):
def call_later(self, when, what, *args, **kwargs):
"""
Schedule a call later and get a canceller callback function

Expand All @@ -69,21 +71,11 @@ def call_later(cls, when, what, *args, **kwargs):
:return: (tuple) twisted.internet.base.DelayedCall object, canceller function
"""

call_later = cls._callLater(when, what, *args, **kwargs)
canceller = cls._cancel(call_later)
cls._pendingCallLaters.append(call_later)
call_later = self._callLater(when, what, *args, **kwargs)
canceller = self._cancel(call_later)
self._pendingCallLaters.append(call_later)
return call_later, canceller

@classmethod
def call_soon(cls, what, *args, **kwargs):
delay = cls.get_min_delay()
return cls.call_later(delay, what, *args, **kwargs)

@classmethod
def setup(cls, callLater):
"""
Setup the callLater function to use, supports the real reactor as well as task.Clock

:param callLater: (IReactorTime.callLater)
"""
cls._callLater = callLater
def call_soon(self, what, *args, **kwargs):
delay = self.get_min_delay()
return self.call_later(delay, what, *args, **kwargs)
Loading