From aaee11ed43f097de8cd78dc4fd6bad3e26be9b93 Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Thu, 9 Feb 2017 15:32:08 -0800 Subject: [PATCH] feat: add a /_memusage API on a separate (internal port) for debugging our memory usage issue #802 --- autopush/gcdump.py | 34 +++++++++++++-------- autopush/main.py | 29 +++++++++++++++++- autopush/memusage.py | 48 ++++++++++++++++++++++++++++++ autopush/tests/test_integration.py | 35 ++++++++++++++++++++-- autopush/tests/test_main.py | 5 ++++ autopush/web/health.py | 25 ++++++++++++++++ base-requirements.txt | 1 + 7 files changed, 161 insertions(+), 16 deletions(-) create mode 100644 autopush/memusage.py diff --git a/autopush/gcdump.py b/autopush/gcdump.py index 8f88f484..05eaeed9 100644 --- a/autopush/gcdump.py +++ b/autopush/gcdump.py @@ -7,17 +7,20 @@ By default, typeids.txt is loaded from the same dir as dumpfile. """ -import sys, array, struct, os +import array +import os +import struct +import sys class Stat(object): summary = {} typeids = {0: ''} - def summarize(self, filename): + def summarize(self, filename, stream=None): a = self.load_dump_file(filename) self.summary = {} # {typenum: [count, totalsize]} - for obj in self.walk(a): + for obj in self.walk(a, stream=stream): self.add_object_summary(obj[2], obj[3]) def load_typeids(self, filename_or_iter): @@ -41,15 +44,17 @@ def load_typeids(self, filename_or_iter): def get_type_name(self, num): return self.typeids.get(num, '' % num) - def print_summary(self): + def print_summary(self, stream): items = self.summary.items() items.sort(key=lambda (typenum, stat): stat[1]) # sort by totalsize totalsize = 0 for typenum, stat in items: totalsize += stat[1] - print '%8d %8.2fM %s' % (stat[0], stat[1] / (1024.0*1024.0), - self.get_type_name(typenum)) - print 'total %.1fM' % (totalsize / (1024.0*1024.0),) + stream.write('%8d %8.2fM %s\n' % + (stat[0], + stat[1] / (1024.0*1024.0), + self.get_type_name(typenum))) + stream.write('total %.1fM\n' % (totalsize / (1024.0*1024.0))) def load_dump_file(self, filename): f = open(filename, 'rb') @@ -69,10 +74,11 @@ def add_object_summary(self, typenum, sizeobj): stat[0] += 1 stat[1] += sizeobj - def walk(self, a, start=0, stop=None): + def walk(self, a, start=0, stop=None, stream=None): assert a[-1] == -1, "invalid or truncated dump file (or 32/64-bit mix)" assert a[-2] != -1, "invalid or truncated dump file (or 32/64-bit mix)" - print >> sys.stderr, 'walking...', + if stream: + stream.write('walking...') i = start if stop is None: stop = len(a) @@ -82,7 +88,8 @@ def walk(self, a, start=0, stop=None): j += 1 yield (i, a[i], a[i+1], a[i+2], a[i+3:j]) i = j + 1 - print >> sys.stderr, 'done' + if stream: + stream.write('done\n') if __name__ == '__main__': @@ -90,7 +97,7 @@ def walk(self, a, start=0, stop=None): print >> sys.stderr, __doc__ sys.exit(2) stat = Stat() - stat.summarize(sys.argv[1]) + stat.summarize(sys.argv[1], stream=sys.stderr) # if len(sys.argv) > 2: typeid_name = sys.argv[2] @@ -99,7 +106,8 @@ def walk(self, a, start=0, stop=None): if os.path.isfile(typeid_name): stat.load_typeids(typeid_name) else: - import zlib, gc + import gc + import zlib stat.load_typeids(zlib.decompress(gc.get_typeids_z()).split("\n")) # - stat.print_summary() + stat.print_summary(sys.stdout) diff --git a/autopush/main.py b/autopush/main.py index 374d358c..94103664 100644 --- a/autopush/main.py +++ b/autopush/main.py @@ -7,6 +7,7 @@ from autobahn.twisted.resource import WebSocketResource from twisted.internet import reactor, task from twisted.internet.endpoints import SSL4ServerEndpoint, TCP4ServerEndpoint +from twisted.internet.tcp import Port # noqa from twisted.logger import Logger from twisted.web.server import Site from typing import Any, Callable # noqa @@ -16,7 +17,11 @@ from autopush.logging import PushLogger from autopush.settings import AutopushSettings from autopush.ssl import AutopushSSLContextFactory -from autopush.web.health import HealthHandler, StatusHandler +from autopush.web.health import ( + HealthHandler, + MemUsageHandler, + StatusHandler +) from autopush.web.limitedhttpconnection import LimitedHTTPConnection from autopush.web.log_check import LogCheckHandler from autopush.web.message import MessageHandler @@ -54,6 +59,7 @@ 'logcheck': r"/v1/err(?:/(?P[^\/]+))?", 'status': r"^/status", 'health': r"^/health", + 'memusage': r"^/_memusage", } @@ -147,6 +153,10 @@ def add_shared_args(parser): parser.add_argument('--msg_limit', help="Max limit for messages per uaid " "before reset", type=int, default="100", env_var="MSG_LIMIT") + parser.add_argument('--memusage_port', + help="Enable the debug _memusage API on Port", + type=int, default=None, + env_var='MEMUSAGE_PORT') # No ENV because this is for humans add_external_router_args(parser) obsolete_args(parser) @@ -547,6 +557,8 @@ def connection_main(sysargs=None, use_files=True): start_looping_call(1.0, periodic_reporter, settings, factory) # Start the table rotation checker/updater start_looping_call(60, settings.update_rotating_tables) + if args.memusage_port: + create_memusage_site(settings, args.memusage_port, args.debug) reactor.run() @@ -618,6 +630,8 @@ def create_endpoint(port): reactor.suggestThreadPoolSize(50) # Start the table rotation checker/updater start_looping_call(60, settings.update_rotating_tables) + if args.memusage_port: + create_memusage_site(settings, args.memusage_port, args.debug) reactor.run() @@ -629,3 +643,16 @@ def start_looping_call(interval, func, *args, **kwargs): lambda failure: log.failure( "Error in LoopingCall {name}", name=func.__name__, failure=failure) ) + + +def create_memusage_site(settings, port, debug): + """Setup MemUsageHandler on a specific port""" + # type: (AutopushSettings, int, bool) -> Port + h_kwargs = dict(ap_settings=settings) + site = cyclone.web.Application( + [(endpoint_paths['memusage'], MemUsageHandler, h_kwargs)], + default_host=settings.hostname, + debug=debug, + log_function=skip_request_logging + ) + return reactor.listenTCP(port, site) diff --git a/autopush/memusage.py b/autopush/memusage.py new file mode 100644 index 00000000..1a837ef2 --- /dev/null +++ b/autopush/memusage.py @@ -0,0 +1,48 @@ +"""Produces memory usage information""" +import gc +import objgraph +import os +import resource +import subprocess +import tempfile +import zlib +from StringIO import StringIO + +from autopush.gcdump import Stat + + +def memusage(): + """Returning a str of memory usage stats""" + # type() -> str + def trap_err(func, *args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + buf.writelines([func.__name__, ': ', repr(e)]) + + buf = StringIO() + rusage = trap_err(resource.getrusage, resource.RUSAGE_SELF) + buf.writelines([repr(rusage), '\n\n']) + trap_err(objgraph.show_most_common_types, limit=0, file=buf) + buf.write('\n\n') + pmap = trap_err(subprocess.check_output, ['pmap', '-X', str(os.getpid())], + stderr=subprocess.STDOUT) + buf.writelines([pmap, '\n\n']) + trap_err(dump_rpy_heap, buf) + return buf.getvalue() + + +def dump_rpy_heap(stream): + """Write PyPy's gcdump to the specified stream""" + if not hasattr(gc, '_dump_rpy_heap'): + # not PyPy + return + + with tempfile.NamedTemporaryFile('wb') as fp: + gc._dump_rpy_heap(fp.fileno()) + stream.write("{} size: {}\n".format(fp.name, os.stat(fp.name).st_size)) + stat = Stat() + stat.summarize(fp.name, stream=None) + stat.load_typeids(zlib.decompress(gc.get_typeids_z()).split("\n")) + stream.write('\n\n') + stat.print_summary(stream) diff --git a/autopush/tests/test_integration.py b/autopush/tests/test_integration.py index 8b173fe5..c169ff18 100644 --- a/autopush/tests/test_integration.py +++ b/autopush/tests/test_integration.py @@ -10,6 +10,7 @@ import urlparse import uuid from contextlib import contextmanager +from distutils.spawn import find_executable from StringIO import StringIO from httplib import HTTPResponse # noqa from unittest.case import SkipTest @@ -364,7 +365,11 @@ def setUp(self): from twisted.web.server import Site from autopush.web.log_check import LogCheckHandler - from autopush.main import mount_health_handlers, skip_request_logging + from autopush.main import ( + create_memusage_site, + mount_health_handlers, + skip_request_logging + ) from autopush.settings import AutopushSettings from autopush.web.message import MessageHandler from autopush.web.registration import RegistrationHandler @@ -383,6 +388,7 @@ def setUp(self): self.endpoint_port = 9020 self.router_port = 9030 + self.memusage_port = 9040 settings = AutopushSettings( hostname="localhost", statsd_host=None, @@ -449,6 +455,11 @@ def setUp(self): ) ep.listen(site).addCallback(self._endpoint_listening) + self.memusage_site = create_memusage_site( + settings, + self.memusage_port, + False) + def _create_endpoint(self, port, is_https): if not is_https: return TCP4ServerEndpoint(reactor, port) @@ -465,7 +476,9 @@ def endpoint_SSLCF(self): @inlineCallbacks def tearDown(self): - sites = [self.websocket, self.ws_website] + self.endpoints + sites = [self.websocket, + self.ws_website, + self.memusage_site] + self.endpoints for d in filter(None, (site.stopListening() for site in sites)): yield d @@ -2032,6 +2045,24 @@ def test_no_proxy_protocol(self): eq_(payload['error'], "Test Error") +class TestMemUsage(IntegrationBase): + + @inlineCallbacks + def test_memusage(self): + response, body = yield _agent( + 'GET', + "http://localhost:{}/_memusage".format(self.memusage_port), + ) + eq_(response.code, 200) + ok_('rusage' in body) + ok_('Logger' in body) + if find_executable('pmap'): + ok_('Rss' in body) # pmap -X output + if hasattr(sys, 'pypy_version_info'): + ok_('size: ' in body) + ok_('rpy_unicode' in body) + + @inlineCallbacks def _agent(method, url, contextFactory=None, headers=None, body=None): kwargs = {} diff --git a/autopush/tests/test_main.py b/autopush/tests/test_main.py index 1aae04b3..8c1efe11 100644 --- a/autopush/tests/test_main.py +++ b/autopush/tests/test_main.py @@ -305,6 +305,11 @@ def test_proxy_protocol_port(self): "--proxy_protocol_port=8081", ], False) + def test_memusage(self): + endpoint_main([ + "--memusage_port=8083", + ], False) + @patch('hyper.tls', spec=hyper.tls) def test_client_certs_parse(self, mock): ap = make_settings(self.TestArg) diff --git a/autopush/web/health.py b/autopush/web/health.py index b29ba7f5..657cb645 100644 --- a/autopush/web/health.py +++ b/autopush/web/health.py @@ -93,3 +93,28 @@ def get(self, *args, **kwargs): "status": "OK", "version": __version__ }) + + +class MemUsageHandler(BaseWebHandler): + """Spits out a tarball of some memory stats. + + Should be ran on its own port, not accessible externally. + + """ + + def authenticate_peer_cert(self): + """skip authentication checks""" + pass + + def get(self, *args, **kwargs): + """HTTP Get + + Returns that this node is alive, and the version. + + """ + from autopush.memusage import memusage + d = deferToThread(memusage) + d.addCallback(self.write) + d.addCallback(self.finish) + d.addErrback(self._response_err) + return d diff --git a/base-requirements.txt b/base-requirements.txt index 5e3cf025..8a96b1cd 100644 --- a/base-requirements.txt +++ b/base-requirements.txt @@ -24,6 +24,7 @@ ipaddress==1.0.16 jmespath==0.9.0 marshmallow==2.10.2 marshmallow_polyfield==3.1 +objgraph==3.1.0 pyOpenSSL==16.1.0 pyasn1==0.1.9 pyasn1-modules==0.0.8