Skip to content

Commit

Permalink
Merge #31902
Browse files Browse the repository at this point in the history
31902: workload: add zipfian distribution option to kv r=ridwanmsharif a=ridwanmsharif

Adds a workload distribution that spreads load according
to a Zipfian distribution. This is part of the experimentation
with load based splitting.

Release note: None

Co-authored-by: Ridwan Sharif <[email protected]>
  • Loading branch information
craig[bot] and Ridwan Sharif committed Oct 28, 2018
2 parents 7fdb1ad + 77618f3 commit e9e3ff2
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 0 deletions.
53 changes: 53 additions & 0 deletions pkg/workload/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type kv struct {
seed int64
writeSeq string
sequential bool
zipfian bool
splits int
secondaryIndex bool
useOpt bool
Expand Down Expand Up @@ -97,6 +98,8 @@ var kvMeta = workload.Meta{
g.flags.IntVar(&g.readPercent, `read-percent`, 0,
`Percent (0-100) of operations that are reads of existing keys.`)
g.flags.Int64Var(&g.seed, `seed`, 1, `Key hash seed.`)
g.flags.BoolVar(&g.zipfian, `zipfian`, false,
`Pick keys in a zipfian distribution instead of randomly.`)
g.flags.BoolVar(&g.sequential, `sequential`, false,
`Pick keys sequentially instead of randomly.`)
g.flags.StringVar(&g.writeSeq, `write-seq`, "",
Expand Down Expand Up @@ -130,6 +133,9 @@ func (w *kv) Hooks() workload.Hooks {
if w.sequential && w.splits > 0 {
return errors.New("'sequential' and 'splits' cannot both be enabled")
}
if w.sequential && w.zipfian {
return errors.New("'sequential' and 'zipfian' cannot both be enabled")
}
return nil
},
}
Expand Down Expand Up @@ -235,6 +241,8 @@ func (w *kv) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Query
}
if w.sequential {
op.g = newSequentialGenerator(seq)
} else if w.zipfian {
op.g = newZipfianGenerator(seq)
} else {
op.g = newHashGenerator(seq)
}
Expand Down Expand Up @@ -406,6 +414,51 @@ func (g *sequentialGenerator) sequence() int64 {
return atomic.LoadInt64(&g.seq.val)
}

type zipfGenerator struct {
seq *sequence
random *rand.Rand
zipf *zipf
}

// Creates a new zipfian generator.
func newZipfianGenerator(seq *sequence) *zipfGenerator {
random := rand.New(rand.NewSource(timeutil.Now().UnixNano()))
return &zipfGenerator{
seq: seq,
random: random,
zipf: newZipf(1.1, 1, uint64(math.MaxInt64)),
}
}

// Get a random number seeded by v that follows the
// zipfian distribution.
func (g *zipfGenerator) zipfian(seed int64) int64 {
randomWithSeed := rand.New(rand.NewSource(seed))
return int64(g.zipf.Uint64(randomWithSeed))
}

// Get a zipf write key appropriately.
func (g *zipfGenerator) writeKey() int64 {
return g.zipfian(g.seq.write())
}

// Get a zipf read key appropriately.
func (g *zipfGenerator) readKey() int64 {
v := g.seq.read()
if v == 0 {
return 0
}
return g.zipfian(g.random.Int63n(v))
}

func (g *zipfGenerator) rand() *rand.Rand {
return g.random
}

func (g *zipfGenerator) sequence() int64 {
return atomic.LoadInt64(&g.seq.val)
}

func randomBlock(config *kv, r *rand.Rand) []byte {
blockSize := r.Intn(config.maxBlockSizeBytes-config.minBlockSizeBytes) + config.minBlockSizeBytes
blockData := make([]byte, blockSize)
Expand Down
86 changes: 86 additions & 0 deletions pkg/workload/kv/zipfian.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// W.Hormann, G.Derflinger:
// "Rejection-Inversion to Generate Variates
// from Monotone Discrete Distributions"
// http://eeyore.wu-wien.ac.at/papers/96-04-04.wh-der.ps.gz

package kv

import (
"math"
"math/rand"
)

// A zipf generates Zipf distributed variates.
// This was added here from math/rand so we could
// have the Zipfian distribution give us a deterministic
// result based on the read key so we don't read missing
// entries.
//
// Our changes involve being supplied with
// a seeded rand object during the time of retrieval instead
// of a rand object during creation.
type zipf struct {
imax float64
v float64
q float64
s float64
oneminusQ float64
oneminusQinv float64
hxm float64
hx0minusHxm float64
}

func (z *zipf) h(x float64) float64 {
return math.Exp(z.oneminusQ*math.Log(z.v+x)) * z.oneminusQinv
}

func (z *zipf) hinv(x float64) float64 {
return math.Exp(z.oneminusQinv*math.Log(z.oneminusQ*x)) - z.v
}

// newZipf returns a Zipf variate generator.
// The generator generates values k ∈ [0, imax]
// such that P(k) is proportional to (v + k) ** (-s).
// Requirements: s > 1 and v >= 1.
func newZipf(s float64, v float64, imax uint64) *zipf {
z := new(zipf)
if s <= 1.0 || v < 1 {
return nil
}
z.imax = float64(imax)
z.v = v
z.q = s
z.oneminusQ = 1.0 - z.q
z.oneminusQinv = 1.0 / z.oneminusQ
z.hxm = z.h(z.imax + 0.5)
z.hx0minusHxm = z.h(0.5) - math.Exp(math.Log(z.v)*(-z.q)) - z.hxm
z.s = 1 - z.hinv(z.h(1.5)-math.Exp(-z.q*math.Log(z.v+1.0)))
return z
}

// Uint64 returns a value drawn from the Zipf distribution described
// by the Zipf object.
func (z *zipf) Uint64(random *rand.Rand) uint64 {
if z == nil {
panic("rand: nil Zipf")
}
k := 0.0

for {
r := random.Float64() // r in [0.0, 1.0]
ur := z.hxm + r*z.hx0minusHxm
x := z.hinv(ur)
k = math.Floor(x + 0.5)
if k-x <= z.s {
break
}
if ur >= z.h(k+0.5)-math.Exp(-math.Log(k+z.v)*z.q) {
break
}
}
return uint64(int64(k))
}

0 comments on commit e9e3ff2

Please sign in to comment.