Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit log and trace telemetry #470

Merged
merged 8 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Aspire.Dashboard/Aspire.Dashboard.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@
<PackageReference Include="Microsoft.Fast.Components.FluentUI.Icons" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="Aspire.Dashboard.Tests"/>
</ItemGroup>

</Project>
261 changes: 261 additions & 0 deletions src/Aspire.Dashboard/Otlp/Storage/CircularBuffer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections;
using System.Runtime.InteropServices;

namespace Aspire.Dashboard.Otlp.Storage;

/// <summary>
/// The circular buffer starts with an empty list and grows to a maximum size.
/// When the buffer is full, adding or inserting a new item removes the first item in the buffer.
/// </summary>
internal sealed class CircularBuffer<T> : IList<T>, ICollection<T>, IEnumerable<T>, IEnumerable
Copy link
Member

@davidfowl davidfowl Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephentoub Can/Should we use ConcurrentQueueSegment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Items are inserted with an order and can arrive out of order, so inserting at a position is required.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

{
// Internal for testing.
internal readonly List<T> _buffer;
internal int _start;
internal int _end;

public CircularBuffer(int capacity)
{
if (capacity < 1)
{
throw new ArgumentException("Circular buffer must have a capacity greater than 0.", nameof(capacity));
}

_buffer = new List<T>();
Capacity = capacity;
_start = 0;
_end = 0;
}

public int Capacity { get; }

public bool IsFull => Count == Capacity;

public bool IsEmpty => Count == 0;

public int Count => _buffer.Count;

public bool IsReadOnly { get; }

public bool IsFixedSize { get; } = true;

public object SyncRoot { get; } = new object();

public bool IsSynchronized { get; }

public int IndexOf(T item)
{
for (var index = 0; index < Count; ++index)
{
if (Equals(this[index], item))
{
return index;
}
}
return -1;
}

public void Insert(int index, T item)
{
// TODO: There are a lot of branches in this method. Look into simplifying it.
if (index == Count)
{
Add(item);
return;
}

ValidateIndexInRange(index);

if (IsFull)
{
if (index == 0)
{
// When full, the item inserted at 0 is always the "last" in the buffer and is removed.
return;
}

var internalIndex = InternalIndex(index);

var data = CollectionsMarshal.AsSpan(_buffer);

// Shift data to make remove for insert.
if (internalIndex == 0)
{
data.Slice(0, _end).CopyTo(data.Slice(1));
}
else if (internalIndex > _end)
{
// Data is shifted forward so save the last item to copy to the front.
var overflowItem = data[data.Length - 1];

var shiftLength = data.Length - internalIndex - 1;
data.Slice(internalIndex, shiftLength).CopyTo(data.Slice(internalIndex + 1));
if (shiftLength > 0 || internalIndex == _buffer.Count - 1)
{
data.Slice(0, _end).CopyTo(data.Slice(1));
}
data[0] = overflowItem;
}
else
{
data.Slice(internalIndex, data.Length - internalIndex - 1).CopyTo(data.Slice(internalIndex + 1));
}

// Set the actual item.
data[internalIndex] = item;

Increment(ref _end);
_start = _end;
}
else
{
var internalIndex = index + _start;
if (internalIndex > _buffer.Count)
{
internalIndex = internalIndex % _buffer.Count;
}

_buffer.Insert(internalIndex, item);
if (internalIndex < _end)
{
Increment(ref _end);
if (_end != _buffer.Count)
{
_start = _end;
}
}
}
}

public void RemoveAt(int index)
{
ValidateIndexInRange(index);

var internalIndex = InternalIndex(index);
_buffer.RemoveAt(internalIndex);
if (internalIndex < _end)
{
Decrement(ref _end);
_start = _end;
}
}

private void ValidateIndexInRange(int index)
{
if (index >= Count)
{
throw new InvalidOperationException($"Cannot access index {index}. Buffer size is {Count}");
}
}

public bool Remove(T item) => throw new NotImplementedException();

public T this[int index]
{
get
{
ValidateIndexInRange(index);
return _buffer[InternalIndex(index)];
}
set
{
ValidateIndexInRange(index);
_buffer[InternalIndex(index)] = value;
}
}

public void Add(T item)
{
if (IsFull)
{
_buffer[_end] = item;
Increment(ref _end);
_start = _end;
}
else
{
_buffer.Insert(_end, item);
Increment(ref _end);
if (_end != _buffer.Count)
{
_start = _end;
}
}
}

public void Clear()
{
_start = 0;
_end = 0;
_buffer.Clear();
}

public bool Contains(T item) => IndexOf(item) != -1;

public void CopyTo(T[] array, int arrayIndex)
{
if (array.Length - arrayIndex < Count)
{
throw new ArgumentException("Array does not contain enough space for items");
}

for (var index = 0; index < Count; ++index)
{
array[index + arrayIndex] = this[index];
}
}

public T[] ToArray()
{
if (IsEmpty)
{
return Array.Empty<T>();
}

var array = new T[Count];
for (var index = 0; index < Count; ++index)
{
array[index] = this[index];
}

return array;
}

public IEnumerator<T> GetEnumerator()
{
for (var i = 0; i < Count; ++i)
{
yield return this[i];
}
}

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

private int InternalIndex(int index)
{
return (_start + index) % _buffer.Count;
}

private void Increment(ref int index)
{
if (++index < Capacity)
{
return;
}

index = 0;
}

private void Decrement(ref int index)
{
if (index <= 0)
{
index = Capacity - 1;
}

--index;
}
}
31 changes: 24 additions & 7 deletions src/Aspire.Dashboard/Otlp/Storage/TelemetryRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using Aspire.Dashboard.Otlp.Model;
using Google.Protobuf.Collections;
using Microsoft.Extensions.Configuration;
Expand All @@ -16,8 +17,7 @@ namespace Aspire.Dashboard.Otlp.Storage;

