Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inline the large-weight heap to avoid interface conversions #11

Merged
merged 3 commits into from
Nov 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions internal/sampleheap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2019, LightStep Inc.

package internal

type Vsample struct {
Sample interface{}
Weight float64
}

type SampleHeap []Vsample

func (sh *SampleHeap) Push(v Vsample) {
l := append(*sh, v)
n := len(l) - 1

// This copies the body of heap.up().
j := n
for {
i := (j - 1) / 2 // parent
if i == j || l[j].Weight >= l[i].Weight {
break
}
l[i], l[j] = l[j], l[i]
j = i
}

*sh = l
}

func (sh *SampleHeap) Pop() Vsample {
l := *sh
n := len(l) - 1
result := l[0]
l[0] = l[n]
l = l[:n]

// This copies the body of heap.down().
i := 0
for {
j1 := 2*i + 1
if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
break
}
j := j1 // left child
if j2 := j1 + 1; j2 < n && l[j2].Weight < l[j1].Weight {
j = j2 // = 2*i + 2 // right child
}
if l[j].Weight >= l[i].Weight {
break
}
l[i], l[j] = l[j], l[i]
i = j
}

*sh = l
return result
}
58 changes: 58 additions & 0 deletions internal/sampleheap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2019, LightStep Inc.

package internal_test

import (
"container/heap"
"math/rand"
"testing"

"github.com/lightstep/varopt/internal"
"github.com/stretchr/testify/require"
)

type simpleHeap []float64

func (s *simpleHeap) Len() int {
return len(*s)
}

func (s *simpleHeap) Swap(i, j int) {
(*s)[i], (*s)[j] = (*s)[j], (*s)[i]
}

func (s *simpleHeap) Less(i, j int) bool {
return (*s)[i] < (*s)[j]
}

func (s *simpleHeap) Push(x interface{}) {
*s = append(*s, x.(float64))
}

func (s *simpleHeap) Pop() interface{} {
old := *s
n := len(old)
x := old[n-1]
*s = old[0 : n-1]
return x
}

func TestLargeHeap(t *testing.T) {
var L internal.SampleHeap
var S simpleHeap

for i := 0; i < 1e6; i++ {
v := rand.NormFloat64()
L.Push(internal.Vsample{Weight: v})
heap.Push(&S, v)
}

for len(S) > 0 {
v1 := heap.Pop(&S).(float64)
v2 := L.Pop().Weight

require.Equal(t, v1, v2)
}

require.Equal(t, 0, len(L))
}
68 changes: 19 additions & 49 deletions varopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
package varopt

import (
"container/heap"
"fmt"
"math"
"math/rand"

"github.com/lightstep/varopt/internal"
)

// Varopt implements the algorithm from Stream sampling for
Expand All @@ -18,14 +19,14 @@ type Varopt struct {
// Random number generator
rnd *rand.Rand

// Large-weight items
L largeHeap
// Large-weight items stored in a min-heap.
L internal.SampleHeap

// Light-weight items.
T []vsample
T []internal.Vsample

// Temporary buffer.
X []vsample
X []internal.Vsample

// Current threshold
tau float64
Expand All @@ -42,13 +43,6 @@ type Varopt struct {
// passed in separately.
type Sample interface{}

type vsample struct {
sample Sample
weight float64
}

type largeHeap []vsample

var ErrInvalidWeight = fmt.Errorf("Negative, zero, or NaN weight")

// New returns a new Varopt sampler with given capacity (i.e.,
Expand All @@ -64,9 +58,9 @@ func New(capacity int, rnd *rand.Rand) *Varopt {
//
// An error will be returned if the weight is either negative or NaN.
func (s *Varopt) Add(sample Sample, weight float64) error {
individual := vsample{
sample: sample,
weight: weight,
individual := internal.Vsample{
Sample: sample,
Weight: weight,
}

if weight <= 0 || math.IsNaN(weight) {
Expand All @@ -77,7 +71,7 @@ func (s *Varopt) Add(sample Sample, weight float64) error {
s.totalWeight += weight

if s.Size() < s.capacity {
heap.Push(&s.L, individual)
s.L.Push(individual)
return nil
}

Expand All @@ -87,24 +81,24 @@ func (s *Varopt) Add(sample Sample, weight float64) error {
W := s.tau * float64(len(s.T))

if weight > s.tau {
heap.Push(&s.L, individual)
s.L.Push(individual)
} else {
s.X = append(s.X, individual)
W += weight
}

for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].weight {
h := heap.Pop(&s.L).(vsample)
for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].Weight {
h := s.L.Pop()
s.X = append(s.X, h)
W += h.weight
W += h.Weight
}

s.tau = W / float64(len(s.T)+len(s.X)-1)
r := s.uniform()
d := 0

for d < len(s.X) && r >= 0 {
wxd := s.X[d].weight
wxd := s.X[d].Weight
r -= (1 - wxd/s.tau)
d++
}
Expand Down Expand Up @@ -137,21 +131,21 @@ func (s *Varopt) uniform() float64 {
// GetOriginalWeight(i).
func (s *Varopt) Get(i int) (Sample, float64) {
if i < len(s.L) {
return s.L[i].sample, s.L[i].weight
return s.L[i].Sample, s.L[i].Weight
}

return s.T[i-len(s.L)].sample, s.tau
return s.T[i-len(s.L)].Sample, s.tau
}

// GetOriginalWeight returns the original input weight of the sample
// item that was passed to Add(). This can be useful for computing a
// frequency from the adjusted sample weight.
func (s *Varopt) GetOriginalWeight(i int) float64 {
if i < len(s.L) {
return s.L[i].weight
return s.L[i].Weight
}

return s.T[i-len(s.L)].weight
return s.T[i-len(s.L)].Weight
}

// Capacity returns the size of the reservoir. This is the maximum
Expand Down Expand Up @@ -182,27 +176,3 @@ func (s *Varopt) TotalCount() int {
func (s *Varopt) Tau() float64 {
return s.tau
}

func (b largeHeap) Len() int {
return len(b)
}

func (b largeHeap) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}

func (b largeHeap) Less(i, j int) bool {
return b[i].weight < b[j].weight
}

func (b *largeHeap) Push(x interface{}) {
*b = append(*b, x.(vsample))
}

func (b *largeHeap) Pop() interface{} {
old := *b
n := len(old)
x := old[n-1]
*b = old[0 : n-1]
return x
}