Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add actor state TTL support. #1060

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ jobs:
./dist/linux_amd64/release/placement &
- name: Spin local environment
run: |
docker-compose -f ./sdk-tests/deploy/local-test.yml up -d mongo kafka
docker-compose -f ./sdk-tests/deploy/local-test.yml up -d mongo kafka mysql
docker ps
- name: Install local ToxiProxy to simulate connectivity issues to Dapr sidecar
run: |
Expand Down
88 changes: 88 additions & 0 deletions sdk-actors/src/main/java/io/dapr/actors/runtime/ActorState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.actors.runtime;

import java.time.Instant;

/**
* Represents a state change for an actor.
*/
final class ActorState<T> {

/**
* Name of the state being changed.
*/
private final String name;

/**
* New value for the state being changed.
*/
private final T value;

/**
* Expiration.
*/
private final Instant expiration;

/**
* Creates a new instance of the metadata on actor state.
*
* @param name Name of the state being changed.
* @param value Value to be set.
*/
ActorState(String name, T value) {
this(name, value, null);
}

/**
* Creates a new instance of the metadata on actor state.
*
* @param name Name of the state being changed.
* @param value Value to be set.
* @param expiration When the value is set to expire (recommended but accepts null).
*/
ActorState(String name, T value, Instant expiration) {
this.name = name;
this.value = value;
this.expiration = expiration;
}

/**
* Gets the name of the state being changed.
*
* @return Name of the state.
*/
String getName() {
return name;
}

/**
* Gets the new value of the state being changed.
*
* @return New value.
*/
T getValue() {
return value;
}

/**
* Gets the expiration of the state.
*
* @return State expiration.
*/
Instant getExpiration() {
return expiration;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@
public final class ActorStateChange {

/**
* Name of the state being changed.
* State being changed.
*/
private final String stateName;

/**
* New value for the state being changed.
*/
private final Object value;
private final ActorState state;

/**
* Type of change {@link ActorStateChangeKind}.
Expand All @@ -36,32 +31,21 @@ public final class ActorStateChange {
/**
* Creates an actor state change.
*
* @param stateName Name of the state being changed.
* @param value New value for the state being changed.
* @param state State being changed.
* @param changeKind Kind of change.
*/
ActorStateChange(String stateName, Object value, ActorStateChangeKind changeKind) {
this.stateName = stateName;
this.value = value;
ActorStateChange(ActorState state, ActorStateChangeKind changeKind) {
this.state = state;
this.changeKind = changeKind;
}

/**
* Gets the name of the state being changed.
*
* @return Name of the state.
*/
String getStateName() {
return stateName;
}

/**
* Gets the new value of the state being changed.
* Gets the state being changed.
*
* @return New value.
* @return state.
*/
Object getValue() {
return value;
ActorState getState() {
return state;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -66,12 +69,13 @@ public class ActorStateManager {
/**
* Adds a given key/value to the Actor's state store's cache.
*
* @param stateName Name of the state being added.
* @param value Value to be added.
* @param <T> Type of the object being added.
* @param stateName Name of the state being added.
* @param value Value to be added.
* @param expiration State's expiration.
* @param <T> Type of the object being added.
* @return Asynchronous void operation.
*/
public <T> Mono<Void> add(String stateName, T value) {
public <T> Mono<Void> add(String stateName, T value, Instant expiration) {
return Mono.fromSupplier(() -> {
if (stateName == null) {
throw new IllegalArgumentException("State's name cannot be null.");
Expand All @@ -84,7 +88,8 @@ public <T> Mono<Void> add(String stateName, T value) {
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);

if (metadata.kind == ActorStateChangeKind.REMOVE) {
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.UPDATE, value));
this.stateChangeTracker.put(
stateName, new StateChangeMetadata(ActorStateChangeKind.UPDATE, value, expiration));
return true;
}

Expand All @@ -95,7 +100,8 @@ public <T> Mono<Void> add(String stateName, T value) {
throw new IllegalStateException("Duplicate state: " + stateName);
}

this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.ADD, value));
this.stateChangeTracker.put(
stateName, new StateChangeMetadata(ActorStateChangeKind.ADD, value, expiration));
return true;
}))
.then();
Expand Down Expand Up @@ -130,6 +136,10 @@ public <T> Mono<T> get(String stateName, TypeRef<T> type) {
if (this.stateChangeTracker.containsKey(stateName)) {
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);

if (metadata.isExpired()) {
throw new NoSuchElementException("State is expired: " + stateName);
}

if (metadata.kind == ActorStateChangeKind.REMOVE) {
throw new NoSuchElementException("State is marked for removal: " + stateName);
}
Expand All @@ -142,20 +152,37 @@ public <T> Mono<T> get(String stateName, TypeRef<T> type) {
this.stateProvider.load(this.actorTypeName, this.actorId, stateName, type)
.switchIfEmpty(Mono.error(new NoSuchElementException("State not found: " + stateName)))
.map(v -> {
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.NONE, v));
return (T) v;
this.stateChangeTracker.put(
stateName, new StateChangeMetadata(ActorStateChangeKind.NONE, v.getValue(), v.getExpiration()));
return (T) v.getValue();
}));
}

/**
* Updates a given key/value pair in the state store's cache.
* Use the variation that takes in an TTL instead.
*
* @param stateName Name of the state being updated.
* @param value Value to be set for given state.
* @param <T> Type of the value being set.
* @return Asynchronous void result.
*/
@Deprecated
public <T> Mono<Void> set(String stateName, T value) {
return this.set(stateName, value, Duration.ZERO);
}

/**
* Updates a given key/value pair in the state store's cache.
* Using TTL is highly recommended to avoid state to be left in the state store forever.
*
* @param stateName Name of the state being updated.
* @param value Value to be set for given state.
* @param ttl Time to live.
* @param <T> Type of the value being set.
* @return Asynchronous void result.
*/
public <T> Mono<Void> set(String stateName, T value, Duration ttl) {
return Mono.fromSupplier(() -> {
if (stateName == null) {
throw new IllegalArgumentException("State's name cannot be null.");
Expand All @@ -165,20 +192,23 @@ public <T> Mono<Void> set(String stateName, T value) {
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);

ActorStateChangeKind kind = metadata.kind;
if ((kind == ActorStateChangeKind.NONE) || (kind == ActorStateChangeKind.REMOVE)) {
if (metadata.isExpired() || (kind == ActorStateChangeKind.NONE) || (kind == ActorStateChangeKind.REMOVE)) {
kind = ActorStateChangeKind.UPDATE;
}

this.stateChangeTracker.put(stateName, new StateChangeMetadata(kind, value));
var expiration = buildExpiration(ttl);
this.stateChangeTracker.put(stateName, new StateChangeMetadata(kind, value, expiration));
return true;
}

return false;
}).filter(x -> x)
.switchIfEmpty(this.stateProvider.contains(this.actorTypeName, this.actorId, stateName)
.map(exists -> {
var expiration = buildExpiration(ttl);
this.stateChangeTracker.put(stateName,
artursouza marked this conversation as resolved.
Show resolved Hide resolved
new StateChangeMetadata(exists ? ActorStateChangeKind.UPDATE : ActorStateChangeKind.ADD, value));
new StateChangeMetadata(
exists ? ActorStateChangeKind.UPDATE : ActorStateChangeKind.ADD, value, expiration));
return exists;
}))
.then();
Expand Down Expand Up @@ -208,7 +238,7 @@ public Mono<Void> remove(String stateName) {
return true;
}

this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null));
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null, null));
artursouza marked this conversation as resolved.
Show resolved Hide resolved
return true;
}

