Skip to content

Commit

Permalink
eliminate global tracer, add a helper function to grab a per-package …
Browse files Browse the repository at this point in the history
…tracer (creating if needed)
  • Loading branch information
finn-block committed Mar 6, 2024
1 parent eb19ea6 commit aa366f8
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 82 deletions.
2 changes: 1 addition & 1 deletion impl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ require (
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.24.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.23.1
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/sdk/metric v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
Expand Down Expand Up @@ -130,6 +129,7 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
6 changes: 3 additions & 3 deletions impl/pkg/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewTestDHT(t *testing.T, bootstrapPeers ...dht.Addr) *DHT {

// Put puts the given BEP-44 value into the DHT and returns its z32-encoded key.
func (d *DHT) Put(ctx context.Context, request bep44.Put) (string, error) {
ctx, span := telemetry.Tracer.Start(ctx, "DHT.Put")
ctx, span := telemetry.GetTracer("pkg/dht").Start(ctx, "DHT.Put")
defer span.End()

t, err := getput.Put(ctx, request.Target(), d.Server, nil, func(int64) bep44.Put {
Expand All @@ -78,7 +78,7 @@ func (d *DHT) Put(ctx context.Context, request bep44.Put) (string, error) {
// Get returns the BEP-44 result for the given key from the DHT.
// The key is a z32-encoded string, such as "yj47pezutnpw9pyudeeai8cx8z8d6wg35genrkoqf9k3rmfzy58o".
func (d *DHT) Get(ctx context.Context, key string) (*getput.GetResult, error) {
ctx, span := telemetry.Tracer.Start(ctx, "DHT.Get")
ctx, span := telemetry.GetTracer("pkg/dht").Start(ctx, "DHT.Get")
defer span.End()

z32Decoded, err := util.Z32Decode(key)
Expand All @@ -96,7 +96,7 @@ func (d *DHT) Get(ctx context.Context, key string) (*getput.GetResult, error) {
// implementation of getput.Get. It should ONLY be used when it's needed to get the signature
// data for a record.
func (d *DHT) GetFull(ctx context.Context, key string) (*dhtint.FullGetResult, error) {
ctx, span := telemetry.Tracer.Start(ctx, "DHT.GetFull")
ctx, span := telemetry.GetTracer("pkg/dht").Start(ctx, "DHT.GetFull")
defer span.End()

z32Decoded, err := util.Z32Decode(key)
Expand Down
12 changes: 0 additions & 12 deletions impl/pkg/dht/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dht_test
import (
"context"
"encoding/hex"
"os"
"testing"
"time"

Expand All @@ -14,19 +13,8 @@ import (

"github.com/TBD54566975/did-dht-method/internal/util"
dhtclient "github.com/TBD54566975/did-dht-method/pkg/dht"
"github.com/TBD54566975/did-dht-method/pkg/telemetry"
)

func TestMain(m *testing.M) {
// telemetry.Tracer will be nil if this isn't called before the test
if err := telemetry.SetupTelemetry(context.Background()); err != nil {
panic(err)
}
defer telemetry.Shutdown(context.Background())

os.Exit(m.Run())
}

func TestGetPutDHT(t *testing.T) {
ctx := context.Background()

Expand Down
12 changes: 0 additions & 12 deletions impl/pkg/server/server_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package server

import (
"context"
"net/http"
"net/http/httptest"
"os"
Expand All @@ -14,23 +13,12 @@ import (

"github.com/TBD54566975/did-dht-method/config"
"github.com/TBD54566975/did-dht-method/pkg/dht"
"github.com/TBD54566975/did-dht-method/pkg/telemetry"
)

const (
testServerURL = "https://diddht-service.com"
)

func TestMain(m *testing.M) {
// telemetry.Tracer will be nil if this isn't called before the test
if err := telemetry.SetupTelemetry(context.Background()); err != nil {
panic(err)
}
defer telemetry.Shutdown(context.Background())

os.Exit(m.Run())
}

func TestHealthCheckAPI(t *testing.T) {
shutdown := make(chan os.Signal, 1)
serviceConfig, err := config.LoadConfig("")
Expand Down
6 changes: 3 additions & 3 deletions impl/pkg/service/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewPkarrService(cfg *config.Config, db storage.Storage, d *dht.DHT) (*Pkarr

// PublishPkarr stores the record in the db, publishes the given Pkarr record to the DHT, and returns the z-base-32 encoded ID
func (s *PkarrService) PublishPkarr(ctx context.Context, id string, record pkarr.Record) error {
ctx, span := telemetry.Tracer.Start(ctx, "PublishPkarr")
ctx, span := telemetry.GetTracer("pkg/service").Start(ctx, "PublishPkarr")
defer span.End()

if err := record.IsValid(); err != nil {
Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, record pkarr

// GetPkarr returns the full Pkarr record (including sig data) for the given z-base-32 encoded ID
func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response, error) {
ctx, span := telemetry.Tracer.Start(ctx, "GetPkarr")
ctx, span := telemetry.GetTracer("pkg/service").Start(ctx, "GetPkarr")
defer span.End()

// first do a cache lookup
Expand Down Expand Up @@ -173,7 +173,7 @@ func (s *PkarrService) addRecordToCache(id string, resp pkarr.Response) error {

// TODO(gabe) make this more efficient. create a publish schedule based on each individual record, not all records
func (s *PkarrService) republish() {
ctx, span := telemetry.Tracer.Start(context.Background(), "republish")
ctx, span := telemetry.GetTracer("pkg/service").Start(context.Background(), "republish")
defer span.End()

var nextPageToken []byte
Expand Down
10 changes: 0 additions & 10 deletions impl/pkg/service/pkarr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,8 @@ import (
"github.com/TBD54566975/did-dht-method/pkg/dht"
"github.com/TBD54566975/did-dht-method/pkg/pkarr"
"github.com/TBD54566975/did-dht-method/pkg/storage"
"github.com/TBD54566975/did-dht-method/pkg/telemetry"
)

func TestMain(m *testing.M) {
// telemetry.Tracer will be nil if this isn't called before the test
if err := telemetry.SetupTelemetry(context.Background()); err != nil {
panic(err)
}
defer telemetry.Shutdown(context.Background())

os.Exit(m.Run())
}
func TestPKARRService(t *testing.T) {
svc := newPKARRService(t, "a")

Expand Down
12 changes: 6 additions & 6 deletions impl/pkg/storage/db/bolt/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewBolt(path string) (*BoltDB, error) {
// WriteRecord writes the given record to the storage
// TODO: don't overwrite existing records, store unique seq numbers
func (s *BoltDB) WriteRecord(ctx context.Context, record pkarr.Record) error {
ctx, span := telemetry.Tracer.Start(ctx, "bolt.WriteRecord")
ctx, span := telemetry.GetTracer("pkg/storage/bolt").Start(ctx, "WriteRecord")
defer span.End()

encoded := encodeRecord(record)
Expand All @@ -55,7 +55,7 @@ func (s *BoltDB) WriteRecord(ctx context.Context, record pkarr.Record) error {

// ReadRecord reads the record with the given id from the storage
func (s *BoltDB) ReadRecord(ctx context.Context, id []byte) (*pkarr.Record, error) {
ctx, span := telemetry.Tracer.Start(ctx, "bolt.ReadRecord")
ctx, span := telemetry.GetTracer("pkg/storage/bolt").Start(ctx, "ReadRecord")
defer span.End()

recordBytes, err := s.read(ctx, pkarrNamespace, encoding.EncodeToString(id))
Expand All @@ -81,7 +81,7 @@ func (s *BoltDB) ReadRecord(ctx context.Context, id []byte) (*pkarr.Record, erro

// ListRecords lists all records in the storage
func (s *BoltDB) ListRecords(ctx context.Context, nextPageToken []byte, pagesize int) ([]pkarr.Record, []byte, error) {
ctx, span := telemetry.Tracer.Start(ctx, "bold.ListRecords")
ctx, span := telemetry.GetTracer("pkg/storage/bolt").Start(ctx, "ListRecords")
defer span.End()

boltRecords, err := s.readSeveral(ctx, pkarrNamespace, nextPageToken, pagesize)
Expand Down Expand Up @@ -118,7 +118,7 @@ func (s *BoltDB) Close() error {
}

func (s *BoltDB) write(ctx context.Context, namespace string, key string, value []byte) error {
_, span := telemetry.Tracer.Start(ctx, "bolt.write")
_, span := telemetry.GetTracer("pkg/storage/bolt").Start(ctx, "write")
defer span.End()

return s.db.Update(func(tx *bolt.Tx) error {
Expand All @@ -134,7 +134,7 @@ func (s *BoltDB) write(ctx context.Context, namespace string, key string, value
}

func (s *BoltDB) read(ctx context.Context, namespace, key string) ([]byte, error) {
_, span := telemetry.Tracer.Start(ctx, "bolt.read")
_, span := telemetry.GetTracer("pkg/storage/bolt").Start(ctx, "read")
defer span.End()

var result []byte
Expand Down Expand Up @@ -168,7 +168,7 @@ func (s *BoltDB) readAll(namespace string) (map[string][]byte, error) {
}

func (s *BoltDB) readSeveral(ctx context.Context, namespace string, after []byte, count int) ([]boltRecord, error) {
_, span := telemetry.Tracer.Start(ctx, "bolt.readSeveral")
_, span := telemetry.GetTracer("pkg/storage/bolt").Start(ctx, "readSeveral")
defer span.End()

var result []boltRecord
Expand Down
11 changes: 0 additions & 11 deletions impl/pkg/storage/db/bolt/bolt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,12 @@ import (
"github.com/TBD54566975/did-dht-method/internal/did"
"github.com/TBD54566975/did-dht-method/pkg/dht"
"github.com/TBD54566975/did-dht-method/pkg/pkarr"
"github.com/TBD54566975/did-dht-method/pkg/telemetry"
"github.com/goccy/go-json"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestMain(m *testing.M) {
// telemetry.Tracer will be nil if this isn't called before the test
if err := telemetry.SetupTelemetry(context.Background()); err != nil {
panic(err)
}
defer telemetry.Shutdown(context.Background())

os.Exit(m.Run())
}

func TestBoltDB_ReadWrite(t *testing.T) {
ctx := context.Background()

Expand Down
8 changes: 4 additions & 4 deletions impl/pkg/storage/db/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (p Postgres) migrate() error {
}

func (p Postgres) connect(ctx context.Context) (*Queries, *pgx.Conn, error) {
ctx, span := telemetry.Tracer.Start(ctx, "postgres.connect")
ctx, span := telemetry.GetTracer("pkg/storage/postgres").Start(ctx, "connect")
defer span.End()

conn, err := pgx.Connect(ctx, string(p))
Expand All @@ -62,7 +62,7 @@ func (p Postgres) connect(ctx context.Context) (*Queries, *pgx.Conn, error) {
}

func (p Postgres) WriteRecord(ctx context.Context, record pkarr.Record) error {
ctx, span := telemetry.Tracer.Start(ctx, "postgres.WriteRecord")
ctx, span := telemetry.GetTracer("pkg/storage/postgres").Start(ctx, "WriteRecord")
defer span.End()

queries, db, err := p.connect(ctx)
Expand All @@ -85,7 +85,7 @@ func (p Postgres) WriteRecord(ctx context.Context, record pkarr.Record) error {
}

func (p Postgres) ReadRecord(ctx context.Context, id []byte) (*pkarr.Record, error) {
ctx, span := telemetry.Tracer.Start(ctx, "postgres.ReadRecord")
ctx, span := telemetry.GetTracer("pkg/storage/postgres").Start(ctx, "ReadRecord")
defer span.End()

queries, db, err := p.connect(ctx)
Expand All @@ -108,7 +108,7 @@ func (p Postgres) ReadRecord(ctx context.Context, id []byte) (*pkarr.Record, err
}

func (p Postgres) ListRecords(ctx context.Context, nextPageToken []byte, limit int) ([]pkarr.Record, []byte, error) {
ctx, span := telemetry.Tracer.Start(ctx, "postgres.ListRecords")
ctx, span := telemetry.GetTracer("pkg/storage/postgres").Start(ctx, "ListRecords")
defer span.End()

queries, db, err := p.connect(ctx)
Expand Down
11 changes: 0 additions & 11 deletions impl/pkg/storage/db/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,8 @@ import (
"github.com/TBD54566975/did-dht-method/pkg/pkarr"
"github.com/TBD54566975/did-dht-method/pkg/storage"
"github.com/TBD54566975/did-dht-method/pkg/storage/db/postgres"
"github.com/TBD54566975/did-dht-method/pkg/telemetry"
)

func TestMain(m *testing.M) {
// telemetry.Tracer will be nil if this isn't called before the test
if err := telemetry.SetupTelemetry(context.Background()); err != nil {
panic(err)
}
defer telemetry.Shutdown(context.Background())

os.Exit(m.Run())
}

func getTestDB(t *testing.T) storage.Storage {
uri := os.Getenv("TEST_DB")
if uri == "" {
Expand Down
35 changes: 26 additions & 9 deletions impl/pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package telemetry

import (
"context"
"fmt"
"sync"
"time"

"github.com/TBD54566975/did-dht-method/config"
Expand All @@ -10,7 +12,6 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand All @@ -21,10 +22,10 @@ import (
const scopeName = "github.com/TBD54566975/did-dht-method"

var (
Tracer trace.Tracer
traceProvider *sdktrace.TracerProvider
tracers = make(map[string]trace.Tracer)
tracersLock sync.Mutex

Meter metric.Meter
meterProvider *sdkmetric.MeterProvider
)

Expand All @@ -44,7 +45,6 @@ func SetupTelemetry(ctx context.Context) error {
}
traceProvider = sdktrace.NewTracerProvider(sdktrace.WithBatcher(traceExporter), sdktrace.WithResource(r))
otel.SetTracerProvider(traceProvider)
Tracer = traceProvider.Tracer(scopeName, trace.WithInstrumentationVersion(config.Version))

// setup metrics
metricExporter, err := otlpmetrichttp.New(ctx)
Expand All @@ -53,7 +53,6 @@ func SetupTelemetry(ctx context.Context) error {
}
meterProvider = sdkmetric.NewMeterProvider(sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter)))
otel.SetMeterProvider(meterProvider)
Meter = meterProvider.Meter(scopeName, metric.WithInstrumentationVersion(config.Version))

// setup memory metrics
err = runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second * 30))
Expand All @@ -65,11 +64,29 @@ func SetupTelemetry(ctx context.Context) error {
}

func Shutdown(ctx context.Context) {
if err := traceProvider.Shutdown(ctx); err != nil {
logrus.WithError(err).Error("error shutting down trace provider")
if traceProvider != nil {
if err := traceProvider.Shutdown(ctx); err != nil {
logrus.WithError(err).Error("error shutting down trace provider")
}
}

if err := meterProvider.Shutdown(ctx); err != nil {
logrus.WithError(err).Error("error shutting down meter provider")
if meterProvider != nil {
if err := meterProvider.Shutdown(ctx); err != nil {
logrus.WithError(err).Error("error shutting down meter provider")
}
}
}

func GetTracer(subpackage string) trace.Tracer {
tracersLock.Lock()
defer tracersLock.Unlock()

tracer, ok := tracers[subpackage]
if !ok {
name := fmt.Sprintf("%s/%s", scopeName, subpackage)
tracer = otel.GetTracerProvider().Tracer(name, trace.WithInstrumentationVersion(config.Version))
tracers[subpackage] = tracer
}

return tracer
}

0 comments on commit aa366f8

Please sign in to comment.