Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce db stats collector #648

Merged
merged 15 commits into from
Sep 26, 2024
Merged
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ sec: ## Run security checks

.PHONY: fmt
fmt: install-tools ## Formats all go files
$(GO) generate ./...
$(GO) run $(govulncheck) ./...
$(GO) run $(gofumpt) -l -w -extra .
find . -type f -name '*.go' -exec grep -L -E 'Code generated by .*\. DO NOT EDIT.' {} + | xargs $(GO) run $(goimports) -format-only -w -local=github.com/rudderlabs
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ require (
dario.cat/mergo v1.0.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/actgardner/gogen-avro/v10 v10.2.1 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/DataDog/zstd v1.5.6 h1:LbEglqepa/ipmmQJUDnSsfvA8e8IStVcGaFWDuxvGOY=
github.com/DataDog/zstd v1.5.6/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
Expand Down Expand Up @@ -320,6 +322,7 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNU
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
Expand Down
77 changes: 77 additions & 0 deletions stats/collectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package stats

import (
"context"
"fmt"
"sync"
"time"
)

const defaultPauseDur = 10 * time.Second

type gaugeTagsFunc = func(key string, tags Tags, val uint64)

type Collector interface {
Collect(gaugeTagsFunc)
Zero(gaugeTagsFunc)
ID() string
}

type aggregatedCollector struct {
c map[string]Collector
PauseDur time.Duration
gaugeFunc gaugeTagsFunc
mu sync.Mutex
}

func (p *aggregatedCollector) Add(c Collector) error {
p.mu.Lock()
defer p.mu.Unlock()

if p.c == nil {
p.c = make(map[string]Collector)
}

if _, ok := p.c[c.ID()]; ok {
return fmt.Errorf("collector with ID %s already register", c.ID())
}

p.c[c.ID()] = c
return nil
}

func (p *aggregatedCollector) Run(ctx context.Context) {
defer p.allZero()
p.allCollect()

if p.PauseDur <= 0 {
p.PauseDur = defaultPauseDur
}

tick := time.NewTicker(p.PauseDur)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
p.allCollect()
}
}
}

func (p *aggregatedCollector) allCollect() {
p.mu.Lock()
defer p.mu.Unlock()
for _, c := range p.c {
c.Collect(p.gaugeFunc)
}
}

func (p *aggregatedCollector) allZero() {
p.mu.Lock()
defer p.mu.Unlock()
for _, c := range p.c {
c.Zero(p.gaugeFunc)
}
}
62 changes: 62 additions & 0 deletions stats/collectors/sqldb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package collectors

import (
"database/sql"
"fmt"

"github.com/rudderlabs/rudder-go-kit/stats"
)

const (
uniqName = "database_sql_%s"
)

type SQLDBStats struct {
name string
db *sql.DB
}

func NewDatabaseSQLStats(name string, db *sql.DB) *SQLDBStats {
return &SQLDBStats{
name: name,
db: db,
}
}

func (s *SQLDBStats) Collect(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
dbStats := s.db.Stats()
tags := stats.Tags{"name": s.name}

gaugeFunc("sql_db_max_open_connections", tags, uint64(dbStats.MaxOpenConnections))
gaugeFunc("sql_db_open_connections", tags, uint64(dbStats.OpenConnections))
gaugeFunc("sql_db_in_use_connections", tags, uint64(dbStats.InUse))
gaugeFunc("sql_db_idle_connections", tags, uint64(dbStats.Idle))

gaugeFunc("sql_db_wait_count_total", tags, uint64(dbStats.WaitCount))
gaugeFunc("sql_db_wait_duration_seconds_total", tags, uint64(dbStats.WaitDuration.Seconds()))

gaugeFunc("sql_db_max_idle_closed_total", tags, uint64(dbStats.MaxIdleClosed))
gaugeFunc("sql_db_max_idle_time_closed_total", tags, uint64(dbStats.MaxIdleTimeClosed))
gaugeFunc("sql_db_max_lifetime_closed_total", tags, uint64(dbStats.MaxLifetimeClosed))
}

