Skip to content

Commit

Permalink
feat: add parameter to histogramQuantile (#5386)
Browse files Browse the repository at this point in the history
* feat: add parameter to histogramQuantile

* chore: make fmt

* feat: implement new parameter for histogramQuantile

* chore: update interpreter_test.go
  • Loading branch information
Christopher M. Wolff authored Feb 23, 2023
1 parent 0cbec3e commit 726bf06
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 25 deletions.
4 changes: 2 additions & 2 deletions interpreter/interpreter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,8 @@ func TestStack(t *testing.T) {
FunctionName: "window",
Location: ast.SourceLocation{
File: "universe/universe.flux",
Start: ast.Position{Line: 3895, Column: 12},
End: ast.Position{Line: 3895, Column: 51},
Start: ast.Position{Line: 3904, Column: 12},
End: ast.Position{Line: 3904, Column: 51},
Source: `window(every: inf, timeColumn: timeDst)`,
},
},
Expand Down
8 changes: 4 additions & 4 deletions libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ var sourceHashes = map[string]string{
"stdlib/experimental/polyline/polyline.flux": "09f8d405349236de713fef7639d5523b107401bbe349789924f2283296205b9b",
"stdlib/experimental/polyline/polyline_test.flux": "16473dce4f71dcdbe1e3f90b350ab1e56a513679cb5c3f872e0f2d7678d42d1f",
"stdlib/experimental/preview_test.flux": "cca570d25b17ed201a0ecc7ebf9e547ccff2aa0814a3ac49f12faa938cbdaf73",
"stdlib/experimental/prometheus/prometheus.flux": "e0b3df509c8f522edee1081e5e3907ce9628f783d33b47c2e2d8ffb766f3f948",
"stdlib/experimental/prometheus/prometheus_histogramQuantile_test.flux": "37bda52e9440820c3bb278b4e9d331c0330a68f405a7623b95e15a50ee176753",
"stdlib/experimental/prometheus/prometheus.flux": "dc01322d3c0655661a8c2279c69acf50d02791a670fb0d681947af6726bc2f4d",
"stdlib/experimental/prometheus/prometheus_histogramQuantile_test.flux": "15da11b9c9d43cbc1c579de7064d7f3870b1c77b610ce3c567e8eb14f40ee090",
"stdlib/experimental/quantile_test.flux": "e3cf2ee9716c1179139d0a388c24b24f1c25ca329b27bebaeb53b7e1b608ead2",
"stdlib/experimental/query/from.flux": "713b7feb6904d64cf4cbd235396ff6ff20e1eb96578190c0ac3be4f37df7c362",
"stdlib/experimental/record/record.flux": "273eebb2ee5cb9b153940ca0f42ed5b97b3d86de07d91c96a75508682d84feae",
Expand Down Expand Up @@ -492,7 +492,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/highestAverage_test.flux": "65744d16b7c5d8ac2f4e3c47621195de1840ad72b12bfbb692d4f645e71a00a8",
"stdlib/universe/highestCurrent_test.flux": "c285ff40a7d8789d2c20bcf90d8063b710499ddc24621d627f6693c30351ebe5",
"stdlib/universe/highestMax_test.flux": "fff9f21422d2607afb9ff2786d67d173c7cd797418eb7b3521f8cf7e49443b88",
"stdlib/universe/histogram_quantile_test.flux": "7dcd4549a44f6a2889a445ae8c1adbb07a99747358b1d400d39b68d5c4233551",
"stdlib/universe/histogram_quantile_test.flux": "3ce3d5e6ce27fd8bb2e84da031faca2dafef2566737842e534fbd667ffa8a4f6",
"stdlib/universe/histogram_test.flux": "e9e9775f80ac7c2a76044e6e0e8a89045d736c6ab618e8de6d7d1ebe9848938e",
"stdlib/universe/holt_winters_panic_test.flux": "204eb8044d634e5350a364eac466eb51e7f549e4ac7f454de7b244ba272b248f",
"stdlib/universe/holt_winters_test.flux": "9bc8441527867b6c075d003034a3def1748766df478ba8b434e2a2297ead0ec0",
Expand Down Expand Up @@ -609,7 +609,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/union_heterogeneous_test.flux": "7d8b47b3e96b859a5fed5985c051e2a3fdc947d3d6ff9cc104e40821581fb0cb",
"stdlib/universe/union_test.flux": "f008260d48db70212ce64d3f51f4cf031532a9a67c1ba43242dbc4d43ef31293",
"stdlib/universe/unique_test.flux": "c108ab7b0e4b0b77f0a320c8c4dacb8cfbccae8b389425754e9583e69cd64ee3",
"stdlib/universe/universe.flux": "060426aa8c8caf89187489f3bbd077cdba9efa389d76a09d3346636b4ba35e61",
"stdlib/universe/universe.flux": "8f3b92accba41d660339bcd6e2f83e7a0d650cab637630e320ba87b5cc2f2335",
"stdlib/universe/universe_truncateTimeColumn_test.flux": "8acb700c612e9eba87c0525b33fd1f0528e6139cc912ed844932caef25d37b56",
"stdlib/universe/window_aggregate_test.flux": "c8f66f7ee188bb2e979e5a8b526057b653922197ae441658f7c7f11251c96576",
"stdlib/universe/window_default_start_align_test.flux": "0aaf612796fbb5ac421579151ad32a8861f4494a314ea615d0ccedd18067b980",
Expand Down
22 changes: 15 additions & 7 deletions stdlib/experimental/prometheus/prometheus.flux
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ builtin scrape : (url: string) => stream[A] where A: Record
// Available versions are `1` and `2`.
// Default is `2`.
// - tables: Input data. Default is piped-forward data (`<-`).
// - onNonmonotonic: Describes behavior when counts are not monotonically increasing
// when sorted by upper bound. Default is `error`.
//
// **Supported values**:
// - **error**: Produce an error.
// - **force**: Force bin counts to be monotonic by adding to each bin such that it
// is equal to the next smaller bin.
// - **drop**: When a nonmonotonic table is encountered, produce no output.
//
// ## Examples
//
Expand All @@ -72,31 +80,31 @@ builtin scrape : (url: string) => stream[A] where A: Record
// ## Metadata
// tags: transformations,aggregates,prometheus
//
histogramQuantile = (tables=<-, quantile, metricVersion=2) => {
_version2 = () =>
histogramQuantile = (tables=<-, quantile, metricVersion=2, onNonmonotonic="error") => {
_version2 = (onNonmonotonic) =>
tables
|> group(mode: "except", columns: ["le", "_value"])
|> map(fn: (r) => ({r with le: float(v: r.le)}))
|> universe.histogramQuantile(quantile: quantile)
|> universe.histogramQuantile(quantile: quantile, onNonmonotonic: onNonmonotonic)
|> group(mode: "except", columns: ["le", "_value", "_time"])
|> set(key: "quantile", value: string(v: quantile))
|> experimental.group(columns: ["quantile"], mode: "extend")

_version1 = () =>
_version1 = (onNonmonotonic) =>
tables
|> filter(fn: (r) => r._field != "sum" and r._field != "count")
|> map(fn: (r) => ({r with le: float(v: r._field)}))
|> group(mode: "except", columns: ["_field", "le", "_value"])
|> universe.histogramQuantile(quantile: quantile)
|> universe.histogramQuantile(quantile: quantile, onNonmonotonic: onNonmonotonic)
|> group(mode: "except", columns: ["le", "_value", "_time"])
|> set(key: "quantile", value: string(v: quantile))
|> experimental.group(columns: ["quantile"], mode: "extend")

output =
if metricVersion == 2 then
_version2()
_version2(onNonmonotonic)
else if metricVersion == 1 then
_version1()
_version1(onNonmonotonic)
else
universe.die(msg: "Invalid metricVersion. Available versions are 1 and 2.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,86 @@ testcase prometheus_histogramQuantile_v1 {

testing.diff(got: got, want: want)
}

testcase prometheus_histogramQuantile_onNonmonotonicForce {
inData =
"#group,false,false,true,true,false,false,true,true
#datatype,string,long,string,string,dateTime:RFC3339,double,string,string
#default,_result,,,,,,,
,result,table,_field,_measurement,_time,_value,le,org
,,0,qc_all_duration_seconds,prometheus,2021-10-08T00:00:00.412729Z,0,0.001,0001
,,2,qc_all_duration_seconds,prometheus,2021-10-08T00:00:00.412729Z,1,0.005,0001
,,3,qc_all_duration_seconds,prometheus,2021-10-08T00:00:00.412729Z,1,0.025,0001
,,4,qc_all_duration_seconds,prometheus,2021-10-08T00:00:00.412729Z,3,0.125,0001
,,4,qc_all_duration_seconds,prometheus,2021-10-08T00:00:00.412729Z,2,0.625,0001
,,5,qc_all_duration_seconds,prometheus,2021-10-08T00:00:00.412729Z,7,3.125,0001
,,6,qc_all_duration_seconds,prometheus,2021-10-08T00:00:00.412729Z,10,15.625,0001
,,7,qc_all_duration_seconds,prometheus,2021-10-08T00:00:00.412729Z,10,+Inf,0001
"
outData =
"#group,false,false,true,true,true,true,false,true,false,true
#datatype,string,long,string,string,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,double,string
#default,_result,,,,,,,,,
,result,table,_field,_measurement,_start,_stop,_time,org,_value,quantile
,,0,qc_all_duration_seconds,prometheus,2021-10-08T00:00:00Z,2021-10-08T00:01:00Z,2021-10-08T00:00:00.412729Z,0001,15.208333333333336,0.99
"
want = csv.from(csv: outData)
got =
csv.from(csv: inData)
|> range(start: 2021-10-08T00:00:00Z, stop: 2021-10-08T00:01:00Z)
|> prometheus.histogramQuantile(
quantile: 0.99,
metricVersion: 2,
onNonmonotonic: "force",
)

testing.diff(got: got, want: want)
}

testcase prometheus_histogramQuantile_onNonmonotonicDrop {
// Data for org 0002 is not monotonic
inData =
"#group,false,false,true,true,false,false,true
#datatype,string,long,string,string,dateTime:RFC3339,double,string
#default,_result,,,,,,
,result,table,_field,_measurement,_time,_value,org
,,0,+Inf,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,10,0001
,,1,+Inf,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,1405,0002
,,2,0.001,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,0,0001
,,3,0.001,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,1,0002
,,4,0.005,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,0,0001
,,5,0.005,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,1,0002
,,6,0.025,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,0,0001
,,7,0.025,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,84,0002
,,8,0.125,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,0,0001
,,9,0.125,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,83,0002
,,10,0.625,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,0,0001
,,11,0.625,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,1373,0002
,,12,15.625,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,10,0001
,,13,15.625,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,1405,0002
,,14,3.125,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,0,0001
,,15,3.125,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,1401,0002
,,16,count,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,10,0001
,,17,count,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,1405,0002
,,18,sum,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,45.746700925,0001
,,19,sum,qc_all_duration_seconds,2021-10-08T00:00:01.866064Z,178.4667627259998,0002
"
outData =
"#group,false,false,true,true,true,false,true,false,true
#datatype,string,long,string,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,double,string
#default,_result,,,,,,,,
,result,table,_measurement,_start,_stop,_time,org,_value,quantile
,,0,qc_all_duration_seconds,2021-10-08T00:00:00Z,2021-10-08T00:01:00Z,2021-10-08T00:00:01.866064Z,0001,15.5,0.99
"
want = csv.from(csv: outData)
got =
csv.from(csv: inData)
|> range(start: 2021-10-08T00:00:00Z, stop: 2021-10-08T00:01:00Z)
|> prometheus.histogramQuantile(
quantile: 0.99,
metricVersion: 1,
onNonmonotonic: "drop",
)

testing.diff(got: got, want: want)
}
58 changes: 46 additions & 12 deletions stdlib/universe/histogram_quantile.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,23 @@ import (
"github.com/influxdata/flux/runtime"
)

const HistogramQuantileKind = "histogramQuantile"
const (
HistogramQuantileKind = "histogramQuantile"

const DefaultUpperBoundColumnLabel = "le"
DefaultUpperBoundColumnLabel = "le"

onNonmonotonicError = "error"
onNonmonotonicDrop = "drop"
onNonmonotonicForce = "force"
)

type HistogramQuantileOpSpec struct {
Quantile float64 `json:"quantile"`
CountColumn string `json:"countColumn"`
UpperBoundColumn string `json:"upperBoundColumn"`
ValueColumn string `json:"valueColumn"`
MinValue float64 `json:"minValue"`
OnNonmonotonic string `json:"onNonmonotonic"`
}

func init() {
Expand Down Expand Up @@ -73,6 +80,18 @@ func CreateHistogramQuantileOpSpec(args flux.Arguments, a *flux.Administration)
s.MinValue = min
}

if onNonmonotonic, ok, err := args.GetString("onNonmonotonic"); err != nil {
return nil, err
} else if ok {
s.OnNonmonotonic = onNonmonotonic
} else {
s.OnNonmonotonic = onNonmonotonicError
}

if s.OnNonmonotonic != onNonmonotonicError && s.OnNonmonotonic != onNonmonotonicForce && s.OnNonmonotonic != onNonmonotonicDrop {
return nil, errors.Newf(codes.Invalid, "value provided to histogramQuantile parameter onNonmonotonic is invalid; must be one of %q, %q or %q", onNonmonotonicError, onNonmonotonicForce, onNonmonotonicDrop)
}

return s, nil
}

Expand All @@ -87,6 +106,7 @@ type HistogramQuantileProcedureSpec struct {
UpperBoundColumn string `json:"upperBoundColumn"`
ValueColumn string `json:"valueColumn"`
MinValue float64 `json:"minValue"`
OnNonmonotonic string `json:"onNonmonotonic"`
}

func newHistogramQuantileProcedure(qs flux.OperationSpec, a plan.Administration) (plan.ProcedureSpec, error) {
Expand All @@ -100,6 +120,7 @@ func newHistogramQuantileProcedure(qs flux.OperationSpec, a plan.Administration)
UpperBoundColumn: spec.UpperBoundColumn,
ValueColumn: spec.ValueColumn,
MinValue: spec.MinValue,
OnNonmonotonic: spec.OnNonmonotonic,
}, nil
}

Expand Down Expand Up @@ -230,10 +251,13 @@ func (t histogramQuantileTransformation) Process(id execute.DatasetID, tbl flux.
})
}

