Skip to content

Commit

Permalink
feat: introduce db stats collector
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach committed Sep 20, 2024
1 parent 1a4a26b commit 393b4f1
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 1 deletion.
64 changes: 64 additions & 0 deletions stats/collectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package stats

import (
"context"
"sync"
"time"
)

type gaugeTagsFunc func(key string, tags Tags, val uint64)

type collector interface {
Collect(gaugeTagsFunc)
Zero(gaugeTagsFunc)
}

type aggregatedCollector struct {
c []collector
PauseDur time.Duration
gaugeFunc gaugeTagsFunc
mu sync.Mutex
}

func (p *aggregatedCollector) Add(c collector) {
p.mu.Lock()
defer p.mu.Unlock()
p.c = append(p.c, c)
}

func (p *aggregatedCollector) Run(ctx context.Context) {
defer p.allZero()
p.allCollect()

if p.PauseDur <= 0 {
p.PauseDur = 10 * time.Second
return
}

tick := time.NewTicker(p.PauseDur)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
p.allCollect()
}
}
}

func (p *aggregatedCollector) allCollect() {
p.mu.Lock()
defer p.mu.Unlock()
for _, c := range p.c {
c.Collect(p.gaugeFunc)
}
}

func (p *aggregatedCollector) allZero() {
p.mu.Lock()
defer p.mu.Unlock()
for _, c := range p.c {
c.Zero(p.gaugeFunc)
}
}
54 changes: 54 additions & 0 deletions stats/collectors/sqldb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package collectors

import (
"database/sql"

"github.com/rudderlabs/rudder-go-kit/stats"
)

type sqlDBStats struct {
name string
db *sql.DB
}

func NewDatabaseSQLStats(name string, db *sql.DB) *sqlDBStats {
return &sqlDBStats{
name: name,
db: db,
}
}

func (s *sqlDBStats) Collect(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
dbStats := s.db.Stats()
tags := stats.Tags{"name": s.name}

gaugeFunc("sql_db_max_open_connections", tags, uint64(dbStats.MaxOpenConnections))
gaugeFunc("sql_db_open_connections", tags, uint64(dbStats.OpenConnections))
gaugeFunc("sql_db_in_use_connections", tags, uint64(dbStats.InUse))
gaugeFunc("sql_db_idle_connections", tags, uint64(dbStats.Idle))

gaugeFunc("sql_db_wait_count_total", tags, uint64(dbStats.WaitCount))
gaugeFunc("sql_db_wait_duration_seconds_total", tags, uint64(dbStats.WaitDuration.Seconds()))

gaugeFunc("sql_db_max_idle_closed_total", tags, uint64(dbStats.MaxIdleClosed))
gaugeFunc("sql_db_max_idle_time_closed_total", tags, uint64(dbStats.MaxIdleTimeClosed))
gaugeFunc("sql_db_max_lifetime_closed_total", tags, uint64(dbStats.MaxLifetimeClosed))
}

func (s *sqlDBStats) Zero(gaugeFunc func(key string, tag stats.Tags, val uint64)) {
tags := stats.Tags{"name": s.name}

gaugeFunc("sql_db_max_open_connections", tags, 0)

gaugeFunc("sql_db_open_connections", tags, 0)
gaugeFunc("sql_db_in_use_connections", tags, 0)
gaugeFunc("sql_db_idle_connections", tags, 0)

gaugeFunc("sql_db_wait_count_total", tags, 0)
gaugeFunc("sql_db_wait_duration_seconds_total", tags, 0)

gaugeFunc("sql_db_max_idle_closed_total", tags, 0)
gaugeFunc("sql_db_max_idle_time_closed_total", tags, 0)
gaugeFunc("sql_db_max_lifetime_closed_total", tags, 0)

}
17 changes: 16 additions & 1 deletion stats/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,14 @@ func (s *statsdStats) collectPeriodicStats(goFactory GoRoutineFactory) {
s.state.rc.EnableCPU = s.config.periodicStatsConfig.enableCPUStats
s.state.rc.EnableMem = s.config.periodicStatsConfig.enableMemStats
s.state.rc.EnableGC = s.config.periodicStatsConfig.enableGCStats
gaugeTagsFunc := func(key string, tags Tags, val uint64) {
s.NewTaggedStat(key, GaugeType, tags).Gauge(val)
}
s.state.ac = aggregatedCollector{
gaugeFunc: gaugeTagsFunc,
PauseDur: time.Duration(s.config.periodicStatsConfig.statsCollectionInterval) * time.Second,
}

s.state.mc = newMetricStatsCollector(s, s.config.periodicStatsConfig.metricManager)
if s.config.periodicStatsConfig.enabled {
var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -135,10 +141,18 @@ func (s *statsdStats) collectPeriodicStats(goFactory GoRoutineFactory) {
defer wg.Done()
s.state.mc.run(s.backgroundCollectionCtx)
})
// goFactory.Go(func() {
// defer wg.Done()
// s.state.ac.Run(s.backgroundCollectionCtx)
// })
wg.Wait()
}
}

func (s *statsdStats) RegisterCollector(c collector) {
s.state.ac.Add(c)
}

// Stop stops periodic collection of stats.
func (s *statsdStats) Stop() {
s.state.clientsLock.RLock()
Expand Down Expand Up @@ -292,6 +306,7 @@ type statsdState struct {
conn statsd.Option
client *statsdClient
rc runtimeStatsCollector
ac aggregatedCollector
mc metricStatsCollector

clientsLock sync.RWMutex // protects the following
Expand Down

0 comments on commit 393b4f1

Please sign in to comment.