func (s *SQLDBStats) Zero(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
tags := stats.Tags{"name": s.name}

gaugeFunc("sql_db_max_open_connections", tags, 0)

gaugeFunc("sql_db_open_connections", tags, 0)
gaugeFunc("sql_db_in_use_connections", tags, 0)
gaugeFunc("sql_db_idle_connections", tags, 0)

gaugeFunc("sql_db_wait_count_total", tags, 0)
gaugeFunc("sql_db_wait_duration_seconds_total", tags, 0)

gaugeFunc("sql_db_max_idle_closed_total", tags, 0)
gaugeFunc("sql_db_max_idle_time_closed_total", tags, 0)
gaugeFunc("sql_db_max_lifetime_closed_total", tags, 0)
}

func (s *SQLDBStats) ID() string {
return fmt.Sprintf(uniqName, s.name)
}
80 changes: 80 additions & 0 deletions stats/collectors/sqldb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package collectors_test

import (
"testing"

"github.com/DATA-DOG/go-sqlmock"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/collectors"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
)

func TestSQLDatabase(t *testing.T) {
db, _, err := sqlmock.New()
if err != nil {
t.Fatalf("an error '%s' was not expected when opening a stub database connection", err)
}
defer db.Close()

db.SetMaxOpenConns(5)

m, err := memstats.New()
require.NoError(t, err)

testName := "test_sqlite"
s := collectors.NewDatabaseSQLStats(testName, db)

err = m.RegisterCollector(s)
require.NoError(t, err)

require.Equal(t, []memstats.Metric{
{
Name: "sql_db_idle_connections",
Tags: stats.Tags{"name": testName},
Value: 1,
},
{
Name: "sql_db_in_use_connections",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_max_idle_closed_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_max_idle_time_closed_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_max_lifetime_closed_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_max_open_connections",
Tags: stats.Tags{"name": testName},
Value: 5,
},
{
Name: "sql_db_open_connections",
Tags: stats.Tags{"name": testName},
Value: 1,
},
{
Name: "sql_db_wait_count_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
{
Name: "sql_db_wait_duration_seconds_total",
Tags: stats.Tags{"name": testName},
Value: 0,
},
}, m.GetAll())
}
39 changes: 39 additions & 0 deletions stats/collectors/static.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package collectors

import (
"fmt"

"github.com/rudderlabs/rudder-go-kit/stats"
)

const (
statsUniqName = "static_%s_%s"
)

type staticStats struct {
tags stats.Tags
key string
value uint64
}

// NewStaticMetric allows to capture a gauge metric that does not change during the lifetime of the application.
// Can be useful for capturing configuration values or application version.
func NewStaticMetric(key string, tags stats.Tags, value uint64) *staticStats {
return &staticStats{
tags: tags,
key: key,
value: value,
}
}

func (s *staticStats) Collect(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
gaugeFunc(s.key, s.tags, s.value)
}

func (s *staticStats) Zero(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
gaugeFunc(s.key, s.tags, 0)
}

func (s *staticStats) ID() string {
return fmt.Sprintf(statsUniqName, s.key, s.tags.String())
}
32 changes: 32 additions & 0 deletions stats/collectors/static_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package collectors_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/collectors"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
)

func TestStatic(t *testing.T) {
testName := "test_sqlite"
s := collectors.NewStaticMetric(testName, stats.Tags{
"foo": "bar",
}, 2)

m, err := memstats.New()
require.NoError(t, err)

err = m.RegisterCollector(s)
require.NoError(t, err)

require.Equal(t, []memstats.Metric{
{
Name: testName,
Tags: stats.Tags{"foo": "bar"},
Value: 2,
},
}, m.GetAll())
}
7 changes: 7 additions & 0 deletions stats/memstats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,13 @@ func (*Store) Start(_ context.Context, _ stats.GoRoutineFactory) error { return
// Stop implements stats.Stats
func (*Store) Stop() {}

func (s *Store) RegisterCollector(c stats.Collector) error {
c.Collect(func(key string, tags stats.Tags, val uint64) {
s.NewTaggedStat(key, stats.GaugeType, tags).Gauge(val)
})
return nil
}

// getKey maps name and tags, to a store lookup key.
func (*Store) getKey(name string, tags stats.Tags) string {
return name + tags.String()
Expand Down
14 changes: 14 additions & 0 deletions stats/mock_stats/mock_stats.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions stats/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ func (*nop) NewTracer(_ string) Tracer {

func (*nop) Start(_ context.Context, _ GoRoutineFactory) error { return nil }
func (*nop) Stop() {}

func (*nop) RegisterCollector(c Collector) error { return nil }
Loading
Loading