Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Counter and MinMaxSumCount aggregators thread safe #439

Merged
merged 6 commits into from
Feb 27, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 37 additions & 34 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import abc
import threading
from collections import namedtuple


Expand Down Expand Up @@ -47,13 +48,16 @@ def __init__(self):
super().__init__()
self.current = 0
self.checkpoint = 0
self._lock = threading.Lock()

def update(self, value):
self.current += value
with self._lock:
self.current += value

def take_checkpoint(self):
self.checkpoint = self.current
self.current = 0
with self._lock:
self.checkpoint = self.current
self.current = 0

def merge(self, other):
self.checkpoint += other.checkpoint
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to lock here too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to my understanding merge() and take_checkpoint() will be never called concurrently. Those methods are invoked in Batcher::process() that is invoked from Meter::collect() that is never concurrent. @lzchen could you please check this reasoning?

def process(self, record):
# Checkpoints the current aggregator value to be collected for export
record.aggregator.take_checkpoint()
batch_key = (record.metric, record.label_set)
batch_value = self._batch_map.get(batch_key)
aggregator = record.aggregator
if batch_value:
# Update the stored checkpointed value if exists. The call to merge
# here combines only identical records (same key).
batch_value.merge(aggregator)
return
if self.stateful:
# if stateful batcher, create a copy of the aggregator and update
# it with the current checkpointed value for long-term storage
aggregator = self.aggregator_for(record.metric.__class__)
aggregator.merge(record.aggregator)
self._batch_map[batch_key] = aggregator

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even still, I think it's better not to rely on the behavior of the caller here. Even better: if they're never called concurrently, there will never be lock contention to slow this down.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I added locks to merge too but I didn't added tests for that. We can revisit this later on when considering the use of atomics libraries here.

Expand All @@ -63,46 +67,45 @@ class MinMaxSumCountAggregator(Aggregator):
"""Agregator for Measure metrics that keeps min, max, sum and count."""

