Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: Make Counter & Gauge atomic #415

Merged
merged 6 commits into from
Jan 27, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion linkerd/app/core/src/handle_time.rs
Original file line number Diff line number Diff line change
@@ -259,7 +259,7 @@ impl Shared {
if counter.clones.fetch_sub(1, Ordering::Release) == 1 {
let elapsed = t0.elapsed();

let mut hist = match self.histogram.lock() {
let hist = match self.histogram.lock() {
hawkw marked this conversation as resolved.
Show resolved Hide resolved
Ok(lock) => lock,
// Avoid double panicking in drop.
Err(_) if panicking => return,
22 changes: 12 additions & 10 deletions linkerd/app/core/src/telemetry/process.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use self::system::System;
use linkerd2_metrics::{metrics, FmtMetrics, Gauge};
use std::fmt;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::debug;

@@ -12,7 +13,7 @@ metrics! {

#[derive(Clone, Debug, Default)]
pub struct Report {
start_time: Gauge,
start_time: Arc<Gauge>,
system: Option<System>,
}

@@ -31,7 +32,7 @@ impl Report {
}
};
Self {
start_time: t0.into(),
start_time: Arc::new(t0.into()),
system,
}
}
@@ -40,7 +41,7 @@ impl Report {
impl FmtMetrics for Report {
fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
process_start_time_seconds.fmt_help(f)?;
process_start_time_seconds.fmt_metric(f, self.start_time)?;
process_start_time_seconds.fmt_metric(f, self.start_time.as_ref())?;

if let Some(ref sys) = self.system {
sys.fmt_metrics(f)?;
@@ -129,14 +130,14 @@ mod system {
};

let clock_ticks = stat.utime as u64 + stat.stime as u64;
let cpu = Counter::from(clock_ticks / self.clock_ticks_per_sec);
process_cpu_seconds_total.fmt_help(f)?;
process_cpu_seconds_total
.fmt_metric(f, Counter::from(clock_ticks / self.clock_ticks_per_sec))?;
process_cpu_seconds_total.fmt_metric(f, &cpu)?;

match Self::open_fds(stat.pid) {
Ok(open_fds) => {
process_open_fds.fmt_help(f)?;
process_open_fds.fmt_metric(f, open_fds)?;
process_open_fds.fmt_metric(f, &open_fds)?;
}
Err(err) => {
warn!("could not determine process_open_fds: {}", err);
@@ -148,7 +149,7 @@ mod system {
Ok(None) => {}
Ok(Some(ref max_fds)) => {
process_max_fds.fmt_help(f)?;
process_max_fds.fmt_metric(f, *max_fds)?;
process_max_fds.fmt_metric(f, max_fds)?;
}
Err(err) => {
warn!("could not determine process_max_fds: {}", err);
@@ -157,11 +158,12 @@ mod system {
}

process_virtual_memory_bytes.fmt_help(f)?;
process_virtual_memory_bytes.fmt_metric(f, Gauge::from(stat.vsize as u64))?;
let vsz = Gauge::from(stat.vsize as u64);
process_virtual_memory_bytes.fmt_metric(f, &vsz)?;

process_resident_memory_bytes.fmt_help(f)?;
process_resident_memory_bytes
.fmt_metric(f, Gauge::from(stat.rss as u64 * self.page_size))
let rss = Gauge::from(stat.rss as u64 * self.page_size);
process_resident_memory_bytes.fmt_metric(f, &rss)
}
}
}
86 changes: 28 additions & 58 deletions linkerd/metrics/src/counter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::prom::{FmtLabels, FmtMetric, MAX_PRECISE_VALUE};
use std::fmt::{self, Display};
use std::ops;

use super::prom::{FmtLabels, FmtMetric};
use std::sync::atomic::{AtomicU64, Ordering};

/// A Prometheus counter is represented by a `Wrapping` unsigned 52-bit integer.
///
@@ -15,75 +14,45 @@ use super::prom::{FmtLabels, FmtMetric};
/// [`rate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#rate()
/// [`irate()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#irate()
/// [`resets()`]: https://prometheus.io/docs/prometheus/latest/querying/functions/#resets
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
pub struct Counter(u64);

/// Largest `u64` that can fit without loss of precision in `f64` (2^53).
olix0r marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) const MAX_PRECISE_COUNTER: u64 = 0x20_0000_0000_0000;
#[derive(Debug, Default)]
pub struct Counter(AtomicU64);

// ===== impl Counter =====

impl Counter {
/// Increment the counter by one.
///
/// This function wraps on 52-bit overflows.
pub fn incr(&mut self) {
*self += 1;
pub fn incr(&self) {
self.add(1)
}

/// Return current counter value.
pub fn add(&self, n: u64) {
self.0.fetch_add(n, Ordering::Release);
}

/// Return current counter value, wrapped to be safe for use with Prometheus.
pub fn value(&self) -> u64 {
self.0
.load(Ordering::Acquire)
.wrapping_rem(MAX_PRECISE_VALUE + 1)
olix0r marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl Into<u64> for Counter {
fn into(self) -> u64 {
self.0
self.value()
}
}

impl From<u64> for Counter {
fn from(value: u64) -> Self {
Counter(0) + value
}
}

impl ops::Add<u64> for Counter {
type Output = Self;
fn add(self, rhs: u64) -> Self::Output {
let wrapped = self
.0
.wrapping_add(rhs)
.wrapping_rem(MAX_PRECISE_COUNTER + 1);
Counter(wrapped)
}
}

impl ops::Add<Self> for Counter {
type Output = Self;
fn add(self, Counter(rhs): Self) -> Self::Output {
self + rhs
}
}

impl ops::AddAssign<u64> for Counter {
fn add_assign(&mut self, rhs: u64) {
*self = *self + rhs
}
}

impl ops::AddAssign<Self> for Counter {
fn add_assign(&mut self, Counter(rhs): Self) {
*self += rhs
Counter(value.into())
}
}

impl FmtMetric for Counter {
const KIND: &'static str = "counter";

fn fmt_metric<N: Display>(&self, f: &mut fmt::Formatter<'_>, name: N) -> fmt::Result {
writeln!(f, "{} {}", name, self.0)
writeln!(f, "{} {}", name, self.value())
}

fn fmt_metric_labeled<N, L>(
@@ -98,7 +67,7 @@ impl FmtMetric for Counter {
{
write!(f, "{}{{", name)?;
labels.fmt_labels(f)?;
writeln!(f, "}} {}", self.0)
writeln!(f, "}} {}", self.value())
}
}

@@ -108,30 +77,31 @@ mod tests {

#[test]
fn count_simple() {
let mut cnt = Counter::from(0);
let cnt = Counter::from(0);
assert_eq!(cnt.value(), 0);
cnt.incr();
assert_eq!(cnt.value(), 1);
cnt += 41;
cnt.add(41);
assert_eq!(cnt.value(), 42);
cnt += 0;
cnt.add(0);
assert_eq!(cnt.value(), 42);
}

#[test]
fn count_wrapping() {
let mut cnt = Counter::from(MAX_PRECISE_COUNTER - 1);
assert_eq!(cnt.value(), MAX_PRECISE_COUNTER - 1);
let cnt = Counter::from(MAX_PRECISE_VALUE - 1);
assert_eq!(cnt.value(), MAX_PRECISE_VALUE - 1);
cnt.incr();
assert_eq!(cnt.value(), MAX_PRECISE_COUNTER);
assert_eq!(cnt + 1, Counter::from(0));
assert_eq!(cnt.value(), MAX_PRECISE_VALUE);
cnt.incr();
assert_eq!(cnt.value(), 0);
cnt.incr();
assert_eq!(cnt.value(), 1);

let max = Counter::from(MAX_PRECISE_COUNTER);
assert_eq!(max.value(), MAX_PRECISE_COUNTER);
let max = Counter::from(MAX_PRECISE_VALUE);
assert_eq!(max.value(), MAX_PRECISE_VALUE);

let over = Counter::from(MAX_PRECISE_COUNTER + 1);
let over = Counter::from(MAX_PRECISE_VALUE + 1);
assert_eq!(over.value(), 0);
}
}
40 changes: 18 additions & 22 deletions linkerd/metrics/src/gauge.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,46 @@
use super::prom::{FmtLabels, FmtMetric, MAX_PRECISE_VALUE};
use std::fmt::{self, Display};

use tracing::warn;

use super::{FmtLabels, FmtMetric};
use std::sync::atomic::{AtomicU64, Ordering};

/// An instaneous metric value.
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
pub struct Gauge(u64);
#[derive(Debug, Default)]
pub struct Gauge(AtomicU64);
olix0r marked this conversation as resolved.
Show resolved Hide resolved

impl Gauge {
/// Increment the gauge by one.
pub fn incr(&mut self) {
if let Some(new_value) = self.0.checked_add(1) {
(*self).0 = new_value;
} else {
warn!("Gauge overflow");
}
pub fn incr(&self) {
self.0.fetch_add(1, Ordering::Release);
}

/// Decrement the gauge by one.
pub fn decr(&mut self) {
if let Some(new_value) = self.0.checked_sub(1) {
(*self).0 = new_value;
} else {
warn!("Gauge underflow");
}
pub fn decr(&self) {
self.0.fetch_sub(1, Ordering::Release);
}
olix0r marked this conversation as resolved.
Show resolved Hide resolved

pub fn value(&self) -> u64 {
self.0
.load(Ordering::Acquire)
.wrapping_rem(MAX_PRECISE_VALUE + 1)
}
}

impl From<u64> for Gauge {
fn from(n: u64) -> Self {
Gauge(n)
Gauge(n.into())
}
}

impl Into<u64> for Gauge {
fn into(self) -> u64 {
self.0
self.value()
}
}

impl FmtMetric for Gauge {
const KIND: &'static str = "gauge";

fn fmt_metric<N: Display>(&self, f: &mut fmt::Formatter<'_>, name: N) -> fmt::Result {
writeln!(f, "{} {}", name, self.0)
writeln!(f, "{} {}", name, self.value())
}

fn fmt_metric_labeled<N, L>(
@@ -59,6 +55,6 @@ impl FmtMetric for Gauge {
{
write!(f, "{}{{", name)?;
labels.fmt_labels(f)?;
writeln!(f, "}} {}", self.0)
writeln!(f, "}} {}", self.value())
}
}
58 changes: 30 additions & 28 deletions linkerd/metrics/src/histogram.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ use std::{cmp, iter, slice};
use super::{Counter, FmtLabels, FmtMetric};

/// A series of latency values and counts.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct Histogram<V: Into<u64>> {
bounds: &'static Bounds,
buckets: Box<[Counter]>,
@@ -71,7 +71,7 @@ impl<V: Into<u64>> Histogram<V> {
}
}

pub fn add<U: Into<V>>(&mut self, u: U) {
pub fn add<U: Into<V>>(&self, u: U) {
let v: V = u.into();
let value: u64 = v.into();

@@ -86,17 +86,17 @@ impl<V: Into<u64>> Histogram<V> {
.expect("all values must fit into a bucket");

self.buckets[idx].incr();
self.sum += value;
self.sum.add(value);
}
}

#[cfg(any(test, feature = "test_util"))]
impl<V: Into<u64>> Histogram<V> {
/// Assert the bucket containing `le` has a count of at least `at_least`.
pub fn assert_bucket_at_least(&self, le: u64, at_least: u64) {
for (&bucket, &count) in self {
for (&bucket, ref count) in self {
if bucket >= le {
let count: u64 = count.into();
let count = count.value();
assert!(count >= at_least, "le={:?}; bucket={:?};", le, bucket);
break;
}
@@ -105,9 +105,9 @@ impl<V: Into<u64>> Histogram<V> {

/// Assert the bucket containing `le` has a count of exactly `exactly`.
pub fn assert_bucket_exactly(&self, le: u64, exactly: u64) -> &Self {
for (&bucket, &count) in self {
for (&bucket, ref count) in self {
if bucket >= le {
let count: u64 = count.into();
let count = count.value();
assert_eq!(
count, exactly,
"le={:?}; bucket={:?}; buckets={:#?};",
@@ -137,7 +137,7 @@ impl<V: Into<u64>> Histogram<V> {
break;
}

let count: u64 = self.buckets[i].into();
let count: u64 = self.buckets[i].value();
assert_eq!(count, exactly, "bucket={:?}; value={:?};", bucket, value,);
}
self
@@ -149,7 +149,7 @@ impl<V: Into<u64>> Histogram<V> {
// We set this to true after we've iterated past the first bucket
// whose upper bound is >= `value`.
let mut past_le = false;
for (&bucket, &count) in self {
for (&bucket, ref count) in self {
if bucket < value {
continue;
}
@@ -160,8 +160,13 @@ impl<V: Into<u64>> Histogram<V> {
}

if past_le {
let count: u64 = count.into();
assert_eq!(count, exactly, "bucket={:?}; value={:?};", bucket, value,);
assert_eq!(
count.value(),
exactly,
"bucket={:?}; value={:?};",
bucket,
value,
);
}
}
self
@@ -181,9 +186,9 @@ impl<V: Into<u64>> FmtMetric for Histogram<V> {
const KIND: &'static str = "histogram";

fn fmt_metric<N: fmt::Display>(&self, f: &mut fmt::Formatter<'_>, name: N) -> fmt::Result {
let mut total = Counter::default();
let total = Counter::default();
for (le, count) in self {
total += *count;
total.add(count.value());
total.fmt_metric_labeled(f, Key(&name, "bucket"), Label("le", le))?;
}
total.fmt_metric(f, Key(&name, "count"))?;
@@ -202,9 +207,9 @@ impl<V: Into<u64>> FmtMetric for Histogram<V> {
N: fmt::Display,
L: FmtLabels,
{
let mut total = Counter::default();
let total = Counter::default();
for (le, count) in self {
total += *count;
total.add(count.value());
total.fmt_metric_labeled(f, Key(&name, "bucket"), (&labels, Label("le", le)))?;
}
total.fmt_metric_labeled(f, Key(&name, "count"), &labels)?;
@@ -340,7 +345,7 @@ mod tests {

quickcheck! {
fn bucket_incremented(obs: u64) -> bool {
let mut hist = Histogram::<u64>::new(&BOUNDS);
let hist = Histogram::<u64>::new(&BOUNDS);
hist.add(obs);
// The bucket containing `obs` must have count 1.
hist.assert_bucket_exactly(obs, 1)
@@ -354,34 +359,31 @@ mod tests {
}

fn sum_equals_total_of_observations(observations: Vec<u64>) -> bool {
let mut hist = Histogram::<u64>::new(&BOUNDS);
let hist = Histogram::<u64>::new(&BOUNDS);

let mut expected_sum = Counter::default();
let expected_sum = Counter::default();
for obs in observations {
expected_sum += obs;
expected_sum.add(obs);
hist.add(obs);
}

hist.sum == expected_sum
hist.sum.value() == expected_sum.value()
}

fn count_equals_number_of_observations(observations: Vec<u64>) -> bool {
let mut hist = Histogram::<u64>::new(&BOUNDS);
let hist = Histogram::<u64>::new(&BOUNDS);

for obs in &observations {
hist.add(*obs);
}

let count: u64 = hist.buckets.iter().map(|&c| {
let count: u64 = c.into();
count
}).sum();
count as usize == observations.len()
let count = hist.buckets.iter().map(|ref c| c.value()).sum::<u64>() as usize;
count == observations.len()
}

fn multiple_observations_increment_buckets(observations: Vec<u64>) -> bool {
let mut buckets_and_counts: HashMap<usize, u64> = HashMap::new();
let mut hist = Histogram::<u64>::new(&BOUNDS);
let hist = Histogram::<u64>::new(&BOUNDS);

for obs in observations {
let incremented_bucket = &BOUNDS.0.iter()
@@ -398,7 +400,7 @@ mod tests {
}

for (i, count) in hist.buckets.iter().enumerate() {
let count: u64 = (*count).into();
let count = count.value();
assert_eq!(buckets_and_counts.get(&i).unwrap_or(&0), &count);
}
true
9 changes: 8 additions & 1 deletion linkerd/metrics/src/prom.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
use std::fmt;
use std::marker::{PhantomData, Sized};

/// Largest `u64` that can fit without loss of precision in `f64` (2^53).
///
/// Wrapping is based on the fact that Prometheus models values as f64 (52-bits
/// mantissa), thus integer values over 2^53 are not guaranteed to be correctly
/// exposed.
pub(crate) const MAX_PRECISE_VALUE: u64 = 0x20_0000_0000_0000;

/// Writes a block of metrics in prometheus-formatted output.
pub trait FmtMetrics {
fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result;
@@ -90,7 +97,7 @@ impl<'a, N: fmt::Display, M: FmtMetric> Metric<'a, N, M> {
}

/// Formats a single metric without labels.
pub fn fmt_metric(&self, f: &mut fmt::Formatter<'_>, metric: M) -> fmt::Result {
pub fn fmt_metric(&self, f: &mut fmt::Formatter<'_>, metric: &M) -> fmt::Result {
metric.fmt_metric(f, &self.name)
}

34 changes: 10 additions & 24 deletions linkerd/opencensus/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use linkerd2_metrics::{metrics, Counter, FmtMetrics};
use std::fmt;
use std::sync::{Arc, Mutex};
use tracing::error;
use std::sync::Arc;

metrics! {
opencensus_span_export_streams: Counter { "Total count of opened span export streams" },
@@ -16,55 +15,42 @@ struct Metrics {
}

#[derive(Clone)]
pub struct Registry(Arc<Mutex<Metrics>>);
pub struct Registry(Arc<Metrics>);

#[derive(Clone)]
pub struct Report(Arc<Mutex<Metrics>>);
pub struct Report(Arc<Metrics>);

pub fn new() -> (Registry, Report) {
let metrics = Metrics {
streams: Counter::default(),
requests: Counter::default(),
spans: Counter::default(),
};
let shared = Arc::new(Mutex::new(metrics));
let shared = Arc::new(metrics);
(Registry(shared.clone()), Report(shared))
}

impl Registry {
pub fn start_stream(&mut self) {
match self.0.lock() {
Ok(mut metrics) => metrics.streams.incr(),
Err(e) => error!(message="failed to lock metrics", %e),
}
self.0.streams.incr()
}

pub fn send(&mut self, spans: u64) {
match self.0.lock() {
Ok(mut metrics) => {
metrics.requests.incr();
metrics.spans += spans;
}
Err(e) => error!(message="failed to lock metrics", %e),
}
self.0.requests.incr();
self.0.spans.add(spans);
}
}

impl FmtMetrics for Report {
fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let metrics = match self.0.lock() {
Err(_) => return Ok(()),
Ok(lock) => lock,
};

opencensus_span_export_streams.fmt_help(f)?;
opencensus_span_export_streams.fmt_metric(f, metrics.streams)?;
opencensus_span_export_streams.fmt_metric(f, &self.0.streams)?;

opencensus_span_export_requests.fmt_help(f)?;
opencensus_span_export_requests.fmt_metric(f, metrics.requests)?;
opencensus_span_export_requests.fmt_metric(f, &self.0.requests)?;

opencensus_span_exports.fmt_help(f)?;
opencensus_span_exports.fmt_metric(f, metrics.spans)?;
opencensus_span_exports.fmt_metric(f, &self.0.spans)?;

Ok(())
}
43 changes: 19 additions & 24 deletions linkerd/proxy/transport/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ use linkerd2_metrics::{
use std::fmt;
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::debug;
@@ -76,7 +76,7 @@ struct Metrics {
write_bytes_total: Counter,
read_bytes_total: Counter,

by_eos: IndexMap<Eos, EosMetrics>,
by_eos: Arc<Mutex<IndexMap<Eos, EosMetrics>>>,
}

/// Describes a classtransport end.
@@ -97,17 +97,17 @@ struct EosMetrics {
/// Tracks the state of a single instance of `Io` throughout its lifetime.
#[derive(Debug)]
struct Sensor {
metrics: Option<Arc<Mutex<Metrics>>>,
metrics: Option<Arc<Metrics>>,
opened_at: Instant,
}

/// Lazily builds instances of `Sensor`.
#[derive(Clone, Debug)]
struct NewSensor(Arc<Mutex<Metrics>>);
struct NewSensor(Arc<Metrics>);

/// Shares state between `Report` and `Registry`.
#[derive(Debug)]
struct Inner<K: Eq + Hash + FmtLabels>(IndexMap<K, Arc<Mutex<Metrics>>>);
struct Inner<K: Eq + Hash + FmtLabels>(IndexMap<K, Arc<Metrics>>);

// ===== impl Inner =====

@@ -122,10 +122,8 @@ impl<K: Eq + Hash + FmtLabels> Inner<K> {
self.0.is_empty()
}

fn iter(&self) -> impl Iterator<Item = (&K, MutexGuard<'_, Metrics>)> {
self.0
.iter()
.filter_map(|(k, l)| l.lock().ok().map(move |m| (k, m)))
fn iter(&self) -> impl Iterator<Item = (&K, &Arc<Metrics>)> {
self.0.iter()
}

/// Formats a metric across all instances of `Metrics` in the registry.
@@ -160,15 +158,17 @@ impl<K: Eq + Hash + FmtLabels> Inner<K> {
M: FmtMetric,
{
for (key, metrics) in self.iter() {
for (eos, m) in (*metrics).by_eos.iter() {
get_metric(&*m).fmt_metric_labeled(f, &metric.name, (key, eos))?;
if let Ok(by_eos) = (*metrics).by_eos.lock() {
for (eos, m) in by_eos.iter() {
get_metric(&*m).fmt_metric_labeled(f, &metric.name, (key, eos))?;
}
}
}

Ok(())
}

fn get_or_default(&mut self, k: K) -> &Arc<Mutex<Metrics>> {
fn get_or_default(&mut self, k: K) -> &Arc<Metrics> {
self.0.entry(k).or_insert_with(|| Default::default())
}
}
@@ -337,12 +337,9 @@ impl<K: Eq + Hash + FmtLabels> FmtMetrics for Report<K> {
// ===== impl Sensor =====

impl Sensor {
pub fn open(metrics: Arc<Mutex<Metrics>>) -> Self {
{
let mut m = metrics.lock().expect("metrics registry poisoned");
m.open_total.incr();
m.open_connections.incr();
}
pub fn open(metrics: Arc<Metrics>) -> Self {
metrics.open_total.incr();
metrics.open_connections.incr();
Self {
metrics: Some(metrics),
opened_at: Instant::now(),
@@ -351,15 +348,13 @@ impl Sensor {

pub fn record_read(&mut self, sz: usize) {
if let Some(ref m) = self.metrics {
let mut m = m.lock().expect("metrics registry poisoned");
m.read_bytes_total += sz as u64;
m.read_bytes_total.add(sz as u64);
}
}

pub fn record_write(&mut self, sz: usize) {
if let Some(ref m) = self.metrics {
let mut m = m.lock().expect("metrics registry poisoned");
m.write_bytes_total += sz as u64;
m.write_bytes_total.add(sz as u64);
}
}

@@ -369,10 +364,10 @@ impl Sensor {
// updates can occur (i.e. so that an additional close won't be recorded
// on Drop).
if let Some(m) = self.metrics.take() {
let mut m = m.lock().expect("metrics registry poisoned");
m.open_connections.decr();

let class = m.by_eos.entry(Eos(eos)).or_insert_with(EosMetrics::default);
let mut by_eos = m.by_eos.lock().expect("transport eos metrics lock");
let class = by_eos.entry(Eos(eos)).or_insert_with(EosMetrics::default);
class.close_total.incr();
class.connection_duration.add(duration);
}