Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AggregatorStore to use concurentdictionary #2339

Merged

Conversation

cijothomas
Copy link
Member

@cijothomas cijothomas commented Sep 11, 2021

In this (#2295) PR, the explicit "lock" on Collect cycle was removed, so the lock() in hot path was not competing with collect().

But the lock() in hot path is still affecting scalability. This PR is moving to use ConcurrentDictionary, which does not use locks for Read. Once all unique dimension combinations are reported once, then we'll be doing Reads only, so this should avoid the lock in hot path. (after the initial period)

Also, this is purely internal implementation detail, so we can keep improving this. (I have other small optimizations coming up for this hotpath later)

Sharing numbers from my local machine using the stress test (benchmark tool doesn't create multiple threads, so its not giving any difference)

Dictionary with lock (Current)
Runtime | Throughput
net462 | 1.455 M/sec
netcoreapp3.1 |1.40 M/sec
net5.0 | 1.53 M/sec

ConcurrentDictionary (this PR)
Runtime | Throughput
net462 | 2.42 M/sec
netcoreapp3.1 | 2.5 M/sec
net5.0 | 3.0 M/sec

TODO for next:
Explore Mikel's suggestion of using Hashtable instead of ConcurrentDictionary. It'll need a single lock. Now we use our own lock + ConcurrentDictionary's internal write lock.

@cijothomas cijothomas requested a review from a team September 11, 2021 00:49
@@ -29,8 +30,8 @@ internal class AggregatorStore
private readonly object lockKeyValue2MetricAggs = new object();

// Two-Level lookup. TagKeys x [ TagValues x Metrics ]
private readonly Dictionary<string[], Dictionary<object[], int>> keyValue2MetricAggs =
new Dictionary<string[], Dictionary<object[], int>>(new StringArrayEqualityComparer());
private readonly ConcurrentDictionary<string[], ConcurrentDictionary<object[], int>> keyValue2MetricAggs =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cijothomas I think if we switch the inner dictionary to Hashtable we can eliminate a couple of the challenges below while preserving the perf. Hashtable has the unique thread-safety gurantee that it is safe for many readers and only needs a lock for writing. Similar to ConcurrentDictionary but you can self-manage the lock. Why this is good is below between TryGet & TryAdd we do a bunch of logic where multiple threads can race. Hashtable takes care of that issue because we'll manage the lock. I think it simplifies the code and we don't have to worry about gaps in the array, or a losing thread having to spin wait. Something like this...

        // Two-Level lookup. TagKeys x [ TagValues x Metrics ]
        private readonly ConcurrentDictionary<string[], Hashtable> keyValue2MetricAggs =
            new ConcurrentDictionary<string[], Hashtable>(new StringArrayEqualityComparer());

        internal int FindMetricAggregators(ReadOnlySpan<KeyValuePair<string, object>> tags)
        {
            int len = tags.Length;
            if (len == 0)
            {
                if (this.metrics[0].StartTime == default)
                {
                    var dt = DateTimeOffset.UtcNow;
                    this.metrics[0] = new MetricPoint(this.aggType, dt, null, null);
                }

                return 0;
            }

            var storage = ThreadStaticStorage.GetStorage();

            storage.SplitToKeysAndValues(tags, out var tagKey, out var tagValue);

            if (len > 1)
            {
                Array.Sort<string, object>(tagKey, tagValue);
            }

            string[] seqKey = null;

            // GetOrAdd by TagKey at 1st Level of 2-level dictionary structure.
            // Get back a Dictionary of [ Values x Metrics[] ].
            if (!this.keyValue2MetricAggs.TryGetValue(tagKey, out var value2metrics))
            {
                // Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage.
                seqKey = new string[len];
                tagKey.CopyTo(seqKey, 0);

                value2metrics = new Hashtable(new ObjectArrayEqualityComparer()); // Need to implement non-generic IEqualityComparer for this to work
                if (!this.keyValue2MetricAggs.TryAdd(seqKey, value2metrics))
                {
                    // Some other thread added value2metrics, take it.
                    // The one created by this thread is wasted.
                    this.keyValue2MetricAggs.TryGetValue(tagKey, out value2metrics);
                }
            }

            // GetOrAdd by TagValue at 2st Level of 2-level dictionary structure.
            // Get back Metrics[].
            if (!(value2metrics[tagValue] is int aggregatorIndex))
            {
                lock (value2metrics)
                {
                    aggregatorIndex = (value2metrics[tagValue] as int?) ?? -1;
                    if (aggregatorIndex == -1)
                    {
                        this.metricPointIndex++;
                        aggregatorIndex = this.metricPointIndex;
                        if (aggregatorIndex >= MaxMetricPoints)
                        {
                            // sorry! out of data points.
                             return -1;
                        }

                        // Note: We are using storage from ThreadStatic, so need to make a deep copy for Dictionary storage.

                        if (seqKey == null)
                        {
                            seqKey = new string[len];
                            tagKey.CopyTo(seqKey, 0);
                        }

                        var seqVal = new object[len];
                        tagValue.CopyTo(seqVal, 0);

                        value2metrics[tagValue] = aggregatorIndex;

                        ref var metricPoint = ref this.metrics[aggregatorIndex];
                        if (metricPoint.StartTime == default)
                        {
                            var dt = DateTimeOffset.UtcNow;
                            metricPoint = new MetricPoint(this.aggType, dt, seqKey, seqVal);
                        }
                    }
                }
            }

            return aggregatorIndex;
        }

A benchmark showing Dictionary vs ConcurrentDictionary vs Hashtable...

Method Mean Error StdDev Median Gen 0 Allocated
Dictionary 445.03 us 19.546 us 57.632 us 424.03 us 0.9766 11 KB
ConcurrentDictionary 56.91 us 1.115 us 1.735 us 56.19 us 0.5493 5 KB
Hashtable 56.35 us 0.713 us 0.667 us 56.01 us 0.5493 5 KB
Benchmark code
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;

namespace Benchmarks
{
    [MemoryDiagnoser]
    public class DictionaryBenchmarks
    {
        private readonly string keys = string.Empty;
        private readonly object values = new object();

        private readonly Dictionary<string, Dictionary<object, int>> dictionary = new Dictionary<string, Dictionary<object, int>>();
        private readonly ConcurrentDictionary<string, ConcurrentDictionary<object, int>> concurrentDictionary = new ConcurrentDictionary<string, ConcurrentDictionary<object, int>>();
        private readonly Hashtable hashtable = new Hashtable();

        [Benchmark]
        public int Dictionary()
        {
            int count = 0;

            Parallel.For(0, 1000, (i) =>
            {
                Dictionary<object, int> inner;
                lock (this.dictionary)
                {
                    if (!this.dictionary.TryGetValue(this.keys, out inner))
                    {
                        inner = new Dictionary<object, int>();
                        this.dictionary[this.keys] = inner;
                    }
                }

                int value;
                lock (inner)
                {
                    if (!inner.TryGetValue(this.values, out value))
                    {
                        value = 0;
                        inner[this.values] = value;
                    }
                }

                Interlocked.Add(ref count, value);
            });

            return count;
        }

        [Benchmark]
        public int ConcurrentDictionary()
        {
            int count = 0;

            Parallel.For(0, 1000, (i) =>
            {
                if (!this.concurrentDictionary.TryGetValue(this.keys, out ConcurrentDictionary<object, int> inner))
                {
                    inner = new ConcurrentDictionary<object, int>();
                    this.concurrentDictionary[this.keys] = inner;
                }

                if (!inner.TryGetValue(this.values, out var value))
                {
                    value = 0;
                    inner[this.values] = value;
                }

                Interlocked.Add(ref count, value);
            });

            return count;
        }

        [Benchmark]
        public int Hashtable()
        {
            int count = 0;

            Parallel.For(0, 1000, (i) =>
            {
                if (!(this.hashtable[this.keys] is Hashtable inner))
                {
                    lock (this.hashtable)
                    {
                        inner = this.hashtable[this.keys] as Hashtable;
                        if (inner == null)
                        {
                            inner = new Hashtable();
                            this.hashtable[this.keys] = inner;
                        }
                    }
                }

                if (!(inner[this.values] is int value))
                {
                    lock (inner)
                    {
                        if (!(inner[this.values] is int innerValue))
                        {
                            value = 0;
                            inner[this.values] = value;
                        }
                        else
                        {
                            value = innerValue;
                        }
                    }
                }

                Interlocked.Add(ref count, value);
            });

            return count;
        }
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nice thing about Hashtable/Hashset is that concurrent reads don't need lock.
I'm not sure if Hashtable has the same perf problem as Hashset, might need to confirm the .NET Framework perf.

#708 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, here are those benchmarks from above run on net462, netcoreapp3.1, & net5.0:

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1237 (21H1/May2021Update)
Intel Core i9-9880H CPU 2.30GHz, 1 CPU, 16 logical and 8 physical cores
[Host] : .NET Framework 4.8 (4.8.4400.0), X64 RyuJIT
DefaultJob : .NET Framework 4.8 (4.8.4400.0), X64 RyuJIT

Method Mean Error StdDev Gen 0 Allocated
Dictionary 418.63 us 8.223 us 17.876 us - 3 KB
ConcurrentDictionary 70.61 us 1.275 us 1.193 us 0.3662 3 KB
Hashtable 74.55 us 1.427 us 1.698 us 0.3662 3 KB

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1237 (21H1/May2021Update)
Intel Core i9-9880H CPU 2.30GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=6.0.100-rc.1.21458.32
[Host] : .NET Core 3.1.19 (CoreCLR 4.700.21.41101, CoreFX 4.700.21.41603), X64 RyuJIT
DefaultJob : .NET Core 3.1.19 (CoreCLR 4.700.21.41101, CoreFX 4.700.21.41603), X64 RyuJIT

Method Mean Error StdDev Gen 0 Allocated
Dictionary 498.57 us 23.672 us 69.796 us 0.9766 10 KB
ConcurrentDictionary 65.73 us 1.155 us 1.375 us 0.4883 5 KB
Hashtable 63.17 us 1.242 us 1.525 us 0.4883 5 KB

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1237 (21H1/May2021Update)
Intel Core i9-9880H CPU 2.30GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=6.0.100-rc.1.21458.32
[Host] : .NET 5.0.10 (5.0.1021.41214), X64 RyuJIT
DefaultJob : .NET 5.0.10 (5.0.1021.41214), X64 RyuJIT

Method Mean Error StdDev Gen 0 Allocated
Dictionary 485.20 us 20.901 us 61.627 us 0.9766 10 KB
ConcurrentDictionary 61.81 us 1.224 us 1.457 us 0.4883 5 KB
Hashtable 62.29 us 1.236 us 1.323 us 0.4883 5 KB

Very interesting that on .NET Core the allocations have gone way up!

@codecov
Copy link

codecov bot commented Sep 17, 2021

Codecov Report

Merging #2339 (e3e36c5) into main (c76f961) will increase coverage by 0.10%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2339      +/-   ##
==========================================
+ Coverage   80.89%   81.00%   +0.10%     
==========================================
  Files         229      229              
  Lines        7323     7333      +10     
==========================================
+ Hits         5924     5940      +16     
+ Misses       1399     1393       -6     
Impacted Files Coverage Δ
src/OpenTelemetry/Metrics/AggregatorStore.cs 98.68% <100.00%> (+3.22%) ⬆️
...Telemetry/Internal/SelfDiagnosticsEventListener.cs 96.85% <0.00%> (-0.79%) ⬇️
...ter.ZPages/Implementation/ZPagesActivityTracker.cs 100.00% <0.00%> (+2.85%) ⬆️
...ZPages/Implementation/ZPagesExporterEventSource.cs 62.50% <0.00%> (+6.25%) ⬆️
...emetry.Api/Internal/OpenTelemetryApiEventSource.cs 82.35% <0.00%> (+8.82%) ⬆️

Copy link
Member

@reyang reyang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are still room for improvements but I don't think there is any blocker to this PR.

{
if (!this.zeroTagMetricPointInitialized)
{
var dt = DateTimeOffset.UtcNow;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to put these logics in the lock or not? Need to find a good balance.
Think about:

  1. if we put the logic inside the lock, it'll have the run-once guarantee, but the contention is becoming bigger so there is high chance for the lock to be escalated from a fast CPU spin to a real OS synchronization object.
  2. if we put the logic before the lock, we might run the code more than once (e.g. getting UtcNow multiple times), but it won't cause other side effects besides burning some extra CPU cycles.
  3. if we put the logic after the lock, we might be able to use some local variable to indicate if we need to run it or not, so we have the exact run-once guarantee, however it might introduce stale reads to other threads depending on the design between readers and writers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points @reyang. TBH, I'm not sure. I feel like this is OK because we will only hit the contention on cold start with multiple 0-length tags racing?

Some other kind of crazy perf-theory to consider...

Let's say everything under if (!this.zeroTagMetricPointInitialized) is uncommon path. We should move it into its own method and add MethodImplOptions.NoInlining on it. The reason is we ideally want our hot-path code to fit into what the CPU can load into its instruction cache. Something like that I see this happening in dotnet/runtime a lot 😅

Copy link
Member

@reyang reyang Sep 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, just want to point it out so whatever decision we made here will be a conscious decision (with real data to back the theory) if it is on a critical code path, rather than saying "I don't know, never thought about that before".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the ideas. Tracking it under #2374, so that these comments are not lost.

@cijothomas cijothomas merged commit 36e8915 into open-telemetry:main Sep 17, 2021
@cijothomas cijothomas deleted the cijothomas/aggstorewithconcurrentdict branch September 17, 2021 18:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants