This UDF aggregates incoming points by groups or by batches (depending on input data type).
Files in this directory represent AggregateUDF usage.
This UDF can be used with stream input edge only.
@streamAggregateUDF()
.aggregate('last(idle) as idle.last')
.aggregate('mean(idle) as idle.mean')
.aggregate('last(interrupt) as interrupt.last')
.aggregate('mean(interrupt) as interrupt.mean')
.aggregate('last(nice) as nice.last')
.aggregate('mean(nice) as nice.mean')
.aggregate('last(softirq) as softirq.last')
.aggregate('mean(softirq) as softirq.mean')
.aggregate('last(steal) as steal.last')
.aggregate('mean(steal) as steal.mean')
.aggregate('last(system) as system.last')
.aggregate('mean(system) as system.mean')
.aggregate('last(user) as user.last')
.aggregate('mean(user) as user.mean')
.aggregate('last(wait) as wait.last')
.aggregate('mean(wait) as wait.mean')
.timeAggregateRule('last')
.emitTimeout(10s)
.tolerance(1s)
aggregate
-- defines one aggregation using syntax:<aggregateFunction>(<fieldName>) as <resultFieldName>
. Currently available aggregate functions are:first
last
min
max
mean
timeAggregateRule
-- defines aggregate rule for result timestamp. It takes one of available aggregate functions as an argumentemitTimeout
-– UDF accumulates several points before processing.emitTimeout
property defines timeout between two sequential processing momentstolerance
(optional) -- defines time interval between the first and the last points in aggregating batch. Defaulttolerance
value is0s
which means that points aggregated together have equal timestamps.
This UDF can be used with batch input edge only. Its properties do not contain
emitTimeout
and tolerance
properties as this UDF aggregate every batch it
receives separately.
@batchAggregateUDF()
.aggregate('last(idle) as idle.last')
.aggregate('mean(idle) as idle.mean')
.aggregate('last(interrupt) as interrupt.last')
.aggregate('mean(interrupt) as interrupt.mean')
.aggregate('last(nice) as nice.last')
.aggregate('mean(nice) as nice.mean')
.aggregate('last(softirq) as softirq.last')
.aggregate('mean(softirq) as softirq.mean')
.aggregate('last(steal) as steal.last')
.aggregate('mean(steal) as steal.mean')
.aggregate('last(system) as system.last')
.aggregate('mean(system) as system.mean')
.aggregate('last(user) as user.last')
.aggregate('mean(user) as user.mean')
.aggregate('last(wait) as wait.last')
.aggregate('mean(wait) as wait.mean')
.timeAggregateRule('last')
Just run the following in the current directory:
$ docker-compose up
Now you can use kapacitor
commands right from your host system. For example,
you can run cpu.tick and watch the results:
$ kapacitor define cpu_task -tick cpu.tick
$ kapacitor enable cpu_task
$ kapacitor watch cpu_task
After that you will able to see points coming from UDF.
To remove intermediate build container, call:
$ docker image prune --filter "label=stage=builder" --filter "label=project=aggregate_udf_example" --force
This example uses number of Docker containers:
- kapacitor-udf -- container with UDF. Uses Docker image built from Dockerfile.
- collectd -- metrics producer. Sends them to InfluxDB. In theory, can be replaced.
- indluxdb -- InfluxDB container. Parses Graphite data format coming from collectd. Provides metrics for Kapacitor.
- kapacitor -- Kapacitor container. Subscribes to InfluxDB to receive metrics, processes them with TICK scripts, communicates with UDF.