-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
Copy pathfilter.go
134 lines (120 loc) · 4.3 KB
/
filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
import (
"sync"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// NewFilter returns an Aggregator that wraps an agg with an attribute
// filtering function. Both pre-computed non-pre-computed Aggregators can be
// passed for agg. An appropriate Aggregator will be returned for the detected
// type.
func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] {
if fn == nil {
return agg
}
if fa, ok := agg.(precomputeAggregator[N]); ok {
return newPrecomputedFilter(fa, fn)
}
return newFilter(agg, fn)
}
// filter wraps an aggregator with an attribute filter. All recorded
// measurements will have their attributes filtered before they are passed to
// the underlying aggregator's Aggregate method.
//
// This should not be used to wrap a pre-computed Aggregator. Use a
// precomputedFilter instead.
type filter[N int64 | float64] struct {
filter attribute.Filter
aggregator Aggregator[N]
sync.Mutex
seen map[attribute.Set]attribute.Set
}
// newFilter returns an filter Aggregator that wraps agg with the attribute
// filter fn.
//
// This should not be used to wrap a pre-computed Aggregator. Use a
// precomputedFilter instead.
func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filter[N] {
return &filter[N]{
filter: fn,
aggregator: agg,
seen: make(map[attribute.Set]attribute.Set),
}
}
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
// TODO (#3006): drop stale attributes from seen.
f.Lock()
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr, _ = attr.Filter(f.filter)
f.seen[attr] = fAttr
}
f.aggregator.Aggregate(measurement, fAttr)
}
// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (f *filter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}
// precomputedFilter is an aggregator that applies attribute filter when
// Aggregating for pre-computed Aggregations. The pre-computed Aggregations
// need to operate normally when no attribute filtering is done (for sums this
// means setting the value), but when attribute filtering is done it needs to
// be added to any set value.
type precomputedFilter[N int64 | float64] struct {
filter attribute.Filter
aggregator precomputeAggregator[N]
sync.Mutex
seen map[attribute.Set]attribute.Set
}
// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg
// with the attribute filter fn.
//
// This should not be used to wrap a non-pre-computed Aggregator. Use a
// precomputedFilter instead.
func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn attribute.Filter) *precomputedFilter[N] {
return &precomputedFilter[N]{
filter: fn,
aggregator: agg,
seen: make(map[attribute.Set]attribute.Set),
}
}
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
// TODO (#3006): drop stale attributes from seen.
f.Lock()
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr, _ = attr.Filter(f.filter)
f.seen[attr] = fAttr
}
if fAttr.Equals(&attr) {
// No filtering done.
f.aggregator.Aggregate(measurement, fAttr)
} else {
f.aggregator.aggregateFiltered(measurement, fAttr)
}
}
// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}