Skip to content

Commit

Permalink
Added telemetry injection point for Ask<T> (#5297)
Browse files Browse the repository at this point in the history
Looking for a way to help trace timeouts inside `Ask<T>` operations - needed some way to tap into the `TaskCompletionSource` and to create an active `ISpan` before the operation begins.
  • Loading branch information
Aaronontheweb authored Oct 8, 2021
1 parent 01c35a9 commit a091c2a
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System;
using Akka.Actor;
using Akka.Configuration;
using Akka.Configuration;

namespace Akka.Cluster.Metrics
{
Expand Down
3 changes: 3 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,8 @@ namespace Akka.Actor
Akka.Actor.IInternalActorRef TempContainer { get; }
System.Threading.Tasks.Task TerminationTask { get; }
Akka.Actor.IInternalActorRef ActorOf(Akka.Actor.Internal.ActorSystemImpl system, Akka.Actor.Props props, Akka.Actor.IInternalActorRef supervisor, Akka.Actor.ActorPath path, bool systemService, Akka.Actor.Deploy deploy, bool lookupDeploy, bool async);
[Akka.Annotations.InternalApiAttribute()]
Akka.Actor.FutureActorRef<T> CreateFutureRef<T>(System.Threading.Tasks.TaskCompletionSource<T> tcs);
Akka.Actor.Address GetExternalAddressFor(Akka.Actor.Address address);
void Init(Akka.Actor.Internal.ActorSystemImpl system);
void RegisterTempActor(Akka.Actor.IInternalActorRef actorRef, Akka.Actor.ActorPath path);
Expand Down Expand Up @@ -1279,6 +1281,7 @@ namespace Akka.Actor
public Akka.Actor.IInternalActorRef TempContainer { get; }
public System.Threading.Tasks.Task TerminationTask { get; }
public Akka.Actor.IInternalActorRef ActorOf(Akka.Actor.Internal.ActorSystemImpl system, Akka.Actor.Props props, Akka.Actor.IInternalActorRef supervisor, Akka.Actor.ActorPath path, bool systemService, Akka.Actor.Deploy deploy, bool lookupDeploy, bool async) { }
public Akka.Actor.FutureActorRef<T> CreateFutureRef<T>(System.Threading.Tasks.TaskCompletionSource<T> tcs) { }
public Akka.Actor.Address GetExternalAddressFor(Akka.Actor.Address address) { }
public void Init(Akka.Actor.Internal.ActorSystemImpl system) { }
public void RegisterExtraName(string name, Akka.Actor.IInternalActorRef actor) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ namespace Akka.Remote
public System.Threading.Tasks.Task TerminationTask { get; }
public Akka.Remote.RemoteTransport Transport { get; }
public Akka.Actor.IInternalActorRef ActorOf(Akka.Actor.Internal.ActorSystemImpl system, Akka.Actor.Props props, Akka.Actor.IInternalActorRef supervisor, Akka.Actor.ActorPath path, bool systemService, Akka.Actor.Deploy deploy, bool lookupDeploy, bool async) { }
public Akka.Actor.FutureActorRef<T> CreateFutureRef<T>(System.Threading.Tasks.TaskCompletionSource<T> tcs) { }
protected virtual Akka.Actor.IActorRef CreateRemoteDeploymentWatcher(Akka.Actor.Internal.ActorSystemImpl system) { }
protected virtual Akka.Actor.IInternalActorRef CreateRemoteRef(Akka.Actor.ActorPath actorPath, Akka.Actor.Address localAddress) { }
protected virtual Akka.Actor.IInternalActorRef CreateRemoteRef(Akka.Actor.Props props, Akka.Actor.IInternalActorRef supervisor, Akka.Actor.Address localAddress, Akka.Actor.ActorPath rpath, Akka.Actor.Deploy deployment) { }
Expand Down
5 changes: 3 additions & 2 deletions src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1561,8 +1561,9 @@ public void StopSeedNodeProcess()
/// Received `Join` message and replies with `Welcome` message, containing
/// current gossip state, including the new joining member.
/// </summary>
/// <param name="node">TBD</param>
/// <param name="roles">TBD</param>
/// <param name="node">The unique address of the joining node.</param>
/// <param name="roles">The roles, if any, of the joining node.</param>
/// <param name="appVersion">The software version of the joining node.</param>
public void Joining(UniqueAddress node, ImmutableHashSet<string> roles, AppVersion appVersion)
{
var selfStatus = LatestGossip.GetMember(SelfUniqueAddress).Status;
Expand Down
6 changes: 6 additions & 0 deletions src/core/Akka.Remote/RemoteActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ public void UnregisterTempActor(ActorPath path)
_local.UnregisterTempActor(path);
}

/// <inheritdoc/>
public FutureActorRef<T> CreateFutureRef<T>(TaskCompletionSource<T> tcs)
{
return _local.CreateFutureRef(tcs);
}

private IActorRef _remotingTerminator;
private IActorRef _remoteWatcher;

Expand Down
22 changes: 22 additions & 0 deletions src/core/Akka/Actor/ActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ public interface IActorRefProvider
/// <param name="path">A path returned by <see cref="TempPath"/>. Do NOT pass in any other path!</param>
void UnregisterTempActor(ActorPath path);

/// <summary>
/// Automatically generates a <see cref="FutureActorRef{T}"/> with a temporary path.
/// </summary>
/// <remarks>
/// Does not call <see cref="RegisterTempActor"/> or <see cref="UnregisterTempActor"/>.
/// </remarks>
/// <param name="tcs">A typed <see cref="TaskCompletionSource{T}"/></param>
/// <typeparam name="T">The type of output this <see cref="FutureActorRef{T}"/> expects.</typeparam>
/// <returns>A new, single-use <see cref="FutureActorRef{T}"/> instance.</returns>
[InternalApi]
FutureActorRef<T> CreateFutureRef<T>(TaskCompletionSource<T> tcs);

/// <summary>
/// Actor factory with create-only semantics: will create an actor as
/// described by <paramref name="props"/> with the given <paramref name="supervisor"/> and <paramref name="path"/> (may be different
Expand Down Expand Up @@ -386,6 +398,16 @@ public void UnregisterTempActor(ActorPath path)
_tempContainer.RemoveChild(path.Name);
}

/// <inheritdoc cref="IActorRefProvider.CreateFutureRef{T}"/>
public FutureActorRef<T> CreateFutureRef<T>(TaskCompletionSource<T> tcs)
{
//create a new tempcontainer path
var path = TempPath();

var future = new FutureActorRef<T>(tcs, path, this);
return future;
}

/// <summary>
/// Initializes the ActorRefProvider
/// </summary>
Expand Down
6 changes: 2 additions & 4 deletions src/core/Akka/Actor/Futures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,8 @@ public static Task<T> Ask<T>(this ICanTell self, Func<IActorRef, object> message
ctr2 = cancellationToken.Register(() => result.TrySetCanceled());
}

//create a new tempcontainer path
var path = provider.TempPath();

var future = new FutureActorRef<T>(result, path, provider);
var future = provider.CreateFutureRef(result);
var path = future.Path;

//The future actor needs to be unregistered in the temp container
_ = result.Task.ContinueWith(t =>
Expand Down

0 comments on commit a091c2a

Please sign in to comment.