Skip to content

Commit

Permalink
[DOCS] Add custom lease documentation (#7388)
Browse files Browse the repository at this point in the history
* Add custom lease documentation

* Simplify structure

* upgrade unit test
  • Loading branch information
Arkatufus authored Nov 14, 2024
1 parent 0ad0b77 commit 4082e9c
Show file tree
Hide file tree
Showing 4 changed files with 346 additions and 1 deletion.
89 changes: 89 additions & 0 deletions docs/articles/utilities/lease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
---
uid: lease
title: Distributed Locks with Akka.Coordination
---
# Distributed Locks with Akka.Coordination

## General Definition

`Akka.Coordination` provides a generalized "distributed lock" implementation called a `Lease` that uses a unique resource identifier inside a backing store (such as Azure Blob Storage or Kubernetes Custom Resource Definitions) to only allow one current "holder" of the lease to perform an action at any given time.

Akka.NET uses leases internally inside [Split Brain Resolver](../clustering/split-brain-resolver.md), [Cluster.Sharding](../clustering/cluster-sharding.md), and [Cluster Singletons](../clustering/cluster-singleton.md) for this purpose - and in this document you can learn how to call and create leases in your own Akka.NET applications if needed.

### Officially Supported Lease Implementations

There are currently two officially supported lease implementations:

* [Akka.Coordination.KubernetesApi](https://github.com/akkadotnet/Akka.Management/tree/dev/src/coordination/kubernetes/Akka.Coordination.KubernetesApi)
* [Akka.Coordination.Azure](https://github.com/akkadotnet/Akka.Management/tree/dev/src/coordination/azure/Akka.Coordination.Azure)

All lease implementations in Akka.NET supports automatic expiry or renewal mechanisms. Expiry ensures that leases do not remain active indefinitely, which can prevent resource deadlock or starvation scenarios.

### Key Characteristics and Components

* **Lease Name**: A unique identifier for the lease, which specifies the resource to be protected.
* **Owner Name**: A unique identifier for the entity (usually an actor or node) that is attempting to acquire the lease.
* **Lease Timeout**: May also be called "Time To Live" or TTL. A duration parameter that specifies how long the lease should last. Leases may be renewed or revoked depending on the implementation.

### Public API

The `Akka.Coordination.Lease` API provides the following methods:

* **`Task<bool> Acquire()`**
* **`Task<bool> Acquire(Action<Exception> leaseLostCallback)`**

These asynchronous methods attempts to acquire the lease for the resource. It returns a `Task<bool>`, indicating if the acquisition was successful or not. Parameters may include callback delegate method that will be invoked when a granted lease have been revoked for some reason.

* **`Task<bool> Release()`**

This asynchronous method releases the lease, relinquishing the access rights to the resource. It returns a `Task<bool>`, where true indicates successful release. This method is important for ensuring that resources are freed up for other actors or nodes once a task is completed.

* **`bool CheckLease()`**

This method checks whether the lease is still valid, typically returning a Boolean. `CheckLease()` is useful for verifying if a lease has expired or been revoked, ensuring that processes do not operate under an invalid lease.

## Example

The full code for this example can be seen inside the [Akka.NET repo](https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs)

### Internal Messages

The actor using `Lease` will need a few boilerplate internal messages:

[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs?name=messages)]

### Obtaining Reference To Lease Implementation

To obtain a reference to the `Lease` implementation, you will need 4 things:

* **Lease Name**: A unique identifier for the lease, which specifies the resource to be protected.
* **Owner Name**: A unique identifier for the entity (usually an actor or node) that is attempting to acquire the lease.
* **Configuration Path**: A full HOCON configuration path containing the definition of the lease implementation.
* **Retry Interval**: A time duration needed for failed lease acquisition retry.

A `Lease` reference is then obtained by calling `LeaseProvider.Get(Context.System).GetLease()`

[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs?name=constructor)]

### Actor States

The actor leverages actor states to separate the lease acquisition and actual working state of the actor.

* **`AcquiringLease` State**
In this state, the actor will only handle the required internal messages related to lease acquisition. Any other messages not related to lease acquisition will be stashed until the lease is acquired/granted. The actor will automatically retry lease acquisition by calling `AcquireLease()` on a regular basis if it failed to acquire a lease.
* **`Active` State**
In this state, the actor is active and is allowed to process all received messages normally. The only lease related message being processed is the `LeaseLost` internal message that signals lease revocation.

In the event of a lease revocation, the actor will forcefully shuts down to prevent resource contention. This may be modified to suit user needs.

[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs?name=actor-states)]

