Skip to content

Commit

Permalink
Code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
anayrat committed Apr 26, 2019
1 parent ef8d6ae commit 8a97d47
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 87 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/pgcheetah
cmd/pgcheetah/pgcheetah
Binary file removed cmd/pgcheetah/pgcheetah
Binary file not shown.
149 changes: 76 additions & 73 deletions cmd/pgcheetah/pgcheetah.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import (
// Preallocate 100k transactions
var data = make(map[int][]string, 100000)

var delayXactUs int
var start time.Time
var wg sync.WaitGroup
var worker pgcheetah.Worker
var done chan bool

// Command line arguments
var clients = flag.Int("clients", 100, "number of client")
Expand All @@ -45,12 +48,10 @@ func main() {

data[0] = []string{""}
waitEvent := make(map[string]int)
done := make(chan bool)
done = make(chan bool)
var timer *time.Timer
think := pgcheetah.ThinkTime{Distribution: "uniform", Min: 0, Max: 5}
s := pgcheetah.State{Statedesc: "init", Xact: 0, XactInProgress: false}
var start time.Time
var delayXactUs int

flag.Parse()
if *queryfile == "" {
Expand Down Expand Up @@ -106,81 +107,12 @@ func main() {

log.Println("Start parsing")
xact, err := pgcheetah.ParseXact(data, queryfile, &s, debug)

if err != nil {
log.Fatalf("Error during parsing %s", err)
}

log.Println("Parsing done, start workers. Transactions processed:", xact)

// Naive tps limiting/throttle
go func() {
time.Sleep(time.Duration(*delaystart) * time.Second)
var prevXactCount int64
var prevQueriesCount int64
var curtps float64
step := 10
wg.Add(1)
time.Sleep(time.Duration(1) * time.Second)

// Loop every 100ms to calculate throttle to reach wanted tps
for i := 0; true; i++ {

curtps = float64(xactCount-prevXactCount) * 10

// Reports stats for each inverval
if i%(*interval*10) == 0 {
if *duration == 0 {
log.Printf("TPS: %.f QPS: %d Xact: %d Queries: %d Delay: %s Test duration: %.fs\n",
curtps, (queriesCount-prevQueriesCount)*10, xactCount, queriesCount,
time.Duration(delayXactUs)*time.Microsecond, time.Since(start).Seconds())
} else {
log.Printf("TPS: %.f QPS: %d Xact: %d Queries: %d Delay: %s Remaining: %.fs\n",
curtps, (queriesCount-prevQueriesCount)*10, xactCount, queriesCount,
time.Duration(delayXactUs)*time.Microsecond, float64(*duration)-time.Since(start).Seconds())
}
}
if *tps != 0 {

// We change the step if we are above +/- 1% of wanted tps
if int64(curtps) > int64(*tps*(1+0.01)) {

// step is calculated in order to, the more we have a difference between wanted tps and current tps
// bigger the step is. Inversely, the more we are close to desirated tps, smaller is the step.
// The empirical formula is:
// step = 10 * deltatps ^ 1.6 + 20 * deltatps
// where delta tps is a ratio between wanted tps and current tps.

step = int(10*math.Pow(curtps / *tps, 1.6) + 30*curtps / *tps)
//log.Printf("> TPS: %d - tps diff %d Delay: %d => %d\n", (xactCount-prevXactCount)*10, int64(*tps*(1+0.1)), delayXactUs, delayXactUs+step)

} else if int64(curtps) < int64(*tps*(1-0.01)) {

// We keep the min between calculated step and current delayXactUs to avoid negative delayXactUs
step = -int(math.Min(10*math.Pow(*tps/curtps, 1.6)+30**tps/curtps, float64(delayXactUs)))
//log.Printf("< TPS: %d - tps diff %d Delay: %d => %d\n", (xactCount-prevXactCount)*10, int64(*tps*(1-0.1)), delayXactUs, delayXactUs-step)
}
delayXactUs += step
}
prevXactCount = xactCount
prevQueriesCount = queriesCount
time.Sleep(100 * time.Millisecond)
select {
case <-done:

t := time.Now()
elapsed := t.Sub(start)
log.Printf("End test - Clients: %d - Elapsed: %s - Average TPS: %.f - Average QPS: %.f\n",
*clients, elapsed.String(), float64(xactCount)/elapsed.Seconds(), float64(queriesCount)/elapsed.Seconds())
wg.Done()
return
default:

}

}

}()
go rateLimiter()

worker.ConnStr = connStr
worker.Dataset = data
Expand Down Expand Up @@ -213,3 +145,74 @@ func main() {
wg.Wait()

}

// Naive tps limiting/throttle
func rateLimiter() {

// Start rate limiter after workers. Keep it simple without
// synchronisation.
time.Sleep(time.Duration(*delaystart+1) * time.Second)
var prevXactCount int64
var prevQueriesCount int64
var curtps float64
step := 10 // 10µs by default
wg.Add(1)

// Loop every 100ms to calculate throttle to reach wanted tps
for i := 0; true; i++ {

curtps = float64(xactCount-prevXactCount) * 10

// Reports stats for each inverval
if i%(*interval*10) == 0 {
if *duration == 0 {
log.Printf("TPS: %.f QPS: %d Xact: %d Queries: %d Delay: %s Test duration: %.fs\n",
curtps, (queriesCount-prevQueriesCount)*10, xactCount, queriesCount,
time.Duration(delayXactUs)*time.Microsecond, time.Since(start).Seconds())
} else {
log.Printf("TPS: %.f QPS: %d Xact: %d Queries: %d Delay: %s Remaining: %.fs\n",
curtps, (queriesCount-prevQueriesCount)*10, xactCount, queriesCount,
time.Duration(delayXactUs)*time.Microsecond, float64(*duration)-time.Since(start).Seconds())
}
}
if *tps != 0 {

// We change the step if we are above +/- 1% of wanted tps
if int64(curtps) > int64(*tps*(1+0.01)) {

// step is calculated in order to, the more we have a difference between wanted tps and current tps
// bigger the step is. Inversely, the more we are close to desirated tps, smaller is the step.
// The empirical formula is:
// step = 10 * deltatps ^ 1.6 + 20 * deltatps
// where delta tps is a ratio between wanted tps and current tps.

step = int(10*math.Pow(curtps / *tps, 1.6) + 30*curtps / *tps)
//log.Printf("> TPS: %d - tps diff %d Delay: %d => %d\n", (xactCount-prevXactCount)*10, int64(*tps*(1+0.1)), delayXactUs, delayXactUs+step)

} else if int64(curtps) < int64(*tps*(1-0.01)) {

// We keep the min between calculated step and current delayXactUs to avoid negative delayXactUs
step = -int(math.Min(10*math.Pow(*tps/curtps, 1.6)+30**tps/curtps, float64(delayXactUs)))
//log.Printf("< TPS: %d - tps diff %d Delay: %d => %d\n", (xactCount-prevXactCount)*10, int64(*tps*(1-0.1)), delayXactUs, delayXactUs-step)
}
delayXactUs += step
}
prevXactCount = xactCount
prevQueriesCount = queriesCount
time.Sleep(100 * time.Millisecond)
select {
case <-done:

t := time.Now()
elapsed := t.Sub(start)
log.Printf("End test - Clients: %d - Elapsed: %s - Average TPS: %.f - Average QPS: %.f\n",
*clients, elapsed.String(), float64(xactCount)/elapsed.Seconds(), float64(queriesCount)/elapsed.Seconds())
wg.Done()
return
default:

}

}

}
22 changes: 8 additions & 14 deletions pkg/pgcheetah/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,16 @@ import (
)

// The Worker type contains all informations needed to start a WorkerPG.
// Earch Worker has access to several shared structures through pointers:
// - The Dataset containing all transactions
// - DelayXactUs to limit global throughput
// - Two global counters for transactions and queries, XactCount and
// QueriesCount respectively
// - A Done channel used to stop worker
// - A WaitGroup to wait all workers ended
// Earch Worker has access to several shared structures through pointers.
type Worker struct {
ConnStr *string
Dataset map[int][]string
DelayXactUs *int
Done chan bool
QueriesCount *int64
Think *ThinkTime
ConnStr *string // URI or a DSN connection string
Dataset map[int][]string // Dataset containing all transactions
DelayXactUs *int // Delay to limit global throughput
Done chan bool // Used to stop workers
QueriesCount *int64 // Global counter for queries
Think *ThinkTime // Used to add random delay between each query
Wg *sync.WaitGroup
XactCount *int64
XactCount *int64 // Global counter for transactions
}

// WorkerPG execute all queries from a randomly
Expand Down

0 comments on commit 8a97d47

Please sign in to comment.