From d82b0c1146bd468ec4d3a35224633f8cccc9571e Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Fri, 21 Jun 2024 19:14:27 -0700 Subject: [PATCH 1/4] Add actor state TTL support. Signed-off-by: Artur Souza --- .github/workflows/build.yml | 2 +- .../io/dapr/actors/runtime/ActorState.java | 88 ++++++++++++++++++ .../dapr/actors/runtime/ActorStateChange.java | 34 ++----- .../actors/runtime/ActorStateManager.java | 81 ++++++++++++---- .../actors/runtime/ActorStateOperation.java | 35 ++----- .../io/dapr/actors/runtime/DaprClient.java | 4 +- .../dapr/actors/runtime/DaprClientImpl.java | 30 +++++- .../runtime/DaprStateAsyncProvider.java | 29 +++--- .../actors/runtime/ActorStatefulTest.java | 82 ++++++++++++++-- .../actors/runtime/DaprGrpcClientTest.java | 18 ++-- .../runtime/DaprInMemoryStateProvider.java | 38 ++++++-- .../runtime/DaprStateAsyncProviderTest.java | 73 ++++++++------- sdk-tests/components/actorstatestore.yaml | 16 ++++ sdk-tests/components/statestore.yaml | 2 - sdk-tests/configurations/configuration.yaml | 5 +- sdk-tests/deploy/local-test.yml | 7 ++ .../java/io/dapr/it/actors/ActorStateIT.java | 93 +++++++++++++++++-- .../services/springboot/StatefulActor.java | 2 + .../springboot/StatefulActorImpl.java | 7 ++ 19 files changed, 488 insertions(+), 158 deletions(-) create mode 100644 sdk-actors/src/main/java/io/dapr/actors/runtime/ActorState.java create mode 100644 sdk-tests/components/actorstatestore.yaml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2453275c0..da51b6ff6 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -107,7 +107,7 @@ jobs: ./dist/linux_amd64/release/placement & - name: Spin local environment run: | - docker-compose -f ./sdk-tests/deploy/local-test.yml up -d mongo kafka + docker-compose -f ./sdk-tests/deploy/local-test.yml up -d mongo kafka mysql docker ps - name: Install local ToxiProxy to simulate connectivity issues to Dapr sidecar run: | diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorState.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorState.java new file mode 100644 index 000000000..17d46be39 --- /dev/null +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorState.java @@ -0,0 +1,88 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.actors.runtime; + +import java.time.Instant; + +/** + * Represents a state change for an actor. + */ +final class ActorState { + + /** + * Name of the state being changed. + */ + private final String name; + + /** + * New value for the state being changed. + */ + private final T value; + + /** + * Expiration. + */ + private final Instant expiration; + + /** + * Creates a new instance of the metadata on actor state. + * + * @param name Name of the state being changed. + * @param value Value to be set. + */ + ActorState(String name, T value) { + this(name, value, null); + } + + /** + * Creates a new instance of the metadata on actor state. + * + * @param name Name of the state being changed. + * @param value Value to be set. + * @param expiration When the value is set to expire (recommended but accepts null). + */ + ActorState(String name, T value, Instant expiration) { + this.name = name; + this.value = value; + this.expiration = expiration; + } + + /** + * Gets the name of the state being changed. + * + * @return Name of the state. + */ + String getName() { + return name; + } + + /** + * Gets the new value of the state being changed. + * + * @return New value. + */ + T getValue() { + return value; + } + + /** + * Gets the expiration of the state. + * + * @return State expiration. + */ + Instant getExpiration() { + return expiration; + } + +} diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateChange.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateChange.java index 47ce28e2e..d5c20113a 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateChange.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateChange.java @@ -19,14 +19,9 @@ public final class ActorStateChange { /** - * Name of the state being changed. + * State being changed. */ - private final String stateName; - - /** - * New value for the state being changed. - */ - private final Object value; + private final ActorState state; /** * Type of change {@link ActorStateChangeKind}. @@ -36,32 +31,21 @@ public final class ActorStateChange { /** * Creates an actor state change. * - * @param stateName Name of the state being changed. - * @param value New value for the state being changed. + * @param state State being changed. * @param changeKind Kind of change. */ - ActorStateChange(String stateName, Object value, ActorStateChangeKind changeKind) { - this.stateName = stateName; - this.value = value; + ActorStateChange(ActorState state, ActorStateChangeKind changeKind) { + this.state = state; this.changeKind = changeKind; } /** - * Gets the name of the state being changed. - * - * @return Name of the state. - */ - String getStateName() { - return stateName; - } - - /** - * Gets the new value of the state being changed. + * Gets the state being changed. * - * @return New value. + * @return state. */ - Object getValue() { - return value; + ActorState getState() { + return state; } /** diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateManager.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateManager.java index 1bf5430dd..4d6bdabb8 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateManager.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateManager.java @@ -17,7 +17,10 @@ import io.dapr.utils.TypeRef; import reactor.core.publisher.Mono; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -66,12 +69,13 @@ public class ActorStateManager { /** * Adds a given key/value to the Actor's state store's cache. * - * @param stateName Name of the state being added. - * @param value Value to be added. - * @param Type of the object being added. + * @param stateName Name of the state being added. + * @param value Value to be added. + * @param expiration State's expiration. + * @param Type of the object being added. * @return Asynchronous void operation. */ - public Mono add(String stateName, T value) { + public Mono add(String stateName, T value, Instant expiration) { return Mono.fromSupplier(() -> { if (stateName == null) { throw new IllegalArgumentException("State's name cannot be null."); @@ -84,7 +88,8 @@ public Mono add(String stateName, T value) { StateChangeMetadata metadata = this.stateChangeTracker.get(stateName); if (metadata.kind == ActorStateChangeKind.REMOVE) { - this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.UPDATE, value)); + this.stateChangeTracker.put( + stateName, new StateChangeMetadata(ActorStateChangeKind.UPDATE, value, expiration)); return true; } @@ -95,7 +100,8 @@ public Mono add(String stateName, T value) { throw new IllegalStateException("Duplicate state: " + stateName); } - this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.ADD, value)); + this.stateChangeTracker.put( + stateName, new StateChangeMetadata(ActorStateChangeKind.ADD, value, expiration)); return true; })) .then(); @@ -130,6 +136,10 @@ public Mono get(String stateName, TypeRef type) { if (this.stateChangeTracker.containsKey(stateName)) { StateChangeMetadata metadata = this.stateChangeTracker.get(stateName); + if (metadata.isExpired()) { + throw new NoSuchElementException("State is expired: " + stateName); + } + if (metadata.kind == ActorStateChangeKind.REMOVE) { throw new NoSuchElementException("State is marked for removal: " + stateName); } @@ -142,20 +152,37 @@ public Mono get(String stateName, TypeRef type) { this.stateProvider.load(this.actorTypeName, this.actorId, stateName, type) .switchIfEmpty(Mono.error(new NoSuchElementException("State not found: " + stateName))) .map(v -> { - this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.NONE, v)); - return (T) v; + this.stateChangeTracker.put( + stateName, new StateChangeMetadata(ActorStateChangeKind.NONE, v.getValue(), v.getExpiration())); + return (T) v.getValue(); })); } /** * Updates a given key/value pair in the state store's cache. + * Use the variation that takes in an TTL instead. * * @param stateName Name of the state being updated. * @param value Value to be set for given state. * @param Type of the value being set. * @return Asynchronous void result. */ + @Deprecated public Mono set(String stateName, T value) { + return this.set(stateName, value, Duration.ZERO); + } + + /** + * Updates a given key/value pair in the state store's cache. + * Using TTL is highly recommended to avoid state to be left in the state store forever. + * + * @param stateName Name of the state being updated. + * @param value Value to be set for given state. + * @param ttl Time to live. + * @param Type of the value being set. + * @return Asynchronous void result. + */ + public Mono set(String stateName, T value, Duration ttl) { return Mono.fromSupplier(() -> { if (stateName == null) { throw new IllegalArgumentException("State's name cannot be null."); @@ -165,11 +192,12 @@ public Mono set(String stateName, T value) { StateChangeMetadata metadata = this.stateChangeTracker.get(stateName); ActorStateChangeKind kind = metadata.kind; - if ((kind == ActorStateChangeKind.NONE) || (kind == ActorStateChangeKind.REMOVE)) { + if (metadata.isExpired() || (kind == ActorStateChangeKind.NONE) || (kind == ActorStateChangeKind.REMOVE)) { kind = ActorStateChangeKind.UPDATE; } - this.stateChangeTracker.put(stateName, new StateChangeMetadata(kind, value)); + var expiration = buildExpiration(ttl); + this.stateChangeTracker.put(stateName, new StateChangeMetadata(kind, value, expiration)); return true; } @@ -177,8 +205,10 @@ public Mono set(String stateName, T value) { }).filter(x -> x) .switchIfEmpty(this.stateProvider.contains(this.actorTypeName, this.actorId, stateName) .map(exists -> { + var expiration = buildExpiration(ttl); this.stateChangeTracker.put(stateName, - new StateChangeMetadata(exists ? ActorStateChangeKind.UPDATE : ActorStateChangeKind.ADD, value)); + new StateChangeMetadata( + exists ? ActorStateChangeKind.UPDATE : ActorStateChangeKind.ADD, value, expiration)); return exists; })) .then(); @@ -208,7 +238,7 @@ public Mono remove(String stateName) { return true; } - this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null)); + this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null, null)); return true; } @@ -218,7 +248,7 @@ public Mono remove(String stateName) { .switchIfEmpty(this.stateProvider.contains(this.actorTypeName, this.actorId, stateName)) .filter(exists -> exists) .map(exists -> { - this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null)); + this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null, null)); return exists; }) .then(); @@ -239,7 +269,7 @@ public Mono contains(String stateName) { return this.stateChangeTracker.get(stateName); } ).map(metadata -> { - if (metadata.kind == ActorStateChangeKind.REMOVE) { + if (metadata.isExpired() || (metadata.kind == ActorStateChangeKind.REMOVE)) { return Boolean.FALSE; } @@ -264,7 +294,8 @@ public Mono save() { continue; } - changes.add(new ActorStateChange(tuple.getKey(), tuple.getValue().value, tuple.getValue().kind)); + var actorState = new ActorState<>(tuple.getKey(), tuple.getValue().value, tuple.getValue().expiration); + changes.add(new ActorStateChange(actorState, tuple.getValue().kind)); } return changes.toArray(new ActorStateChange[0]); @@ -288,12 +319,17 @@ private void flush() { if (tuple.getValue().kind == ActorStateChangeKind.REMOVE) { this.stateChangeTracker.remove(stateName); } else { - StateChangeMetadata metadata = new StateChangeMetadata(ActorStateChangeKind.NONE, tuple.getValue().value); + StateChangeMetadata metadata = + new StateChangeMetadata(ActorStateChangeKind.NONE, tuple.getValue().value, tuple.getValue().expiration); this.stateChangeTracker.put(stateName, metadata); } } } + private static Instant buildExpiration(Duration ttl) { + return (ttl != null) && !ttl.isNegative() && !ttl.isZero() ? Instant.now().plus(ttl) : null; + } + /** * Internal class to represent value and change kind. */ @@ -309,15 +345,26 @@ private static final class StateChangeMetadata { */ private final Object value; + /** + * Expiration. + */ + private final Instant expiration; + /** * Creates a new instance of the metadata on state change. * * @param kind Kind of change. * @param value Value to be set. + * @param expiration When the value is set to expire (recommended but accepts null). */ - private StateChangeMetadata(ActorStateChangeKind kind, Object value) { + private StateChangeMetadata(ActorStateChangeKind kind, Object value, Instant expiration) { this.kind = kind; this.value = value; + this.expiration = expiration; + } + + private boolean isExpired() { + return (this.expiration != null) && Instant.now().isAfter(this.expiration); } } } diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateOperation.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateOperation.java index 737815e66..4db588450 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateOperation.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateOperation.java @@ -25,28 +25,19 @@ final class ActorStateOperation { private String operationType; /** - * Key for the state to be persisted. + * State to be persisted. */ - private String key; - - /** - * Value of the state to be persisted. - */ - private Object value; + private ActorState state; /** * Instantiates a new Actor Timer. * * @param operationType Type of state operation. - * @param key Key to be persisted. - * @param value Value to be persisted. + * @param state Key to be persisted. */ - ActorStateOperation(String operationType, - String key, - Object value) { + ActorStateOperation(String operationType, ActorState state) { this.operationType = operationType; - this.key = key; - this.value = value; + this.state = state; } /** @@ -59,20 +50,12 @@ public String getOperationType() { } /** - * Gets the key to be persisted. + * Gets the state to be persisted. * - * @return Key to be persisted. + * @return State to be persisted. */ - public String getKey() { - return key; + public ActorState getState() { + return state; } - /** - * Gets the value to be persisted. - * - * @return Value to be persisted. - */ - public Object getValue() { - return value; - } } diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClient.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClient.java index 6d53361c4..2c6c83c6c 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClient.java @@ -28,9 +28,9 @@ interface DaprClient { * @param actorType Type of actor. * @param actorId Actor Identifier. * @param keyName State name. - * @return Asynchronous result with current state value. + * @return Asynchronous result with current state. */ - Mono getState(String actorType, String actorId, String keyName); + Mono> getState(String actorType, String actorId, String keyName); /** * Saves state batch to Dapr. diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClientImpl.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClientImpl.java index a14d7e257..5b5b5e50f 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClientImpl.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprClientImpl.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.nio.charset.Charset; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; @@ -77,7 +78,7 @@ class DaprClientImpl implements DaprClient { * {@inheritDoc} */ @Override - public Mono getState(String actorType, String actorId, String keyName) { + public Mono> getState(String actorType, String actorId, final String keyName) { DaprProtos.GetActorStateRequest req = DaprProtos.GetActorStateRequest.newBuilder() .setActorType(actorType) @@ -86,7 +87,14 @@ public Mono getState(String actorType, String actorId, String keyName) { .build(); return Mono.create(it -> - client.getActorState(req, createStreamObserver(it))).map(r -> r.getData().toByteArray()); + client.getActorState(req, createStreamObserver(it))).map(r -> { + var expirationStr = r.getMetadataOrDefault("ttlExpireTime", null); + Instant expiration = null; + if ((expirationStr != null) && !expirationStr.isEmpty()) { + expiration = Instant.parse(expirationStr); + } + return new ActorState<>(keyName, r.getData().toByteArray(), expiration); + }); } /** @@ -100,12 +108,26 @@ public Mono saveStateTransactionally( List grpcOps = new ArrayList<>(); for (ActorStateOperation op : operations) { String operationType = op.getOperationType(); - String key = op.getKey(); - Object value = op.getValue(); + String key = op.getState().getName(); + Object value = op.getState().getValue(); + Instant expiration = op.getState().getExpiration(); + Long ttlInSeconds = null; + if (expiration != null) { + ttlInSeconds = expiration.getEpochSecond() - Instant.now().getEpochSecond(); + } DaprProtos.TransactionalActorStateOperation.Builder opBuilder = DaprProtos.TransactionalActorStateOperation.newBuilder() .setOperationType(operationType) .setKey(key); + + if (ttlInSeconds != null) { + if (ttlInSeconds <= 0) { + // already expired, min is 1s. + ttlInSeconds = 1L; + } + opBuilder.putMetadata("ttlInSeconds", ttlInSeconds.toString()); + } + if (value != null) { if (value instanceof String) { opBuilder.setValue(Any.newBuilder().setValue(ByteString.copyFrom((String) value, CHARSET))); diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java index fa481f371..d0db1ba2d 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java @@ -20,9 +20,13 @@ import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.utils.TypeRef; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; import java.io.IOException; import java.nio.charset.Charset; +import java.time.Instant; +import java.util.AbstractMap; import java.util.ArrayList; /** @@ -67,8 +71,8 @@ class DaprStateAsyncProvider { this.isStateSerializerDefault = stateSerializer.getClass() == DefaultObjectSerializer.class; } - Mono load(String actorType, ActorId actorId, String stateName, TypeRef type) { - Mono result = this.daprClient.getState(actorType, actorId.toString(), stateName); + Mono> load(String actorType, ActorId actorId, String stateName, TypeRef type) { + Mono> result = this.daprClient.getState(actorType, actorId.toString(), stateName); return result.flatMap(s -> { try { @@ -76,19 +80,19 @@ Mono load(String actorType, ActorId actorId, String stateName, TypeRef return Mono.empty(); } - T response = this.stateSerializer.deserialize(s, type); + T response = this.stateSerializer.deserialize(s.getValue(), type); if (this.isStateSerializerDefault && (response instanceof byte[])) { - if (s.length == 0) { + if ((s.getValue() == null) || (s.getValue().length == 0)) { return Mono.empty(); } // Default serializer just passes through byte arrays, so we need to decode it here. - response = (T) OBJECT_MAPPER.readValue(s, byte[].class); + response = (T) OBJECT_MAPPER.readValue(s.getValue(), byte[].class); } if (response == null) { return Mono.empty(); } - return Mono.just(response); + return Mono.just(new ActorState<>(s.getName(), response, s.getExpiration())); } catch (IOException e) { return Mono.error(new RuntimeException(e)); } @@ -96,8 +100,8 @@ Mono load(String actorType, ActorId actorId, String stateName, TypeRef } Mono contains(String actorType, ActorId actorId, String stateName) { - Mono result = this.daprClient.getState(actorType, actorId.toString(), stateName); - return result.map(s -> s.length > 0).defaultIfEmpty(false); + var result = this.daprClient.getState(actorType, actorId.toString(), stateName); + return result.map(s -> (s.getValue() != null) && (s.getValue().length > 0)).defaultIfEmpty(false); } /** @@ -139,14 +143,15 @@ Mono apply(String actorType, ActorId actorId, ActorStateChange... stateCha continue; } - String key = stateChange.getStateName(); + var state = stateChange.getState(); + String key = state.getName(); Object value = null; if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE) || (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) { try { - byte[] data = this.stateSerializer.serialize(stateChange.getValue()); + byte[] data = this.stateSerializer.serialize(state.getValue()); if (data != null) { - if (this.isStateSerializerDefault && !(stateChange.getValue() instanceof byte[])) { + if (this.isStateSerializerDefault && !(state.getValue() instanceof byte[])) { // DefaultObjectSerializer is a JSON serializer, so we just pass it on. value = new String(data, CHARSET); } else { @@ -160,7 +165,7 @@ Mono apply(String actorType, ActorId actorId, ActorStateChange... stateCha } } - operations.add(new ActorStateOperation(operationName, key, value)); + operations.add(new ActorStateOperation(operationName, new ActorState(key, value, state.getExpiration()))); } return this.daprClient.saveStateTransactionally(actorType, actorId.toString(), operations); diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorStatefulTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorStatefulTest.java index 88488d30c..56cba2e08 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorStatefulTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorStatefulTest.java @@ -68,6 +68,10 @@ public interface MyActor { Mono setMessage(String message); + Mono setMessageFor1s(String message); + + Mono setMessageAndWait(String message); + Mono getMessage(); Mono hasMessage(); @@ -197,7 +201,7 @@ public Mono getCountButThrowsException() { @Override public Mono addMessage(String message) { - return super.getActorStateManager().add("message", message); + return super.getActorStateManager().add("message", message, null); } @Override @@ -205,6 +209,20 @@ public Mono setMessage(String message) { return super.getActorStateManager().set("message", message).thenReturn(executeSayMethod(message)); } + @Override + public Mono setMessageFor1s(String message) { + return super + .getActorStateManager().set("message", message, Duration.ofSeconds(1)) + .then(super.getActorStateManager().contains("message")); + } + + @Override + public Mono setMessageAndWait(String message) { + return super.getActorStateManager().set("message", message, Duration.ofSeconds(1)) + .then(Mono.delay(Duration.ofMillis(1100))) + .then(super.getActorStateManager().contains("message")); + } + @Override public Mono getMessage() { return super.getActorStateManager().get("message", String.class); @@ -223,20 +241,20 @@ public Mono deleteMessage() { @Override public Mono forceDuplicateException() { // Second add should throw exception. - return super.getActorStateManager().add("message", "anything") - .then(super.getActorStateManager().add("message", "something else")); + return super.getActorStateManager().add("message", "anything", null) + .then(super.getActorStateManager().add("message", "something else", null)); } @Override public Mono forcePartialChange() { - return super.getActorStateManager().add("message", "first message") + return super.getActorStateManager().add("message", "first message", null) .then(super.saveState()) - .then(super.getActorStateManager().add("message", "second message")); + .then(super.getActorStateManager().add("message", "second message", null)); } @Override public Mono throwsWithoutSaving() { - return super.getActorStateManager().add("message", "first message") + return super.getActorStateManager().add("message", "first message", null) .then(Mono.error(new IllegalCharsetNameException("random"))); } @@ -298,23 +316,67 @@ public MyMethodContext setName(String name) { public void happyGetSetDeleteContains() { ActorProxy proxy = newActorProxy(); Assertions.assertEquals( - proxy.getActorId().toString(), proxy.invokeMethod("getIdString", String.class).block()); + proxy.getActorId().toString(), proxy.invokeMethod("getIdString", String.class).block()); Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block()); proxy.invokeMethod("setMessage", "hello world").block(); Assertions.assertTrue(proxy.invokeMethod("hasMessage", Boolean.class).block()); Assertions.assertEquals( - "hello world", proxy.invokeMethod("getMessage", String.class).block()); + "hello world", proxy.invokeMethod("getMessage", String.class).block()); Assertions.assertEquals( - executeSayMethod("hello world"), - proxy.invokeMethod("setMessage", "hello world", String.class).block()); + executeSayMethod("hello world"), + proxy.invokeMethod("setMessage", "hello world", String.class).block()); proxy.invokeMethod("deleteMessage").block(); Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block()); } + @Test + public void actorStateTTL() throws Exception { + ActorProxy proxy = newActorProxy(); + Assertions.assertEquals( + proxy.getActorId().toString(), proxy.invokeMethod("getIdString", String.class).block()); + Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block()); + + Assertions.assertTrue( + proxy.invokeMethod("setMessageFor1s", "hello world expires in 1s", Boolean.class).block()); + Assertions.assertTrue(proxy.invokeMethod("hasMessage", Boolean.class).block()); + + Assertions.assertEquals( + "hello world expires in 1s", proxy.invokeMethod("getMessage", String.class).block()); + + Assertions.assertTrue(proxy.invokeMethod("hasMessage", Boolean.class).block()); + + Thread.sleep(1100); + + Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block()); + } + + @Test + public void actorStateTTLExpiresInLocalCache() throws Exception { + ActorProxy proxy = newActorProxy(); + Assertions.assertEquals( + proxy.getActorId().toString(), proxy.invokeMethod("getIdString", String.class).block()); + Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block()); + + //First, sets a message without TTL and checks it is saved. + proxy.invokeMethod("setMessage", "hello world").block(); + Assertions.assertTrue(proxy.invokeMethod("hasMessage", Boolean.class).block()); + Assertions.assertEquals( + "hello world", proxy.invokeMethod("getMessage", String.class).block()); + + // Now, sets a message that expires still in local cache, before it is sent to state store. + Assertions.assertFalse( + proxy.invokeMethod("setMessageAndWait", "expires while still in cache", Boolean.class).block()); + Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block()); + + Thread.sleep(1100); + + Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block()); + } + @Test public void lazyGet() { ActorProxy proxy = newActorProxy(); diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java index 815186e60..7cfad504f 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprGrpcClientTest.java @@ -63,8 +63,8 @@ public class DaprGrpcClientTest { private static final byte[] RESPONSE_PAYLOAD = "\"hello world\"".getBytes(); private static final List OPERATIONS = Arrays.asList( - new ActorStateOperation("upsert", "mykey", "hello world".getBytes()), - new ActorStateOperation("delete", "mykey", null)); + new ActorStateOperation("upsert", new ActorState("mykey", "hello world".getBytes(), null)), + new ActorStateOperation("delete", new ActorState("mykey", null, null))); private final DaprGrpc.DaprImplBase serviceImpl = new CustomDaprClient(); @@ -92,7 +92,7 @@ public void setup() throws IOException { @Test public void getActorStateException() { - Mono result = client.getState(ACTOR_TYPE, ACTOR_EXCEPTION, KEY); + Mono> result = client.getState(ACTOR_TYPE, ACTOR_EXCEPTION, KEY); assertThrowsDaprException( ExecutionException.class, "UNKNOWN", @@ -102,8 +102,8 @@ public void getActorStateException() { @Test public void getActorState() { - Mono result = client.getState(ACTOR_TYPE, ACTOR_ID, KEY); - assertArrayEquals(RESPONSE_PAYLOAD, result.block()); + Mono> result = client.getState(ACTOR_TYPE, ACTOR_ID, KEY); + assertArrayEquals(RESPONSE_PAYLOAD, result.block().getValue()); } @Test @@ -130,8 +130,8 @@ public void saveActorStateTransactionallyByteArray() { @Test public void saveActorStateTransactionallyInvalidValueType() { ActorStateOperation[] operations = new ActorStateOperation[]{ - new ActorStateOperation("upsert", "mykey", 123), - new ActorStateOperation("delete", "mykey", null), + new ActorStateOperation("upsert", new ActorState("mykey", 123, null)), + new ActorStateOperation("delete", new ActorState("mykey", null, null)), }; Mono result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations)); @@ -327,9 +327,9 @@ public boolean matches(DaprProtos.ExecuteActorStateTransactionRequest argument) for (ActorStateOperation operation : operations) { boolean found = false; for (DaprProtos.TransactionalActorStateOperation grpcOperation : argument.getOperationsList()) { - if (operation.getKey().equals(grpcOperation.getKey()) + if (operation.getState().getName().equals(grpcOperation.getKey()) && operation.getOperationType().equals(grpcOperation.getOperationType()) - && nullableEquals(operation.getValue(), grpcOperation.getValue())) { + && nullableEquals(operation.getState().getValue(), grpcOperation.getValue())) { found = true; break; } diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprInMemoryStateProvider.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprInMemoryStateProvider.java index 4f1c165d7..72a3ea90a 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprInMemoryStateProvider.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprInMemoryStateProvider.java @@ -19,6 +19,7 @@ import reactor.core.publisher.Mono; import java.io.IOException; +import java.time.Instant; import java.util.HashMap; import java.util.Map; @@ -27,7 +28,7 @@ */ public class DaprInMemoryStateProvider extends DaprStateAsyncProvider { - private static final Map stateStore = new HashMap<>(); + private static final Map> stateStore = new HashMap<>(); private final DaprObjectSerializer serializer; @@ -37,7 +38,7 @@ public class DaprInMemoryStateProvider extends DaprStateAsyncProvider { } @Override - Mono load(String actorType, ActorId actorId, String stateName, TypeRef type) { + Mono> load(String actorType, ActorId actorId, String stateName, TypeRef type) { return Mono.fromSupplier(() -> { try { String stateId = this.buildId(actorType, actorId, stateName); @@ -45,16 +46,38 @@ Mono load(String actorType, ActorId actorId, String stateName, TypeRef throw new IllegalStateException("State not found."); } - return this.serializer.deserialize(this.stateStore.get(stateId), type); + var state = this.stateStore.get(stateId); + if (state.getExpiration() != null) { + if (!state.getExpiration().isAfter(Instant.now())) { + throw new IllegalStateException("State expired."); + } + } + var v = this.serializer.deserialize(state.getValue(), type); + return new ActorState<>(stateName, v, state.getExpiration()); } catch (IOException e) { throw new RuntimeException(e); } }); } + private boolean contains(String stateId) { + if (!stateStore.containsKey(stateId)) { + return false; + } + + var state = this.stateStore.get(stateId); + if (state.getExpiration() != null) { + if (!state.getExpiration().isAfter(Instant.now())) { + return false; + } + } + + return true; + } + @Override Mono contains(String actorType, ActorId actorId, String stateName) { - return Mono.fromSupplier(() -> stateStore.containsKey(this.buildId(actorType, actorId, stateName))); + return Mono.fromSupplier(() -> contains(this.buildId(actorType, actorId, stateName))); } @Override @@ -62,15 +85,16 @@ Mono apply(String actorType, ActorId actorId, ActorStateChange... stateCha return Mono.fromRunnable(() -> { try { for (ActorStateChange stateChange : stateChanges) { - String stateId = buildId(actorType, actorId, stateChange.getStateName()); + String stateId = buildId(actorType, actorId, stateChange.getState().getName()); switch (stateChange.getChangeKind()) { case REMOVE: stateStore.remove(stateId); break; case ADD: case UPDATE: - byte[] raw = this.serializer.serialize(stateChange.getValue()); - stateStore.put(stateId, raw); + byte[] raw = this.serializer.serialize(stateChange.getState().getValue()); + stateStore.put(stateId, + new ActorState<>(stateChange.getState().getName(), raw, stateChange.getState().getExpiration())); break; } } diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java index a24bf7271..b8a03759f 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java @@ -110,13 +110,13 @@ public void happyCaseApply() { if (operation.getOperationType() == null) { return false; } - if (operation.getKey() == null) { + if (operation.getState().getName() == null) { return false; } String opName = operation.getOperationType(); - String key = operation.getKey(); - Object value = operation.getValue(); + String key = operation.getState().getName(); + Object value = operation.getState().getValue(); foundInsertName |= "upsert".equals(opName) && "name".equals(key) && @@ -153,58 +153,59 @@ public void happyCaseLoad() throws Exception { DaprClient daprClient = mock(DaprClient.class); when(daprClient .getState(any(), any(), eq("name"))) - .thenReturn(Mono.just(SERIALIZER.serialize("Jon Doe"))); + .thenReturn(Mono.just(new ActorState<>("name", SERIALIZER.serialize("Jon Doe")))); when(daprClient .getState(any(), any(), eq("zipcode"))) - .thenReturn(Mono.just(SERIALIZER.serialize(98021))); + .thenReturn(Mono.just(new ActorState<>("zipcode", SERIALIZER.serialize(98021)))); when(daprClient .getState(any(), any(), eq("goals"))) - .thenReturn(Mono.just(SERIALIZER.serialize(98))); + .thenReturn(Mono.just(new ActorState<>("goals", SERIALIZER.serialize(98)))); when(daprClient .getState(any(), any(), eq("balance"))) - .thenReturn(Mono.just(SERIALIZER.serialize(46.55))); + .thenReturn(Mono.just(new ActorState<>("balance", SERIALIZER.serialize(46.55)))); when(daprClient .getState(any(), any(), eq("active"))) - .thenReturn(Mono.just(SERIALIZER.serialize(true))); + .thenReturn(Mono.just(new ActorState<>("active", SERIALIZER.serialize(true)))); when(daprClient .getState(any(), any(), eq("customer"))) - .thenReturn(Mono.just("{ \"id\": 1000, \"name\": \"Roxane\"}".getBytes())); + .thenReturn(Mono.just(new ActorState<>("customer", "{ \"id\": 1000, \"name\": \"Roxane\"}".getBytes()))); when(daprClient .getState(any(), any(), eq("anotherCustomer"))) - .thenReturn(Mono.just("{ \"id\": 2000, \"name\": \"Max\"}".getBytes())); + .thenReturn(Mono.just(new ActorState<>("anotherCustomer", "{ \"id\": 2000, \"name\": \"Max\"}".getBytes()))); when(daprClient .getState(any(), any(), eq("nullCustomer"))) .thenReturn(Mono.empty()); when(daprClient .getState(any(), any(), eq("bytes"))) - .thenReturn(Mono.just("\"QQ==\"".getBytes())); + .thenReturn(Mono.just(new ActorState<>("bytes", "\"QQ==\"".getBytes()))); when(daprClient .getState(any(), any(), eq("emptyBytes"))) - .thenReturn(Mono.just(new byte[0])); + .thenReturn(Mono.just(new ActorState<>("emptyBytes", new byte[0]))); DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprClient, SERIALIZER); Assertions.assertEquals("Jon Doe", - provider.load("MyActor", new ActorId("123"), "name", TypeRef.STRING).block()); + provider.load("MyActor", new ActorId("123"), "name", TypeRef.STRING).block().getValue()); Assertions.assertEquals(98021, - (int) provider.load("MyActor", new ActorId("123"), "zipcode", TypeRef.INT).block()); + (int) provider.load("MyActor", new ActorId("123"), "zipcode", TypeRef.INT).block().getValue()); Assertions.assertEquals(98, - (int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block()); + (int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block().getValue()); Assertions.assertEquals(98, - (int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block()); + (int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block().getValue()); Assertions.assertEquals(46.55, - (double) provider.load("MyActor", new ActorId("123"), "balance", TypeRef.DOUBLE).block(), + (double) provider.load("MyActor", new ActorId("123"), "balance", TypeRef.DOUBLE).block().getValue(), EPSILON); Assertions.assertEquals(true, - (boolean) provider.load("MyActor", new ActorId("123"), "active", TypeRef.BOOLEAN).block()); + (boolean) provider.load("MyActor", new ActorId("123"), "active", TypeRef.BOOLEAN).block().getValue()); Assertions.assertEquals(new Customer().setId(1000).setName("Roxane"), - provider.load("MyActor", new ActorId("123"), "customer", TypeRef.get(Customer.class)).block()); + provider.load("MyActor", new ActorId("123"), "customer", TypeRef.get(Customer.class)).block().getValue()); Assertions.assertNotEquals(new Customer().setId(1000).setName("Roxane"), - provider.load("MyActor", new ActorId("123"), "anotherCustomer", TypeRef.get(Customer.class)).block()); + provider.load( + "MyActor", new ActorId("123"), "anotherCustomer", TypeRef.get(Customer.class)).block().getValue()); Assertions.assertNull( provider.load("MyActor", new ActorId("123"), "nullCustomer", TypeRef.get(Customer.class)).block()); Assertions.assertArrayEquals("A".getBytes(), - provider.load("MyActor", new ActorId("123"), "bytes", TypeRef.get(byte[].class)).block()); + provider.load("MyActor", new ActorId("123"), "bytes", TypeRef.get(byte[].class)).block().getValue()); Assertions.assertNull( provider.load("MyActor", new ActorId("123"), "emptyBytes", TypeRef.get(byte[].class)).block()); } @@ -216,22 +217,28 @@ public void happyCaseContains() { // Keys that exists. when(daprClient .getState(any(), any(), eq("name"))) - .thenReturn(Mono.just("Jon Doe".getBytes())); + .thenReturn(Mono.just( + new ActorState<>("name", "Jon Doe".getBytes()))); when(daprClient .getState(any(), any(), eq("zipcode"))) - .thenReturn(Mono.just("98021".getBytes())); + .thenReturn(Mono.just( + new ActorState<>("zipcode", "98021".getBytes()))); when(daprClient .getState(any(), any(), eq("goals"))) - .thenReturn(Mono.just("98".getBytes())); + .thenReturn(Mono.just( + new ActorState<>("goals", "98".getBytes()))); when(daprClient .getState(any(), any(), eq("balance"))) - .thenReturn(Mono.just("46.55".getBytes())); + .thenReturn(Mono.just( + new ActorState<>("balance", "46.55".getBytes()))); when(daprClient .getState(any(), any(), eq("active"))) - .thenReturn(Mono.just("true".getBytes())); + .thenReturn(Mono.just( + new ActorState<>("active", "true".getBytes()))); when(daprClient .getState(any(), any(), eq("customer"))) - .thenReturn(Mono.just("{ \"id\": \"3000\", \"name\": \"Ely\" }".getBytes())); + .thenReturn(Mono.just( + new ActorState<>("customer", "{ \"id\": \"3000\", \"name\": \"Ely\" }".getBytes()))); // Keys that do not exist. when(daprClient @@ -257,15 +264,15 @@ public void happyCaseContains() { Assertions.assertFalse(provider.contains("MyActor", new ActorId("123"), null).block()); } - private final ActorStateChange createInsertChange(String name, T value) { - return new ActorStateChange(name, value, ActorStateChangeKind.ADD); + private ActorStateChange createInsertChange(String name, T value) { + return new ActorStateChange(new ActorState(name, value), ActorStateChangeKind.ADD); } - private final ActorStateChange createUpdateChange(String name, T value) { - return new ActorStateChange(name, value, ActorStateChangeKind.UPDATE); + private ActorStateChange createUpdateChange(String name, T value) { + return new ActorStateChange(new ActorState(name, value), ActorStateChangeKind.UPDATE); } - private final ActorStateChange createDeleteChange(String name) { - return new ActorStateChange(name, null, ActorStateChangeKind.REMOVE); + private ActorStateChange createDeleteChange(String name) { + return new ActorStateChange(new ActorState(name, null), ActorStateChangeKind.REMOVE); } } diff --git a/sdk-tests/components/actorstatestore.yaml b/sdk-tests/components/actorstatestore.yaml new file mode 100644 index 000000000..1694a7b86 --- /dev/null +++ b/sdk-tests/components/actorstatestore.yaml @@ -0,0 +1,16 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: mysql-actorstatestore +spec: + type: state.mysql + version: v1 + metadata: + - name: connectionString + value: "root:test@tcp(127.0.0.1:3306)/?allowNativePasswords=true" + - name: cleanupIntervalInSeconds + value: "1" + - name: actorStateStore + value: "true" +scopes: + - actorstateit-statefulactorservice diff --git a/sdk-tests/components/statestore.yaml b/sdk-tests/components/statestore.yaml index 2f676bff8..a0c53bc40 100644 --- a/sdk-tests/components/statestore.yaml +++ b/sdk-tests/components/statestore.yaml @@ -10,5 +10,3 @@ spec: value: localhost:6379 - name: redisPassword value: "" - - name: actorStateStore - value: "true" diff --git a/sdk-tests/configurations/configuration.yaml b/sdk-tests/configurations/configuration.yaml index b3bc4bd7b..60dadec1e 100644 --- a/sdk-tests/configurations/configuration.yaml +++ b/sdk-tests/configurations/configuration.yaml @@ -6,4 +6,7 @@ spec: tracing: samplingRate: "1" zipkin: - endpointAddress: http://localhost:9411/api/v2/spans \ No newline at end of file + endpointAddress: http://localhost:9411/api/v2/spans + features: + - name: ActorStateTTL + enabled: true \ No newline at end of file diff --git a/sdk-tests/deploy/local-test.yml b/sdk-tests/deploy/local-test.yml index 989757a78..76626d803 100644 --- a/sdk-tests/deploy/local-test.yml +++ b/sdk-tests/deploy/local-test.yml @@ -26,3 +26,10 @@ services: image: mongo ports: - "27017:27017" + + mysql: + image: mysql + environment: + MYSQL_ROOT_PASSWORD: test + ports: + - '3306:3306' diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorStateIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorStateIT.java index 534fd481d..e3a78c59d 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorStateIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorStateIT.java @@ -56,12 +56,12 @@ public void writeReadState(AppRun.AppProtocol serviceAppProtocol) throws Excepti logger.debug("Starting actor runtime ..."); // The call below will fail if service cannot start successfully. DaprRun runtime = startDaprApp( - this.getClass().getSimpleName(), - StatefulActorService.SUCCESS_MESSAGE, - StatefulActorService.class, - true, - 60000, - serviceAppProtocol); + this.getClass().getSimpleName(), + StatefulActorService.SUCCESS_MESSAGE, + StatefulActorService.class, + true, + 60000, + serviceAppProtocol); String message = "This is a message to be saved and retrieved."; String name = "Jon Doe"; @@ -75,8 +75,8 @@ public void writeReadState(AppRun.AppProtocol serviceAppProtocol) throws Excepti ActorProxy proxy = proxyBuilder.build(actorId); // wating for actor to be activated - Thread.sleep(2000); - + Thread.sleep(2000); + // Validate conditional read works. callWithRetry(() -> { logger.debug("Invoking readMessage where data is not present yet ... "); @@ -162,7 +162,7 @@ public void writeReadState(AppRun.AppProtocol serviceAppProtocol) throws Excepti ActorProxy newProxy = proxyBuilder.build(actorId); // wating for actor to be activated - Thread.sleep(2000); + Thread.sleep(2000); callWithRetry(() -> { logger.debug("Invoking readMessage where data is not cached ... "); @@ -189,4 +189,79 @@ public void writeReadState(AppRun.AppProtocol serviceAppProtocol) throws Excepti assertArrayEquals(bytes, result); }, 5000); } + + @ParameterizedTest + @MethodSource("data") + public void stateTTL(AppRun.AppProtocol serviceAppProtocol) throws Exception { + logger.debug("Starting actor runtime ..."); + // The call below will fail if service cannot start successfully. + DaprRun runtime = startDaprApp( + this.getClass().getSimpleName(), + StatefulActorService.SUCCESS_MESSAGE, + StatefulActorService.class, + true, + 60000, + serviceAppProtocol); + + String message = "This is a message to be saved and retrieved."; + String name = "Jon Doe"; + byte[] bytes = new byte[] { 0x1 }; + ActorId actorId = new ActorId( + String.format("%d-%b-state-ttl", System.currentTimeMillis(), serviceAppProtocol)); + String actorType = "StatefulActorTest"; + logger.debug("Building proxy ..."); + ActorProxyBuilder proxyBuilder = + new ActorProxyBuilder(actorType, ActorProxy.class, newActorClient()); + ActorProxy proxy = proxyBuilder.build(actorId); + + // wating for actor to be activated + Thread.sleep(2000); + + // Validate conditional read works. + callWithRetry(() -> { + logger.debug("Invoking readMessage where data is not present yet ... "); + String result = proxy.invokeMethod("readMessage", String.class).block(); + assertNull(result); + }, 5000); + + callWithRetry(() -> { + logger.debug("Invoking writeMessageFor1s ... "); + proxy.invokeMethod("writeMessageFor1s", message).block(); + }, 5000); + + callWithRetry(() -> { + logger.debug("Invoking readMessage where data is probably still cached ... "); + String result = proxy.invokeMethod("readMessage", String.class).block(); + assertEquals(message, result); + }, 5000); + + + logger.debug("Waiting, so actor can be deactivated ..."); + Thread.sleep(10000); + + logger.debug("Stopping service ..."); + runtime.stop(); + + logger.debug("Starting service ..."); + DaprRun run2 = startDaprApp( + this.getClass().getSimpleName(), + StatefulActorService.SUCCESS_MESSAGE, + StatefulActorService.class, + true, + 60000, + serviceAppProtocol); + + // Need new proxy builder because the proxy builder holds the channel. + proxyBuilder = new ActorProxyBuilder(actorType, ActorProxy.class, newActorClient()); + ActorProxy newProxy = proxyBuilder.build(actorId); + + // waiting for actor to be activated + Thread.sleep(2000); + + callWithRetry(() -> { + logger.debug("Invoking readMessage where data is not cached and expired ... "); + String result = newProxy.invokeMethod("readMessage", String.class).block(); + assertNull(result); + }, 5000); + } } diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActor.java b/sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActor.java index 981a84dc0..6f3d63305 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActor.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActor.java @@ -17,6 +17,8 @@ public interface StatefulActor { void writeMessage(String something); + void writeMessageFor1s(String something); + String readMessage(); void writeName(String something); diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActorImpl.java b/sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActorImpl.java index f37e69ca8..a555f7ea4 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActorImpl.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/services/springboot/StatefulActorImpl.java @@ -18,6 +18,8 @@ import io.dapr.actors.runtime.AbstractActor; import io.dapr.actors.runtime.ActorRuntimeContext; +import java.time.Duration; + @ActorType(name = "StatefulActorTest") public class StatefulActorImpl extends AbstractActor implements StatefulActor { @@ -30,6 +32,11 @@ public void writeMessage(String something) { super.getActorStateManager().set("message", something).block(); } + @Override + public void writeMessageFor1s(String something) { + super.getActorStateManager().set("message", something, Duration.ofSeconds(1)).block(); + } + @Override public String readMessage() { if (super.getActorStateManager().contains("message").block()) { From 6f382ffca5f95760c86c5d71691cbf8861cf096d Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Wed, 26 Jun 2024 09:08:28 -0700 Subject: [PATCH 2/4] Scope actorstatestore to actor ITs only. Signed-off-by: Artur Souza --- sdk-tests/components/actorstatestore.yaml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sdk-tests/components/actorstatestore.yaml b/sdk-tests/components/actorstatestore.yaml index 1694a7b86..a04c69929 100644 --- a/sdk-tests/components/actorstatestore.yaml +++ b/sdk-tests/components/actorstatestore.yaml @@ -14,3 +14,13 @@ spec: value: "true" scopes: - actorstateit-statefulactorservice + - activationdeactivationit-demoactorservice + - actorexceptionit-myactorservice + - actormethodnameit-myactorservice + - actorreminderfailoveritone-myactorservice + - actorreminderfailoverittwo-myactorservice + - actorreminderrecoveryit + - actorsdkresiliencytit-demoactorservice + - actorstateit-statefulactorservice + - actortimerrecoveryit + - actorturnbasedconcurrencyit-myactorservice \ No newline at end of file From b54a7bb923b0813d19e7361473da157b92638b97 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Wed, 26 Jun 2024 15:15:36 -0700 Subject: [PATCH 3/4] nit: fix year in header of new file. Signed-off-by: Artur Souza --- sdk-actors/src/main/java/io/dapr/actors/runtime/ActorState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorState.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorState.java index 17d46be39..3fde9820d 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorState.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorState.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 The Dapr Authors + * Copyright 2024 The Dapr Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at From 8dbf2752a7fad8c9a909cd96c9d9d279d1acd150 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Fri, 15 Nov 2024 13:38:57 -0800 Subject: [PATCH 4/4] Address comments Signed-off-by: Artur Souza --- .../io/dapr/actors/runtime/ActorStateManager.java | 11 +++++++---- sdk-tests/components/actorstatestore.yaml | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateManager.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateManager.java index 4d6bdabb8..9b60cacf6 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateManager.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorStateManager.java @@ -20,7 +20,6 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -32,6 +31,9 @@ */ public class ActorStateManager { + private static final StateChangeMetadata REMOVE_STATE_CHANGE_METADATA = + new StateChangeMetadata(ActorStateChangeKind.REMOVE, null, null); + /** * Provides states using a state store. */ @@ -206,9 +208,10 @@ public Mono set(String stateName, T value, Duration ttl) { .switchIfEmpty(this.stateProvider.contains(this.actorTypeName, this.actorId, stateName) .map(exists -> { var expiration = buildExpiration(ttl); + ActorStateChangeKind changeKind = exists ? ActorStateChangeKind.UPDATE : ActorStateChangeKind.ADD; this.stateChangeTracker.put(stateName, new StateChangeMetadata( - exists ? ActorStateChangeKind.UPDATE : ActorStateChangeKind.ADD, value, expiration)); + changeKind, value, expiration)); return exists; })) .then(); @@ -238,7 +241,7 @@ public Mono remove(String stateName) { return true; } - this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null, null)); + this.stateChangeTracker.put(stateName, REMOVE_STATE_CHANGE_METADATA); return true; } @@ -248,7 +251,7 @@ public Mono remove(String stateName) { .switchIfEmpty(this.stateProvider.contains(this.actorTypeName, this.actorId, stateName)) .filter(exists -> exists) .map(exists -> { - this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null, null)); + this.stateChangeTracker.put(stateName, REMOVE_STATE_CHANGE_METADATA); return exists; }) .then(); diff --git a/sdk-tests/components/actorstatestore.yaml b/sdk-tests/components/actorstatestore.yaml index a04c69929..cad867f8b 100644 --- a/sdk-tests/components/actorstatestore.yaml +++ b/sdk-tests/components/actorstatestore.yaml @@ -23,4 +23,4 @@ scopes: - actorsdkresiliencytit-demoactorservice - actorstateit-statefulactorservice - actortimerrecoveryit - - actorturnbasedconcurrencyit-myactorservice \ No newline at end of file + - actorturnbasedconcurrencyit-myactorservice