-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ClusterClient initial contact discovery feature #7261
Add ClusterClient initial contact discovery feature #7261
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self-review
#nullable enable | ||
namespace Akka.Cluster.Tools.Client; | ||
|
||
public class ClusterClientDiscovery: UntypedActor, IWithUnboundedStash, IWithTimers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All new discovery logic are isolated into this new actor
return new RootActorPath(address) / "system" / _discoverySettings.ReceptionistName; | ||
} | ||
|
||
private static async Task<ResolveResult> ResolveContact(Contact contact, TimeSpan timeout, CancellationToken ct) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method mimics ActorSelection.ResolveOne()
but instead of throwing on failure, it wraps the result in an envelope instead.
Self.Tell(DiscoverTick.Instance); | ||
} | ||
|
||
private ActorPath ResolvedTargetToReceptionistActorPath(ServiceDiscovery.ResolvedTarget target) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method converts ServiceDiscovery.ResolvedTarget
to remote ActorPath
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
_targetActorSystemName = _discoverySettings.ActorSystemName!; | ||
} | ||
|
||
_transportProtocol = ((ExtendedActorSystem)Context.System).Provider.DefaultAddress.Protocol; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Best case guess on what transport protocol we're using right now. Should handle custom protocols, such as the one used in Remote.TestKit
case DiscoverTick: | ||
if(_verboseLogging && _log.IsDebugEnabled) | ||
_log.Debug("Discovering initial contacts"); | ||
|
||
_serviceDiscovery!.Lookup(_lookup, _discoveryTimeout) | ||
.PipeTo(Self, Self, failure: cause => new DiscoveryFailure(cause)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actual Akka.Discovery is done here.
if (terminated.ActorRef.Equals(clusterClient)) | ||
{ | ||
if(_verboseLogging && _log.IsInfoEnabled) | ||
_log.Info("Cluster client failed to reconnect to all receptionists, rediscovering."); | ||
|
||
// Kickoff discovery lookup | ||
Self.Tell(DiscoverTick.Instance); | ||
Become(Discovering); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If cluster client dies, return to Discovering
state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this proxying guarantees that the caller's code doesn't get disrupted if we can to kill / recreate the underlying ClusterClient
- LGTM.
use-initial-contacts-discovery = false | ||
|
||
discovery | ||
{ | ||
method = <method> | ||
actor-system-name = null | ||
receptionist-name = receptionist | ||
service-name = null | ||
port-name = null | ||
discovery-retry-interval = 1s | ||
discovery-timeout = 60s | ||
} | ||
|
||
verbose-logging = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New discovery settings.
If method
is null, empty, whitespace, or "" (not set), use the discovery method declared in "akka.discovery.method". If that also isn't set, use "config" method as default fallback. Warn the user when these happened.
If actor-system-name
is null or whitespace, use the current actor system name instead. Warn the user when these happened.
} | ||
|
||
[InternalStableApi] | ||
public bool TryRemoveEndpoint(string serviceName, ResolvedTarget target) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New Discovery.Config method for MNTR testing
} | ||
|
||
[InternalStableApi] | ||
public bool TryAddEndpoint(string serviceName, ResolvedTarget target) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New Discovery.Config method for MNTR testing
public ClusterClientSettings( | ||
IImmutableSet<ActorPath> initialContacts, | ||
TimeSpan establishingGetContactsInterval, | ||
TimeSpan refreshContactsInterval, | ||
TimeSpan heartbeatInterval, | ||
TimeSpan acceptableHeartbeatPause, | ||
int bufferSize, | ||
bool useLegacySerialization, | ||
bool useInitialContactsDiscovery, | ||
ClusterClientDiscoverySettings? discoverySettings = null, | ||
TimeSpan? reconnectTimeout = null, | ||
bool verboseLogging = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New ClusterClientSettings ctor, backward compatible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
akka.remote.dot-netty.tcp.hostname = localhost | ||
akka.actor.provider = cluster | ||
akka.remote.log-remote-lifecycle-events = off | ||
akka.cluster.client { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we disabling heartbeats on purpose here? 1d
is an eternity in the context of this spec.
cluster.client { | ||
heartbeat-interval = 1s | ||
acceptable-heartbeat-pause = 2s | ||
use-initial-contacts-discovery = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New setting, got it
discovery | ||
{ | ||
method = config | ||
config.services.test-cluster.endpoints = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this gets populated later using the new internal methods you added for dynamically updating config-based discovery
_discoveryService = | ||
(ConfigServiceDiscovery)Discovery.Discovery.Get(Sys).LoadServiceDiscovery("config"); | ||
var address = GetAddress(_config.First); | ||
_discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Populates the discovery list here
{ | ||
RunOn(() => | ||
{ | ||
Cluster.Get(Sys).Leave(Node(_config.First).Address); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The entire cluster goes down here, gracefully
...ntrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientDiscoverySpec.cs
Show resolved
Hide resolved
Self.Tell(DiscoverTick.Instance); | ||
} | ||
|
||
private ActorPath ResolvedTargetToReceptionistActorPath(ServiceDiscovery.ResolvedTarget target) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
if (terminated.ActorRef.Equals(clusterClient)) | ||
{ | ||
if(_verboseLogging && _log.IsInfoEnabled) | ||
_log.Info("Cluster client failed to reconnect to all receptionists, rediscovering."); | ||
|
||
// Kickoff discovery lookup | ||
Self.Tell(DiscoverTick.Instance); | ||
Become(Discovering); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this proxying guarantees that the caller's code doesn't get disrupted if we can to kill / recreate the underlying ClusterClient
- LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Fixes #7243
This PR is superior compared to #7260, no underlying logic inside ClusterClient actor were changed at all.
Changes
Checklist
For significant changes, please ensure that the following have been completed (delete if not relevant):