Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weekly cleanups #4284

Merged
merged 8 commits into from
Jan 4, 2021
Merged
12 changes: 9 additions & 3 deletions contrib/pyln-testing/pyln/testing/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from concurrent import futures
from pyln.testing.db import SqliteDbProvider, PostgresDbProvider
from pyln.testing.utils import NodeFactory, BitcoinD, ElementsD, env, DEVELOPER, LightningNode, TEST_DEBUG
from pyln.testing.utils import NodeFactory, BitcoinD, ElementsD, env, DEVELOPER, LightningNode, TEST_DEBUG, Throttler
from typing import Dict

import logging
Expand Down Expand Up @@ -198,15 +198,21 @@ def teardown_checks(request):


@pytest.fixture
def node_factory(request, directory, test_name, bitcoind, executor, db_provider, teardown_checks, node_cls):
def throttler():
yield Throttler()


@pytest.fixture
def node_factory(request, directory, test_name, bitcoind, executor, db_provider, teardown_checks, node_cls, throttler):
nf = NodeFactory(
request,
test_name,
bitcoind,
executor,
directory=directory,
db_provider=db_provider,
node_cls=node_cls
node_cls=node_cls,
throttler=throttler,
)

yield nf
Expand Down
83 changes: 78 additions & 5 deletions contrib/pyln-testing/pyln/testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from collections import OrderedDict
from decimal import Decimal
from ephemeral_port_reserve import reserve # type: ignore
from filelock import FileLock
from pyln.client import LightningRpc
from pyln.client import Millisatoshi

Expand All @@ -14,6 +15,7 @@
import lzma
import math
import os
import psutil # type: ignore
import random
import re
import shutil
Expand Down Expand Up @@ -225,9 +227,13 @@ def tail(self):

if self.proc.stderr:
for line in iter(self.proc.stderr.readline, ''):
if len(line) == 0:

if line is None or len(line) == 0:
break
self.err_logs.append(line.rstrip().decode('UTF-8', 'replace')).rstrip()

line = line.rstrip().decode('UTF-8', 'replace')
self.err_logs.append(line)

self.proc.stderr.close()

def is_in_log(self, regex, start=0):
Expand Down Expand Up @@ -321,7 +327,16 @@ def __getattr__(self, name):
proxy = BitcoinProxy(btc_conf_file=self.__btc_conf_file__)

def f(*args):
return proxy._call(name, *args)
logging.debug("Calling {name} with arguments {args}".format(
name=name,
args=args
))
res = proxy._call(name, *args)
logging.debug("Result for {name} call: {res}".format(
name=name,
res=res,
))
return res

# Make debuggers show <function bitcoin.rpc.name> rather than <function
# bitcoin.rpc.<lambda>>
Expand Down Expand Up @@ -401,6 +416,14 @@ def generate_block(self, numblocks=1, wait_for_mempool=0):
wait_for(lambda: all(txid in self.rpc.getrawmempool() for txid in wait_for_mempool))
else:
wait_for(lambda: len(self.rpc.getrawmempool()) >= wait_for_mempool)

mempool = self.rpc.getrawmempool()
logging.debug("Generating {numblocks}, confirming {lenmempool} transactions: {mempool}".format(
numblocks=numblocks,
mempool=mempool,
lenmempool=len(mempool),
))

# As of 0.16, generate() is removed; use generatetoaddress.
return self.rpc.generatetoaddress(numblocks, self.rpc.getnewaddress())

Expand Down Expand Up @@ -1021,10 +1044,59 @@ def passes_filters(hmsg, filters):
return msgs


class Throttler(object):
"""Throttles the creation of system-processes to avoid overload.

There is no reason to overload the system with too many processes
being spawned or run at the same time. It causes timeouts by
aggressively preempting processes and swapping if the memory limit is
reached. In order to reduce this loss of performance we provide a
`wait()` method which will serialize the creation of processes, but
also delay if the system load is too high.

Notice that technically we are throttling too late, i.e., we react
to an overload, but chances are pretty good that some other
already running process is about to terminate, and so the overload
is short-lived. We throttle when the process object is first
created, not when restarted, in order to avoid delaying running
tests, which could cause more timeouts.

"""
def __init__(self, target: float = 75):
"""If specified we try to stick to a load of target (in percent).
"""
self.target = target
self.lock = FileLock("/tmp/ltest.lock")
self.current_load = self.target # Start slow
psutil.cpu_percent() # Prime the internal load metric

def wait(self):
start_time = time.time()
with self.lock.acquire(poll_intervall=0.250):
# We just got the lock, assume someone else just released it
self.current_load = 100
while self.load() >= self.target:
time.sleep(1)

