Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Views API Prototype #596

Merged
merged 47 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
7419bda
view
lzchen Apr 16, 2020
aa7562a
remove label keys
lzchen Apr 17, 2020
4a7f6dd
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen Apr 20, 2020
e43a984
Remove meter ref, aggregator for
lzchen Apr 21, 2020
4d4351f
view manager as part of meter
lzchen Apr 21, 2020
e667bc8
fix tests
lzchen Apr 21, 2020
5d9ae6f
seperate aggregations from aggregators
lzchen Apr 24, 2020
554dafb
default aggregations
lzchen Apr 24, 2020
f8997ca
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen May 4, 2020
8331f1b
Add label key logic
lzchen May 4, 2020
d93f159
aggregate config and histogram
lzchen May 7, 2020
53cc0f7
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen May 7, 2020
e966f7e
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen Jul 1, 2020
adce43e
views
lzchen Jul 2, 2020
864b644
fix observer
lzchen Jul 2, 2020
fea636f
fix lastvalue
lzchen Jul 2, 2020
af6d003
fix tests
lzchen Jul 7, 2020
176aa55
fix tests
lzchen Jul 7, 2020
12402f7
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen Jul 7, 2020
8e3a903
changelog
lzchen Jul 7, 2020
9e7332f
fix tests
lzchen Jul 7, 2020
3eb7dcd
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen Jul 7, 2020
ac3734c
lint
lzchen Jul 7, 2020
82a154c
lint
lzchen Jul 7, 2020
55ad347
lint
lzchen Jul 7, 2020
fbeaf34
lint
lzchen Jul 7, 2020
778223e
lint
lzchen Jul 7, 2020
4875769
lint
lzchen Jul 7, 2020
f793247
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen Jul 7, 2020
d7fbcef
Merge branch 'master' into views
lzchen Jul 8, 2020
45e96f3
fix state
lzchen Jul 10, 2020
0b22ef7
Merge branch 'views' of https://github.com/lzchen/opentelemetry-pytho…
lzchen Jul 10, 2020
2a55926
fix test
lzchen Jul 10, 2020
b74fb2d
Merge branch 'master' into views
lzchen Jul 10, 2020
a510eb0
Create new aggregator per ViewData, config dict
cnnradams Jul 28, 2020
54594c8
ViewData references, equality updates
cnnradams Jul 30, 2020
8baa90e
Histogram updates
cnnradams Jul 30, 2020
765e8ef
updates
cnnradams Jul 31, 2020
bba29d5
fix batcher
cnnradams Jul 31, 2020
d609602
Merge pull request #8 from cnnradams/views
lzchen Jul 31, 2020
4b49cb7
Fix View hash
c24t Aug 1, 2020
de145ce
Wrap, blacken
c24t Aug 1, 2020
93c3808
Merge branch 'master' of https://github.com/open-telemetry/openteleme…
lzchen Aug 3, 2020
2962561
Merge branch 'views' of https://github.com/lzchen/opentelemetry-pytho…
lzchen Aug 3, 2020
58212ba
fix teests
lzchen Aug 3, 2020
f0e74b7
test
lzchen Aug 3, 2020
a8ab8de
lint
lzchen Aug 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions docs/examples/basic_meter/basic_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@
- How to configure a meter passing a sateful or stateless.
- How to configure an exporter and how to create a controller.
- How to create some metrics intruments and how to capture data with them.
- How to use views to specify aggregation types for each metric instrument.
"""
import sys
import time

from opentelemetry import metrics
from opentelemetry import metrics as metrics_api
from opentelemetry.sdk import metrics
from opentelemetry.sdk.metrics import Counter, Measure, MeterProvider
from opentelemetry.sdk.metrics.export.aggregate import (
CounterAggregator,
MinMaxSumCountAggregator
)
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
from opentelemetry.sdk.metrics.export.controller import PushController
from opentelemetry.sdk.metrics.view import View

stateful = True

Expand Down Expand Up @@ -55,8 +62,8 @@ def usage(argv):
# determines whether how metrics are collected: if true, metrics accumulate
# over the process lifetime. If false, metrics are reset at the beginning of
# each collection interval.
metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__, batcher_mode == "stateful")
metrics_api.set_meter_provider(MeterProvider())
meter = metrics_api.get_meter(__name__, batcher_mode == "stateful")

# Exporter to export metrics to the console
exporter = ConsoleMetricsExporter()
Expand All @@ -72,7 +79,6 @@ def usage(argv):
unit="1",
value_type=int,
metric_type=Counter,
label_keys=("environment",),
)

requests_size = meter.create_metric(
Expand All @@ -81,7 +87,6 @@ def usage(argv):
unit="1",
value_type=int,
metric_type=Measure,
label_keys=("environment",),
)

# Labels are used to identify key-values that are associated with a specific
Expand All @@ -90,6 +95,14 @@ def usage(argv):
staging_labels = {"environment": "staging"}
testing_labels = {"environment": "testing"}

# Views are used to define an aggregation type to use for a specific metric
counter_view = View(requests_counter, CounterAggregator)
lzchen marked this conversation as resolved.
Show resolved Hide resolved
size_view = View(requests_size, MinMaxSumCountAggregator)

# Register the views to the view manager to use the views
metrics.view_manager.register_view(counter_view)
metrics.view_manager.register_view(size_view)

# Update the metric instruments using the direct calling convention
requests_counter.add(25, staging_labels)
requests_size.record(100, staging_labels)
Expand All @@ -101,4 +114,5 @@ def usage(argv):

requests_counter.add(35, testing_labels)
requests_size.record(2, testing_labels)
time.sleep(5)

input("...\n")
32 changes: 17 additions & 15 deletions docs/examples/basic_meter/calling_conventions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
"""
import time

