diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..deb2de9 --- /dev/null +++ b/doc.go @@ -0,0 +1,22 @@ +// Copyright 2019, LightStep Inc. + +/* +Package varopt is an implementation of VarOpt, an unbiased weighted +sampling algorithm described in the paper "Stream sampling for +variance-optimal estimation of subset sums" +https://arxiv.org/pdf/0803.0473.pdf (2008), by Edith Cohen, Nick +Duffield, Haim Kaplan, Carsten Lund, and Mikkel Thorup. + +VarOpt is a reservoir-type sampler that maintains a fixed-size sample +and provides a mechanism for merging unequal-weight samples. + +This package also includes a simple reservoir sampling algorithm, +often useful in conjunction with weighed reservoir sampling, using +Algorithm R from "Random sampling with a +reservoir", https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_R +(1985), by Jeffrey Vitter. + +See https://github.com/lightstep/varopt/blob/master/README.md for +more detail. +*/ +package varopt diff --git a/frequency_test.go b/frequency_test.go index 9da7bca..e4b3429 100644 --- a/frequency_test.go +++ b/frequency_test.go @@ -35,7 +35,7 @@ var colors = []curve{ // While the number of expected points per second is uniform, the // output sample weights are expected to match the original // frequencies. -func ExampleFrequency() { +func ExampleVaropt_GetOriginalWeight() { // Number of points. const totalCount = 1e6 diff --git a/simple.go b/simple.go index 7d0fb7f..6ff64f1 100644 --- a/simple.go +++ b/simple.go @@ -13,14 +13,20 @@ type Simple struct { capacity int observed int buffer []Sample + rnd *rand.Rand } -func NewSimple(capacity int) *Simple { +// NewSimple returns a simple reservoir sampler with given capacity +// (i.e., reservoir size) and random number generator. +func NewSimple(capacity int, rnd *rand.Rand) *Simple { return &Simple{ capacity: capacity, + rnd: rnd, } } +// Add considers a new observation for the sample. Items have unit +// weight. func (s *Simple) Add(span Sample) { s.observed++ @@ -34,28 +40,29 @@ func (s *Simple) Add(span Sample) { } // Give this a capacity/observed chance of replacing an existing entry. - index := rand.Intn(s.observed) + index := s.rnd.Intn(s.observed) if index < s.capacity { s.buffer[index] = span } } +// Get returns the i'th selected item from the sample. func (s *Simple) Get(i int) Sample { return s.buffer[i] } +// Get returns the number of items in the sample. If the reservoir is +// full, Size() equals Capacity(). func (s *Simple) Size() int { return len(s.buffer) } +// Weight returns the adjusted weight of each item in the sample. func (s *Simple) Weight() float64 { return float64(s.observed) / float64(s.Size()) } -func (s *Simple) Prob() float64 { - return 1 / s.Weight() -} - -func (s *Simple) Observed() int { +// Count returns the number of items that were observed. +func (s *Simple) Count() int { return s.observed } diff --git a/simple_test.go b/simple_test.go index 8c83500..c70503e 100644 --- a/simple_test.go +++ b/simple_test.go @@ -3,6 +3,7 @@ package varopt_test import ( + "math/rand" "testing" "github.com/lightstep/varopt" @@ -19,7 +20,9 @@ func TestSimple(t *testing.T) { epsilon = 0.01 ) - ss := varopt.NewSimple(sampleSize) + rnd := rand.New(rand.NewSource(17167)) + + ss := varopt.NewSimple(sampleSize, rnd) psum := 0. for i := 0; i < popSize; i++ { diff --git a/varopt.go b/varopt.go index 4dd8ea3..1856614 100644 --- a/varopt.go +++ b/varopt.go @@ -36,6 +36,9 @@ type Varopt struct { totalWeight float64 } +// Sample is an empty interface that represents a sample item. +// Sampling algorithms treat these as opaque, as their weight is +// passed in separately. type Sample interface{} type vsample struct { @@ -45,6 +48,8 @@ type vsample struct { type largeHeap []vsample +// New returns a new Varopt sampler with given capacity (i.e., +// reservoir size) and random number generator. func New(capacity int, rnd *rand.Rand) *Varopt { return &Varopt{ capacity: capacity, @@ -52,6 +57,7 @@ func New(capacity int, rnd *rand.Rand) *Varopt { } } +// Add considers a new observation for the sample with given weight. func (s *Varopt) Add(sample Sample, weight float64) { individual := vsample{ sample: sample, @@ -131,6 +137,9 @@ func (s *Varopt) Get(i int) (Sample, float64) { return s.T[i-len(s.L)].sample, s.tau } +// GetOriginalWeight returns the original input weight of the sample +// item that was passed to Add(). This can be useful for computing a +// frequency from the adjusted sample weight. func (s *Varopt) GetOriginalWeight(i int) float64 { if i < len(s.L) { return s.L[i].weight @@ -139,22 +148,31 @@ func (s *Varopt) GetOriginalWeight(i int) float64 { return s.T[i-len(s.L)].weight } +// Capacity returns the size of the reservoir. This is the maximum +// size of the sample. func (s *Varopt) Capacity() int { return s.capacity } +// Size returns the current number of items in the sample. If the +// reservoir is full, this returns Capacity(). func (s *Varopt) Size() int { return len(s.L) + len(s.T) } +// TotalWeight returns the sum of weights that were passed to Add(). func (s *Varopt) TotalWeight() float64 { return s.totalWeight } +// TotalCount returns the number of calls to Add(). func (s *Varopt) TotalCount() int { return s.totalCount } +// Tau returns the current large-weight threshold. Weights larger +// than Tau() carry their exact weight int he sample. See the VarOpt +// paper for details. func (s *Varopt) Tau() float64 { return s.tau } diff --git a/varopt_test.go b/varopt_test.go index 16786ab..3d6eec3 100644 --- a/varopt_test.go +++ b/varopt_test.go @@ -23,9 +23,7 @@ const ( sampleProb = 0.001 sampleSize int = popSize * sampleProb - // TODO epsilon is somewhat variable b/c we're using the - // static rand w/o a fixed seed for the test. - epsilon = 0.06 + epsilon = 0.08 ) func TestUnbiased(t *testing.T) { @@ -108,7 +106,7 @@ func testUnbiased(t *testing.T, bbr, bsr float64) { for _, blockList := range blockLists { for _, block := range blockList { - simple := varopt.NewSimple(sampleSize) + simple := varopt.NewSimple(sampleSize, rnd) for _, s := range block { simple.Add(s) diff --git a/weighted_test.go b/weighted_test.go index e545fbd..1e82043 100644 --- a/weighted_test.go +++ b/weighted_test.go @@ -16,7 +16,7 @@ type packet struct { protocol string } -func ExampleWeighted() { +func ExampleNew() { const totalPackets = 1e6 const sampleRatio = 0.01