Skip to content

Commit

Permalink
Implement Aggregation of Increment and Time measurements
Browse files Browse the repository at this point in the history
  • Loading branch information
cypressious committed Apr 27, 2018
1 parent 7dc6860 commit c1289d4
Show file tree
Hide file tree
Showing 10 changed files with 492 additions and 6 deletions.
5 changes: 5 additions & 0 deletions src/InfluxDB.Collector/CollectorConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class CollectorConfiguration
readonly PipelinedCollectorTagConfiguration _tag;
readonly PipelinedCollectorEmitConfiguration _emitter;
readonly PipelinedCollectorBatchConfiguration _batcher;
readonly PipelinedCollectorAggregateConfiguration _aggregator;

public CollectorConfiguration()
: this(null)
Expand All @@ -22,6 +23,7 @@ internal CollectorConfiguration(IPointEmitter parent = null)
_tag = new PipelinedCollectorTagConfiguration(this);
_emitter = new PipelinedCollectorEmitConfiguration(this);
_batcher = new PipelinedCollectorBatchConfiguration(this);
_aggregator = new PipelinedCollectorAggregateConfiguration(this);
}

public CollectorTagConfiguration Tag => _tag;
Expand All @@ -30,13 +32,16 @@ internal CollectorConfiguration(IPointEmitter parent = null)

public CollectorBatchConfiguration Batch => _batcher;

public CollectorAggregateConfiguration Aggregate => _aggregator;

