Skip to content

Commit

Permalink
Merge pull request elastic#2 from monicasarbu/sigar
Browse files Browse the repository at this point in the history
Calculate %CPU per process as it's done by top
  • Loading branch information
monicasarbu committed Aug 6, 2015
2 parents 8b2dc84 + 08fb8bb commit 51d0cf5
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 73 deletions.
2 changes: 1 addition & 1 deletion etc/topbeat.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
input:
period: 1

procs: [".*"]
procs: ["topbeat"]


############################# Shipper ############################################
Expand Down
167 changes: 114 additions & 53 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
var Version = "1.0.0-beta2"
var Name = "topbeat"

type ProcsMap map[int]*Process

type Topbeat struct {
isAlive bool
period time.Duration
procs []string
isAlive bool
period time.Duration
procs []string
procsMap ProcsMap

events chan common.MapStr
}
Expand Down Expand Up @@ -58,74 +61,132 @@ func (t *Topbeat) MatchProcess(name string) bool {
return false
}

func (t *Topbeat) Run() error {
func (t *Topbeat) initProcStats() {

t.isAlive = true
t.procsMap = make(ProcsMap)

for t.isAlive {
time.Sleep(1 * time.Second)
pids, err := Pids()
if err != nil {
logp.Warn("Getting the list of pids: %v", err)
}

load_stat, err := GetSystemLoad()
for _, pid := range pids {
process, err := GetProcess(pid)
if err != nil {
logp.Warn("Getting load statistics: %v", err)
continue
}
cpu_stat, err := GetCpuTimes()
if err != nil {
logp.Warn("Getting cpu times: %v", err)
logp.Debug("topbeat", "Skip process %d: %v", pid, err)
continue
}
t.procsMap[process.Pid] = process
}
}

func (t *Topbeat) exportProcStats() error {

pids, err := Pids()
if err != nil {
logp.Warn("Getting the list of pids: %v", err)
return err
}

mem_stat, err := GetMemory()
for _, pid := range pids {
process, err := GetProcess(pid)
if err != nil {
logp.Warn("Getting memory details: %v", err)
logp.Debug("topbeat", "Skip process %d: %v", pid, err)
continue
}
swap_stat, err := GetSwap()
if err != nil {
logp.Warn("Getting swap details: %v", err)
}

pids, err := Pids()
if err != nil {
logp.Warn("Getting the list of pids: %v", err)
}
if t.MatchProcess(process.Name) {

for _, pid := range pids {
process, err := GetProcess(pid)
if err != nil {
logp.Debug("topbeat", "Skip process %d: %v", pid, err)
continue
}
process.Cpu.User_p = t.getCpuPercent(process)

t.procsMap[process.Pid] = process

if t.MatchProcess(process.Name) {

logp.Debug("topbeat", "Process: %s", process)

event := common.MapStr{
"timestamp": common.Time(time.Now()),
"type": "proc",
"proc.pid": process.Pid,
"proc.ppid": process.Ppid,
"proc.name": process.Name,
"proc.state": process.State,
"proc.mem": process.Mem,
"proc.cpu": process.Cpu,
}
t.events <- event
logp.Debug("topbeat", "Process: %s", process)

event := common.MapStr{
"timestamp": common.Time(time.Now()),
"type": "proc",
"proc.pid": process.Pid,
"proc.ppid": process.Ppid,
"proc.name": process.Name,
"proc.state": process.State,
"proc.mem": process.Mem,
"proc.cpu": process.Cpu,
}
t.events <- event
}
}
return nil
}

func (t *Topbeat) exportSystemStats() error {

event := common.MapStr{
"timestamp": common.Time(time.Now()),
"type": "system",
"load": load_stat,
"cpu": cpu_stat,
"mem": mem_stat,
"swap": swap_stat,
load_stat, err := GetSystemLoad()
if err != nil {
logp.Warn("Getting load statistics: %v", err)
return err
}
cpu_stat, err := GetCpuTimes()
if err != nil {
logp.Warn("Getting cpu times: %v", err)
return err
}

mem_stat, err := GetMemory()
if err != nil {
logp.Warn("Getting memory details: %v", err)
return err
}
swap_stat, err := GetSwap()
if err != nil {
logp.Warn("Getting swap details: %v", err)
return err
}

event := common.MapStr{
"timestamp": common.Time(time.Now()),
"type": "system",
"load": load_stat,
"cpu": cpu_stat,
"mem": mem_stat,
"swap": swap_stat,
}

t.events <- event

return nil
}

func (t *Topbeat) getCpuPercent(proc *Process) float64 {

oproc, ok := t.procsMap[proc.Pid]
if ok {

elapsed := proc.lastCPUTime.Sub(oproc.lastCPUTime).Seconds()
if elapsed <= 0 {
elapsed = 1
}
ret := float64(proc.Cpu.User-oproc.Cpu.User) / float64(elapsed)
if ret < 0.0001 {
ret = 0
}

return ret
}
return 0
}

func (t *Topbeat) Run() error {

t.isAlive = true

t.initProcStats()

for t.isAlive {
time.Sleep(1 * time.Second)

t.events <- event
t.exportSystemStats()
t.exportProcStats()
}
return nil
}
Expand Down
44 changes: 25 additions & 19 deletions sigar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"time"

"github.com/cloudfoundry/gosigar"
"github.com/elastic/libbeat/logp"
Expand Down Expand Up @@ -39,19 +40,21 @@ type ProcMemStat struct {
}

type ProcCpuTime struct {
User uint64 `json:"user"`
System uint64 `json:"system"`
Total uint64 `json:"total"`
Start string `json:"start_time"`
User uint64 `json:"user"`
User_p float64 `json:"user_p"`
System uint64 `json:"system"`
Total uint64 `json:"total"`
Start string `json:"start_time"`
}

type Process struct {
Pid int `json:"pid"`
Ppid int `json:"ppid"`
Name string `json:"name"`
State string `json:"state"`
Mem ProcMemStat `json:"mem"`
Cpu ProcCpuTime `json:"cpu"`
Pid int `json:"pid"`
Ppid int `json:"ppid"`
Name string `json:"name"`
State string `json:"state"`
Mem ProcMemStat `json:"mem"`
Cpu ProcCpuTime `json:"cpu"`
lastCPUTime time.Time
}

func (p *Process) String() string {
Expand All @@ -66,7 +69,7 @@ func (m *ProcMemStat) String() string {
}

func (t *ProcCpuTime) String() string {
return fmt.Sprintf("started at %s, %d total, %d us, %d sys", t.Start, t.Total, t.User, t.System)
return fmt.Sprintf("started at %s, %d total, %d us (%.2f), %d sys", t.Start, t.Total, t.User, t.User_p, t.System)

}

Expand Down Expand Up @@ -187,7 +190,7 @@ func GetProcess(pid int) (*Process, error) {

state := sigar.ProcState{}
mem := sigar.ProcMem{}
time := sigar.ProcTime{}
cpu := sigar.ProcTime{}

err := state.Get(pid)
if err != nil {
Expand All @@ -199,12 +202,12 @@ func GetProcess(pid int) (*Process, error) {
return nil, err
}

err = time.Get(pid)
err = cpu.Get(pid)
if err != nil {
return nil, err
}

return &Process{
proc := Process{
Pid: pid,
Ppid: state.Ppid,
Name: state.Name,
Expand All @@ -215,10 +218,13 @@ func GetProcess(pid int) (*Process, error) {
Share: mem.Share / 1024,
},
Cpu: ProcCpuTime{
Start: time.FormatStartTime(),
Total: time.Total,
User: time.User,
System: time.Sys,
Start: cpu.FormatStartTime(),
Total: cpu.Total,
User: cpu.User,
System: cpu.Sys,
},
}, nil
}

proc.lastCPUTime = time.Now()
return &proc, nil
}

0 comments on commit 51d0cf5

Please sign in to comment.