diff --git a/.gitignore b/.gitignore index affec96..783e50d 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /pgcheetah +cmd/pgcheetah/pgcheetah diff --git a/cmd/pgcheetah/pgcheetah b/cmd/pgcheetah/pgcheetah deleted file mode 100755 index a41a12d..0000000 Binary files a/cmd/pgcheetah/pgcheetah and /dev/null differ diff --git a/cmd/pgcheetah/pgcheetah.go b/cmd/pgcheetah/pgcheetah.go index 2301acb..1142f9e 100644 --- a/cmd/pgcheetah/pgcheetah.go +++ b/cmd/pgcheetah/pgcheetah.go @@ -18,8 +18,11 @@ import ( // Preallocate 100k transactions var data = make(map[int][]string, 100000) +var delayXactUs int +var start time.Time var wg sync.WaitGroup var worker pgcheetah.Worker +var done chan bool // Command line arguments var clients = flag.Int("clients", 100, "number of client") @@ -45,12 +48,10 @@ func main() { data[0] = []string{""} waitEvent := make(map[string]int) - done := make(chan bool) + done = make(chan bool) var timer *time.Timer think := pgcheetah.ThinkTime{Distribution: "uniform", Min: 0, Max: 5} s := pgcheetah.State{Statedesc: "init", Xact: 0, XactInProgress: false} - var start time.Time - var delayXactUs int flag.Parse() if *queryfile == "" { @@ -106,81 +107,12 @@ func main() { log.Println("Start parsing") xact, err := pgcheetah.ParseXact(data, queryfile, &s, debug) - if err != nil { log.Fatalf("Error during parsing %s", err) } - log.Println("Parsing done, start workers. Transactions processed:", xact) - // Naive tps limiting/throttle - go func() { - time.Sleep(time.Duration(*delaystart) * time.Second) - var prevXactCount int64 - var prevQueriesCount int64 - var curtps float64 - step := 10 - wg.Add(1) - time.Sleep(time.Duration(1) * time.Second) - - // Loop every 100ms to calculate throttle to reach wanted tps - for i := 0; true; i++ { - - curtps = float64(xactCount-prevXactCount) * 10 - - // Reports stats for each inverval - if i%(*interval*10) == 0 { - if *duration == 0 { - log.Printf("TPS: %.f QPS: %d Xact: %d Queries: %d Delay: %s Test duration: %.fs\n", - curtps, (queriesCount-prevQueriesCount)*10, xactCount, queriesCount, - time.Duration(delayXactUs)*time.Microsecond, time.Since(start).Seconds()) - } else { - log.Printf("TPS: %.f QPS: %d Xact: %d Queries: %d Delay: %s Remaining: %.fs\n", - curtps, (queriesCount-prevQueriesCount)*10, xactCount, queriesCount, - time.Duration(delayXactUs)*time.Microsecond, float64(*duration)-time.Since(start).Seconds()) - } - } - if *tps != 0 { - - // We change the step if we are above +/- 1% of wanted tps - if int64(curtps) > int64(*tps*(1+0.01)) { - - // step is calculated in order to, the more we have a difference between wanted tps and current tps - // bigger the step is. Inversely, the more we are close to desirated tps, smaller is the step. - // The empirical formula is: - // step = 10 * deltatps ^ 1.6 + 20 * deltatps - // where delta tps is a ratio between wanted tps and current tps. - - step = int(10*math.Pow(curtps / *tps, 1.6) + 30*curtps / *tps) - //log.Printf("> TPS: %d - tps diff %d Delay: %d => %d\n", (xactCount-prevXactCount)*10, int64(*tps*(1+0.1)), delayXactUs, delayXactUs+step) - - } else if int64(curtps) < int64(*tps*(1-0.01)) { - - // We keep the min between calculated step and current delayXactUs to avoid negative delayXactUs - step = -int(math.Min(10*math.Pow(*tps/curtps, 1.6)+30**tps/curtps, float64(delayXactUs))) - //log.Printf("< TPS: %d - tps diff %d Delay: %d => %d\n", (xactCount-prevXactCount)*10, int64(*tps*(1-0.1)), delayXactUs, delayXactUs-step) - } - delayXactUs += step - } - prevXactCount = xactCount - prevQueriesCount = queriesCount - time.Sleep(100 * time.Millisecond) - select { - case <-done: - - t := time.Now() - elapsed := t.Sub(start) - log.Printf("End test - Clients: %d - Elapsed: %s - Average TPS: %.f - Average QPS: %.f\n", - *clients, elapsed.String(), float64(xactCount)/elapsed.Seconds(), float64(queriesCount)/elapsed.Seconds()) - wg.Done() - return - default: - - } - - } - - }() + go rateLimiter() worker.ConnStr = connStr worker.Dataset = data @@ -213,3 +145,74 @@ func main() { wg.Wait() } + +// Naive tps limiting/throttle +func rateLimiter() { + + // Start rate limiter after workers. Keep it simple without + // synchronisation. + time.Sleep(time.Duration(*delaystart+1) * time.Second) + var prevXactCount int64 + var prevQueriesCount int64 + var curtps float64 + step := 10 // 10µs by default + wg.Add(1) + + // Loop every 100ms to calculate throttle to reach wanted tps + for i := 0; true; i++ { + + curtps = float64(xactCount-prevXactCount) * 10 + + // Reports stats for each inverval + if i%(*interval*10) == 0 { + if *duration == 0 { + log.Printf("TPS: %.f QPS: %d Xact: %d Queries: %d Delay: %s Test duration: %.fs\n", + curtps, (queriesCount-prevQueriesCount)*10, xactCount, queriesCount, + time.Duration(delayXactUs)*time.Microsecond, time.Since(start).Seconds()) + } else { + log.Printf("TPS: %.f QPS: %d Xact: %d Queries: %d Delay: %s Remaining: %.fs\n", + curtps, (queriesCount-prevQueriesCount)*10, xactCount, queriesCount, + time.Duration(delayXactUs)*time.Microsecond, float64(*duration)-time.Since(start).Seconds()) + } + } + if *tps != 0 { + + // We change the step if we are above +/- 1% of wanted tps + if int64(curtps) > int64(*tps*(1+0.01)) { + + // step is calculated in order to, the more we have a difference between wanted tps and current tps + // bigger the step is. Inversely, the more we are close to desirated tps, smaller is the step. + // The empirical formula is: + // step = 10 * deltatps ^ 1.6 + 20 * deltatps + // where delta tps is a ratio between wanted tps and current tps. + + step = int(10*math.Pow(curtps / *tps, 1.6) + 30*curtps / *tps) + //log.Printf("> TPS: %d - tps diff %d Delay: %d => %d\n", (xactCount-prevXactCount)*10, int64(*tps*(1+0.1)), delayXactUs, delayXactUs+step) + + } else if int64(curtps) < int64(*tps*(1-0.01)) { + + // We keep the min between calculated step and current delayXactUs to avoid negative delayXactUs + step = -int(math.Min(10*math.Pow(*tps/curtps, 1.6)+30**tps/curtps, float64(delayXactUs))) + //log.Printf("< TPS: %d - tps diff %d Delay: %d => %d\n", (xactCount-prevXactCount)*10, int64(*tps*(1-0.1)), delayXactUs, delayXactUs-step) + } + delayXactUs += step + } + prevXactCount = xactCount + prevQueriesCount = queriesCount + time.Sleep(100 * time.Millisecond) + select { + case <-done: + + t := time.Now() + elapsed := t.Sub(start) + log.Printf("End test - Clients: %d - Elapsed: %s - Average TPS: %.f - Average QPS: %.f\n", + *clients, elapsed.String(), float64(xactCount)/elapsed.Seconds(), float64(queriesCount)/elapsed.Seconds()) + wg.Done() + return + default: + + } + + } + +} diff --git a/pkg/pgcheetah/worker.go b/pkg/pgcheetah/worker.go index 919ba55..3a8b112 100644 --- a/pkg/pgcheetah/worker.go +++ b/pkg/pgcheetah/worker.go @@ -10,22 +10,16 @@ import ( ) // The Worker type contains all informations needed to start a WorkerPG. -// Earch Worker has access to several shared structures through pointers: -// - The Dataset containing all transactions -// - DelayXactUs to limit global throughput -// - Two global counters for transactions and queries, XactCount and -// QueriesCount respectively -// - A Done channel used to stop worker -// - A WaitGroup to wait all workers ended +// Earch Worker has access to several shared structures through pointers. type Worker struct { - ConnStr *string - Dataset map[int][]string - DelayXactUs *int - Done chan bool - QueriesCount *int64 - Think *ThinkTime + ConnStr *string // URI or a DSN connection string + Dataset map[int][]string // Dataset containing all transactions + DelayXactUs *int // Delay to limit global throughput + Done chan bool // Used to stop workers + QueriesCount *int64 // Global counter for queries + Think *ThinkTime // Used to add random delay between each query Wg *sync.WaitGroup - XactCount *int64 + XactCount *int64 // Global counter for transactions } // WorkerPG execute all queries from a randomly