diff --git a/src/InfluxDB.Collector/CollectorConfiguration.cs b/src/InfluxDB.Collector/CollectorConfiguration.cs index d507229..5e86553 100644 --- a/src/InfluxDB.Collector/CollectorConfiguration.cs +++ b/src/InfluxDB.Collector/CollectorConfiguration.cs @@ -10,6 +10,7 @@ public class CollectorConfiguration readonly PipelinedCollectorTagConfiguration _tag; readonly PipelinedCollectorEmitConfiguration _emitter; readonly PipelinedCollectorBatchConfiguration _batcher; + readonly PipelinedCollectorAggregateConfiguration _aggregator; public CollectorConfiguration() : this(null) @@ -22,6 +23,7 @@ internal CollectorConfiguration(IPointEmitter parent = null) _tag = new PipelinedCollectorTagConfiguration(this); _emitter = new PipelinedCollectorEmitConfiguration(this); _batcher = new PipelinedCollectorBatchConfiguration(this); + _aggregator = new PipelinedCollectorAggregateConfiguration(this); } public CollectorTagConfiguration Tag => _tag; @@ -30,6 +32,8 @@ internal CollectorConfiguration(IPointEmitter parent = null) public CollectorBatchConfiguration Batch => _batcher; + public CollectorAggregateConfiguration Aggregate => _aggregator; + public MetricsCollector CreateCollector() { Action disposeEmitter; @@ -38,6 +42,7 @@ public MetricsCollector CreateCollector() var emitter = _parent; emitter = _emitter.CreateEmitter(emitter, out disposeEmitter); emitter = _batcher.CreateEmitter(emitter, out disposeBatcher); + emitter = _aggregator.CreateEmitter(emitter, out disposeEmitter); return new PipelinedMetricsCollector(emitter, _tag.CreateEnricher(), () => { @@ -46,4 +51,4 @@ public MetricsCollector CreateCollector() }); } } -} +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Configuration/CollectorAggregateConfiguration.cs b/src/InfluxDB.Collector/Configuration/CollectorAggregateConfiguration.cs new file mode 100644 index 0000000..9b19717 --- /dev/null +++ b/src/InfluxDB.Collector/Configuration/CollectorAggregateConfiguration.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; + +namespace InfluxDB.Collector.Configuration +{ + public abstract class CollectorAggregateConfiguration + { + public abstract CollectorConfiguration AtInterval(TimeSpan interval); + + public abstract CollectorConfiguration SumIncrements(); + + public abstract CollectorConfiguration AggregateTimes(Func, double> func); + } +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Configuration/PipelinedCollectorAggregateConfiguration.cs b/src/InfluxDB.Collector/Configuration/PipelinedCollectorAggregateConfiguration.cs new file mode 100644 index 0000000..4f927a6 --- /dev/null +++ b/src/InfluxDB.Collector/Configuration/PipelinedCollectorAggregateConfiguration.cs @@ -0,0 +1,53 @@ +using System; +using System.Collections.Generic; +using InfluxDB.Collector.Pipeline; +using InfluxDB.Collector.Pipeline.Aggregate; + +namespace InfluxDB.Collector.Configuration +{ + class PipelinedCollectorAggregateConfiguration : CollectorAggregateConfiguration + { + private readonly CollectorConfiguration _configuration; + + bool _sumIncrements; + Func, double> _timeAggregation; + TimeSpan? _interval; + + public PipelinedCollectorAggregateConfiguration(CollectorConfiguration configuration) + { + if (configuration == null) throw new ArgumentNullException(nameof(configuration)); + _configuration = configuration; + } + + public override CollectorConfiguration AtInterval(TimeSpan interval) + { + _interval = interval; + return _configuration; + } + + public override CollectorConfiguration SumIncrements() + { + _sumIncrements = true; + return _configuration; + } + + public override CollectorConfiguration AggregateTimes(Func, double> func) + { + _timeAggregation = func; + return _configuration; + } + + public IPointEmitter CreateEmitter(IPointEmitter parent, out Action dispose) + { + if (_interval == null) + { + dispose = null; + return parent; + } + + var aggregator = new AggregatePointEmitter(_interval.Value, _sumIncrements, _timeAggregation, parent); + dispose = aggregator.Dispose; + return aggregator; + } + } +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/Aggregate/AggregateGroupingKey.cs b/src/InfluxDB.Collector/Pipeline/Aggregate/AggregateGroupingKey.cs new file mode 100644 index 0000000..59bac94 --- /dev/null +++ b/src/InfluxDB.Collector/Pipeline/Aggregate/AggregateGroupingKey.cs @@ -0,0 +1,92 @@ +using System; +using System.Collections.Generic; + +namespace InfluxDB.Collector.Pipeline.Aggregate +{ + struct GroupingKey : IEquatable + { + private static readonly Dictionary EmptyDict = new Dictionary(); + + public long Bucket { get; } + + public MeasurementKind Kind { get; } + + public string Measurement { get; } + + public Dictionary Tags { get; } + + public GroupingKey(long bucket, MeasurementKind kind, string measurement, Dictionary tags) + { + Bucket = bucket; + Kind = kind; + Measurement = measurement; + Tags = tags ?? EmptyDict; + } + + public bool Equals(GroupingKey other) + { + return Bucket == other.Bucket && Kind == other.Kind && Measurement == other.Measurement && DictionaryEquals(Tags, other.Tags); + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) + { + return false; + } + + return obj is GroupingKey key && Equals(key); + } + + public override int GetHashCode() + { + unchecked + { + int hashCode = Bucket.GetHashCode(); + hashCode = (hashCode * 397) ^ (int) Kind; + hashCode = (hashCode * 397) ^ Measurement.GetHashCode(); + hashCode = (hashCode * 397) ^ TagsHashCode(); + return hashCode; + } + } + + int TagsHashCode() + { + unchecked + { + int hashCode = 1; + foreach (var kvp in Tags) + { + hashCode *= (kvp.Key.GetHashCode() * 397) ^ kvp.Key.GetHashCode(); + } + + return hashCode; + } + } + + static bool DictionaryEquals(Dictionary dict, Dictionary dict2) + { + if (dict.Count != dict2.Count) + { + return false; + } + + foreach (var kvp in dict) + { + if (dict2.TryGetValue(kvp.Key, out string value)) + { + if (value != kvp.Value) + { + return false; + } + } + else + { + return false; + } + } + + return true; + } + } +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs b/src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs new file mode 100644 index 0000000..7f6c9c1 --- /dev/null +++ b/src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs @@ -0,0 +1,117 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using InfluxDB.Collector.Pipeline.Common; + +namespace InfluxDB.Collector.Pipeline.Aggregate +{ + class AggregatePointEmitter : IntervalEmitterBase + { + readonly bool _sumIncrements; + readonly Func, double> _timesAggregation; + readonly IPointEmitter _parent; + + public AggregatePointEmitter(TimeSpan timeSpan, bool sumIncrements, Func, double> timesAggregation, IPointEmitter parent) + : base(timeSpan) + { + _sumIncrements = sumIncrements; + _timesAggregation = timesAggregation; + _parent = parent; + } + + protected override void HandleBatch(IReadOnlyCollection batch) + { + DateTime now = DateTime.UtcNow; + DateTime bucketThreshold = now - _interval; + + var grouped = batch.GroupBy(x => new GroupingKey( + DetermineBucket(x.UtcTimestamp, bucketThreshold, now), + DetermineKind(x), + x.Measurement, + x.Tags + )); + + var aggregated = grouped.SelectMany(Aggregate).ToArray(); + + _parent.Emit(aggregated); + } + + private long DetermineBucket(DateTime? timestamp, DateTime bucketThreshold, DateTime now) + { + if (!timestamp.HasValue) + { + return 0; + } + + DateTime value = timestamp.Value; + + if (value >= bucketThreshold && value <= now) + { + // point was in timer interval + return bucketThreshold.Ticks; + } + else + { + // point was before or after timer interval, round it to multiple of interval + return (value.Ticks / _interval.Ticks) * _interval.Ticks; + } + } + + static MeasurementKind DetermineKind(PointData x) + { + if (x.Fields.Count != 1) return MeasurementKind.Other; + + if (x.Fields.TryGetValue("count", out var count) && count is long) + { + return MeasurementKind.Increment; + } + else if (x.Fields.TryGetValue("value", out var value) && value is TimeSpan) + { + return MeasurementKind.Time; + } + else + { + return MeasurementKind.Other; + } + } + + IEnumerable Aggregate(IGrouping group) + { + GroupingKey key = group.Key; + MeasurementKind kind = key.Kind; + + if (kind == MeasurementKind.Increment && _sumIncrements) + { + long sum = group.Sum(x => (long) x.Fields["count"]); + return new[] + { + new PointData( + key.Measurement, + new Dictionary { { "count", sum } }, + key.Tags, + AverageTime(key)) + }; + } + + if (kind == MeasurementKind.Time && _timesAggregation != null) + { + long ticks = (long) _timesAggregation(group.Select(x => ((TimeSpan) x.Fields["value"]).Ticks)); + return new[] + { + new PointData( + key.Measurement, + new Dictionary { { "value", new TimeSpan(ticks) } }, + key.Tags, + AverageTime(key)) + }; + } + + return group; + } + + private DateTime AverageTime(GroupingKey key) + { + return new DateTime(key.Bucket + _interval.Ticks / 2, DateTimeKind.Utc); + } + } +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/Aggregate/MeasurementKind.cs b/src/InfluxDB.Collector/Pipeline/Aggregate/MeasurementKind.cs new file mode 100644 index 0000000..89e4e94 --- /dev/null +++ b/src/InfluxDB.Collector/Pipeline/Aggregate/MeasurementKind.cs @@ -0,0 +1,7 @@ +namespace InfluxDB.Collector.Pipeline.Aggregate +{ + public enum MeasurementKind + { + Other = 0, Increment, Time + } +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs index 521ca6f..4ca56a5 100644 --- a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs +++ b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs @@ -1,114 +1,36 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading.Tasks; -using InfluxDB.Collector.Diagnostics; -using InfluxDB.Collector.Platform; +using InfluxDB.Collector.Pipeline.Common; using InfluxDB.Collector.Util; namespace InfluxDB.Collector.Pipeline.Batch { - class IntervalBatcher : IPointEmitter, IDisposable + class IntervalBatcher : IntervalEmitterBase { - readonly object _queueLock = new object(); - Queue _queue = new Queue(); - - readonly TimeSpan _interval; - readonly int? _maxBatchSize; readonly IPointEmitter _parent; - readonly object _stateLock = new object(); - readonly PortableTimer _timer; - bool _unloading; - bool _started; + readonly int? _maxBatchSize; - public IntervalBatcher(TimeSpan interval, int? maxBatchSize, IPointEmitter parent) + public IntervalBatcher(TimeSpan interval, int? maxBatchSize, IPointEmitter parent) : base(interval) { - _parent = parent; - _interval = interval; _maxBatchSize = maxBatchSize; - _timer = new PortableTimer(cancel => OnTick()); + _parent = parent; } - void CloseAndFlush() + protected override void HandleBatch(IReadOnlyCollection batch) { - lock (_stateLock) + if (_maxBatchSize == null || batch.Count <= _maxBatchSize.Value) { - if (!_started || _unloading) - return; - - _unloading = true; + _parent.Emit(batch.ToArray()); } - - _timer.Dispose(); - - OnTick(); - } - - public void Dispose() - { - CloseAndFlush(); - } - - Task OnTick() - { - try + else { - Queue batch; - lock (_queueLock) + foreach (var chunk in batch.Batch(_maxBatchSize.Value)) { - if (_queue.Count == 0) - return Task.Delay(0); - - batch = _queue; - _queue = new Queue(); + _parent.Emit(chunk.ToArray()); } - - if (_maxBatchSize == null || batch.Count <= _maxBatchSize.Value) - { - _parent.Emit(batch.ToArray()); - } - else - { - foreach (var chunk in batch.Batch(_maxBatchSize.Value)) - { - _parent.Emit(chunk.ToArray()); - } - } - } - catch (Exception ex) - { - CollectorLog.ReportError("Failed to emit metrics batch", ex); - } - finally - { - lock (_stateLock) - { - if (!_unloading) - _timer.Start(_interval); - } - } - - return Task.Delay(0); - } - - public void Emit(PointData[] points) - { - lock (_stateLock) - { - if (_unloading) return; - if (!_started) - { - _started = true; - _timer.Start(TimeSpan.Zero); - } - } - - lock (_queueLock) - { - foreach(var point in points) - _queue.Enqueue(point); } } } -} +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/Common/IntervalEmitterBase.cs b/src/InfluxDB.Collector/Pipeline/Common/IntervalEmitterBase.cs new file mode 100644 index 0000000..b5a2e5b --- /dev/null +++ b/src/InfluxDB.Collector/Pipeline/Common/IntervalEmitterBase.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using InfluxDB.Collector.Diagnostics; +using InfluxDB.Collector.Platform; + +namespace InfluxDB.Collector.Pipeline.Common +{ + internal abstract class IntervalEmitterBase : IPointEmitter, IDisposable + { + readonly object _queueLock = new object(); + Queue _queue = new Queue(); + + protected readonly TimeSpan _interval; + + readonly object _stateLock = new object(); + readonly PortableTimer _timer; + bool _unloading; + bool _started; + + protected IntervalEmitterBase(TimeSpan interval) + { + _interval = interval; + _timer = new PortableTimer(cancel => OnTick()); + } + + private void CloseAndFlush() + { + lock (_stateLock) + { + if (!_started || _unloading) + return; + + _unloading = true; + } + + _timer.Dispose(); + + OnTick(); + } + + public void Dispose() + { + CloseAndFlush(); + } + + protected Task OnTick() + { + try + { + Queue batch; + lock (_queueLock) + { + if (_queue.Count == 0) + return Task.Delay(0); + + batch = _queue; + _queue = new Queue(); + } + + HandleBatch(batch); + } + catch (Exception ex) + { + CollectorLog.ReportError("Failed to emit metrics batch", ex); + } + finally + { + lock (_stateLock) + { + if (!_unloading) + _timer.Start(_interval); + } + } + + return Task.Delay(0); + } + + public void Emit(PointData[] points) + { + lock (_stateLock) + { + if (_unloading) return; + if (!_started) + { + _started = true; + _timer.Start(TimeSpan.Zero); + } + } + + lock (_queueLock) + { + foreach (var point in points) + _queue.Enqueue(point); + } + } + + protected abstract void HandleBatch(IReadOnlyCollection batch); + } +} \ No newline at end of file diff --git a/test/InfluxDB.LineProtocol.Tests/Collector/AggregationTests.cs b/test/InfluxDB.LineProtocol.Tests/Collector/AggregationTests.cs new file mode 100644 index 0000000..ab7fc12 --- /dev/null +++ b/test/InfluxDB.LineProtocol.Tests/Collector/AggregationTests.cs @@ -0,0 +1,254 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using InfluxDB.Collector; +using InfluxDB.Collector.Pipeline; +using Xunit; + +namespace InfluxDB.LineProtocol.Tests.Collector +{ + public class AggregationTests + { + [Fact] + public async Task PointsAreCorrectlyGrouped() + { + var written = new TaskCompletionSource(); + var list = new List(); + + var start = DateTime.UtcNow; + + var collector = new CollectorConfiguration() + .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) + .Aggregate.SumIncrements() + .WriteTo.Emitter(pts => + { + list.AddRange(pts); + written.SetResult(0); + }) + .CreateCollector(); + + collector.Write("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + start + ); + collector.Write("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + start + TimeSpan.FromMilliseconds(200) + ); + collector.Write("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + start + TimeSpan.FromMilliseconds(400) + ); + + await written.Task; + + Assert.Equal(1, list.Count); + Assert.Equal(3L, list[0].Fields["count"]); + } + + [Fact] + public async Task IncrementsCanBeSummed() + { + var written = new TaskCompletionSource(); + var list = new List(); + + IPointEmitter collector = new CollectorConfiguration() + .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) + .Aggregate.SumIncrements() + .WriteTo.Emitter(pts => + { + list.AddRange(pts); + written.SetResult(0); + }) + .CreateCollector(); + + collector.Emit(new[] + { + new PointData("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 0)), + + new PointData("foo", + new Dictionary { { "count", 2L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 200)), + + new PointData("foo", + new Dictionary { { "count", 3L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 300)) + }); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.Equal(1, list.Count); + Assert.Equal(6L, list[0].Fields["count"]); + Assert.InRange(list[0].UtcTimestamp.Value.TimeOfDay, TimeSpan.FromMilliseconds(200), TimeSpan.FromMilliseconds(300)); + } + + [Fact] + public async Task TimesCanBeAveraged() + { + var written = new TaskCompletionSource(); + var list = new List(); + + IPointEmitter collector = new CollectorConfiguration() + .Aggregate.AtInterval(TimeSpan.FromMilliseconds(400)) + .Aggregate.AggregateTimes(Enumerable.Average) + .WriteTo.Emitter(pts => + { + list.AddRange(pts); + written.SetResult(0); + }) + .CreateCollector(); + + collector.Emit(new[] + { + new PointData("foo", + new Dictionary { { "value", TimeSpan.FromSeconds(1) } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 0)), + + new PointData("foo", + new Dictionary { { "value", TimeSpan.FromSeconds(2) } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 200)), + + new PointData("foo", + new Dictionary { { "value", TimeSpan.FromSeconds(3) } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 300)) + }); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.Equal(1, list.Count); + Assert.Equal(TimeSpan.FromSeconds(2), (TimeSpan) list[0].Fields["value"]); + Assert.InRange(list[0].UtcTimestamp.Value.TimeOfDay, TimeSpan.FromMilliseconds(200), TimeSpan.FromMilliseconds(300)); + } + + [Fact] + public async Task DifferentTagsArentAggregated() + { + var written = new TaskCompletionSource(); + var list = new List(); + + IPointEmitter collector = new CollectorConfiguration() + .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) + .Aggregate.SumIncrements() + .WriteTo.Emitter(pts => + { + list.AddRange(pts); + written.SetResult(0); + }) + .CreateCollector(); + + collector.Emit(new[] + { + new PointData("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 0)), + + new PointData("foo", + new Dictionary { { "count", 2L } }, + new Dictionary { { "tag1", "b" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 200)), + + new PointData("foo", + new Dictionary { { "count", 3L } }, + new Dictionary { { "tag1", "c" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 300)) + }); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.Equal(3, list.Count); + } + + [Fact] + public async Task DifferentMeasurementsArentAggregated() + { + var written = new TaskCompletionSource(); + var list = new List(); + + IPointEmitter collector = new CollectorConfiguration() + .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) + .Aggregate.SumIncrements() + .WriteTo.Emitter(pts => + { + list.AddRange(pts); + written.SetResult(0); + }) + .CreateCollector(); + + collector.Emit(new[] + { + new PointData("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 0)), + + new PointData("bar", + new Dictionary { { "count", 2L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 200)), + + new PointData("baz", + new Dictionary { { "count", 3L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 300)) + }); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.Equal(3, list.Count); + } + + [Fact] + public async Task DifferentTimeSpansArentAggregated() + { + var written = new TaskCompletionSource(); + var list = new List(); + + IPointEmitter collector = new CollectorConfiguration() + .Aggregate.AtInterval(TimeSpan.FromMilliseconds(500)) + .Aggregate.SumIncrements() + .WriteTo.Emitter(pts => + { + list.AddRange(pts); + written.SetResult(0); + }) + .CreateCollector(); + + collector.Emit(new[] + { + new PointData("foo", + new Dictionary { { "count", 1L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 0)), + + new PointData("foo", + new Dictionary { { "count", 2L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 700)), + + new PointData("foo", + new Dictionary { { "count", 3L } }, + new Dictionary { { "tag1", "a" } }, + new DateTime(2018, 1, 1, 0, 0, 0, 800)) + }); + + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.Equal(2, list.Count); + Assert.True(list.Any(x => (long) x.Fields["count"] == 1)); + Assert.True(list.Any(x => (long) x.Fields["count"] == 5)); + } + } +} \ No newline at end of file