From b92f0877e9a51c422d2e715ff97a550fd5f70a68 Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 15 Nov 2019 12:20:01 -0800 Subject: [PATCH 1/2] Inline the large-weight heap to avoid interface conversions --- varopt.go | 66 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 22 deletions(-) diff --git a/varopt.go b/varopt.go index 5ab1ee5..7e4449c 100644 --- a/varopt.go +++ b/varopt.go @@ -3,7 +3,6 @@ package varopt import ( - "container/heap" "fmt" "math/rand" ) @@ -17,7 +16,7 @@ type Varopt struct { // Random number generator rnd *rand.Rand - // Large-weight items + // Large-weight items stored in a min-heap. L largeHeap // Light-weight items. @@ -72,7 +71,7 @@ func (s *Varopt) Add(sample Sample, weight float64) { s.totalWeight += weight if s.Size() < s.capacity { - heap.Push(&s.L, individual) + s.L.push(individual) return } @@ -82,14 +81,14 @@ func (s *Varopt) Add(sample Sample, weight float64) { W := s.tau * float64(len(s.T)) if weight > s.tau { - heap.Push(&s.L, individual) + s.L.push(individual) } else { s.X = append(s.X, individual) W += weight } for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].weight { - h := heap.Pop(&s.L).(vsample) + h := s.L.pop() s.X = append(s.X, h) W += h.weight } @@ -177,26 +176,49 @@ func (s *Varopt) Tau() float64 { return s.tau } -func (b largeHeap) Len() int { - return len(b) -} +func (lp *largeHeap) push(v vsample) { + l := append(*lp, v) + n := len(l) - 1 -func (b largeHeap) Swap(i, j int) { - b[i], b[j] = b[j], b[i] -} + // This copies the body of heap.up(). + j := n + for { + i := (j - 1) / 2 // parent + if i == j || l[j].weight >= l[i].weight { + break + } + l[i], l[j] = l[j], l[i] + j = i + } -func (b largeHeap) Less(i, j int) bool { - return b[i].weight < b[j].weight + *lp = l } -func (b *largeHeap) Push(x interface{}) { - *b = append(*b, x.(vsample)) -} +func (lp *largeHeap) pop() vsample { + l := *lp + n := len(l) - 1 + result := l[0] + l[0] = l[n] + l = l[:n] + + // This copies the body of heap.down(). + i := 0 + for { + j1 := 2*i + 1 + if j1 >= n || j1 < 0 { // j1 < 0 after int overflow + break + } + j := j1 // left child + if j2 := j1 + 1; j2 < n && l[j2].weight < l[j1].weight { + j = j2 // = 2*i + 2 // right child + } + if l[j].weight >= l[i].weight { + break + } + l[i], l[j] = l[j], l[i] + i = j + } -func (b *largeHeap) Pop() interface{} { - old := *b - n := len(old) - x := old[n-1] - *b = old[0 : n-1] - return x + *lp = l + return result } From f53b1a85587a206e19f8837dc367a7ff3f87acd9 Mon Sep 17 00:00:00 2001 From: jmacd Date: Fri, 15 Nov 2019 21:51:31 -0800 Subject: [PATCH 2/2] Move heap code to internal for better testing --- internal/sampleheap.go | 57 ++++++++++++++++++++++++ internal/sampleheap_test.go | 58 ++++++++++++++++++++++++ varopt.go | 88 ++++++++----------------------------- 3 files changed, 133 insertions(+), 70 deletions(-) create mode 100644 internal/sampleheap.go create mode 100644 internal/sampleheap_test.go diff --git a/internal/sampleheap.go b/internal/sampleheap.go new file mode 100644 index 0000000..c3d82a5 --- /dev/null +++ b/internal/sampleheap.go @@ -0,0 +1,57 @@ +// Copyright 2019, LightStep Inc. + +package internal + +type Vsample struct { + Sample interface{} + Weight float64 +} + +type SampleHeap []Vsample + +func (sh *SampleHeap) Push(v Vsample) { + l := append(*sh, v) + n := len(l) - 1 + + // This copies the body of heap.up(). + j := n + for { + i := (j - 1) / 2 // parent + if i == j || l[j].Weight >= l[i].Weight { + break + } + l[i], l[j] = l[j], l[i] + j = i + } + + *sh = l +} + +func (sh *SampleHeap) Pop() Vsample { + l := *sh + n := len(l) - 1 + result := l[0] + l[0] = l[n] + l = l[:n] + + // This copies the body of heap.down(). + i := 0 + for { + j1 := 2*i + 1 + if j1 >= n || j1 < 0 { // j1 < 0 after int overflow + break + } + j := j1 // left child + if j2 := j1 + 1; j2 < n && l[j2].Weight < l[j1].Weight { + j = j2 // = 2*i + 2 // right child + } + if l[j].Weight >= l[i].Weight { + break + } + l[i], l[j] = l[j], l[i] + i = j + } + + *sh = l + return result +} diff --git a/internal/sampleheap_test.go b/internal/sampleheap_test.go new file mode 100644 index 0000000..50615c0 --- /dev/null +++ b/internal/sampleheap_test.go @@ -0,0 +1,58 @@ +// Copyright 2019, LightStep Inc. + +package internal_test + +import ( + "container/heap" + "math/rand" + "testing" + + "github.com/lightstep/varopt/internal" + "github.com/stretchr/testify/require" +) + +type simpleHeap []float64 + +func (s *simpleHeap) Len() int { + return len(*s) +} + +func (s *simpleHeap) Swap(i, j int) { + (*s)[i], (*s)[j] = (*s)[j], (*s)[i] +} + +func (s *simpleHeap) Less(i, j int) bool { + return (*s)[i] < (*s)[j] +} + +func (s *simpleHeap) Push(x interface{}) { + *s = append(*s, x.(float64)) +} + +func (s *simpleHeap) Pop() interface{} { + old := *s + n := len(old) + x := old[n-1] + *s = old[0 : n-1] + return x +} + +func TestLargeHeap(t *testing.T) { + var L internal.SampleHeap + var S simpleHeap + + for i := 0; i < 1e6; i++ { + v := rand.NormFloat64() + L.Push(internal.Vsample{Weight: v}) + heap.Push(&S, v) + } + + for len(S) > 0 { + v1 := heap.Pop(&S).(float64) + v2 := L.Pop().Weight + + require.Equal(t, v1, v2) + } + + require.Equal(t, 0, len(L)) +} diff --git a/varopt.go b/varopt.go index a535d35..44b5843 100644 --- a/varopt.go +++ b/varopt.go @@ -6,6 +6,8 @@ import ( "fmt" "math" "math/rand" + + "github.com/lightstep/varopt/internal" ) // Varopt implements the algorithm from Stream sampling for @@ -18,13 +20,13 @@ type Varopt struct { rnd *rand.Rand // Large-weight items stored in a min-heap. - L largeHeap + L internal.SampleHeap // Light-weight items. - T []vsample + T []internal.Vsample // Temporary buffer. - X []vsample + X []internal.Vsample // Current threshold tau float64 @@ -41,13 +43,6 @@ type Varopt struct { // passed in separately. type Sample interface{} -type vsample struct { - sample Sample - weight float64 -} - -type largeHeap []vsample - var ErrInvalidWeight = fmt.Errorf("Negative, zero, or NaN weight") // New returns a new Varopt sampler with given capacity (i.e., @@ -63,9 +58,9 @@ func New(capacity int, rnd *rand.Rand) *Varopt { // // An error will be returned if the weight is either negative or NaN. func (s *Varopt) Add(sample Sample, weight float64) error { - individual := vsample{ - sample: sample, - weight: weight, + individual := internal.Vsample{ + Sample: sample, + Weight: weight, } if weight <= 0 || math.IsNaN(weight) { @@ -76,7 +71,7 @@ func (s *Varopt) Add(sample Sample, weight float64) error { s.totalWeight += weight if s.Size() < s.capacity { - s.L.push(individual) + s.L.Push(individual) return nil } @@ -86,16 +81,16 @@ func (s *Varopt) Add(sample Sample, weight float64) error { W := s.tau * float64(len(s.T)) if weight > s.tau { - s.L.push(individual) + s.L.Push(individual) } else { s.X = append(s.X, individual) W += weight } - for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].weight { - h := s.L.pop() + for len(s.L) > 0 && W >= float64(len(s.T)+len(s.X)-1)*s.L[0].Weight { + h := s.L.Pop() s.X = append(s.X, h) - W += h.weight + W += h.Weight } s.tau = W / float64(len(s.T)+len(s.X)-1) @@ -103,7 +98,7 @@ func (s *Varopt) Add(sample Sample, weight float64) error { d := 0 for d < len(s.X) && r >= 0 { - wxd := s.X[d].weight + wxd := s.X[d].Weight r -= (1 - wxd/s.tau) d++ } @@ -136,10 +131,10 @@ func (s *Varopt) uniform() float64 { // GetOriginalWeight(i). func (s *Varopt) Get(i int) (Sample, float64) { if i < len(s.L) { - return s.L[i].sample, s.L[i].weight + return s.L[i].Sample, s.L[i].Weight } - return s.T[i-len(s.L)].sample, s.tau + return s.T[i-len(s.L)].Sample, s.tau } // GetOriginalWeight returns the original input weight of the sample @@ -147,10 +142,10 @@ func (s *Varopt) Get(i int) (Sample, float64) { // frequency from the adjusted sample weight. func (s *Varopt) GetOriginalWeight(i int) float64 { if i < len(s.L) { - return s.L[i].weight + return s.L[i].Weight } - return s.T[i-len(s.L)].weight + return s.T[i-len(s.L)].Weight } // Capacity returns the size of the reservoir. This is the maximum @@ -181,50 +176,3 @@ func (s *Varopt) TotalCount() int { func (s *Varopt) Tau() float64 { return s.tau } - -func (lp *largeHeap) push(v vsample) { - l := append(*lp, v) - n := len(l) - 1 - - // This copies the body of heap.up(). - j := n - for { - i := (j - 1) / 2 // parent - if i == j || l[j].weight >= l[i].weight { - break - } - l[i], l[j] = l[j], l[i] - j = i - } - - *lp = l -} - -func (lp *largeHeap) pop() vsample { - l := *lp - n := len(l) - 1 - result := l[0] - l[0] = l[n] - l = l[:n] - - // This copies the body of heap.down(). - i := 0 - for { - j1 := 2*i + 1 - if j1 >= n || j1 < 0 { // j1 < 0 after int overflow - break - } - j := j1 // left child - if j2 := j1 + 1; j2 < n && l[j2].weight < l[j1].weight { - j = j2 // = 2*i + 2 // right child - } - if l[j].weight >= l[i].weight { - break - } - l[i], l[j] = l[j], l[i] - i = j - } - - *lp = l - return result -}