Skip to content

Commit

Permalink
Make Counter and MinMaxSumCount aggregators thread safe (#439)
Browse files Browse the repository at this point in the history
  • Loading branch information
mauriciovasquezbernal authored Feb 27, 2020
1 parent 5b2e693 commit 2578b37
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 44 deletions.
77 changes: 41 additions & 36 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import abc
import threading
from collections import namedtuple


Expand Down Expand Up @@ -47,62 +48,66 @@ def __init__(self):
super().__init__()
self.current = 0
self.checkpoint = 0
self._lock = threading.Lock()

def update(self, value):
self.current += value
with self._lock:
self.current += value

def take_checkpoint(self):
self.checkpoint = self.current
self.current = 0
with self._lock:
self.checkpoint = self.current
self.current = 0

def merge(self, other):
self.checkpoint += other.checkpoint
with self._lock:
self.checkpoint += other.checkpoint


class MinMaxSumCountAggregator(Aggregator):
"""Agregator for Measure metrics that keeps min, max, sum and count."""

_TYPE = namedtuple("minmaxsumcount", "min max sum count")
_EMPTY = _TYPE(None, None, None, 0)

@classmethod
def _min(cls, val1, val2):
if val1 is None and val2 is None:
return None
return min(val1 or val2, val2 or val1)

@classmethod
def _max(cls, val1, val2):
if val1 is None and val2 is None:
return None
return max(val1 or val2, val2 or val1)

@classmethod
def _sum(cls, val1, val2):
if val1 is None and val2 is None:
return None
return (val1 or 0) + (val2 or 0)
def _merge_checkpoint(cls, val1, val2):
if val1 is cls._EMPTY:
return val2
if val2 is cls._EMPTY:
return val1
return cls._TYPE(
min(val1.min, val2.min),
max(val1.max, val2.max),
val1.sum + val2.sum,
val1.count + val2.count,
)

def __init__(self):
super().__init__()
self.current = self._TYPE(None, None, None, 0)
self.checkpoint = self._TYPE(None, None, None, 0)
self.current = self._EMPTY
self.checkpoint = self._EMPTY
self._lock = threading.Lock()

def update(self, value):
self.current = self._TYPE(
self._min(self.current.min, value),
self._max(self.current.max, value),
self._sum(self.current.sum, value),
self.current.count + 1,
)
with self._lock:
if self.current is self._EMPTY:
self.current = self._TYPE(value, value, value, 1)
else:
self.current = self._TYPE(
min(self.current.min, value),
max(self.current.max, value),
self.current.sum + value,
self.current.count + 1,
)

def take_checkpoint(self):
self.checkpoint = self.current
self.current = self._TYPE(None, None, None, 0)
with self._lock:
self.checkpoint = self.current
self.current = self._EMPTY

def merge(self, other):
self.checkpoint = self._TYPE(
self._min(self.checkpoint.min, other.checkpoint.min),
self._max(self.checkpoint.max, other.checkpoint.max),
self._sum(self.checkpoint.sum, other.checkpoint.sum),
self.checkpoint.count + other.checkpoint.count,
)
with self._lock:
self.checkpoint = self._merge_checkpoint(
self.checkpoint, other.checkpoint
)
128 changes: 120 additions & 8 deletions opentelemetry-sdk/tests/metrics/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent.futures
import random
import unittest
from unittest import mock

Expand Down Expand Up @@ -222,6 +224,15 @@ def test_ungrouped_batcher_process_not_stateful(self):


class TestCounterAggregator(unittest.TestCase):
@staticmethod
def call_update(counter):
update_total = 0
for _ in range(0, 100000):
val = random.getrandbits(32)
counter.update(val)
update_total += val
return update_total

def test_update(self):
counter = CounterAggregator()
counter.update(1.0)
Expand All @@ -243,13 +254,58 @@ def test_merge(self):
counter.merge(counter2)
self.assertEqual(counter.checkpoint, 4.0)

def test_concurrent_update(self):
counter = CounterAggregator()

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
fut1 = executor.submit(self.call_update, counter)
fut2 = executor.submit(self.call_update, counter)

updapte_total = fut1.result() + fut2.result()

counter.take_checkpoint()
self.assertEqual(updapte_total, counter.checkpoint)

def test_concurrent_update_and_checkpoint(self):
counter = CounterAggregator()
checkpoint_total = 0

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
fut = executor.submit(self.call_update, counter)

while not fut.done():
counter.take_checkpoint()
checkpoint_total += counter.checkpoint

counter.take_checkpoint()
checkpoint_total += counter.checkpoint

self.assertEqual(fut.result(), checkpoint_total)


class TestMinMaxSumCountAggregator(unittest.TestCase):
@staticmethod
def call_update(mmsc):
min_ = float("inf")
max_ = float("-inf")
sum_ = 0
count_ = 0
for _ in range(0, 100000):
val = random.getrandbits(32)
mmsc.update(val)
if val < min_:
min_ = val
if val > max_:
max_ = val
sum_ += val
count_ += 1
return MinMaxSumCountAggregator._TYPE(min_, max_, sum_, count_)

def test_update(self):
mmsc = MinMaxSumCountAggregator()
# test current values without any update
self.assertEqual(
mmsc.current, (None, None, None, 0),
mmsc.current, MinMaxSumCountAggregator._EMPTY,
)

# call update with some values
Expand All @@ -267,7 +323,7 @@ def test_checkpoint(self):
# take checkpoint wihtout any update
mmsc.take_checkpoint()
self.assertEqual(
mmsc.checkpoint, (None, None, None, 0),
mmsc.checkpoint, MinMaxSumCountAggregator._EMPTY,
)

# call update with some values
Expand All @@ -282,7 +338,7 @@ def test_checkpoint(self):
)