from opentelemetry import metrics
from opentelemetry.sdk.metrics import Counter, Measure, MeterProvider
from opentelemetry import metrics as metrics_api
from opentelemetry.sdk import metrics
from opentelemetry.sdk.metrics import Counter, MeterProvider
from opentelemetry.sdk.metrics.export.aggregate import CounterAggregator
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
from opentelemetry.sdk.metrics.export.controller import PushController
from opentelemetry.sdk.metrics.view import View

# Use the meter type provided by the SDK package
metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__)
metrics_api.set_meter_provider(MeterProvider())
meter = metrics_api.get_meter(__name__)
exporter = ConsoleMetricsExporter()
controller = PushController(meter=meter, exporter=exporter, interval=5)

Expand All @@ -35,16 +38,6 @@
unit="1",
value_type=int,
metric_type=Counter,
label_keys=("environment",),
)

requests_size = meter.create_metric(
name="requests_size",
description="size of requests",
unit="1",
value_type=int,
metric_type=Measure,
label_keys=("environment",),
)

clicks_counter = meter.create_metric(
Expand All @@ -53,11 +46,18 @@
unit="1",
value_type=int,
metric_type=Counter,
label_keys=("environment",),
)

labels = {"environment": "staging"}

# Views are used to define an aggregation type to use for a specific metric
counter_view = View(requests_counter, CounterAggregator)
clicks_view = View(clicks_counter, CounterAggregator)

# Register the views to the view manager to use the views
metrics.view_manager.register_view(counter_view)
metrics.view_manager.register_view(clicks_view)

print("Updating using direct calling convention...")
# You can record metrics directly using the metric instrument. You pass in
# labels that you would like to record for.
Expand All @@ -80,3 +80,5 @@
# specified labels for each.
meter.record_batch(labels, ((requests_counter, 50), (clicks_counter, 70)))
time.sleep(5)

input("...\n")
5 changes: 0 additions & 5 deletions docs/examples/basic_meter/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
from opentelemetry.sdk.metrics.export.controller import PushController

# Configure a stateful batcher
batcher = UngroupedBatcher(stateful=True)
metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__)
exporter = ConsoleMetricsExporter()
Expand All @@ -45,7 +42,6 @@ def get_cpu_usage_callback(observer):
description="per-cpu usage",
unit="1",
value_type=float,
label_keys=("cpu_number",),
)


Expand All @@ -61,7 +57,6 @@ def get_ram_usage_callback(observer):
description="RAM memory usage",
unit="1",
value_type=float,
label_keys=(),
)

input("Metrics will be printed soon. Press a key to finish...\n")
87 changes: 23 additions & 64 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

from opentelemetry import metrics as metrics_api
from opentelemetry.sdk.metrics.export.aggregate import Aggregator
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
from opentelemetry.sdk.metrics.export.batcher import Batcher
from opentelemetry.sdk.metrics.view import view_manager, ViewData
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo

Expand Down Expand Up @@ -48,47 +49,27 @@ def __init__(
self,
value_type: Type[metrics_api.ValueT],
enabled: bool,
aggregator: Aggregator,
labels: Tuple[Tuple[str, str]],
metric: metrics_api.MetricT,
):
self.value_type = value_type
self.enabled = enabled
self.aggregator = aggregator
self._ref_count = 0
self._ref_count_lock = threading.Lock()
self._value_type = value_type
c24t marked this conversation as resolved.
Show resolved Hide resolved
self._enabled = enabled
self._labels = labels
self._metric = metric
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid using the term "metric" to describe an instrument? The API specs very carefully avoid using metric as a noun, the only place we use that is to describe the result in a protocol such as OTLP. "Metric" is being used to describe the output, in other words, and "Instrument" is the thing we use for input.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A naming pass will be done outside of this PR to update all the names to the specs.


def _validate_update(self, value: metrics_api.ValueT) -> bool:
if not self.enabled:
if not self._enabled:
return False
if not isinstance(value, self.value_type):
if not isinstance(value, self._value_type):
logger.warning(
"Invalid value passed for %s.", self.value_type.__name__
"Invalid value passed for %s.", self._value_type.__name__
)
return False
return True

def update(self, value: metrics_api.ValueT):
self.aggregator.update(value)

def release(self):
lzchen marked this conversation as resolved.
Show resolved Hide resolved
self.decrease_ref_count()

def decrease_ref_count(self):
with self._ref_count_lock:
self._ref_count -= 1

