Skip to content
This repository has been archived by the owner on Jul 18, 2023. It is now read-only.

Implement Aggregation of Increment and Time measurements #55

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/InfluxDB.Collector/CollectorConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class CollectorConfiguration
readonly PipelinedCollectorTagConfiguration _tag;
readonly PipelinedCollectorEmitConfiguration _emitter;
readonly PipelinedCollectorBatchConfiguration _batcher;
readonly PipelinedCollectorAggregateConfiguration _aggregator;

public CollectorConfiguration()
: this(null)
Expand All @@ -22,6 +23,7 @@ internal CollectorConfiguration(IPointEmitter parent = null)
_tag = new PipelinedCollectorTagConfiguration(this);
_emitter = new PipelinedCollectorEmitConfiguration(this);
_batcher = new PipelinedCollectorBatchConfiguration(this);
_aggregator = new PipelinedCollectorAggregateConfiguration(this);
}

public CollectorTagConfiguration Tag => _tag;
Expand All @@ -30,6 +32,8 @@ internal CollectorConfiguration(IPointEmitter parent = null)

public CollectorBatchConfiguration Batch => _batcher;

public CollectorAggregateConfiguration Aggregate => _aggregator;

public MetricsCollector CreateCollector()
{
Action disposeEmitter;
Expand All @@ -38,6 +42,7 @@ public MetricsCollector CreateCollector()
var emitter = _parent;
emitter = _emitter.CreateEmitter(emitter, out disposeEmitter);
emitter = _batcher.CreateEmitter(emitter, out disposeBatcher);
emitter = _aggregator.CreateEmitter(emitter, out disposeEmitter);

return new PipelinedMetricsCollector(emitter, _tag.CreateEnricher(), () =>
{
Expand All @@ -46,4 +51,4 @@ public MetricsCollector CreateCollector()
});
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;

namespace InfluxDB.Collector.Configuration
{
public abstract class CollectorAggregateConfiguration
{
public abstract CollectorConfiguration AtInterval(TimeSpan interval);

public abstract CollectorConfiguration SumIncrements();

public abstract CollectorConfiguration AggregateTimes(Func<IEnumerable<long>, double> func);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System;
using System.Collections.Generic;
using InfluxDB.Collector.Pipeline;
using InfluxDB.Collector.Pipeline.Aggregate;

namespace InfluxDB.Collector.Configuration
{
class PipelinedCollectorAggregateConfiguration : CollectorAggregateConfiguration
{
private readonly CollectorConfiguration _configuration;

bool _sumIncrements;
Func<IEnumerable<long>, double> _timeAggregation;
TimeSpan? _interval;

public PipelinedCollectorAggregateConfiguration(CollectorConfiguration configuration)
{
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
_configuration = configuration;
}

public override CollectorConfiguration AtInterval(TimeSpan interval)
{
_interval = interval;
return _configuration;
}

public override CollectorConfiguration SumIncrements()
{
_sumIncrements = true;
return _configuration;
}

public override CollectorConfiguration AggregateTimes(Func<IEnumerable<long>, double> func)
{
_timeAggregation = func;
return _configuration;
}

public IPointEmitter CreateEmitter(IPointEmitter parent, out Action dispose)
{
if (_interval == null)
{
dispose = null;
return parent;
}

var aggregator = new AggregatePointEmitter(_interval.Value, _sumIncrements, _timeAggregation, parent);
dispose = aggregator.Dispose;
return aggregator;
}
}
}
92 changes: 92 additions & 0 deletions src/InfluxDB.Collector/Pipeline/Aggregate/AggregateGroupingKey.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
using System;
using System.Collections.Generic;

namespace InfluxDB.Collector.Pipeline.Aggregate
{
struct GroupingKey : IEquatable<GroupingKey>
{
private static readonly Dictionary<string, string> EmptyDict = new Dictionary<string, string>();

public long Bucket { get; }

public MeasurementKind Kind { get; }

public string Measurement { get; }

public Dictionary<string, string> Tags { get; }

public GroupingKey(long bucket, MeasurementKind kind, string measurement, Dictionary<string, string> tags)
{
Bucket = bucket;
Kind = kind;
Measurement = measurement;
Tags = tags ?? EmptyDict;
}

public bool Equals(GroupingKey other)
{
return Bucket == other.Bucket && Kind == other.Kind && Measurement == other.Measurement && DictionaryEquals(Tags, other.Tags);
}

public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj))
{
return false;
}

return obj is GroupingKey key && Equals(key);
}

public override int GetHashCode()
{
unchecked
{
int hashCode = Bucket.GetHashCode();
hashCode = (hashCode * 397) ^ (int) Kind;
hashCode = (hashCode * 397) ^ Measurement.GetHashCode();
hashCode = (hashCode * 397) ^ TagsHashCode();
return hashCode;
}
}

int TagsHashCode()
{
unchecked
{
int hashCode = 1;
foreach (var kvp in Tags)
{
hashCode *= (kvp.Key.GetHashCode() * 397) ^ kvp.Key.GetHashCode();
}

return hashCode;
}
}

static bool DictionaryEquals(Dictionary<string, string> dict, Dictionary<string, string> dict2)
{
if (dict.Count != dict2.Count)
{
return false;
}

foreach (var kvp in dict)
{
if (dict2.TryGetValue(kvp.Key, out string value))
{
if (value != kvp.Value)
{
return false;
}
}
else
{
return false;
}
}

return true;
}
}
}
117 changes: 117 additions & 0 deletions src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using System;
using System.Collections.Generic;
using System.Linq;
using InfluxDB.Collector.Pipeline.Common;

