From ae5515a6887f3b94537a10d56598f946ff289604 Mon Sep 17 00:00:00 2001 From: Reuben Bond <203839+ReubenBond@users.noreply.github.com> Date: Tue, 22 Oct 2024 07:38:51 -0700 Subject: [PATCH] Improve timely shutdown of directory partitions when snapshot transfer has been abandoned (#9197) --- .../SiloBuilderConfigurator.cs | 6 +- .../DistributedGrainDirectory.cs | 29 +++++- .../GrainDirectory/GrainDirectoryReplica.cs | 89 ++++++++++++++----- .../IGrainDirectoryPartition.cs | 3 + .../Hosting/CoreHostingExtensions.cs | 2 +- .../ConfigureDistributedGrainDirectory.cs | 4 +- .../GrainDirectoryResilienceTests.cs | 4 +- 7 files changed, 102 insertions(+), 35 deletions(-) diff --git a/playground/ChaoticCluster/ChaoticCluster.Silo/SiloBuilderConfigurator.cs b/playground/ChaoticCluster/ChaoticCluster.Silo/SiloBuilderConfigurator.cs index aac181b83f..3c7506bb3b 100644 --- a/playground/ChaoticCluster/ChaoticCluster.Silo/SiloBuilderConfigurator.cs +++ b/playground/ChaoticCluster/ChaoticCluster.Silo/SiloBuilderConfigurator.cs @@ -1,5 +1,3 @@ -using Microsoft.Extensions.DependencyInjection; -using Orleans.Configuration; using Orleans.TestingHost; namespace ChaoticCluster.Silo; @@ -8,9 +6,9 @@ class SiloBuilderConfigurator : ISiloConfigurator { public void Configure(ISiloBuilder siloBuilder) { -#pragma warning disable ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. +#pragma warning disable ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. siloBuilder.AddDistributedGrainDirectory(); -#pragma warning restore ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. +#pragma warning restore ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. } } diff --git a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs index 3d2612946c..0096c0982e 100644 --- a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs @@ -157,6 +157,16 @@ private async Task InvokeAsync( } } + public async ValueTask>> RecoverRegisteredActivations(MembershipVersion membershipVersion, RingRange range, SiloAddress siloAddress, int partitionIndex) + { + foreach (var partition in _partitions) + { + partition.OnRecoveringPartition(membershipVersion, range, siloAddress, partitionIndex).Ignore(); + } + + return await GetRegisteredActivations(membershipVersion, range, false); + } + public async ValueTask>> GetRegisteredActivations(MembershipVersion membershipVersion, RingRange range, bool isValidation) { if (!isValidation && _logger.IsEnabled(LogLevel.Debug)) @@ -181,7 +191,7 @@ public async ValueTask>> GetRegisteredActivations(M foreach (var (grainId, activation) in localActivations) { var directory = GetGrainDirectory(activation, grainDirectoryResolver); - if (directory is not null && directory == this) + if (directory == this) { var address = activation.Address; if (!range.Contains(address.GrainId)) @@ -296,6 +306,7 @@ async Task OnShuttingDown(CancellationToken token) { tasks.Add(partition.OnShuttingDown(token)); } + await Task.WhenAll(tasks).SuppressThrowing(); } } @@ -309,6 +320,8 @@ private async Task ProcessMembershipUpdates() { try { + DirectoryMembershipSnapshot previous = _membershipService.CurrentView; + var previousRanges = RingRangeCollection.Empty; await foreach (var update in _membershipService.ViewUpdates.WithCancellation(_stoppedCts.Token)) { tasks.RemoveAll(t => t.IsCompleted); @@ -326,6 +339,7 @@ private async Task ProcessMembershipUpdates() } var current = update; + var currentRanges = current.GetMemberRanges(Silo); foreach (var partition in _partitions) { @@ -334,10 +348,21 @@ private async Task ProcessMembershipUpdates() if (_logger.IsEnabled(LogLevel.Debug)) { - _logger.LogDebug("Updated view from '{PreviousVersion}' to '{Version}'.", previousUpdate.Version, update.Version); + var deltaSize = currentRanges.SizePercent - previousRanges.SizePercent; + var meanSizePercent = current.Members.Length > 0 ? 100.0 / current.Members.Length : 0f; + var deviationFromMean = Math.Abs(meanSizePercent - currentRanges.SizePercent); + _logger.LogDebug( + "Updated view from '{PreviousVersion}' to '{Version}'. Now responsible for {Range:0.00}% (Δ {DeltaPercent:0.00}%). {DeviationFromMean:0.00}% from ideal share.", + previous.Version, + current.Version, + currentRanges.SizePercent, + deltaSize, + deviationFromMean); } previousUpdate = update.ClusterMembershipSnapshot; + previous = current; + previousRanges = currentRanges; } } catch (Exception exception) diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryReplica.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryReplica.cs index 00a28cba4d..95ca903b1b 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryReplica.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryReplica.cs @@ -120,22 +120,25 @@ public async ValueTask RefreshViewAsync(MembershipV ValueTask IGrainDirectoryPartition.AcknowledgeSnapshotTransferAsync(SiloAddress silo, int partitionIndex, MembershipVersion rangeVersion) { - RemoveSnapshotTransferPartner((silo, partitionIndex), rangeVersion); + RemoveSnapshotTransferPartner( + (silo, partitionIndex, rangeVersion), + snapshotFilter: (state, snapshot) => snapshot.DirectoryMembershipVersion == state.rangeVersion, + partnerFilter: (state, silo, partitionIndex) => silo.Equals(state.silo) && partitionIndex == state.partitionIndex); return new(true); } - private void RemoveSnapshotTransferPartner((SiloAddress Silo, int PartitionIndex) owner, MembershipVersion? rangeVersion) + private void RemoveSnapshotTransferPartner(TState state, Func snapshotFilter, Func partnerFilter) { for (var i = 0; i < _partitionSnapshots.Count; ++i) { var partitionSnapshot = _partitionSnapshots[i]; - if (rangeVersion.HasValue && partitionSnapshot.DirectoryMembershipVersion != rangeVersion.Value) + if (!snapshotFilter(state, partitionSnapshot)) { continue; } var partners = partitionSnapshot.TransferPartners; - partners.RemoveWhere(p => p.SiloAddress.Equals(owner.Silo) && (owner.PartitionIndex < 0 || p.PartitionIndex == owner.PartitionIndex)); + partners.RemoveWhere(p => partnerFilter(state, p.SiloAddress, p.PartitionIndex)); if (partners.Count == 0) { _partitionSnapshots.RemoveAt(i); @@ -276,9 +279,32 @@ private void OnSiloRemovedFromCluster(ClusterMember change) } } - RemoveSnapshotTransferPartner((change.SiloAddress, -1), rangeVersion: null); + RemoveSnapshotTransferPartner( + change.SiloAddress, + snapshotFilter: (state, snapshot) => true, + partnerFilter: (state, silo, partitionIndex) => silo.Equals(state)); } + internal Task OnRecoveringPartition(MembershipVersion version, RingRange range, SiloAddress siloAddress, int partitionIndex) => + this.QueueTask( + async () => + { + try + { + await WaitForRange(range, version); + } + catch (Exception exception) + { + _logger.LogWarning(exception, "Error waiting for range to unlock."); + } + + // Remove all snapshots that are associated with the given replica prior or equal to the specified version. + RemoveSnapshotTransferPartner( + (Version: version, SiloAddress: siloAddress, PartitionIndex: partitionIndex), + snapshotFilter: (state, snapshot) => snapshot.DirectoryMembershipVersion <= state.Version, + partnerFilter: (state, silo, partitionIndex) => partitionIndex == state.PartitionIndex && silo.Equals(state.SiloAddress)); + }); + internal Task ProcessMembershipUpdateAsync(DirectoryMembershipSnapshot current) => this.QueueAction( static state => state.Self.ProcessMembershipUpdate(state.Current), @@ -302,15 +328,6 @@ private void ProcessMembershipUpdate(DirectoryMembershipSnapshot current) var previousRange = previous.GetRange(_id, _partitionIndex); _currentRange = current.GetRange(_id, _partitionIndex); - // It is important that this method is synchronous, to ensure that updates are atomic. - var deltaSize = _currentRange.SizePercent - previousRange.SizePercent; - var meanSizePercent = current.Members.Length > 0 ? 100.0 / current.Members.Length : 0f; - var deviationFromMean = Math.Abs(meanSizePercent - _currentRange.SizePercent); - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug("Updating view from '{PreviousVersion}' to '{Version}'. Now responsible for '{Range}' (Δ {DeltaPercent:0.00}%. {DeviationFromMean:0.00}% from ideal share).", previous.Version, current.Version, _currentRange, deltaSize, deviationFromMean); - } - var removedRange = previousRange.Difference(_currentRange).SingleOrDefault(); var addedRange = _currentRange.Difference(previousRange).SingleOrDefault(); @@ -347,7 +364,7 @@ private async Task ReleaseRangeAsync(DirectoryMembershipSnapshot previous, Direc var (tcs, sw) = LockRange(removedRange, current.Version); if (_logger.IsEnabled(LogLevel.Debug)) { - _logger.LogDebug("Relinquishing ownership of range '{Range}'.", removedRange); + _logger.LogDebug("Relinquishing ownership of range '{Range}' at version '{Version}'.", removedRange, current.Version); } try @@ -361,10 +378,6 @@ private async Task ReleaseRangeAsync(DirectoryMembershipSnapshot previous, Direc await WaitForRange(removedRange, previous.Version); GrainRuntime.CheckRuntimeContext(this); - if (_logger.IsEnabled(LogLevel.Trace)) - { - _logger.LogTrace("Relinquishing ownership of range '{Range}'.", removedRange); - } foreach (var (range, ownerIndex, partitionIndex) in current.RangeOwners) { @@ -396,14 +409,28 @@ private async Task ReleaseRangeAsync(DirectoryMembershipSnapshot previous, Direc _directory.Remove(address.GrainId); } - if (transferPartners.Count > 0) + var isContiguous = current.Version.Value == previous.Version.Value + 1; + if (!isContiguous) { - _partitionSnapshots.Add(new PartitionSnapshotState(previous.Version, removedAddresses, transferPartners)); + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug("Encountered non-contiguous update from '{Previous}' to '{Current}' while releasing range '{Range}'. Dropping snapshot.", previous.Version, current.Version, removedRange); + } + + return; } - else + + if (transferPartners.Count == 0) { - _logger.LogDebug("Dropping snapshot since there are no transfer partners."); + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug("No transfer partners for snapshot of range '{Range}' at version '{Version}'. Dropping snapshot.", removedRange, current.Version); + } + + return; } + + _partitionSnapshots.Add(new PartitionSnapshotState(previous.Version, removedAddresses, transferPartners)); } finally { @@ -641,7 +668,21 @@ async Task> GetRegisteredActivationsFromClusterMember(Members var client = _grainFactory.GetSystemTarget(Constants.GrainDirectory, siloAddress); var result = await InvokeOnClusterMember( siloAddress, - async () => await client.GetRegisteredActivations(version, range, isValidation), + async () => + { + var innerSw = ValueStopwatch.StartNew(); + Immutable> result = default; + if (isValidation) + { + result = await client.GetRegisteredActivations(version, range, isValidation: true); + } + else + { + result = await client.RecoverRegisteredActivations(version, range, _id, _partitionIndex); + } + + return result; + }, new Immutable>([]), nameof(GetRegisteredActivations)); diff --git a/src/Orleans.Runtime/GrainDirectory/IGrainDirectoryPartition.cs b/src/Orleans.Runtime/GrainDirectory/IGrainDirectoryPartition.cs index f42df98812..655073582a 100644 --- a/src/Orleans.Runtime/GrainDirectory/IGrainDirectoryPartition.cs +++ b/src/Orleans.Runtime/GrainDirectory/IGrainDirectoryPartition.cs @@ -29,6 +29,9 @@ internal interface IGrainDirectoryClient : ISystemTarget { [Alias("GetRegisteredActivations")] ValueTask>> GetRegisteredActivations(MembershipVersion membershipVersion, RingRange range, bool isValidation); + + [Alias("RecoverRegisteredActivations")] + ValueTask>> RecoverRegisteredActivations(MembershipVersion membershipVersion, RingRange range, SiloAddress siloAddress, int partitionId); } [Alias("IGrainDirectoryReplicaTestHooks")] diff --git a/src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs b/src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs index 4d95b623cc..8b916a38ac 100644 --- a/src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs +++ b/src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs @@ -153,7 +153,7 @@ private static void ConfigurePrimarySiloEndpoint(OptionsBuilderThe silo builder to register the directory implementation with. /// The name of the directory to register, or null to register the directory as the default. /// The provided silo builder. - [Experimental("ORLEANSEXP002")] + [Experimental("ORLEANSEXP003")] public static ISiloBuilder AddDistributedGrainDirectory(this ISiloBuilder siloBuilder, string? name = null) { var services = siloBuilder.Services; diff --git a/src/Orleans.TestingHost/ConfigureDistributedGrainDirectory.cs b/src/Orleans.TestingHost/ConfigureDistributedGrainDirectory.cs index f2beaaa0b5..95d5ec5c19 100644 --- a/src/Orleans.TestingHost/ConfigureDistributedGrainDirectory.cs +++ b/src/Orleans.TestingHost/ConfigureDistributedGrainDirectory.cs @@ -4,7 +4,7 @@ namespace Orleans.TestingHost; internal class ConfigureDistributedGrainDirectory : ISiloConfigurator { -#pragma warning disable ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. +#pragma warning disable ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. public void Configure(ISiloBuilder siloBuilder) => siloBuilder.AddDistributedGrainDirectory(); -#pragma warning restore ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. +#pragma warning restore ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. } \ No newline at end of file diff --git a/test/TesterInternal/GrainDirectory/GrainDirectoryResilienceTests.cs b/test/TesterInternal/GrainDirectory/GrainDirectoryResilienceTests.cs index b026061baf..51dc7d4fa4 100644 --- a/test/TesterInternal/GrainDirectory/GrainDirectoryResilienceTests.cs +++ b/test/TesterInternal/GrainDirectory/GrainDirectoryResilienceTests.cs @@ -173,9 +173,9 @@ private class SiloBuilderConfigurator : ISiloConfigurator public void Configure(ISiloBuilder siloBuilder) { siloBuilder.Configure(o => o.ResponseTimeout = o.SystemResponseTimeout = TimeSpan.FromMinutes(2)); -#pragma warning disable ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. +#pragma warning disable ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. siloBuilder.AddDistributedGrainDirectory(); -#pragma warning restore ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. +#pragma warning restore ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. } } }