[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs?name=lease-acquisition)]

### Lease Lifecycle

Lease needs to be granted before an actor can perform any of its message handling and the actor needs to stop, forcefully or gracefully, if the lease is revoked. Attention must be taken so that, in the event of revoked lease, there would be no resource contention, or at least with minimal impact.

In the example code, lease would be acquired inside the `PreStart()` method override by calling `AcquireLease()` and it will be released inside the `PostStop()` method override.

[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs?name=lease-lifecycle)]
4 changes: 3 additions & 1 deletion docs/articles/utilities/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@
- name: Scheduler
href: scheduler.md
- name: Circuit Breaker
href: circuit-breaker.md
href: circuit-breaker.md
- name: Lease
href: lease.md
1 change: 1 addition & 0 deletions src/core/Akka.Docs.Tests/Akka.Docs.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

<ItemGroup>
<ProjectReference Include="..\..\contrib\cluster\Akka.Cluster.Metrics\Akka.Cluster.Metrics.csproj" />
<ProjectReference Include="..\Akka.Coordination.Tests\Akka.Coordination.Tests.csproj" />
<ProjectReference Include="..\Akka\Akka.csproj" />
<ProjectReference Include="..\Akka.Persistence\Akka.Persistence.csproj" />
<ProjectReference Include="..\Akka.Streams\Akka.Streams.csproj" />
Expand Down
253 changes: 253 additions & 0 deletions src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
// -----------------------------------------------------------------------
// <copyright file="LeaseActorDocSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Coordination;
using Akka.Coordination.Tests;
using Akka.Event;
using Akka.TestKit.Xunit2;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

#nullable enable
namespace DocsExamples.Utilities.Leases;

public class LeaseActorDocSpec: TestKit
{
private class LeaseFailed : Exception
{
public LeaseFailed(string message) : base(message)
{
}

public LeaseFailed(string message, Exception innerEx)
: base(message, innerEx)
{
}
}

private const string ResourceId = "protected-resource";

private LeaseUsageSettings LeaseSettings => new("test-lease", TimeSpan.FromSeconds(1));
private string LeaseOwner { get; } = $"leased-actor-{Guid.NewGuid():N}";

public LeaseActorDocSpec(ITestOutputHelper output)
: base(TestLease.Configuration, nameof(LeaseActorDocSpec), output)
{
// start test lease extension
TestLeaseExt.Get(Sys);
}

private TestLease GetLease() => TestLeaseExt.Get(Sys).GetTestLease(ResourceId);

[Fact]
public void Actor_with_lease_should_not_be_active_until_lease_is_acquired()
{
var testActor = Sys.ActorOf(Props.Create(() => new LeaseActor(LeaseSettings, ResourceId, LeaseOwner)));
testActor.Tell("Hi", TestActor);
ExpectNoMsg(200.Milliseconds());

var lease = GetLease();
lease.InitialPromise.SetResult(true);
ExpectMsg("Hi");
}

[Fact]
public void Actor_with_lease_should_retry_if_initial_acquire_is_false()
{
var testActor = Sys.ActorOf(Props.Create(() => new LeaseActor(LeaseSettings, ResourceId, LeaseOwner)));
testActor.Tell("Hi", TestActor);
ExpectNoMsg(200.Milliseconds());

var lease = GetLease();
lease.InitialPromise.SetResult(false);
ExpectNoMsg(200.Milliseconds());
lease.SetNextAcquireResult(Task.FromResult(true));
ExpectMsg("Hi");
}

[Fact]
public void Actor_with_lease_should_retry_if_initial_acquire_fails()
{
var testActor = Sys.ActorOf(Props.Create(() => new LeaseActor(LeaseSettings, ResourceId, LeaseOwner)));
testActor.Tell("Hi", TestActor);
ExpectNoMsg(200.Milliseconds());

var lease = GetLease();
lease.InitialPromise.SetException(new LeaseFailed("oh no"));
ExpectNoMsg(200.Milliseconds());
lease.SetNextAcquireResult(Task.FromResult(true));
ExpectMsg("Hi");
}

[Fact]
public void Actor_with_lease_should_terminate_if_lease_lost()
{
var testActor = Sys.ActorOf(Props.Create(() => new LeaseActor(LeaseSettings, ResourceId, LeaseOwner)));
testActor.Tell("Hi", TestActor);
ExpectNoMsg(200.Milliseconds());

var lease = GetLease();
lease.InitialPromise.SetResult(true);
ExpectMsg("Hi");

Watch(testActor);
lease.GetCurrentCallback()(new LeaseFailed("oh dear"));
ExpectTerminated(testActor);
}

[Fact]
public void Actor_with_lease_should_release_lease_when_stopped()
{
var testActor = Sys.ActorOf(Props.Create(() => new LeaseActor(LeaseSettings, ResourceId, LeaseOwner)));
testActor.Tell("Hi", TestActor);
ExpectNoMsg(200.Milliseconds());

var lease = GetLease();
lease.InitialPromise.SetResult(true);
lease.Probe.ExpectMsg(new TestLease.AcquireReq(LeaseOwner));
ExpectMsg("Hi");

Watch(testActor);
lease.GetCurrentCallback()(new LeaseFailed("oh dear"));
ExpectTerminated(testActor);

lease.Probe.ExpectMsg(new TestLease.ReleaseReq(LeaseOwner));
}

}

