Skip to content

Commit

Permalink
Merge pull request #421 from valohai/multiproc-expose-speed
Browse files Browse the repository at this point in the history
Multiprocess exposition speed boost
  • Loading branch information
brian-brazil authored Jun 6, 2019
2 parents 5132fd2 + 0f544eb commit df024e0
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 50 deletions.
65 changes: 40 additions & 25 deletions prometheus_client/mmap_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,29 @@ def _pack_integer(data, pos, value):
data[pos:pos + 4] = _pack_integer_func(value)


def _read_all_values(data, used=0):
"""Yield (key, value, pos). No locking is performed."""

if used <= 0:
# If not valid `used` value is passed in, read it from the file.
used = _unpack_integer(data, 0)[0]

pos = 8

while pos < used:
encoded_len = _unpack_integer(data, pos)[0]
# check we are not reading beyond bounds
if encoded_len + pos > used:
raise RuntimeError('Read beyond file size detected, file is corrupted.')
pos += 4
encoded_key = data[pos : pos + encoded_len]
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
pos += padded_len
value = _unpack_double(data, pos)[0]
yield encoded_key.decode('utf-8'), value, pos
pos += 8


class MmapedDict(object):
"""A dict of doubles, backed by an mmapped file.
Expand All @@ -37,9 +60,11 @@ class MmapedDict(object):
def __init__(self, filename, read_mode=False):
self._f = open(filename, 'rb' if read_mode else 'a+b')
self._fname = filename
if os.fstat(self._f.fileno()).st_size == 0:
capacity = os.fstat(self._f.fileno()).st_size
if capacity == 0:
self._f.truncate(_INITIAL_MMAP_SIZE)
self._capacity = os.fstat(self._f.fileno()).st_size
capacity = _INITIAL_MMAP_SIZE
self._capacity = capacity
self._m = mmap.mmap(self._f.fileno(), self._capacity,
access=mmap.ACCESS_READ if read_mode else mmap.ACCESS_WRITE)

Expand All @@ -53,6 +78,17 @@ def __init__(self, filename, read_mode=False):
for key, _, pos in self._read_all_values():
self._positions[key] = pos

@staticmethod
def read_all_values_from_file(filename):
with open(filename, 'rb') as infp:
# Read the first block of data, including the first 4 bytes which tell us
# how much of the file (which is preallocated to _INITIAL_MMAP_SIZE bytes) is occupied.
data = infp.read(65535)
used = _unpack_integer(data, 0)[0]
if used > len(data): # Then read in the rest, if needed.
data += infp.read(used - len(data))
return _read_all_values(data, used)

def _init_value(self, key):
"""Initialize a value. Lock must be held by caller."""
encoded = key.encode('utf-8')
Expand All @@ -72,31 +108,10 @@ def _init_value(self, key):

def _read_all_values(self):
"""Yield (key, value, pos). No locking is performed."""

pos = 8

# cache variables to local ones and prevent attributes lookup
# on every loop iteration
used = self._used
data = self._m
unpack_from = struct.unpack_from

while pos < used:
encoded_len = _unpack_integer(data, pos)[0]
# check we are not reading beyond bounds
if encoded_len + pos > used:
msg = 'Read beyond file size detected, %s is corrupted.'
raise RuntimeError(msg % self._fname)
pos += 4
encoded = unpack_from(('%ss' % encoded_len).encode(), data, pos)[0]
padded_len = encoded_len + (8 - (encoded_len + 4) % 8)
pos += padded_len
value = _unpack_double(data, pos)[0]
yield encoded.decode('utf-8'), value, pos
pos += 8
return _read_all_values(data=self._m, used=self._used)

def read_all_values(self):
"""Yield (key, value, pos). No locking is performed."""
"""Yield (key, value). No locking is performed."""
for k, v, _ in self._read_all_values():
yield k, v

Expand Down
69 changes: 44 additions & 25 deletions prometheus_client/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from .samples import Sample
from .utils import floatToGoString

MP_METRIC_HELP = 'Multiprocess metric'


class MultiProcessCollector(object):
"""Collector for files for multi-process mode."""
Expand All @@ -33,18 +35,31 @@ def merge(files, accumulate=True):
But if writing the merged data back to mmap files, use
accumulate=False to avoid compound accumulation.
"""
metrics = MultiProcessCollector._read_metrics(files)
return MultiProcessCollector._accumulate_metrics(metrics, accumulate)