Expand All @@ -218,7 +248,7 @@ public Mono<Void> remove(String stateName) {
.switchIfEmpty(this.stateProvider.contains(this.actorTypeName, this.actorId, stateName))
.filter(exists -> exists)
.map(exists -> {
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null));
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null, null));
return exists;
artursouza marked this conversation as resolved.
Show resolved Hide resolved
})
.then();
Expand All @@ -239,7 +269,7 @@ public Mono<Boolean> contains(String stateName) {
return this.stateChangeTracker.get(stateName);
}
).map(metadata -> {
if (metadata.kind == ActorStateChangeKind.REMOVE) {
if (metadata.isExpired() || (metadata.kind == ActorStateChangeKind.REMOVE)) {
return Boolean.FALSE;
}

Expand All @@ -264,7 +294,8 @@ public Mono<Void> save() {
continue;
}

changes.add(new ActorStateChange(tuple.getKey(), tuple.getValue().value, tuple.getValue().kind));
var actorState = new ActorState<>(tuple.getKey(), tuple.getValue().value, tuple.getValue().expiration);
changes.add(new ActorStateChange(actorState, tuple.getValue().kind));
}

return changes.toArray(new ActorStateChange[0]);
Expand All @@ -288,12 +319,17 @@ private void flush() {
if (tuple.getValue().kind == ActorStateChangeKind.REMOVE) {
this.stateChangeTracker.remove(stateName);
} else {
StateChangeMetadata metadata = new StateChangeMetadata(ActorStateChangeKind.NONE, tuple.getValue().value);
StateChangeMetadata metadata =
new StateChangeMetadata(ActorStateChangeKind.NONE, tuple.getValue().value, tuple.getValue().expiration);
this.stateChangeTracker.put(stateName, metadata);
}
}
}

private static Instant buildExpiration(Duration ttl) {
return (ttl != null) && !ttl.isNegative() && !ttl.isZero() ? Instant.now().plus(ttl) : null;
}

/**
* Internal class to represent value and change kind.
*/
Expand All @@ -309,15 +345,26 @@ private static final class StateChangeMetadata {
*/
private final Object value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if value shouldn't be of type T just like in ActorState, otherwise we are loosing some compiler safety.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not need because this is only to serialize and not to be deserialized.


/**
* Expiration.
*/
private final Instant expiration;

/**
* Creates a new instance of the metadata on state change.
*
* @param kind Kind of change.
* @param value Value to be set.
* @param expiration When the value is set to expire (recommended but accepts null).
*/
private StateChangeMetadata(ActorStateChangeKind kind, Object value) {
private StateChangeMetadata(ActorStateChangeKind kind, Object value, Instant expiration) {
this.kind = kind;
this.value = value;
this.expiration = expiration;
}

private boolean isExpired() {
return (this.expiration != null) && Instant.now().isAfter(this.expiration);
}
}
}
Loading
Loading