Skip to content

Commit

Permalink
Metric AggregatorStore optimizations for sorting tags (#2805)
Browse files Browse the repository at this point in the history
  • Loading branch information
utpilla authored Feb 2, 2022
1 parent 840b24e commit c1c5436
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 196 deletions.
4 changes: 4 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
thread.
([2844](https://github.com/open-telemetry/opentelemetry-dotnet/issues/2844))

* Performance improvement: when emitting metrics, users are strongly advised to
provide tags with same Key order, to achieve maximum performance.
([2805](https://github.com/open-telemetry/opentelemetry-dotnet/pull/2805/files))

## 1.2.0-rc1

Released 2021-Nov-29
Expand Down
158 changes: 99 additions & 59 deletions src/OpenTelemetry/Metrics/AggregatorStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ namespace OpenTelemetry.Metrics
{
internal sealed class AggregatorStore
{
private static readonly ObjectArrayEqualityComparer ObjectArrayComparer = new ObjectArrayEqualityComparer();
private readonly object lockZeroTags = new object();
private readonly HashSet<string> tagKeysInteresting;
private readonly int tagsKeysInterestingCount;

// Two-Level lookup. TagKeys x [ TagValues x Metrics ]
private readonly ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>> keyValue2MetricAggs =
new ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>>(new StringArrayEqualityComparer());
private readonly ConcurrentDictionary<Tags, int> tagsToMetricPointIndexDictionary =
new ConcurrentDictionary<Tags, int>();

private readonly AggregationTemporality temporality;
private readonly string name;
Expand Down Expand Up @@ -178,44 +176,37 @@ private void InitializeZeroTagPointIfNotInitialized()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int length)
{
int aggregatorIndex;
string[] seqKey = null;
var givenTags = new Tags(tagKeys, tagValues);

// GetOrAdd by TagKeys at 1st Level of 2-level dictionary structure.
// Get back a Dictionary of [ Values x Metrics[] ].
if (!this.keyValue2MetricAggs.TryGetValue(tagKeys, out var value2metrics))
if (!this.tagsToMetricPointIndexDictionary.TryGetValue(givenTags, out var aggregatorIndex))
{
// Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage.
seqKey = new string[length];
tagKeys.CopyTo(seqKey, 0);

value2metrics = new ConcurrentDictionary<object[], int>(ObjectArrayComparer);
if (!this.keyValue2MetricAggs.TryAdd(seqKey, value2metrics))
if (length > 1)
{
this.keyValue2MetricAggs.TryGetValue(seqKey, out value2metrics);
}
}
// Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage.
// Create a new array for the sorted Tag keys.
var sortedTagKeys = new string[length];
tagKeys.CopyTo(sortedTagKeys, 0);

// GetOrAdd by TagValues at 2st Level of 2-level dictionary structure.
// Get back Metrics[].
if (!value2metrics.TryGetValue(tagValues, out aggregatorIndex))
{
aggregatorIndex = this.metricPointIndex;
if (aggregatorIndex >= this.maxMetricPoints)
{
// sorry! out of data points.
// TODO: Once we support cleanup of
// unused points (typically with delta)
// we can re-claim them here.
return -1;
}
// Create a new array for the sorted Tag values.
var sortedTagValues = new object[length];
tagValues.CopyTo(sortedTagValues, 0);

lock (value2metrics)
{
// check again after acquiring lock.
if (!value2metrics.TryGetValue(tagValues, out aggregatorIndex))
Array.Sort(sortedTagKeys, sortedTagValues);

var sortedTags = new Tags(sortedTagKeys, sortedTagValues);

if (!this.tagsToMetricPointIndexDictionary.TryGetValue(sortedTags, out aggregatorIndex))
{
aggregatorIndex = Interlocked.Increment(ref this.metricPointIndex);
// Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage.
var givenKeys = new string[length];
tagKeys.CopyTo(givenKeys, 0);

var givenValues = new object[length];
tagValues.CopyTo(givenValues, 0);

givenTags = new Tags(givenKeys, givenValues);

aggregatorIndex = this.metricPointIndex;
if (aggregatorIndex >= this.maxMetricPoints)
{
// sorry! out of data points.
Expand All @@ -225,24 +216,83 @@ private int LookupAggregatorStore(string[] tagKeys, object[] tagValues, int leng
return -1;
}

// Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage.
if (seqKey == null)
lock (this.tagsToMetricPointIndexDictionary)
{
seqKey = new string[length];
tagKeys.CopyTo(seqKey, 0);
// check again after acquiring lock.
if (!this.tagsToMetricPointIndexDictionary.TryGetValue(sortedTags, out aggregatorIndex))
{
aggregatorIndex = ++this.metricPointIndex;
if (aggregatorIndex >= this.maxMetricPoints)
{
// sorry! out of data points.
// TODO: Once we support cleanup of
// unused points (typically with delta)
// we can re-claim them here.
return -1;
}

ref var metricPoint = ref this.metricPoints[aggregatorIndex];
var dt = DateTimeOffset.UtcNow;
metricPoint = new MetricPoint(this.aggType, dt, sortedTags.Keys, sortedTags.Values, this.histogramBounds);

// Add to dictionary *after* initializing MetricPoint
// as other threads can start writing to the
// MetricPoint, if dictionary entry found.

// Add the sorted order along with the given order of tags
this.tagsToMetricPointIndexDictionary.TryAdd(sortedTags, aggregatorIndex);
this.tagsToMetricPointIndexDictionary.TryAdd(givenTags, aggregatorIndex);
}
}
}
}
else
{
// Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage.
var givenKeys = new string[length];
var givenValues = new object[length];

var seqVal = new object[length];
tagValues.CopyTo(seqVal, 0);
tagKeys.CopyTo(givenKeys, 0);
tagValues.CopyTo(givenValues, 0);

ref var metricPoint = ref this.metricPoints[aggregatorIndex];
var dt = DateTimeOffset.UtcNow;
metricPoint = new MetricPoint(this.aggType, dt, seqKey, seqVal, this.histogramBounds);
givenTags = new Tags(givenKeys, givenValues);

// Add to dictionary *after* initializing MetricPoint
// as other threads can start writing to the
// MetricPoint, if dictionary entry found.
value2metrics.TryAdd(seqVal, aggregatorIndex);
aggregatorIndex = this.metricPointIndex;
if (aggregatorIndex >= this.maxMetricPoints)
{
// sorry! out of data points.
// TODO: Once we support cleanup of
// unused points (typically with delta)
// we can re-claim them here.
return -1;
}

lock (this.tagsToMetricPointIndexDictionary)
{
// check again after acquiring lock.
if (!this.tagsToMetricPointIndexDictionary.TryGetValue(givenTags, out aggregatorIndex))
{
aggregatorIndex = ++this.metricPointIndex;
if (aggregatorIndex >= this.maxMetricPoints)
{
// sorry! out of data points.
// TODO: Once we support cleanup of
// unused points (typically with delta)
// we can re-claim them here.
return -1;
}

ref var metricPoint = ref this.metricPoints[aggregatorIndex];
var dt = DateTimeOffset.UtcNow;
metricPoint = new MetricPoint(this.aggType, dt, givenTags.Keys, givenTags.Values, this.histogramBounds);

// Add to dictionary *after* initializing MetricPoint
// as other threads can start writing to the
// MetricPoint, if dictionary entry found.

// givenTags will always be sorted when tags length == 1
this.tagsToMetricPointIndexDictionary.TryAdd(givenTags, aggregatorIndex);
}
}
}
}
Expand Down Expand Up @@ -355,11 +405,6 @@ private int FindMetricAggregatorsDefault(ReadOnlySpan<KeyValuePair<string, objec

storage.SplitToKeysAndValues(tags, tagLength, out var tagKeys, out var tagValues);

if (tagLength > 1)
{
Array.Sort(tagKeys, tagValues);
}

return this.LookupAggregatorStore(tagKeys, tagValues, tagLength);
}

Expand Down Expand Up @@ -388,11 +433,6 @@ private int FindMetricAggregatorsCustomTag(ReadOnlySpan<KeyValuePair<string, obj
return 0;
}

if (actualLength > 1)
{
Array.Sort(tagKeys, tagValues);
}

return this.LookupAggregatorStore(tagKeys, tagValues, actualLength);
}
}
Expand Down
68 changes: 0 additions & 68 deletions src/OpenTelemetry/Metrics/ObjectArrayEqualityComparer.cs

This file was deleted.

69 changes: 0 additions & 69 deletions src/OpenTelemetry/Metrics/StringArrayEqualityComparer.cs

This file was deleted.

Loading

0 comments on commit c1c5436

Please sign in to comment.