namespace InfluxDB.Collector.Pipeline.Aggregate
{
class AggregatePointEmitter : IntervalEmitterBase
{
readonly bool _sumIncrements;
readonly Func<IEnumerable<long>, double> _timesAggregation;
readonly IPointEmitter _parent;

public AggregatePointEmitter(TimeSpan timeSpan, bool sumIncrements, Func<IEnumerable<long>, double> timesAggregation, IPointEmitter parent)
: base(timeSpan)
{
_sumIncrements = sumIncrements;
_timesAggregation = timesAggregation;
_parent = parent;
}

protected override void HandleBatch(IReadOnlyCollection<PointData> batch)
{
DateTime now = DateTime.UtcNow;
DateTime bucketThreshold = now - _interval;

var grouped = batch.GroupBy(x => new GroupingKey(
DetermineBucket(x.UtcTimestamp, bucketThreshold, now),
DetermineKind(x),
x.Measurement,
x.Tags
));

var aggregated = grouped.SelectMany(Aggregate).ToArray();

_parent.Emit(aggregated);
}

private long DetermineBucket(DateTime? timestamp, DateTime bucketThreshold, DateTime now)
{
if (!timestamp.HasValue)
{
return 0;
}

DateTime value = timestamp.Value;

if (value >= bucketThreshold && value <= now)
{
// point was in timer interval
return bucketThreshold.Ticks;
}
else
{
// point was before or after timer interval, round it to multiple of interval
return (value.Ticks / _interval.Ticks) * _interval.Ticks;
}
}

static MeasurementKind DetermineKind(PointData x)
{
if (x.Fields.Count != 1) return MeasurementKind.Other;

if (x.Fields.TryGetValue("count", out var count) && count is long)
{
return MeasurementKind.Increment;
}
else if (x.Fields.TryGetValue("value", out var value) && value is TimeSpan)
{
return MeasurementKind.Time;
}
else
{
return MeasurementKind.Other;
}
}

IEnumerable<PointData> Aggregate(IGrouping<GroupingKey, PointData> group)
{
GroupingKey key = group.Key;
MeasurementKind kind = key.Kind;

if (kind == MeasurementKind.Increment && _sumIncrements)
{
long sum = group.Sum(x => (long) x.Fields["count"]);
return new[]
{
new PointData(
key.Measurement,
new Dictionary<string, object> { { "count", sum } },
key.Tags,
AverageTime(key))
};
}

if (kind == MeasurementKind.Time && _timesAggregation != null)
{
long ticks = (long) _timesAggregation(group.Select(x => ((TimeSpan) x.Fields["value"]).Ticks));
return new[]
{
new PointData(
key.Measurement,
new Dictionary<string, object> { { "value", new TimeSpan(ticks) } },
key.Tags,
AverageTime(key))
};
}

return group;
}

private DateTime AverageTime(GroupingKey key)
{
return new DateTime(key.Bucket + _interval.Ticks / 2, DateTimeKind.Utc);
}
}
}
7 changes: 7 additions & 0 deletions src/InfluxDB.Collector/Pipeline/Aggregate/MeasurementKind.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace InfluxDB.Collector.Pipeline.Aggregate
{
public enum MeasurementKind
{
Other = 0, Increment, Time
}
}
Loading