delay = time.time() - start_time
with open("/tmp/ltest-throttler.csv", "a") as f:
f.write("{}, {}, {}, {}\n".format(time.time(), self.load(), self.target, delay))
self.current_load = 100 # Back off slightly to avoid triggering right away
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read this comment a few times, but i still don't get it 😅 . What would a < 75 load would trigger ? And why setting it to 100 after getting each node would avoid the trigger ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is that if we leave the current_load at 75 percent a followup call, e.g., another lightningd wanting to start, could end up seeing 75% and decide to immediately schedule its startup (maybe the first one hasn't had time to start up and push the load up yet), thus we might end up clearing the queue despite only the first one getting the all-clear.

Setting this to 100% means the ewma smoothing needs a couple of rounds before allowing the next one to go through, avoiding the flooding scenario above.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, i had it the other way around, thanks!


def load(self):
"""An exponential moving average of the load
"""
decay = 0.5
load = psutil.cpu_percent()
self.current_load = decay * load + (1 - decay) * self.current_load
return self.current_load


class NodeFactory(object):
"""A factory to setup and start `lightningd` daemons.
"""
def __init__(self, request, testname, bitcoind, executor, directory, db_provider, node_cls):
def __init__(self, request, testname, bitcoind, executor, directory,
db_provider, node_cls, throttler):
if request.node.get_closest_marker("slow_test") and SLOW_MACHINE:
self.valgrind = False
else:
Expand All @@ -1038,6 +1110,7 @@ def __init__(self, request, testname, bitcoind, executor, directory, db_provider
self.lock = threading.Lock()
self.db_provider = db_provider
self.node_cls = node_cls
self.throttler = throttler

def split_options(self, opts):
"""Split node options from cli options
Expand Down Expand Up @@ -1098,7 +1171,7 @@ def get_node(self, node_id=None, options=None, dbfile=None,
feerates=(15000, 11000, 7500, 3750), start=True,
wait_for_bitcoind_sync=True, may_fail=False,
expect_fail=False, cleandir=True, **kwargs):

self.throttler.wait()
node_id = self.get_node_id() if not node_id else node_id
port = self.get_next_port()

Expand Down
2 changes: 2 additions & 0 deletions contrib/pyln-testing/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ cheroot==8.2.1
ephemeral-port-reserve==1.1.1
python-bitcoinlib==0.10.2
psycopg2-binary==2.8.4
filelock==3.0.*
psutil==5.7.*
4 changes: 2 additions & 2 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from utils import DEVELOPER, TEST_NETWORK # noqa: F401,F403
from pyln.testing.fixtures import directory, test_base_dir, test_name, chainparams, node_factory, bitcoind, teardown_checks, db_provider, executor, setup_logging # noqa: F401,F403
from pyln.testing.fixtures import directory, test_base_dir, test_name, chainparams, node_factory, bitcoind, teardown_checks, throttler, db_provider, executor, setup_logging # noqa: F401,F403
from pyln.testing import utils
from utils import COMPAT

Expand All @@ -19,7 +19,7 @@ def __init__(self, *args, **kwargs):

# If we opted into checking the DB statements we will attach the dblog
# plugin before starting the node
check_dblog = os.environ.get("TEST_CHECK_DBSTMTS", None) is not None
check_dblog = os.environ.get("TEST_CHECK_DBSTMTS", None) == "1"
db = os.environ.get("TEST_DB_PROVIDER", "sqlite3")
if db == 'sqlite3' and check_dblog:
dblog = os.path.join(os.path.dirname(__file__), 'plugins', 'dblog.py')
Expand Down
2 changes: 2 additions & 0 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
check_coin_moves, first_channel_id, account_balance, basic_fee,
EXPERIMENTAL_FEATURES
)
from pyln.testing.utils import SLOW_MACHINE, VALGRIND
from bitcoin.core import CMutableTransaction, CMutableTxOut

import binascii
Expand Down Expand Up @@ -1042,6 +1043,7 @@ def test_funding_external_wallet_corners(node_factory, bitcoind):
l1.rpc.close(l2.info['id'])


@unittest.skipIf(SLOW_MACHINE and not VALGRIND, "Way too taxing on CI machines")
def test_funding_cancel_race(node_factory, bitcoind, executor):
l1 = node_factory.get_node()

Expand Down
5 changes: 5 additions & 0 deletions tests/test_gossip.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,11 @@ def test_routing_gossip(node_factory, bitcoind):
src.rpc.connect(dst.info['id'], 'localhost', dst.port)
src.openchannel(dst, 25000)

# Avoid "bad gossip" caused by future announcements (a node below
# confirmation height receiving and ignoring the announcement,
# thus marking followup messages as bad).
sync_blockheight(bitcoind, nodes)

# Allow announce messages.
bitcoind.generate_block(5)

Expand Down