Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A cheaper linked cancellation token source? #40670

Closed
ghost opened this issue Aug 11, 2020 · 17 comments
Closed

A cheaper linked cancellation token source? #40670

ghost opened this issue Aug 11, 2020 · 17 comments
Labels
api-suggestion Early API idea and discussion, it is NOT ready for implementation area-System.Threading.Tasks
Milestone

Comments

@ghost
Copy link

ghost commented Aug 11, 2020

Often, we need to create linked cancellation tokens in the context of an operation. Unfortunatley, CancellationTokenSource.CreateLinkedTokenSource requires heap allocation that increases pressure on GC. Can we have a linked cancellation token source that is a value type?

This is what I often need to do:

class SomeClass : IDisposable
{
  readonly CancellationTokenSource cancellationSource = new CancellationTokenSource();

  public Dispose() => cancellationSource.Cancel();

  public async ValueTask FooAsync(CancellationToken token)
  {
    using var source = CancellationTokenSource.CreateLinkedTokenSource(
      cancellationSource.Token,
      token);

    await BarAsync(source.Token);

It would be nice if we could do something like this where LinkedCancellationTokenSource was a value type with no internal heap allocations:

class SomeClass : IDisposable
{
  readonly CancellationTokenSource cancellationSource = new CancellationTokenSource();

  public Dispose() => cancellationSource.Cancel();

  public async ValueTask FooAsync(CancellationToken token)
  {
    using var source = new LinkedCancellationTokenSource(
      cancellationSource.Token,
      token);

    await BarAsync(source.Token);

@stephentoub and @ericstj as FYI.

@ghost ghost added the api-suggestion Early API idea and discussion, it is NOT ready for implementation label Aug 11, 2020
@Dotnet-GitSync-Bot Dotnet-GitSync-Bot added the untriaged New issue has not been triaged by the area owner label Aug 11, 2020
@Dotnet-GitSync-Bot
Copy link
Collaborator

I couldn't figure out the best area label to add to this issue. If you have write-permissions please help me learn by adding exactly one area label.

@stephentoub
Copy link
Member

stephentoub commented Aug 11, 2020

CancellationToken just references a CancellationTokenSource. Any change that required a CT to be able to refer multiple CTS instances or even multiple value types would bloat the size of a CT non-trivially, and it's stored on tons of types (think of all the async methods that take one as an argument, whch causes it to be on the state machine type), and would thus make all of them larger, and I expect be a significant net negative.

If you have a suggestion/prototype for doing this in a way that's pay-for-play, please share. Otherwise, I don't think this is actionable.

@mangod9 mangod9 removed the untriaged New issue has not been triaged by the area owner label Aug 12, 2020
@mangod9 mangod9 added this to the Future milestone Aug 12, 2020
@ghost
Copy link

ghost commented Aug 12, 2020

Tagging subscribers to this area: @tarekgh
See info in area-owners.md if you want to be subscribed.

@ghost
Copy link
Author

ghost commented Aug 20, 2020

@stephentoub, could we reuse CancellationTokenSources? When a linked CancellationTokenSource is disposed, we can reset the instance and return it back to the pool.

One thing is certain: linked CancellationTokens are used often and their lifetime is typically short. This can be a nice little boost.

@stephentoub
Copy link
Member

stephentoub commented Aug 20, 2020

You can pool your own CTS instances if you want: if you control their cancellation, then you can return them to the pool if they haven't been canceled, and presumably that's the 99% case.

For resetting the instance, that has lots of problems, stemming from the fact that there are strong guarantees that once a CT has transitioned to canceled, it must not transition back.
#4694

@ghost
Copy link
Author

ghost commented Aug 20, 2020

@stephentoub, thanks man. A question for you: How do I pool these CancellationTokenSources when the second token is call specific and changes every time?

Second token in the example above is the token parameter passed to FooAsync.

@stephentoub
Copy link
Member

stephentoub commented Aug 20, 2020

How do I pool these CancellationTokenSources when the second token is call specific and changes every time?

CreateLinkedTokenSource is really just a helper that creates a new CTS and does the logical equivalent of CT.Register(CTS.Cancel) on each token. You can do the same, e.g.

CancellationTokenSource cts = GetFromPoolOrAllocate();

using (ct1.UnsafeRegister(s => ((CancellationTokenSource)s).Cancel(), cts))
using (ct2.UnsafeRegister(s => ((CancellationTokenSource)s).Cancel(), cts))
    await FooAsync(cts.Token);

if (!cts.IsCancellationRequested)
    ReturnToPool(cts);

@ghost
Copy link
Author

ghost commented Aug 20, 2020

@stephentoub. Awesome. Giving a try now.

@ghost
Copy link
Author

ghost commented Aug 21, 2020

@stephentoub , I ended up using something like the code below. Perhaps we could have a similar concept in .NET?

public static class CancellationScope
{
    private static readonly Action<object?> CancelAction = s => ((CancellationTokenSource)s!).Cancel();
    private static readonly ConcurrentStack<CancellationTokenSource> Stack;

    static CancellationScope()
    {
        var count = Math.Min(Environment.ProcessorCount * 16, 256);
        var sources = Enumerable.Range(0, count).Select(_ => new CancellationTokenSource());
        Stack = new ConcurrentStack<CancellationTokenSource>(sources);
    }

    public static async ValueTask<TResult> ExecuteAsync<TState, TResult>(
        CancellationToken token1,
        CancellationToken token2,
        Func<TState, CancellationToken, ValueTask<TResult>> action,
        TState state)
    {
        if (Stack.TryPop(out var source))
        {
            try
            {
                using (token1.Register(CancelAction, source, false))
                using (token2.Register(CancelAction, source, false))
                    return await action(state, source.Token);
            }
            finally
            {
                if (source.IsCancellationRequested)
                {
                    source.Dispose();
                    source = new CancellationTokenSource();
                }

                Stack.Push(source);
            }
        }
        else
        {
            using (source = CancellationTokenSource.CreateLinkedTokenSource(token1, token2))
                return await action(state, source.Token);
        }
    }

@stephentoub
Copy link
Member

Have you measured it and seen that to actually be measurably better? Returning to ConcurrentStack allocates, and if your helper ends up yielding, it'll allocate the state associated with the async frame. On top of that your code is allocating two delegates when passing CancelAction to Register. I don't see us adding this to the core libraries. Obviously feel free to publish your own helpers on nuget for others to use of they find it helpful.

@ghost
Copy link
Author

ghost commented Aug 21, 2020

@stephentoub, the delegate is a static readonly instance at the class level. What are you referring to by

.. code is allocating two delegates when passing CancelAction to Register

Just to be exact, here is a simple benchmark:

[SimpleJob(RuntimeMoniker.NetCoreApp31)]
[MemoryDiagnoser]
public class DelegateBenchmark
{
    private static readonly Action<int> FooDelegate = i => Foo(i);

    [Benchmark]
    public void WithStaticDelegateInstance()
    {
        for (var i = 0; i < 1000; i++)
            Bar(FooDelegate, i);
    }

    [Benchmark]
    public void WithNewDelegateInstance()
    {
        for (var i = 0; i < 1000; i++)
            Bar(Foo, i);
    }

    private static void Foo(int i)
    {
    }

    private static void Bar(Action<int> action, int i)
        => action(i);
}
Method Mean Error StdDev Gen 0 Gen 1 Gen 2 Allocated
WithStaticDelegateInstance 2.960 us 0.0466 us 0.0413 us - - - -
WithNewDelegateInstance 12.844 us 0.2566 us 0.3426 us 12.2375 - - 64000 B

@stephentoub
Copy link
Member

stephentoub commented Aug 21, 2020

I must have misread; I thought CancelAction was a method rather than a delegate field.

@ghost
Copy link
Author

ghost commented Aug 21, 2020

@stephentoub , here is the benchmark. I think a ConcurrentBag version or a simple array with CompareAndExchange should do the trick. Yes, ConcurrentStack allocates (less than benchmark) and I totally forgot about that. The best results are from CompareAndExchange:

Method Mean Error StdDev Allocated
NewLinkedCancellationTokenSource 1.566 ms 0.0309 ms 0.0390 ms 960000 B
Stack 1.546 ms 0.0307 ms 0.0366 ms 320000 B
Bag 1.806 ms 0.0357 ms 0.0367 ms -
CompareAndExchange 1.478 ms 0.0285 ms 0.0390 ms -

Benchmark

[SimpleJob(RuntimeMoniker.NetCoreApp31)]
[MemoryDiagnoser]
public class CancellationScopeBenchmark
{
    private static readonly CancellationTokenSource Source1 = new CancellationTokenSource();
    private static readonly CancellationTokenSource Source2 = new CancellationTokenSource();
    private static readonly Action<CancellationToken> ActionDelegate = ct => ct.ThrowIfCancellationRequested();

    [Benchmark]
    public void NewLinkedCancellationTokenSource()
    {
        for (var i = 0; i < 10000; i++)
        {
            using var source = CancellationTokenSource.CreateLinkedTokenSource(Source1.Token, Source2.Token);
            ActionDelegate(source.Token);
        }
    }

    [Benchmark]
    public void Stack()
    {
        for (var i = 0; i < 10000; i++)
            CancellationScopeStack.Execute(Source1.Token, Source2.Token, ActionDelegate);
    }

    [Benchmark]
    public void Bag()
    {
        for (var i = 0; i < 10000; i++)
            CancellationScopeBag.Execute(Source1.Token, Source2.Token, ActionDelegate);
    }

    [Benchmark(Baseline = true)]
    public void CompareAndExchange()
    {
        for (var i = 0; i < 10000; i++)
            CancellationScopeCompareAndExchange.Execute(Source1.Token, Source2.Token, ActionDelegate);
    }
}

Bag implementation

internal static class CancellationScopeBag
{
    private static readonly Action<object?> CancelAction = s => ((CancellationTokenSource)s!).Cancel();
    private static readonly ConcurrentBag<CancellationTokenSource> Items;

    static CancellationScopeBag()
    {
        var count = Math.Min(Environment.ProcessorCount * 16, 256);
        var sources = Enumerable.Range(0, count).Select(_ => new CancellationTokenSource());
        Items = new ConcurrentBag<CancellationTokenSource>(sources);
    }

    internal static void Execute(
        CancellationToken token1,
        CancellationToken token2,
        Action<CancellationToken> action)
    {
        if (Items.TryTake(out var source))
        {
            try
            {
                using (token1.Register(CancelAction, source, false))
                using (token2.Register(CancelAction, source, false))
                    action(source.Token);
            }
            finally
            {
                if (source.IsCancellationRequested)
                {
                    source.Dispose();
                    source = new CancellationTokenSource();
                }

                Items.Add(source);
            }
        }
        else
        {
            using (source = CancellationTokenSource.CreateLinkedTokenSource(token1, token2))
                action(source.Token);
        }
    }
}

Compare and exchange implementation

internal static class CancellationScopeCompareAndExchange
{
    private static readonly Action<object?> CancelAction = s => ((CancellationTokenSource)s!).Cancel();
    private static readonly int Count = Math.Min(Environment.ProcessorCount * 16, 256);
    private static readonly CancellationTokenSource?[] Items;

    static CancellationScopeCompareAndExchange()
    {
        Items = new CancellationTokenSource[Count];
        for (var i = 0; i < Count; i++)
            Items[i] = new CancellationTokenSource();
    }

    internal static void Execute(
        CancellationToken token1,
        CancellationToken token2,
        Action<CancellationToken> action)
    {
        var source = RentSource();
        if (source != null)
        {
            try
            {
                using (token1.Register(CancelAction, source, false))
                using (token2.Register(CancelAction, source, false))
                    action(source.Token);
            }
            finally
            {
                if (source.IsCancellationRequested)
                {
                    source.Dispose();
                    source = new CancellationTokenSource();
                }

                ReturnSource(source);
            }
        }
        else
        {
            using (source = CancellationTokenSource.CreateLinkedTokenSource(token1, token2))
                action(source.Token);
        }
    }

    private static CancellationTokenSource? RentSource()
    {
        for (var i = 0; i < Count; i++)
        {
            while (true)
            {
                var snapshot = Items[i];
                if (snapshot is null)
                    break;

                if (Interlocked.CompareExchange(ref Items[i], null, snapshot) == snapshot)
                    return snapshot;
            }
        }

        return null;
    }

    private static void ReturnSource(CancellationTokenSource source)
    {
        var i = 0;
        while (true)
        {
            if (Interlocked.CompareExchange(ref Items[i], source, null) == null)
                return;

            i = (i + 1) % Count;
        }
    }
}

@stephentoub , are you sure we should not have something like this in the core lib?

@stephentoub
Copy link
Member

are you sure we should not have something like this in the core lib?

I do not think it's valuable enough to add.

@stephentoub
Copy link
Member

here is the benchmark

A few other things to keep in mind about your example. A real-world case will have concurrency and will introduce contention as part of the pooling. Also, cancellation tokens are used much more in async code, so your Execute method would more likely be ExecuteAsync, and introduce another async state machine as part of any operation that yielded.

@ghost
Copy link
Author

ghost commented Aug 22, 2020

@stephentoub, as always, I am grateful to you. Here is a version without the async state machine:

Usage

using (var linked = new LinkedCancellationToken(token1, token2))
  await Foo(linked.Token);

Implementation

public readonly struct LinkedCancellationToken : IEquatable<LinkedCancellationToken>, IDisposable
{
    private static readonly int Count = Math.Min(Environment.ProcessorCount * 16, 256);
    private static readonly Action<object?> CancelAction = s => ((CancellationTokenSource)s!).Cancel();
    private static readonly CancellationTokenSource?[] Items;
    private readonly CancellationTokenSource? source;
    private readonly CancellationTokenRegistration reg1;
    private readonly CancellationTokenRegistration reg2;

    static LinkedCancellationToken()
    {
        Items = new CancellationTokenSource[Count];
        for (var i = 0; i < Count; i++)
            Items[i] = new CancellationTokenSource();
    }

    public LinkedCancellationToken(CancellationToken token1, CancellationToken token2)
    {
        var source = Rent();
        if (source is null)
        {
            source = CancellationTokenSource.CreateLinkedTokenSource(token1, token2);
            reg1 = reg2 = default;
        }
        else
        {
            reg1 = token1.Register(CancelAction, source, false);
            reg2 = token2.Register(CancelAction, source, false);
        }

        this.source = source;
    }

    public CancellationToken Token
        => source?.Token ?? default;

    public static bool operator ==(LinkedCancellationToken left, LinkedCancellationToken right)
        => left.Equals(right);

    public static bool operator !=(LinkedCancellationToken left, LinkedCancellationToken right)
        => !left.Equals(right);

    public override bool Equals(object? obj)
        => obj is LinkedCancellationToken token && Equals(token);

    public bool Equals(LinkedCancellationToken other)
        => ReferenceEquals(source, other.source);

    public override int GetHashCode()
        => source?.GetHashCode() ?? 0;

    public void Dispose()
    {
        if (source is null)
            return;

        if (reg1 == default)
        {
            source.Dispose();
        }
        else
        {
            reg1.Dispose();
            reg2.Dispose();

            if (source.IsCancellationRequested)
            {
                source.Dispose();
                Return(new CancellationTokenSource());
            }
            else
            {
                Return(source);
            }
        }
    }

    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    private static CancellationTokenSource? Rent()
    {
        for (var i = 0; i < Count; i++)
        {
            while (true)
            {
                var snapshot = Items[i];
                if (snapshot is null)
                    break;

                if (Interlocked.CompareExchange(ref Items[i], null, snapshot) == snapshot)
                    return snapshot;
            }
        }

        return null;
    }

    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    private static void Return(CancellationTokenSource source)
    {
        var i = 0;
        while (true)
        {
            if (Items[i] is null)
            {
                if (Interlocked.CompareExchange(ref Items[i], source, null) == null)
                    return;
            }
            else
            {
                if (ReferenceEquals(source, Items[i]))
                    return;
            }

            i = (i + 1) % Count;
        }
    }
}

@stephentoub
Copy link
Member

You're welcome. :)

@ghost ghost locked as resolved and limited conversation to collaborators Dec 7, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
api-suggestion Early API idea and discussion, it is NOT ready for implementation area-System.Threading.Tasks
Projects
None yet
Development

No branches or pull requests

4 participants