Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #812 from mozilla-services/feat/802
Browse files Browse the repository at this point in the history
add a /_memusage API on a separate (internal port)
  • Loading branch information
jrconlin authored Feb 10, 2017
2 parents 53630ff + 6a9336c commit fe90445
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
omit =
*noseplugin*
autopush/tests/certs/makecerts.py
autopush/gcdump.py
exclude_lines = def dump_rpy_heap
show_missing = true
113 changes: 113 additions & 0 deletions autopush/gcdump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#! /usr/bin/env python
"""
Prints a human-readable total out of a dumpfile produced
by gc.dump_rpy_heap(), and optionally a typeids.txt.
Syntax: dump.py <dumpfile> [<typeids.txt>]
By default, typeids.txt is loaded from the same dir as dumpfile.
"""
import array
import os
import struct
import sys


class Stat(object):
summary = {}
typeids = {0: '<GCROOT>'}

def summarize(self, filename, stream=None):
a = self.load_dump_file(filename)
self.summary = {} # {typenum: [count, totalsize]}
for obj in self.walk(a, stream=stream):
self.add_object_summary(obj[2], obj[3])

def load_typeids(self, filename_or_iter):
self.typeids = Stat.typeids.copy()
if isinstance(filename_or_iter, str):
iter = open(filename_or_iter)
else:
iter = filename_or_iter
for num, line in enumerate(iter):
if num == 0:
continue
if not line:
continue
words = line.split()
if words[0].startswith('member'):
del words[0]
if words[0] == 'GcStruct':
del words[0]
self.typeids[num] = ' '.join(words)

def get_type_name(self, num):
return self.typeids.get(num, '<typenum %d>' % num)

def print_summary(self, stream):
items = self.summary.items()
items.sort(key=lambda (typenum, stat): stat[1]) # sort by totalsize
totalsize = 0
for typenum, stat in items:
totalsize += stat[1]
stream.write('%8d %8.2fM %s\n' %
(stat[0],
stat[1] / (1024.0*1024.0),
self.get_type_name(typenum)))
stream.write('total %.1fM\n' % (totalsize / (1024.0*1024.0)))

def load_dump_file(self, filename):
f = open(filename, 'rb')
f.seek(0, 2)
end = f.tell()
f.seek(0)
a = array.array('l')
a.fromfile(f, end / struct.calcsize('l'))
f.close()
return a

def add_object_summary(self, typenum, sizeobj):
try:
stat = self.summary[typenum]
except KeyError:
stat = self.summary[typenum] = [0, 0]
stat[0] += 1
stat[1] += sizeobj

def walk(self, a, start=0, stop=None, stream=None):
assert a[-1] == -1, "invalid or truncated dump file (or 32/64-bit mix)"
assert a[-2] != -1, "invalid or truncated dump file (or 32/64-bit mix)"
if stream:
stream.write('walking...')
i = start
if stop is None:
stop = len(a)
while i < stop:
j = i + 3
while a[j] != -1:
j += 1
yield (i, a[i], a[i+1], a[i+2], a[i+3:j])
i = j + 1
if stream:
stream.write('done\n')


if __name__ == '__main__':
if len(sys.argv) <= 1:
print >> sys.stderr, __doc__
sys.exit(2)
stat = Stat()
stat.summarize(sys.argv[1], stream=sys.stderr)
#
if len(sys.argv) > 2:
typeid_name = sys.argv[2]
else:
typeid_name = os.path.join(os.path.dirname(sys.argv[1]), 'typeids.txt')
if os.path.isfile(typeid_name):
stat.load_typeids(typeid_name)
else:
import gc
import zlib
stat.load_typeids(zlib.decompress(gc.get_typeids_z()).split("\n"))
#
stat.print_summary(sys.stdout)
29 changes: 28 additions & 1 deletion autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from autobahn.twisted.resource import WebSocketResource
from twisted.internet import reactor, task
from twisted.internet.endpoints import SSL4ServerEndpoint, TCP4ServerEndpoint
from twisted.internet.tcp import Port # noqa
from twisted.logger import Logger
from twisted.web.server import Site
from typing import Any, Callable # noqa
Expand All @@ -16,7 +17,11 @@
from autopush.logging import PushLogger
from autopush.settings import AutopushSettings
from autopush.ssl import AutopushSSLContextFactory
from autopush.web.health import HealthHandler, StatusHandler
from autopush.web.health import (
HealthHandler,
MemUsageHandler,
StatusHandler
)
from autopush.web.limitedhttpconnection import LimitedHTTPConnection
from autopush.web.log_check import LogCheckHandler
from autopush.web.message import MessageHandler
Expand Down Expand Up @@ -54,6 +59,7 @@
'logcheck': r"/v1/err(?:/(?P<err_type>[^\/]+))?",
'status': r"^/status",
'health': r"^/health",
'memusage': r"^/_memusage",
}