_TYPE = namedtuple("minmaxsumcount", "min max sum count")
_EMPTY = _TYPE(None, None, None, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't sum be 0 too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I don't have a strong feeling about that, if you have please let me know and I'll change it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be because of https://en.wikipedia.org/wiki/Identity_element, also not a strong opinion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that, min should be inf and max -inf. The point is, do we want to have a special value to indicate that it hasn't been computed?. i.e, None. I don't have a strong opinion, the go implementation returns an error in such case, even if I don't understand the details of the race condition that is mentioned there.

https://github.com/open-telemetry/opentelemetry-go/blob/29cd0c08b70052860d9a6ce0510e102537ff7fa7/sdk/metric/aggregator/minmaxsumcount/mmsc.go#L81-L86

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way's fine with me, but I'd expect sum and count to either both be None or both be 0. As for +/-inf I think the min/max of an empty set is literally undefined, but the sum and count are defined and 0.

But this is getting dangerously close to philosophy. LGTM as is.


@classmethod
def _min(cls, val1, val2):
if val1 is None and val2 is None:
return None
return min(val1 or val2, val2 or val1)

@classmethod
def _max(cls, val1, val2):
if val1 is None and val2 is None:
return None
return max(val1 or val2, val2 or val1)

@classmethod
def _sum(cls, val1, val2):
if val1 is None and val2 is None:
return None
return (val1 or 0) + (val2 or 0)
def _merge_checkpoint(cls, val1, val2):
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
if val1 is cls._EMPTY:
return val2
if val2 is cls._EMPTY:
return val1
return cls._TYPE(
min(val1.min, val2.min),
max(val1.max, val2.max),
val1.sum + val2.sum,
val1.count + val2.count,
)

def __init__(self):
super().__init__()
self.current = self._TYPE(None, None, None, 0)
self.checkpoint = self._TYPE(None, None, None, 0)
self.current = self._EMPTY
self.checkpoint = self._EMPTY
self._lock = threading.Lock()

def update(self, value):
self.current = self._TYPE(
self._min(self.current.min, value),
self._max(self.current.max, value),
self._sum(self.current.sum, value),
self.current.count + 1,
)
with self._lock:
if self.current is self._EMPTY:
self.current = self._TYPE(value, value, value, 1)
else:
self.current = self._TYPE(
min(self.current.min, value),
max(self.current.max, value),
self.current.sum + value,
self.current.count + 1,
)

def take_checkpoint(self):
self.checkpoint = self.current
self.current = self._TYPE(None, None, None, 0)
with self._lock:
self.checkpoint = self.current
self.current = self._EMPTY

def merge(self, other):
self.checkpoint = self._TYPE(
self._min(self.checkpoint.min, other.checkpoint.min),
self._max(self.checkpoint.max, other.checkpoint.max),
self._sum(self.checkpoint.sum, other.checkpoint.sum),
self.checkpoint.count + other.checkpoint.count,
self.checkpoint = self._merge_checkpoint(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you still need to lock here:

  1. thread 1 calls merge, loads self.checkpoint
  2. thread 2 calls take_checkpoint, updates self.checkpoint
  3. thread 1 updates self.checkpoint by merging the value it loaded in step 1, undoing step 2

This is based on:

In [9]: dis.dis(MinMaxSumCountAggregator.merge)
109           0 LOAD_FAST                0 (self)
              2 LOAD_METHOD              0 (_merge_checkpoint)

110           4 LOAD_FAST                0 (self)
              6 LOAD_ATTR                1 (checkpoint)
              8 LOAD_FAST                1 (other)      << thread 2 start
             10 LOAD_ATTR                1 (checkpoint)
             12 CALL_METHOD              2
             14 LOAD_FAST                0 (self)
             16 STORE_ATTR               1 (checkpoint)
             18 LOAD_CONST               0 (None)
             20 RETURN_VALUE

In [10]: dis.dis(MinMaxSumCountAggregator.take_checkpoint)
104           0 LOAD_FAST                0 (self)
              2 LOAD_ATTR                0 (_lock)
              4 SETUP_WITH              22 (to 28)
              6 POP_TOP

105           8 LOAD_FAST                0 (self)
             10 LOAD_ATTR                1 (current)
             12 LOAD_FAST                0 (self)
             14 STORE_ATTR               2 (checkpoint)

106          16 LOAD_FAST                0 (self)   << thread 1 overwrite
             18 LOAD_ATTR                3 (_EMPTY)
             20 LOAD_FAST                0 (self)
             22 STORE_ATTR               1 (current)
             24 POP_BLOCK
             26 LOAD_CONST               0 (None)
        >>   28 WITH_CLEANUP_START
             30 WITH_CLEANUP_FINISH
             32 END_FINALLY
             34 LOAD_CONST               0 (None)
             36 RETURN_VALUE

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see #439 (comment).

self.checkpoint, other.checkpoint
)
128 changes: 120 additions & 8 deletions opentelemetry-sdk/tests/metrics/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent.futures
import random
import unittest
from unittest import mock

Expand Down Expand Up @@ -222,6 +224,15 @@ def test_ungrouped_batcher_process_not_stateful(self):


class TestCounterAggregator(unittest.TestCase):
@classmethod
mauriciovasquezbernal marked this conversation as resolved.
Show resolved Hide resolved
def call_update(cls, counter):
update_total = 0
for _ in range(0, 100000):
val = random.getrandbits(32)
counter.update(val)
update_total += val
return update_total

def test_update(self):
counter = CounterAggregator()
counter.update(1.0)
Expand All @@ -243,13 +254,58 @@ def test_merge(self):
counter.merge(counter2)
self.assertEqual(counter.checkpoint, 4.0)

def test_concurrent_update(self):
counter = CounterAggregator()

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
fut1 = executor.submit(self.call_update, counter)
fut2 = executor.submit(self.call_update, counter)

updapte_total = fut1.result() + fut2.result()

counter.take_checkpoint()
self.assertEqual(updapte_total, counter.checkpoint)

def test_concurrent_update_and_checkpoint(self):
counter = CounterAggregator()
checkpoint_total = 0

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
fut = executor.submit(self.call_update, counter)

while not fut.done():
counter.take_checkpoint()
checkpoint_total += counter.checkpoint

counter.take_checkpoint()
checkpoint_total += counter.checkpoint

self.assertEqual(fut.result(), checkpoint_total)


class TestMinMaxSumCountAggregator(unittest.TestCase):
@classmethod
def call_update(cls, mmsc):
min_ = 2 ** 32
mauriciovasquezbernal marked this conversation as resolved.
Show resolved Hide resolved
max_ = 0
sum_ = 0
count_ = 0
for _ in range(0, 100000):
val = random.getrandbits(32)
mmsc.update(val)
if val < min_:
min_ = val
if val > max_:
max_ = val
sum_ += val
count_ += 1
return MinMaxSumCountAggregator._TYPE(min_, max_, sum_, count_)

def test_update(self):
mmsc = MinMaxSumCountAggregator()
# test current values without any update
self.assertEqual(
mmsc.current, (None, None, None, 0),
mmsc.current, MinMaxSumCountAggregator._EMPTY,
)

# call update with some values
Expand All @@ -267,7 +323,7 @@ def test_checkpoint(self):
# take checkpoint wihtout any update
mmsc.take_checkpoint()
self.assertEqual(
mmsc.checkpoint, (None, None, None, 0),
mmsc.checkpoint, MinMaxSumCountAggregator._EMPTY,
)

# call update with some values
Expand All @@ -282,7 +338,7 @@ def test_checkpoint(self):
)

self.assertEqual(
mmsc.current, (None, None, None, 0),
mmsc.current, MinMaxSumCountAggregator._EMPTY,
)

def test_merge(self):
Expand All @@ -299,14 +355,34 @@ def test_merge(self):

self.assertEqual(
mmsc1.checkpoint,
(
min(checkpoint1.min, checkpoint2.min),
max(checkpoint1.max, checkpoint2.max),
checkpoint1.sum + checkpoint2.sum,
checkpoint1.count + checkpoint2.count,
MinMaxSumCountAggregator._merge_checkpoint(
checkpoint1, checkpoint2
),
)

def test_merge_checkpoint(self):
func = MinMaxSumCountAggregator._merge_checkpoint
_type = MinMaxSumCountAggregator._TYPE
empty = MinMaxSumCountAggregator._EMPTY

ret = func(empty, empty)
self.assertEqual(ret, empty)

ret = func(empty, _type(0, 0, 0, 0))
self.assertEqual(ret, _type(0, 0, 0, 0))

ret = func(_type(0, 0, 0, 0), empty)
self.assertEqual(ret, _type(0, 0, 0, 0))

ret = func(_type(0, 0, 0, 0), _type(0, 0, 0, 0))
self.assertEqual(ret, _type(0, 0, 0, 0))

ret = func(_type(44, 23, 55, 86), empty)
self.assertEqual(ret, _type(44, 23, 55, 86))

ret = func(_type(3, 150, 101, 3), _type(1, 33, 44, 2))
self.assertEqual(ret, _type(1, 150, 101 + 44, 2 + 3))

def test_merge_with_empty(self):
mmsc1 = MinMaxSumCountAggregator()
mmsc2 = MinMaxSumCountAggregator()
Expand All @@ -318,6 +394,42 @@ def test_merge_with_empty(self):

self.assertEqual(mmsc1.checkpoint, checkpoint1)

def test_concurrent_update(self):
mmsc = MinMaxSumCountAggregator()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as ex:
fut1 = ex.submit(self.call_update, mmsc)
fut2 = ex.submit(self.call_update, mmsc)

ret1 = fut1.result()
ret2 = fut2.result()

update_total = MinMaxSumCountAggregator._merge_checkpoint(
ret1, ret2
)
mmsc.take_checkpoint()

self.assertEqual(update_total, mmsc.checkpoint)

def test_concurrent_update_and_checkpoint(self):
mmsc = MinMaxSumCountAggregator()
checkpoint_total = MinMaxSumCountAggregator._TYPE(2 ** 32, 0, 0, 0)

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex:
fut = ex.submit(self.call_update, mmsc)

while not fut.done():
mmsc.take_checkpoint()
checkpoint_total = MinMaxSumCountAggregator._merge_checkpoint(
checkpoint_total, mmsc.checkpoint
)

mmsc.take_checkpoint()
checkpoint_total = MinMaxSumCountAggregator._merge_checkpoint(
checkpoint_total, mmsc.checkpoint
)

self.assertEqual(checkpoint_total, fut.result())


class TestController(unittest.TestCase):
def test_push_controller(self):
Expand Down