@staticmethod
def _read_metrics(files):
metrics = {}
key_cache = {}

def _parse_key(key):
val = key_cache.get(key)
if not val:
metric_name, name, labels = json.loads(key)
labels_key = tuple(sorted(labels.items()))
val = key_cache[key] = (metric_name, name, labels, labels_key)
return val

for f in files:
parts = os.path.basename(f).split('_')
typ = parts[0]
d = MmapedDict(f, read_mode=True)
for key, value in d.read_all_values():
metric_name, name, labels = json.loads(key)
labels_key = tuple(sorted(labels.items()))
for key, value, pos in MmapedDict.read_all_values_from_file(f):
metric_name, name, labels, labels_key = _parse_key(key)

metric = metrics.get(metric_name)
if metric is None:
metric = Metric(metric_name, 'Multiprocess metric', typ)
metric = Metric(metric_name, MP_METRIC_HELP, typ)
metrics[metric_name] = metric

if typ == 'gauge':
Expand All @@ -54,43 +69,47 @@ def merge(files, accumulate=True):
else:
# The duplicates and labels are fixed in the next for.
metric.add_sample(name, labels_key, value)
d.close()
return metrics

@staticmethod
def _accumulate_metrics(metrics, accumulate):
for metric in metrics.values():
samples = defaultdict(float)
buckets = {}
buckets = defaultdict(lambda: defaultdict(float))
samples_setdefault = samples.setdefault
for s in metric.samples:
name, labels, value = s.name, s.labels, s.value
name, labels, value, timestamp, exemplar = s
if metric.type == 'gauge':
without_pid = tuple(l for l in labels if l[0] != 'pid')
without_pid_key = (name, tuple([l for l in labels if l[0] != 'pid']))
if metric._multiprocess_mode == 'min':
current = samples.setdefault((name, without_pid), value)
current = samples_setdefault(without_pid_key, value)
if value < current:
samples[(s.name, without_pid)] = value
samples[without_pid_key] = value
elif metric._multiprocess_mode == 'max':
current = samples.setdefault((name, without_pid), value)
current = samples_setdefault(without_pid_key, value)
if value > current:
samples[(s.name, without_pid)] = value
samples[without_pid_key] = value
elif metric._multiprocess_mode == 'livesum':
samples[(name, without_pid)] += value
samples[without_pid_key] += value
else: # all/liveall
samples[(name, labels)] = value

elif metric.type == 'histogram':
bucket = tuple(float(l[1]) for l in labels if l[0] == 'le')
if bucket:
# _bucket
without_le = tuple(l for l in labels if l[0] != 'le')
buckets.setdefault(without_le, {})
buckets[without_le].setdefault(bucket[0], 0.0)
buckets[without_le][bucket[0]] += value
else:
# A for loop with early exit is faster than a genexpr
# or a listcomp that ends up building unnecessary things
for l in labels:
if l[0] == 'le':
bucket_value = float(l[1])
# _bucket
without_le = tuple(l for l in labels if l[0] != 'le')
buckets[without_le][bucket_value] += value
break
else: # did not find the `le` key
# _sum/_count
samples[(s.name, labels)] += value

samples[(name, labels)] += value
else:
# Counter and Summary.
samples[(s.name, labels)] += value
samples[(name, labels)] += value

# Accumulate bucket values.
if metric.type == 'histogram':
Expand Down

0 comments on commit df024e0

Please sign in to comment.