self.assertEqual(
mmsc.current, (None, None, None, 0),
mmsc.current, MinMaxSumCountAggregator._EMPTY,
)

def test_merge(self):
Expand All @@ -299,14 +355,34 @@ def test_merge(self):

self.assertEqual(
mmsc1.checkpoint,
(
min(checkpoint1.min, checkpoint2.min),
max(checkpoint1.max, checkpoint2.max),
checkpoint1.sum + checkpoint2.sum,
checkpoint1.count + checkpoint2.count,
MinMaxSumCountAggregator._merge_checkpoint(
checkpoint1, checkpoint2
),
)

def test_merge_checkpoint(self):
func = MinMaxSumCountAggregator._merge_checkpoint
_type = MinMaxSumCountAggregator._TYPE
empty = MinMaxSumCountAggregator._EMPTY

ret = func(empty, empty)
self.assertEqual(ret, empty)

ret = func(empty, _type(0, 0, 0, 0))
self.assertEqual(ret, _type(0, 0, 0, 0))

ret = func(_type(0, 0, 0, 0), empty)
self.assertEqual(ret, _type(0, 0, 0, 0))

ret = func(_type(0, 0, 0, 0), _type(0, 0, 0, 0))
self.assertEqual(ret, _type(0, 0, 0, 0))

ret = func(_type(44, 23, 55, 86), empty)
self.assertEqual(ret, _type(44, 23, 55, 86))

ret = func(_type(3, 150, 101, 3), _type(1, 33, 44, 2))
self.assertEqual(ret, _type(1, 150, 101 + 44, 2 + 3))

def test_merge_with_empty(self):
mmsc1 = MinMaxSumCountAggregator()
mmsc2 = MinMaxSumCountAggregator()
Expand All @@ -318,6 +394,42 @@ def test_merge_with_empty(self):

self.assertEqual(mmsc1.checkpoint, checkpoint1)

def test_concurrent_update(self):
mmsc = MinMaxSumCountAggregator()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
fut1 = ex.submit(self.call_update, mmsc)
fut2 = ex.submit(self.call_update, mmsc)

ret1 = fut1.result()
ret2 = fut2.result()

update_total = MinMaxSumCountAggregator._merge_checkpoint(
ret1, ret2
)
mmsc.take_checkpoint()

self.assertEqual(update_total, mmsc.checkpoint)

def test_concurrent_update_and_checkpoint(self):
mmsc = MinMaxSumCountAggregator()
checkpoint_total = MinMaxSumCountAggregator._TYPE(2 ** 32, 0, 0, 0)

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex:
fut = ex.submit(self.call_update, mmsc)

while not fut.done():
mmsc.take_checkpoint()
checkpoint_total = MinMaxSumCountAggregator._merge_checkpoint(
checkpoint_total, mmsc.checkpoint
)

mmsc.take_checkpoint()
checkpoint_total = MinMaxSumCountAggregator._merge_checkpoint(
checkpoint_total, mmsc.checkpoint
)

self.assertEqual(checkpoint_total, fut.result())


class TestController(unittest.TestCase):
def test_push_controller(self):
Expand Down

0 comments on commit 2578b37

Please sign in to comment.