Skip to content

Commit

Permalink
Optimize allocations on emission of single point
Browse files Browse the repository at this point in the history
Fixes influxdata#60

(cherry picked from commit f9d678f)
  • Loading branch information
cypressious committed May 9, 2018
1 parent f708be2 commit 4de1a98
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 17 deletions.
17 changes: 16 additions & 1 deletion src/InfluxDB.Collector/Configuration/AggregateEmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace InfluxDB.Collector.Configuration
{
class AggregateEmitter : IPointEmitter
class AggregateEmitter : IPointEmitter, ISinglePointEmitter
{
readonly List<IPointEmitter> _emitters;

Expand All @@ -19,5 +19,20 @@ public void Emit(PointData[] points)
foreach (var emitter in _emitters)
emitter.Emit(points);
}

public void Emit(PointData point)
{
foreach (var emitter in _emitters)
{
if (emitter is ISinglePointEmitter singlePointEmitter)
{
singlePointEmitter.Emit(point);
}
else
{
emitter.Emit(new[] { point });
}
}
}
}
}
20 changes: 16 additions & 4 deletions src/InfluxDB.Collector/MetricsCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace InfluxDB.Collector
{
public abstract class MetricsCollector : IPointEmitter, IDisposable
public abstract class MetricsCollector : IPointEmitter, ISinglePointEmitter, IDisposable
{
readonly Util.ITimestampSource _timestampSource = new Util.PseudoHighResTimestampSource();

Expand Down Expand Up @@ -34,14 +34,16 @@ public void Dispose()
Dispose(true);
}

protected virtual void Dispose(bool disposing) { }
protected virtual void Dispose(bool disposing)
{
}

public void Write(string measurement, IReadOnlyDictionary<string, object> fields, IReadOnlyDictionary<string, string> tags = null, DateTime? timestamp = null)
{
try
{
var point = new PointData(measurement, fields, tags, timestamp ?? _timestampSource.GetUtcNow());
Emit(new[] { point });
Emit(point);
}
catch (Exception ex)
{
Expand All @@ -54,6 +56,16 @@ void IPointEmitter.Emit(PointData[] points)
Emit(points);
}

void ISinglePointEmitter.Emit(PointData point)
{
Emit(point);
}

protected abstract void Emit(PointData[] points);

protected virtual void Emit(PointData point)
{
Emit(new[] { point });
}
}
}
}
37 changes: 30 additions & 7 deletions src/InfluxDB.Collector/Pipeline/Common/IntervalEmitterBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace InfluxDB.Collector.Pipeline.Common
{
internal abstract class IntervalEmitterBase : IPointEmitter, IDisposable
internal abstract class IntervalEmitterBase : IPointEmitter, ISinglePointEmitter, IDisposable
{
readonly object _queueLock = new object();
Queue<PointData> _queue = new Queue<PointData>();
Expand Down Expand Up @@ -77,22 +77,45 @@ protected Task OnTick()
}

public void Emit(PointData[] points)
{
if (!CheckState())
{
return;
}

lock (_queueLock)
{
foreach (var point in points)
_queue.Enqueue(point);
}
}

public void Emit(PointData point)
{
if (!CheckState())
{
return;
}

lock (_queueLock)
{
_queue.Enqueue(point);
}
}

private bool CheckState()
{
lock (_stateLock)
{
if (_unloading) return;
if (_unloading) return false;
if (!_started)
{
_started = true;
_timer.Start(TimeSpan.Zero);
}
}

lock (_queueLock)
{
foreach (var point in points)
_queue.Enqueue(point);
}
return true;
}

protected abstract void HandleBatch(IReadOnlyCollection<PointData> batch);
Expand Down
18 changes: 16 additions & 2 deletions src/InfluxDB.Collector/Pipeline/Emit/HttpLineProtocolEmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace InfluxDB.Collector.Pipeline.Emit
{
class HttpLineProtocolEmitter : IDisposable, IPointEmitter
class HttpLineProtocolEmitter : IDisposable, IPointEmitter, ISinglePointEmitter
{
readonly ILineProtocolClient _client;

Expand All @@ -29,9 +29,23 @@ public void Emit(PointData[] points)
payload.Add(new LineProtocolPoint(point.Measurement, point.Fields, point.Tags, point.UtcTimestamp));
}

SendPayload(payload);
}

public void Emit(PointData point)
{
var payload = new LineProtocolPayload();

payload.Add(new LineProtocolPoint(point.Measurement, point.Fields, point.Tags, point.UtcTimestamp));

SendPayload(payload);
}

private void SendPayload(LineProtocolPayload payload)
{
var influxResult = _client.WriteAsync(payload).Result;
if (!influxResult.Success)
CollectorLog.ReportError(influxResult.ErrorMessage, null);
}
}
}
}
7 changes: 7 additions & 0 deletions src/InfluxDB.Collector/Pipeline/ISinglePointEmitter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace InfluxDB.Collector.Pipeline
{
interface ISinglePointEmitter
{
void Emit(PointData point);
}
}
4 changes: 4 additions & 0 deletions src/InfluxDB.Collector/Pipeline/NullMetricsCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,9 @@ class NullMetricsCollector : MetricsCollector
protected override void Emit(PointData[] points)
{
}

protected override void Emit(PointData point)
{
}
}
}
19 changes: 16 additions & 3 deletions src/InfluxDB.Collector/Pipeline/PipelinedMetricsCollector.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

using System;
using System;

namespace InfluxDB.Collector.Pipeline
{
Expand All @@ -24,10 +23,24 @@ protected override void Emit(PointData[] points)
_emitter.Emit(points);
}

protected override void Emit(PointData point)
{
_enricher.Enrich(point);

if (_emitter is ISinglePointEmitter singlePointEmitter)
{
singlePointEmitter.Emit(point);
}
else
{
_emitter.Emit(new[] { point });
}
}

protected override void Dispose(bool disposing)
{
if (disposing)
_dispose();
}
}
}
}

0 comments on commit 4de1a98

Please sign in to comment.