q, err := t.computeQuantile(cdf)
q, ok, err := t.computeQuantile(cdf)
if err != nil {
return err
}
if !ok {
return nil
}
if err := execute.AppendKeyValues(tbl.Key(), builder); err != nil {
return err
}
Expand All @@ -243,9 +267,9 @@ func (t histogramQuantileTransformation) Process(id execute.DatasetID, tbl flux.
return nil
}

func (t *histogramQuantileTransformation) computeQuantile(cdf []bucket) (float64, error) {
func (t *histogramQuantileTransformation) computeQuantile(cdf []bucket) (float64, bool, error) {
if len(cdf) == 0 {
return 0, errors.New(codes.FailedPrecondition, "histogram is empty")
return 0, false, errors.New(codes.FailedPrecondition, "histogram is empty")
}
// Find rank index and check counts are monotonic
prevCount := 0.0
Expand All @@ -254,9 +278,19 @@ func (t *histogramQuantileTransformation) computeQuantile(cdf []bucket) (float64
rankIdx := -1
for i, b := range cdf {
if b.count < prevCount {
return 0, errors.New(codes.FailedPrecondition, "histogram records counts are not monotonic")
switch t.spec.OnNonmonotonic {
case onNonmonotonicError:
return 0, false, errors.New(codes.FailedPrecondition, "histogram records counts are not monotonic")
case onNonmonotonicForce:
b.count = prevCount
case onNonmonotonicDrop:
return 0, false, nil
default:
return 0, false, errors.Newf(codes.Internal, "unknown value for onNonmonotonic: %q", t.spec.OnNonmonotonic)
}
} else {
prevCount = b.count
}
prevCount = b.count

if rank >= b.count {
rankIdx = i
Expand All @@ -277,7 +311,7 @@ func (t *histogramQuantileTransformation) computeQuantile(cdf []bucket) (float64
upperBound = cdf[0].upperBound
case len(cdf) - 1:
// Quantile is above the highest upper bound, simply return it as it must be finite
return cdf[len(cdf)-1].upperBound, nil
return cdf[len(cdf)-1].upperBound, true, nil
default:
lowerCount = cdf[rankIdx].count
lowerBound = cdf[rankIdx].upperBound
Expand All @@ -286,19 +320,19 @@ func (t *histogramQuantileTransformation) computeQuantile(cdf []bucket) (float64
}
if rank == lowerCount {
// No need to interpolate
return lowerBound, nil
return lowerBound, true, nil
}
if math.IsInf(lowerBound, -1) {
// We cannot interpolate with infinity
return upperBound, nil
return upperBound, true, nil
}
if math.IsInf(upperBound, 1) {
// We cannot interpolate with infinity
return lowerBound, nil
return lowerBound, true, nil
}
// Compute quantile using linear interpolation
scale := (rank - lowerCount) / (upperCount - lowerCount)
return lowerBound + (upperBound-lowerBound)*scale, nil
return lowerBound + (upperBound-lowerBound)*scale, true, nil
}

func (t histogramQuantileTransformation) UpdateWatermark(id execute.DatasetID, mark execute.Time) error {
Expand Down
Loading

0 comments on commit 726bf06

Please sign in to comment.