Expand Down Expand Up @@ -147,6 +153,10 @@ def add_shared_args(parser):
parser.add_argument('--msg_limit', help="Max limit for messages per uaid "
"before reset", type=int, default="100",
env_var="MSG_LIMIT")
parser.add_argument('--memusage_port',
help="Enable the debug _memusage API on Port",
type=int, default=None,
env_var='MEMUSAGE_PORT')
# No ENV because this is for humans
add_external_router_args(parser)
obsolete_args(parser)
Expand Down Expand Up @@ -547,6 +557,8 @@ def connection_main(sysargs=None, use_files=True):
start_looping_call(1.0, periodic_reporter, settings, factory)
# Start the table rotation checker/updater
start_looping_call(60, settings.update_rotating_tables)
if args.memusage_port:
create_memusage_site(settings, args.memusage_port, args.debug)
reactor.run()


Expand Down Expand Up @@ -618,6 +630,8 @@ def create_endpoint(port):
reactor.suggestThreadPoolSize(50)
# Start the table rotation checker/updater
start_looping_call(60, settings.update_rotating_tables)
if args.memusage_port:
create_memusage_site(settings, args.memusage_port, args.debug)
reactor.run()


Expand All @@ -629,3 +643,16 @@ def start_looping_call(interval, func, *args, **kwargs):
lambda failure: log.failure(
"Error in LoopingCall {name}", name=func.__name__, failure=failure)
)


def create_memusage_site(settings, port, debug):
"""Setup MemUsageHandler on a specific port"""
# type: (AutopushSettings, int, bool) -> Port
h_kwargs = dict(ap_settings=settings)
site = cyclone.web.Application(
[(endpoint_paths['memusage'], MemUsageHandler, h_kwargs)],
default_host=settings.hostname,
debug=debug,
log_function=skip_request_logging
)
return reactor.listenTCP(port, site)
49 changes: 49 additions & 0 deletions autopush/memusage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Produces memory usage information"""
import gc
import objgraph
import os
import resource
import subprocess
import tempfile
import zlib
from StringIO import StringIO

from autopush.gcdump import Stat


def memusage():
"""Returning a str of memory usage stats"""
# type() -> str
def trap_err(func, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
# include both __str/repr__, sometimes one's useless
buf.writelines([func.__name__, ': ', repr(e), ': ', str(e)])

buf = StringIO()
rusage = trap_err(resource.getrusage, resource.RUSAGE_SELF)
buf.writelines([repr(rusage), '\n\n'])
trap_err(objgraph.show_most_common_types, limit=0, file=buf)
buf.write('\n\n')
pmap = trap_err(subprocess.check_output, ['pmap', '-x', str(os.getpid())],
stderr=subprocess.STDOUT)
buf.writelines([pmap, '\n\n'])
trap_err(dump_rpy_heap, buf)
return buf.getvalue()


def dump_rpy_heap(stream):
"""Write PyPy's gcdump to the specified stream"""
if not hasattr(gc, '_dump_rpy_heap'):
# not PyPy
return

with tempfile.NamedTemporaryFile('wb') as fp:
gc._dump_rpy_heap(fp.fileno())
stream.write("{} size: {}\n".format(fp.name, os.stat(fp.name).st_size))
stat = Stat()
stat.summarize(fp.name, stream=None)
stat.load_typeids(zlib.decompress(gc.get_typeids_z()).split("\n"))
stream.write('\n\n')
stat.print_summary(stream)
36 changes: 34 additions & 2 deletions autopush/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import urlparse
import uuid
from contextlib import contextmanager
from distutils.spawn import find_executable
from StringIO import StringIO
from httplib import HTTPResponse # noqa
from unittest.case import SkipTest
Expand Down Expand Up @@ -364,7 +365,11 @@ def setUp(self):
from twisted.web.server import Site

