Skip to content

Commit

Permalink
use net pprof, avoid ThinkTime call, fix stats output
Browse files Browse the repository at this point in the history
  • Loading branch information
anayrat committed Apr 22, 2019
1 parent f4c906e commit 7889752
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 45 deletions.
65 changes: 23 additions & 42 deletions cmd/pgcheetah/pgcheetah.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"fmt"
"github.com/anayrat/pgcheetah/pkg/pgcheetah"
"log"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime/pprof"
"runtime/trace"
"sync"
"sync/atomic"
"time"
Expand All @@ -24,7 +24,6 @@ var worker pgcheetah.Worker
var clients = flag.Int("clients", 100, "number of client")
var configFile = flag.String("configfile", "", "configfile")
var connStr = flag.String("constr", "user=postgres dbname=postgres", "pg connstring")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var debug = flag.Bool("debug", false, "debug mode")
var delaystart = flag.Int("delaystart", 0, "spread client start among seconds")
var delayxact = flag.Float64("delayxact", 5, "millisecond between each transaction")
Expand All @@ -34,7 +33,7 @@ var queryfile = flag.String("queryfile", "", "file containing queries to play")
var thinktimemax = flag.Int("thinktimemax", 5, "millisecond thinktime")
var thinktimemin = flag.Int("thinktimemin", 5, "millisecond thinktime")
var tps = flag.Float64("tps", 0, "expected tps")
var traceprofile = flag.String("traceprofile", "", "write trace to file")
var netpprof = flag.Bool("netpprof", false, "enable internal pprof web server")

// Global counters
var (
Expand All @@ -53,17 +52,23 @@ func main() {
var start time.Time

flag.Parse()
if *queryfile == "" {
log.Println("Provide queryfile with -queryfile")
os.Exit(1)
}

if *netpprof {
go func() {
log.Println("Start pprof http server on http://localhost:6060/debug/pprof/")
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
}
think.Min = *thinktimemin
think.Max = *thinktimemax

// Initiate timer, will be reseted later
if *duration != 0 {
timer = time.NewTimer(time.Duration(*duration) * time.Second)
}

// Start profiling if enabled
pproofing()
timer = time.NewTimer(time.Duration(*duration) * time.Second)
timer.Stop()

// capture ctrl+c or end of timer to stop workers and display wait_event counters
c := make(chan os.Signal, 1)
Expand All @@ -72,7 +77,6 @@ func main() {
select {
case <-c:
log.Print("Stop requested, stop clients\n")
cleanup()
for i := 0; i < (*clients + 1); i++ {
done <- true
}
Expand All @@ -83,7 +87,6 @@ func main() {
case <-timer.C:

log.Print("Test finished, stop clients\n")
cleanup()
for i := 0; i < (*clients + 1); i++ {
done <- true
}
Expand All @@ -108,11 +111,17 @@ func main() {
var prevXactCount int64 = 0
var prevQueriesCount int64 = 0
wg.Add(1)
time.Sleep(time.Duration(1) * time.Second)
for i := 0; true; i++ {

if i%(*interval*10) == 0 {
log.Printf("TPS: %d QPS: %d Xact: %d Queries: %d Delay: %.1fms Remaining: %.fs\n",
(xactCount-prevXactCount)*10, (queriesCount-prevQueriesCount)*10, xactCount, queriesCount, *delayxact, float64(*duration)-time.Since(start).Seconds())
if *duration == 0 {
log.Printf("TPS: %d QPS: %d Xact: %d Queries: %d Delay: %.1fms Test duration: %.fs\n",
(xactCount-prevXactCount)*10, (queriesCount-prevQueriesCount)*10, xactCount, queriesCount, *delayxact, time.Since(start).Seconds())
} else {
log.Printf("TPS: %d QPS: %d Xact: %d Queries: %d Delay: %.1fms Remaining: %.fs\n",
(xactCount-prevXactCount)*10, (queriesCount-prevQueriesCount)*10, xactCount, queriesCount, *delayxact, float64(*duration)-time.Since(start).Seconds())
}
}
if *tps != 0 {
if (xactCount-prevXactCount)*10 > int64(*tps*(1+0.01)) {
Expand Down Expand Up @@ -174,33 +183,5 @@ func main() {
go pgcheetah.WaitEventCollector(wait_event, connStr)

wg.Wait()
}

func pproofing() {
switch {
case *cpuprofile != "":
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
log.Println("Start cpuprofiler")
case *traceprofile != "":
f, err := os.Create(*traceprofile)
if err != nil {
log.Fatal(err)
}
log.Println("Start trace")
trace.Start(f)
}
}

func cleanup() {
switch {
case *cpuprofile != "":
pprof.StopCPUProfile()
case *traceprofile != "":
trace.Stop()
}

}
11 changes: 8 additions & 3 deletions pkg/pgcheetah/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,15 @@ func WorkerPGv2(w Worker) {

var randxact, i int
cfg, err := pgx.ParseConnectionString(*w.ConnStr)
if err != nil {
log.Fatal(err)
}
// use simple protocol in order to work with pgbouncer
cfg.PreferSimpleProtocol = true
db, err := pgx.Connect(cfg)

if err != nil {
log.Fatal(err)
log.Fatal(err, " Connection params : ", string(*w.ConnStr))
}

func() {
Expand All @@ -87,7 +90,9 @@ func WorkerPGv2(w Worker) {

atomic.AddInt64(w.QueriesCount, 1)

time.Sleep(time.Duration(ThinkTime(*w.Think)) * time.Millisecond)
if (*w.Think).Min != 0 && (*w.Think).Max != 0 {
time.Sleep(time.Duration(ThinkTime(*w.Think)) * time.Millisecond)
}
select {
case <-w.Done:
return
Expand All @@ -114,7 +119,7 @@ func WaitEventCollector(we map[string]int, connStr *string) {
db, err := pgx.Connect(cfg)

if err != nil {
log.Fatal(err)
log.Fatal(err, " Connection params : ", string(*connStr))
}

// Wait event query for postgres 9.6
Expand Down

0 comments on commit 7889752

Please sign in to comment.