Skip to content

Commit

Permalink
metrics: use a single slice pool for all metrics tracer (#2054)
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann authored Feb 6, 2023
1 parent 3598171 commit 31966fb
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 27 deletions.
38 changes: 32 additions & 6 deletions p2p/host/eventbus/basic_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"strings"
"sync"

"github.com/libp2p/go-libp2p/p2p/metricshelper"

"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -107,23 +109,43 @@ func NewMetricsTracer(opts ...MetricsTracerOption) MetricsTracer {
}

func (m *metricsTracer) EventEmitted(typ reflect.Type) {
eventsEmitted.WithLabelValues(strings.TrimPrefix(typ.String(), "event.")).Inc()
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, strings.TrimPrefix(typ.String(), "event."))
eventsEmitted.WithLabelValues(*tags...).Inc()
}

func (m *metricsTracer) AddSubscriber(typ reflect.Type) {
totalSubscribers.WithLabelValues(strings.TrimPrefix(typ.String(), "event.")).Inc()
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, strings.TrimPrefix(typ.String(), "event."))
totalSubscribers.WithLabelValues(*tags...).Inc()
}

func (m *metricsTracer) RemoveSubscriber(typ reflect.Type) {
totalSubscribers.WithLabelValues(strings.TrimPrefix(typ.String(), "event.")).Dec()
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, strings.TrimPrefix(typ.String(), "event."))
totalSubscribers.WithLabelValues(*tags...).Dec()
}

func (m *metricsTracer) SubscriberQueueLength(name string, n int) {
subscriberQueueLength.WithLabelValues(name).Set(float64(n))
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, name)
subscriberQueueLength.WithLabelValues(*tags...).Set(float64(n))
}

func (m *metricsTracer) SubscriberQueueFull(name string, isFull bool) {
observer := subscriberQueueFull.WithLabelValues(name)
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, name)
observer := subscriberQueueFull.WithLabelValues(*tags...)
if isFull {
observer.Set(1)
} else {
Expand All @@ -132,5 +154,9 @@ func (m *metricsTracer) SubscriberQueueFull(name string, isFull bool) {
}

func (m *metricsTracer) SubscriberEventQueued(name string) {
subscriberEventQueued.WithLabelValues(name).Inc()
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, name)
subscriberEventQueued.WithLabelValues(*tags...).Inc()
}
7 changes: 5 additions & 2 deletions p2p/host/eventbus/basic_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (

func BenchmarkEventEmitted(b *testing.B) {
b.ReportAllocs()
types := []reflect.Type{reflect.TypeOf(new(event.EvtLocalAddressesUpdated)), reflect.TypeOf(new(event.EvtNATDeviceTypeChanged)),
reflect.TypeOf(new(event.EvtLocalProtocolsUpdated))}
types := []reflect.Type{
reflect.TypeOf(new(event.EvtLocalAddressesUpdated)),
reflect.TypeOf(new(event.EvtNATDeviceTypeChanged)),
reflect.TypeOf(new(event.EvtLocalProtocolsUpdated)),
}
mt := NewMetricsTracer()
for i := 0; i < b.N; i++ {
mt.EventEmitted(types[i%len(types)])
Expand Down
26 changes: 26 additions & 0 deletions p2p/metricshelper/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package metricshelper

import (
"fmt"
"sync"
)

const capacity = 8

var stringPool = sync.Pool{New: func() any {
s := make([]string, 0, capacity)
return &s
}}

func GetStringSlice() *[]string {
s := stringPool.Get().(*[]string)
*s = (*s)[:0]
return s
}

func PutStringSlice(s *[]string) {
if c := cap(*s); c < capacity {
panic(fmt.Sprintf("expected a string slice with capacity 8 or greater, got %d", c))
}
stringPool.Put(s)
}
21 changes: 21 additions & 0 deletions p2p/metricshelper/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package metricshelper

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

func TestStringSlicePool(t *testing.T) {
for i := 0; i < 1e5; i++ {
s := GetStringSlice()
require.Empty(t, *s)
require.Equal(t, 8, cap(*s))
*s = append(*s, "foo")
*s = append(*s, "bar")
if rand.Int()%3 == 0 {
PutStringSlice(s)
}
}
}
28 changes: 9 additions & 19 deletions p2p/net/swarm/swarm_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/p2p/metricshelper"

ma "github.com/multiformats/go-multiaddr"

Expand Down Expand Up @@ -93,19 +94,6 @@ func NewMetricsTracer() *metricsTracer {
return &metricsTracer{}
}

var stringPool = sync.Pool{New: func() any {
s := make([]string, 0, 8)
return &s
}}

func getStringSlice() *[]string {
s := stringPool.Get().(*[]string)
*s = (*s)[:0]
return s
}

func putStringSlice(s *[]string) { stringPool.Put(s) }

func getDirection(dir network.Direction) string {
switch dir {
case network.DirOutbound:
Expand All @@ -132,8 +120,8 @@ func appendConnectionState(tags []string, cs network.ConnectionState) []string {
}

func (m *metricsTracer) OpenedConnection(dir network.Direction, p crypto.PubKey, cs network.ConnectionState) {
tags := getStringSlice()
defer putStringSlice(tags)
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, getDirection(dir))
*tags = appendConnectionState(*tags, cs)
Expand All @@ -146,8 +134,9 @@ func (m *metricsTracer) OpenedConnection(dir network.Direction, p crypto.PubKey,
}

func (m *metricsTracer) ClosedConnection(dir network.Direction, duration time.Duration, cs network.ConnectionState) {
tags := getStringSlice()
defer putStringSlice(tags)
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, getDirection(dir))
*tags = appendConnectionState(*tags, cs)
connsClosed.WithLabelValues(*tags...).Inc()
Expand All @@ -159,8 +148,9 @@ func (m *metricsTracer) ClosedConnection(dir network.Direction, duration time.Du
}

func (m *metricsTracer) CompletedHandshake(t time.Duration, cs network.ConnectionState) {
tags := getStringSlice()
defer putStringSlice(tags)
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = appendConnectionState(*tags, cs)
connHandshakeLatency.WithLabelValues(*tags...).Observe(t.Seconds())
}
Expand Down

0 comments on commit 31966fb

Please sign in to comment.