def increase_ref_count(self):
with self._ref_count_lock:
self._ref_count += 1

def ref_count(self):
with self._ref_count_lock:
return self._ref_count

def __repr__(self):
return '{}(data="{}")'.format(
type(self).__name__, self.aggregator.current
)

# The view manager handles all updates to aggregators
view_manager.update_view(self._metric, self._labels, value)
lzchen marked this conversation as resolved.
Show resolved Hide resolved

class BoundCounter(metrics_api.BoundCounter, BaseBoundInstrument):
def add(self, value: metrics_api.ValueT) -> None:
Expand Down Expand Up @@ -122,15 +103,13 @@ def __init__(
unit: str,
value_type: Type[metrics_api.ValueT],
meter: "Meter",
label_keys: Sequence[str] = (),
enabled: bool = True,
):
self.name = name
self.description = description
self.unit = unit
self.value_type = value_type
self.meter = meter
self.label_keys = label_keys
self.enabled = enabled
self.bound_instruments = {}
self.bound_instruments_lock = threading.Lock()
Expand All @@ -144,11 +123,10 @@ def bind(self, labels: Dict[str, str]) -> BaseBoundInstrument:
bound_instrument = self.BOUND_INSTR_TYPE(
self.value_type,
self.enabled,
# Aggregator will be created based off type of metric
self.meter.batcher.aggregator_for(self.__class__),
get_labels_as_key(labels),
self,
)
self.bound_instruments[key] = bound_instrument
bound_instrument.increase_ref_count()
return bound_instrument

def __repr__(self):
Expand All @@ -169,7 +147,6 @@ def add(self, value: metrics_api.ValueT, labels: Dict[str, str]) -> None:
"""See `opentelemetry.metrics.Counter.add`."""
bound_intrument = self.bind(labels)
bound_intrument.add(value)
bound_intrument.release()

UPDATE_FUNCTION = add

Expand All @@ -185,7 +162,6 @@ def record(
"""See `opentelemetry.metrics.Measure.record`."""
bound_intrument = self.bind(labels)
bound_intrument.record(value)
bound_intrument.release()

UPDATE_FUNCTION = record

Expand All @@ -201,7 +177,6 @@ def __init__(
unit: str,
value_type: Type[metrics_api.ValueT],
meter: "Meter",
label_keys: Sequence[str] = (),
enabled: bool = True,
):
self.callback = callback
Expand All @@ -210,7 +185,6 @@ def __init__(
self.unit = unit
self.value_type = value_type
self.meter = meter
self.label_keys = label_keys
self.enabled = enabled

self.aggregators = {}
Expand Down Expand Up @@ -258,7 +232,7 @@ class Record:
def __init__(
self,
metric: metrics_api.MetricT,
labels: Dict[str, str],
labels: Tuple[Tuple[str, str]],
aggregator: Aggregator,
):
self.metric = metric
Expand All @@ -283,7 +257,7 @@ def __init__(
self.instrumentation_info = instrumentation_info
self.metrics = set()
self.observers = set()
self.batcher = UngroupedBatcher(stateful)
self.batcher = Batcher(stateful)
self.observers_lock = threading.Lock()
self.resource = resource

Expand All @@ -302,23 +276,12 @@ def _collect_metrics(self) -> None:
for metric in self.metrics:
if not metric.enabled:
continue

to_remove = []

with metric.bound_instruments_lock:
for labels, bound_instr in metric.bound_instruments.items():
# TODO: Consider storing records in memory?
record = Record(metric, labels, bound_instr.aggregator)
# Checkpoints the current aggregators
# Applies different batching logic based on type of batcher
self.batcher.process(record)

if bound_instr.ref_count() == 0:
to_remove.append(labels)

# Remove handles that were released
for labels in to_remove:
del metric.bound_instruments[labels]

lzchen marked this conversation as resolved.
Show resolved Hide resolved
for view_data in view_manager.get_all_view_data_for_metric(metric):
record = Record(metric, view_data.labels, view_data.aggregator)
# Checkpoints the current aggregators
# Applies different logic for stateful
self.batcher.process(record)

def _collect_observers(self) -> None:
with self.observers_lock:
Expand Down Expand Up @@ -351,7 +314,6 @@ def create_metric(
unit: str,
value_type: Type[metrics_api.ValueT],
metric_type: Type[metrics_api.MetricT],
label_keys: Sequence[str] = (),
enabled: bool = True,
) -> metrics_api.MetricT:
"""See `opentelemetry.metrics.Meter.create_metric`."""
Expand All @@ -362,7 +324,6 @@ def create_metric(
unit,
value_type,
self,
label_keys=label_keys,
enabled=enabled,
)
self.metrics.add(metric)
Expand All @@ -375,7 +336,6 @@ def register_observer(
description: str,
unit: str,
value_type: Type[metrics_api.ValueT],
label_keys: Sequence[str] = (),
enabled: bool = True,
) -> metrics_api.Observer:
ob = Observer(
Expand All @@ -385,7 +345,6 @@ def register_observer(
unit,
value_type,
self,
label_keys,
enabled,
)
with self.observers_lock:
Expand Down
Loading