from autopush.web.log_check import LogCheckHandler
from autopush.main import mount_health_handlers, skip_request_logging
from autopush.main import (
create_memusage_site,
mount_health_handlers,
skip_request_logging
)
from autopush.settings import AutopushSettings
from autopush.web.message import MessageHandler
from autopush.web.registration import RegistrationHandler
Expand All @@ -383,6 +388,7 @@ def setUp(self):

self.endpoint_port = 9020
self.router_port = 9030
self.memusage_port = 9040
settings = AutopushSettings(
hostname="localhost",
statsd_host=None,
Expand Down Expand Up @@ -449,6 +455,11 @@ def setUp(self):
)
ep.listen(site).addCallback(self._endpoint_listening)

self.memusage_site = create_memusage_site(
settings,
self.memusage_port,
False)

def _create_endpoint(self, port, is_https):
if not is_https:
return TCP4ServerEndpoint(reactor, port)
Expand All @@ -465,7 +476,9 @@ def endpoint_SSLCF(self):

@inlineCallbacks
def tearDown(self):
sites = [self.websocket, self.ws_website] + self.endpoints
sites = [self.websocket,
self.ws_website,
self.memusage_site] + self.endpoints
for d in filter(None, (site.stopListening() for site in sites)):
yield d

Expand Down Expand Up @@ -2032,6 +2045,25 @@ def test_no_proxy_protocol(self):
eq_(payload['error'], "Test Error")


class TestMemUsage(IntegrationBase):

@inlineCallbacks
def test_memusage(self):
response, body = yield _agent(
'GET',
"http://localhost:{}/_memusage".format(self.memusage_port),
)
eq_(response.code, 200)
ok_('rusage' in body)
ok_('Logger' in body)
if find_executable('pmap'):
assert 'RSS' in body, body
ok_('RSS' in body) # pmap -X output
if hasattr(sys, 'pypy_version_info'):
ok_('size: ' in body)
ok_('rpy_unicode' in body)


@inlineCallbacks
def _agent(method, url, contextFactory=None, headers=None, body=None):
kwargs = {}
Expand Down
10 changes: 10 additions & 0 deletions autopush/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ def test_ssl(self):
"--router_ssl_key=keys/server.key",
], False)

def test_memusage(self):
connection_main([
"--memusage_port=8083",
], False)

def test_skip_logging(self):
# Should skip setting up logging on the handler
mock_handler = Mock()
Expand Down Expand Up @@ -305,6 +310,11 @@ def test_proxy_protocol_port(self):
"--proxy_protocol_port=8081",
], False)

def test_memusage(self):
endpoint_main([
"--memusage_port=8083",
], False)

@patch('hyper.tls', spec=hyper.tls)
def test_client_certs_parse(self, mock):
ap = make_settings(self.TestArg)
Expand Down
25 changes: 25 additions & 0 deletions autopush/web/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,28 @@ def get(self, *args, **kwargs):
"status": "OK",
"version": __version__
})


class MemUsageHandler(BaseWebHandler):
"""Spits out a tarball of some memory stats.
Should be ran on its own port, not accessible externally.
"""

def authenticate_peer_cert(self):
"""skip authentication checks"""
pass

def get(self, *args, **kwargs):
"""HTTP Get
Returns that this node is alive, and the version.
"""
from autopush.memusage import memusage
d = deferToThread(memusage)
d.addCallback(self.write)
d.addCallback(self.finish)
d.addErrback(self._response_err)
return d
1 change: 1 addition & 0 deletions base-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ ipaddress==1.0.16
jmespath==0.9.0
marshmallow==2.10.2
marshmallow_polyfield==3.1
objgraph==3.1.0
pyOpenSSL==16.1.0
pyasn1==0.1.9
pyasn1-modules==0.0.8
Expand Down

0 comments on commit fe90445

Please sign in to comment.