public class TelemetryRepository
{
private int MaxOperationCount { get; init; }
private int MaxLogCount { get; init; }
private const int DefaultMaxTelemetryCount = 10_000;

private readonly object _lock = new();
private readonly ILogger _logger;
Expand All @@ -30,18 +30,19 @@ public class TelemetryRepository
private readonly ConcurrentDictionary<string, OtlpApplication> _applications = new();

private readonly ReaderWriterLockSlim _logsLock = new();
private readonly List<OtlpLogEntry> _logs = new();
private readonly CircularBuffer<OtlpLogEntry> _logs;
private readonly HashSet<(OtlpApplication Application, string PropertyKey)> _logPropertyKeys = new();

private readonly ReaderWriterLockSlim _tracesLock = new();
private readonly Dictionary<string, OtlpTraceScope> _traceScopes = new();
private readonly OtlpTraceCollection _traces = new();
private readonly CircularBuffer<OtlpTrace> _traces;

public TelemetryRepository(IConfiguration config, ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger(typeof(TelemetryRepository));
MaxOperationCount = config.GetValue(nameof(MaxOperationCount), 128);
MaxLogCount = config.GetValue(nameof(MaxLogCount), 4096);

_logs = new(config.GetValue("MaxLogCount", DefaultMaxTelemetryCount));
_traces = new(config.GetValue("MaxTraceCount", DefaultMaxTelemetryCount));
}

public List<OtlpApplication> GetApplications()
Expand Down Expand Up @@ -454,7 +455,7 @@ internal void AddTracesCore(AddContext context, OtlpApplication application, Rep
{
trace = lastTrace;
}
else if (!_traces.TryGetValue(span.TraceId.Memory, out trace))
else if (!TryGetTraceById(_traces, span.TraceId.Memory, out trace))
{
trace = new OtlpTrace(span.TraceId.Memory, traceScope);
newTrace = true;
Expand Down Expand Up @@ -533,6 +534,22 @@ internal void AddTracesCore(AddContext context, OtlpApplication application, Rep
{
_tracesLock.ExitWriteLock();
}

static bool TryGetTraceById(CircularBuffer<OtlpTrace> traces, ReadOnlyMemory<byte> traceId, [NotNullWhen(true)] out OtlpTrace? trace)
{
var s = traceId.Span;
for (var i = traces.Count - 1; i >= 0; i--)
{
if (traces[i].Key.Span.SequenceEqual(s))
{
trace = traces[i];
return true;
}
}

trace = null;
return false;
}
}

private static OtlpSpan CreateSpan(OtlpApplication application, Span span, OtlpTrace trace)
Expand Down
Loading