public class LeaseActor: ReceiveActor, IWithStash, IWithTimers
{
#region messages
private sealed record LeaseAcquireResult(bool Acquired, Exception? Reason);
private sealed record LeaseLost(Exception Reason);
private sealed class LeaseRetryTick
{
public static readonly LeaseRetryTick Instance = new();
private LeaseRetryTick() { }
}
#endregion

private const string LeaseRetryTimer = "lease-retry";

private readonly string _resourceId;
private readonly Lease _lease;
private readonly TimeSpan _leaseRetryInterval;
private readonly ILoggingAdapter _log;
private readonly string _uniqueId;

#region constructor
public LeaseActor(LeaseUsageSettings leaseSettings, string resourceId, string actorUniqueId)
{
_resourceId = resourceId;
_uniqueId = actorUniqueId;

_lease = LeaseProvider.Get(Context.System).GetLease(
leaseName: _resourceId,
configPath: leaseSettings.LeaseImplementation,
ownerName: _uniqueId);
_leaseRetryInterval = leaseSettings.LeaseRetryInterval;

_log = Context.GetLogger();
}
#endregion

public IStash Stash { get; set; } = null!;

public ITimerScheduler Timers { get; set; } = null!;

#region actor-states
private void AcquiringLease()
{
Receive<LeaseAcquireResult>(lar =>
{
if (lar.Acquired)
{
_log.Debug("{0}: Lease acquired", _resourceId);
Stash.UnstashAll();
Become(Active);
}
else
{
_log.Error(lar.Reason, "{0}: Failed to get lease for unique Id [{1}]. Retry in {2}",
_resourceId, _uniqueId, _leaseRetryInterval);
Timers.StartSingleTimer(LeaseRetryTimer, LeaseRetryTick.Instance, _leaseRetryInterval);
}
});

Receive<LeaseRetryTick>(_ => AcquireLease());

Receive<LeaseLost>(HandleLeaseLost);

ReceiveAny(msg =>
{
_log.Debug("{0}: Got msg of type [{1}] from [{2}] while waiting for lease, stashing",
_resourceId, msg.GetType().Name, Sender);
Stash.Stash();
});
}

private void Active()
{
Receive<LeaseLost>(HandleLeaseLost);

// TODO: Insert your actor message handlers here
ReceiveAny(msg => Sender.Tell(msg, Self));
}

private void HandleLeaseLost(LeaseLost msg)
{
_log.Error(msg.Reason, "{0}: unique id [{1}] lease lost", _resourceId, _uniqueId);
Context.Stop(Self);
}
#endregion

#region lease-acquisition
private void AcquireLease()
{
_log.Info("{0}: Acquiring lease {1}", _resourceId, _lease.Settings);
var self = Self;
Acquire().PipeTo(self);
Become(AcquiringLease);
return;

async Task<LeaseAcquireResult> Acquire()
{
try
{
var result = await _lease.Acquire(reason => { self.Tell(new LeaseLost(reason)); });
return new LeaseAcquireResult(result, null);
}
catch (Exception ex)
{
return new LeaseAcquireResult(false, ex);
}
}
}
#endregion

#region lease-lifecycle
protected override void PreStart()
{
base.PreStart();
// Acquire a lease when actor starts
AcquireLease();
}

protected override void PostStop()
{
base.PostStop();
// Release the lease when actor stops
_lease.Release().GetAwaiter().GetResult();
}
#endregion

}

0 comments on commit 4082e9c

Please sign in to comment.