public MetricsCollector CreateCollector()
{
Action disposeEmitter;
Action disposeBatcher;

var emitter = _parent;
emitter = _emitter.CreateEmitter(emitter, out disposeEmitter);
emitter = _aggregator.CreateEmitter(emitter, out disposeEmitter);
emitter = _batcher.CreateEmitter(emitter, out disposeBatcher);

return new PipelinedMetricsCollector(emitter, _tag.CreateEnricher(), () =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;

namespace InfluxDB.Collector.Configuration
{
public abstract class CollectorAggregateConfiguration
{
public abstract CollectorConfiguration AtInterval(TimeSpan interval);

public abstract CollectorConfiguration SumIncrements();

public abstract CollectorConfiguration AggregateTimes(Func<IEnumerable<long>, double> func);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System;
using System.Collections.Generic;
using InfluxDB.Collector.Pipeline;
using InfluxDB.Collector.Pipeline.Aggregate;

namespace InfluxDB.Collector.Configuration
{
class PipelinedCollectorAggregateConfiguration : CollectorAggregateConfiguration
{
private readonly CollectorConfiguration _configuration;

bool _sumIncrements;
Func<IEnumerable<long>, double> _timeAggregation;
TimeSpan? _interval;

public PipelinedCollectorAggregateConfiguration(CollectorConfiguration configuration)
{
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
_configuration = configuration;
}

public override CollectorConfiguration AtInterval(TimeSpan interval)
{
_interval = interval;
return _configuration;
}

public override CollectorConfiguration SumIncrements()
{
_sumIncrements = true;
return _configuration;
}

public override CollectorConfiguration AggregateTimes(Func<IEnumerable<long>, double> func)
{
_timeAggregation = func;
return _configuration;
}

public IPointEmitter CreateEmitter(IPointEmitter parent, out Action dispose)
{
if (_interval == null)
{
dispose = null;
return parent;
}

var aggregator = new AggregatePointEmitter(_interval.Value, _sumIncrements, _timeAggregation, parent);
dispose = aggregator.Dispose;
return aggregator;
}
}
}
10 changes: 5 additions & 5 deletions src/InfluxDB.Collector/MetricsCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ public abstract class MetricsCollector : IPointEmitter, IDisposable

public void Increment(string measurement, long count = 1, IReadOnlyDictionary<string, string> tags = null)
{
Write(measurement, new Dictionary<string, object> { { "count", count } }, tags);
Write(measurement, new Dictionary<string, object> { { "count", count } }, tags, kind: MeasurementKind.Increment);
}

public void Measure(string measurement, object value, IReadOnlyDictionary<string, string> tags = null)
public void Measure(string measurement, object value, IReadOnlyDictionary<string, string> tags = null, MeasurementKind kind = MeasurementKind.Other)
{
Write(measurement, new Dictionary<string, object> { { "value", value } }, tags);
Write(measurement, new Dictionary<string, object> { { "value", value } }, tags, kind: kind);
}

public IDisposable Time(string measurement, IReadOnlyDictionary<string, string> tags = null)
Expand All @@ -36,11 +36,11 @@ public void Dispose()

protected virtual void Dispose(bool disposing) { }

public void Write(string measurement, IReadOnlyDictionary<string, object> fields, IReadOnlyDictionary<string, string> tags = null, DateTime? timestamp = null)
public void Write(string measurement, IReadOnlyDictionary<string, object> fields, IReadOnlyDictionary<string, string> tags = null, DateTime? timestamp = null, MeasurementKind kind = MeasurementKind.Other)
{
try
{
var point = new PointData(measurement, fields, tags, timestamp ?? _timestampSource.GetUtcNow());
var point = new PointData(measurement, fields, tags, timestamp ?? _timestampSource.GetUtcNow(), kind);
Emit(new[] { point });
}
catch (Exception ex)
Expand Down
92 changes: 92 additions & 0 deletions src/InfluxDB.Collector/Pipeline/Aggregate/AggregateGroupingKey.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
using System;
using System.Collections.Generic;

namespace InfluxDB.Collector.Pipeline.Aggregate
{
struct GroupingKey : IEquatable<GroupingKey>
{
private static readonly Dictionary<string, string> EmptyDict = new Dictionary<string, string>();

public long Bucket { get; }

public MeasurementKind Kind { get; }

public string Measurement { get; }

public Dictionary<string, string> Tags { get; }

public GroupingKey(long bucket, MeasurementKind kind, string measurement, Dictionary<string, string> tags)
{
Bucket = bucket;
Kind = kind;
Measurement = measurement;
Tags = tags ?? EmptyDict;
}

public bool Equals(GroupingKey other)
{
return Bucket == other.Bucket && Kind == other.Kind && Measurement == other.Measurement && DictionaryEquals(Tags, other.Tags);
}

public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj))
{
return false;
}

return obj is GroupingKey key && Equals(key);
}

public override int GetHashCode()
{
unchecked
{
int hashCode = Bucket.GetHashCode();
hashCode = (hashCode * 397) ^ (int) Kind;
hashCode = (hashCode * 397) ^ Measurement.GetHashCode();
hashCode = (hashCode * 397) ^ TagsHashCode();
return hashCode;
}
}

int TagsHashCode()
{
unchecked
{
int hashCode = 1;
foreach (var kvp in Tags)
{
hashCode *= (kvp.Key.GetHashCode() * 397) ^ kvp.Key.GetHashCode();
}

return hashCode;
}
}

static bool DictionaryEquals(Dictionary<string, string> dict, Dictionary<string, string> dict2)
{
if (dict.Count != dict2.Count)
{
return false;
}

foreach (var kvp in dict)
{
if (dict2.TryGetValue(kvp.Key, out string value))
{
if (value != kvp.Value)
{
return false;
}
}
else
{
return false;
}
}

return true;
}
}
}
103 changes: 103 additions & 0 deletions src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// ==========================================================================
// AggregatePointEmitter.cs
// Bus Portal (busliniensuche.de)
// ==========================================================================
// All rights reserved.
// ==========================================================================

using System;
using System.Collections.Generic;
using System.Linq;

namespace InfluxDB.Collector.Pipeline.Aggregate
{
class AggregatePointEmitter : IPointEmitter, IDisposable
{
readonly TimeSpan _timeSpan;
readonly bool _sumIncrements;
readonly Func<IEnumerable<long>, double> _timesAggregation;
readonly IPointEmitter _parent;

public AggregatePointEmitter(TimeSpan timeSpan, bool sumIncrements, Func<IEnumerable<long>, double> timesAggregation, IPointEmitter parent)
{
_timeSpan = timeSpan;
_sumIncrements = sumIncrements;
_timesAggregation = timesAggregation;
_parent = parent;
}

public void Emit(PointData[] points)
{
var grouped = points.GroupBy(x => new GroupingKey(
x.UtcTimestamp.HasValue ? x.UtcTimestamp.Value.Ticks / _timeSpan.Ticks : 0,
DetermineKind(x),
x.Measurement,
x.Tags
));

var aggregated = grouped.SelectMany(Aggregate).ToArray();

_parent.Emit(aggregated);
}

IEnumerable<PointData> Aggregate(IGrouping<GroupingKey, PointData> group)
{
GroupingKey key = group.Key;
MeasurementKind kind = key.Kind;

if (kind == MeasurementKind.Increment && _sumIncrements)
{
long sum = group.Sum(x => (long) x.Fields["count"]);
return new[]
{
new PointData(
key.Measurement,
new Dictionary<string, object> { { "count", sum } },
key.Tags,
AverageTime(key),
key.Kind)
};
}

if (kind == MeasurementKind.Time && _timesAggregation != null)
{
long ticks = (long) _timesAggregation(group.Select(x => ((TimeSpan) x.Fields["value"]).Ticks));
return new[]
{
new PointData(
key.Measurement,
new Dictionary<string, object> { { "value", new TimeSpan(ticks) } },
key.Tags,
AverageTime(key),
key.Kind)
};
}

return group;
}

private DateTime AverageTime(GroupingKey key)
{
return new DateTime(key.Bucket + _timeSpan.Ticks / 2, DateTimeKind.Utc);
}

static MeasurementKind DetermineKind(PointData x)
{
if (x.Fields.Count != 1) return MeasurementKind.Other;

switch (x.Kind)
{
case MeasurementKind.Increment when x.Fields.TryGetValue("count", out var count) && count is long:
return MeasurementKind.Increment;
case MeasurementKind.Time when x.Fields.TryGetValue("value", out var value) && value is TimeSpan:
return MeasurementKind.Time;
default:
return MeasurementKind.Other;
}
}

public void Dispose()
{
}
}
}
7 changes: 7 additions & 0 deletions src/InfluxDB.Collector/Pipeline/MeasurementKind.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace InfluxDB.Collector.Pipeline
{
public enum MeasurementKind
{
Other = 0, Increment, Time
}
}
11 changes: 11 additions & 0 deletions src/InfluxDB.Collector/Pipeline/PointData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class PointData
public Dictionary<string, object> Fields { get; }
public Dictionary<string, string> Tags { get; set; }
public DateTime? UtcTimestamp { get; }
public MeasurementKind Kind { get; }

public PointData(
string measurement,
Expand All @@ -25,5 +26,15 @@ public PointData(
Tags = tags.ToDictionary(kv => kv.Key, kv => kv.Value);
UtcTimestamp = utcTimestamp;
}

public PointData(
string measurement,
IReadOnlyDictionary<string, object> fields,
IReadOnlyDictionary<string, string> tags,
DateTime utcTimestamp,
MeasurementKind kind) : this(measurement, fields, tags, utcTimestamp)
{
Kind = kind;
}
}
}
3 changes: 2 additions & 1 deletion src/InfluxDB.Collector/StopwatchTimer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using InfluxDB.Collector.Pipeline;

namespace InfluxDB.Collector
{
Expand All @@ -22,7 +23,7 @@ public StopwatchTimer(MetricsCollector collector, string measurement, IReadOnlyD
public void Dispose()
{
_stopwatch.Stop();
_collector.Measure(_measurement, _stopwatch.Elapsed, _tags);
_collector.Measure(_measurement, _stopwatch.Elapsed, _tags, MeasurementKind.Time);
}
}
}
Loading

0 comments on commit c1289d4

Please sign in to comment.