diff --git a/cmd/pgcheetah/.gitignore b/cmd/pgcheetah/.gitignore new file mode 100644 index 0000000..bf3f8a2 --- /dev/null +++ b/cmd/pgcheetah/.gitignore @@ -0,0 +1 @@ +pgcheetah diff --git a/cmd/pgcheetah/pgcheetah.go b/cmd/pgcheetah/pgcheetah.go index 36bf2d4..0737d8d 100644 --- a/cmd/pgcheetah/pgcheetah.go +++ b/cmd/pgcheetah/pgcheetah.go @@ -10,6 +10,7 @@ import ( "runtime/pprof" "runtime/trace" "sync" + "sync/atomic" "time" ) @@ -26,7 +27,7 @@ var connStr = flag.String("constr", "user=postgres dbname=postgres", "pg connstr var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file") var debug = flag.Bool("debug", false, "debug mode") var delaystart = flag.Int("delaystart", 0, "spread client start among seconds") -var delayxact = flag.Int("delayxact", 5, "millisecond between each transaction") +var delayxact = flag.Float64("delayxact", 5, "millisecond between each transaction") var duration = flag.Int("duration", 0, "Test duration") var queryfile = flag.String("queryfile", "", "file containing queries to play") var thinktimemax = flag.Int("thinktimemax", 5, "millisecond thinktime") @@ -48,6 +49,7 @@ func main() { var timer *time.Timer think := pgcheetah.Thinktime{"uniform", 0, 5} s := pgcheetah.State{"init", 0, false} + var start time.Time flag.Parse() think.Min = *thinktimemin @@ -69,7 +71,7 @@ func main() { case <-c: log.Print("Stop requested, stop clients\n") cleanup() - for i := 0; i < *clients; i++ { + for i := 0; i < (*clients + 1); i++ { done <- true } log.Print("Wait_event count:\n") @@ -80,7 +82,7 @@ func main() { log.Print("Test finished, stop clients\n") cleanup() - for i := 0; i < *clients; i++ { + for i := 0; i < (*clients + 1); i++ { done <- true } log.Print("Wait_event count:\n") @@ -106,19 +108,32 @@ func main() { for i := 0; true; i++ { if i%10 == 0 { - log.Printf("TPS: %d QPS: %d Xact: %d Queries: %d Delay: %dms\n", (xactCount-prevXactCount)*10, (queriesCount-prevQueriesCount)*10, xactCount, queriesCount, *delayxact) + log.Printf("TPS: %d QPS: %d Xact: %d Queries: %d Delay: %.1fms\n", (xactCount-prevXactCount)*10, (queriesCount-prevQueriesCount)*10, xactCount, queriesCount, *delayxact) } if *tps != 0 { - if (xactCount-prevXactCount)*10 > int64(*tps*1.01) { - *delayxact += 1 + if (xactCount-prevXactCount)*10 > int64(*tps*(1+0.1)) { + //log.Printf("> TPS: %d - tps diff %d Delay: %.2fms\n", (xactCount-prevXactCount)*10, int64(*tps*(1+0.1)), *delayxact) + *delayxact += 0.1 - } else if *delayxact > 0 && (xactCount-prevXactCount)*10 < int64(*tps*1.01) { - *delayxact -= 1 + } else if *delayxact > 0.0 && (xactCount-prevXactCount)*10 < int64(*tps*(1-0.1)) { + //log.Printf("< TPS: %d - tps diff %d Delay: %.2fms\n", (xactCount-prevXactCount)*10, int64(*tps*(1-0.1)), *delayxact) + *delayxact -= 0.1 } } prevXactCount = xactCount prevQueriesCount = queriesCount time.Sleep(100 * time.Millisecond) + select { + case <-done: + + t := time.Now() + elapsed := t.Sub(start) + log.Printf("End test - Elapsed %s - Average TPS: %.f - Average QPS: %.f\n", elapsed.String(), float64(xactCount)/elapsed.Seconds(), float64(queriesCount)/elapsed.Seconds()) + return + default: + + } + } }() @@ -140,6 +155,11 @@ func main() { go pgcheetah.WorkerPGv2(worker) } log.Println("All workers launched") + // Workers had already processed transactions before all worker are started. + //Reset counter in order to have accurate stats at the end of the test. + atomic.StoreInt64(&queriesCount, 0) + atomic.StoreInt64(&xactCount, 0) + start = time.Now() wg.Wait() }