diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlySpec.scala index dc37094e059..c3836a0c189 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlySpec.scala @@ -5,10 +5,9 @@ package docs.akka.cluster.sharding.typed import scala.annotation.nowarn - import akka.NotUsed import akka.actor.ActorSystem -import akka.persistence.query.Offset +import akka.persistence.query.{ DeletedDurableState, Offset } import akka.stream.scaladsl.Source @nowarn @@ -24,7 +23,8 @@ object DurableStateStoreQueryUsageCompileOnlySpec { DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[Record]](pluginId) val source: Source[DurableStateChange[Record], NotUsed] = durableStateStoreQuery.changes("tag", offset) source.map { - case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => value + case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => Some(value) + case _: DeletedDurableState[_] => None } //#get-durable-state-store-query-example } diff --git a/akka-docs/src/main/paradox/durable-state/persistence-query.md b/akka-docs/src/main/paradox/durable-state/persistence-query.md index a31c953d606..f79ac56c396 100644 --- a/akka-docs/src/main/paradox/durable-state/persistence-query.md +++ b/akka-docs/src/main/paradox/durable-state/persistence-query.md @@ -46,4 +46,3 @@ Java : @@snip [DurableStateStoreQueryUsageCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlyTest.java) { #get-durable-state-store-query-example } The @apidoc[DurableStateChange] elements can be `UpdatedDurableState` or `DeletedDurableState`. -`DeletedDurableState` is not implemented yet. diff --git a/akka-docs/src/main/paradox/typed/durable-state/persistence.md b/akka-docs/src/main/paradox/typed/durable-state/persistence.md index 878929ffc55..ffee804cc9c 100644 --- a/akka-docs/src/main/paradox/typed/durable-state/persistence.md +++ b/akka-docs/src/main/paradox/typed/durable-state/persistence.md @@ -152,6 +152,7 @@ and can be one of: * `persist` will persist the latest state. If it's a new persistence id, the record will be inserted. In case of an existing persistence id, the record will be updated only if the revision number of the incoming record is 1 more than the already existing record. Otherwise `persist` will fail. +* `delete` will delete the state by setting it to the empty state and the revision number will be incremented by 1. * `none` no state to be persisted, for example a read-only command * `unhandled` the command is unhandled (not supported) in current state * `stop` stop this actor diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/DurableStateChange.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/DurableStateChange.scala index 9f158c2de0c..1ac15f1212f 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/DurableStateChange.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/DurableStateChange.scala @@ -9,8 +9,7 @@ import akka.annotation.DoNotInherit /** * The `DurableStateStoreQuery` stream elements for `DurableStateStoreQuery`. * - * The implementation can be a [[UpdatedDurableState]] or a `DeletedDurableState`. - * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446 + * The implementation can be a [[UpdatedDurableState]] or a [[DeletedDurableState]]. * * Not for user extension * @@ -53,3 +52,25 @@ final class UpdatedDurableState[A]( override val offset: Offset, val timestamp: Long) extends DurableStateChange[A] + +object DeletedDurableState { + + def unapply[A](arg: DeletedDurableState[A]): Option[(String, Long, Offset, Long)] = + Some((arg.persistenceId, arg.revision, arg.offset, arg.timestamp)) +} + +/** + * + * @param persistenceId The persistence id of the origin entity. + * @param revision The revision number from the origin entity. + * @param offset The offset that can be used in next `changes` or `currentChanges` query. + * @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC + * (same as `System.currentTimeMillis`). + * @tparam A the type of the value + */ +final class DeletedDurableState[A]( + val persistenceId: String, + val revision: Long, + override val offset: Offset, + val timestamp: Long) + extends DurableStateChange[A] diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/DurableStateStoreQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/DurableStateStoreQuery.scala index 76d72db3e6b..44eddea035c 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/DurableStateStoreQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/javadsl/DurableStateStoreQuery.scala @@ -26,8 +26,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] { * This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to * objects made since materialization are not guaranteed to be included in the results. * - * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. - * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. + * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or + * [[akka.persistence.query.DeletedDurableState]]. * * @param tag The tag to get changes for. * @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get @@ -48,8 +48,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] { * in quick succession are likely to be skipped, with only the last update resulting in a change from this * source. * - * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. - * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. + * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or + * [[akka.persistence.query.DeletedDurableState]]. * * @param tag The tag to get changes for. * @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/DurableStateStoreQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/DurableStateStoreQuery.scala index c82c2f75389..74dbc6ecfb8 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/DurableStateStoreQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/scaladsl/DurableStateStoreQuery.scala @@ -26,8 +26,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] { * This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to * objects made since materialization are not guaranteed to be included in the results. * - * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. - * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. + * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or + * [[akka.persistence.query.DeletedDurableState]]. * * @param tag The tag to get changes for. * @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get @@ -48,8 +48,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] { * in quick succession are likely to be skipped, with only the last update resulting in a change from this * source. * - * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. - * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. + * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or + * [[akka.persistence.query.DeletedDurableState]]. * * @param tag The tag to get changes for. * @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/typed/javadsl/DurableStateStoreBySliceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/typed/javadsl/DurableStateStoreBySliceQuery.scala index 4aa44303b98..eb3d318ddb8 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/typed/javadsl/DurableStateStoreBySliceQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/typed/javadsl/DurableStateStoreBySliceQuery.scala @@ -34,8 +34,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] { * This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to * objects made since materialization are not guaranteed to be included in the results. * - * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. - * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. + * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or + * [[akka.persistence.query.DeletedDurableState]]. */ def currentChangesBySlices( entityType: String, @@ -56,8 +56,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] { * change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick * succession are likely to be skipped, with only the last update resulting in a change from this source. * - * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. - * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. + * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or + * [[akka.persistence.query.DeletedDurableState]]. */ def changesBySlices( entityType: String, diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/typed/scaladsl/DurableStateStoreBySliceQuery.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/typed/scaladsl/DurableStateStoreBySliceQuery.scala index e0310d550cc..8bb0e15ee32 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/typed/scaladsl/DurableStateStoreBySliceQuery.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/typed/scaladsl/DurableStateStoreBySliceQuery.scala @@ -35,8 +35,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] { * This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to * objects made since materialization are not guaranteed to be included in the results. * - * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. - * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. + * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or + * [[akka.persistence.query.DeletedDurableState]]. */ def currentChangesBySlices( entityType: String, @@ -57,8 +57,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] { * change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick * succession are likely to be skipped, with only the last update resulting in a change from this source. * - * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`. - * `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446. + * The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or + * [[akka.persistence.query.DeletedDurableState]]. */ def changesBySlices( entityType: String, diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/javadsl/PersistenceTestKitDurableStateStore.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/javadsl/PersistenceTestKitDurableStateStore.scala index 3de8ed8e278..f984f7f3656 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/javadsl/PersistenceTestKitDurableStateStore.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/javadsl/PersistenceTestKitDurableStateStore.scala @@ -5,11 +5,9 @@ package akka.persistence.testkit.state.javadsl import java.util.Optional -import java.util.concurrent.CompletionStage - +import java.util.concurrent.{ CompletableFuture, CompletionStage } import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ - import akka.japi.Pair import akka.{ Done, NotUsed } import akka.persistence.query.DurableStateChange @@ -37,8 +35,10 @@ class PersistenceTestKitDurableStateStore[A](stateStore: SStore[A]) def upsertObject(persistenceId: String, seqNr: Long, value: A, tag: String): CompletionStage[Done] = stateStore.upsertObject(persistenceId, seqNr, value, tag).toJava - def deleteObject(persistenceId: String): CompletionStage[Done] = - stateStore.deleteObject(persistenceId).toJava + def deleteObject(persistenceId: String): CompletionStage[Done] = CompletableFuture.completedFuture(Done) + + def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] = + stateStore.deleteObject(persistenceId, revision).toJava def changes(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = { stateStore.changes(tag, offset).asJava diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala index 56ecf047021..5939f9b1f86 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStore.scala @@ -11,12 +11,15 @@ import scala.concurrent.Future import akka.{ Done, NotUsed } import akka.actor.ExtendedActorSystem import akka.persistence.Persistence -import akka.persistence.query.DurableStateChange +import akka.persistence.query.{ + DeletedDurableState, + DurableStateChange, + NoOffset, + Offset, + Sequence, + UpdatedDurableState +} import akka.persistence.query.scaladsl.{ DurableStateStorePagedPersistenceIdsQuery, DurableStateStoreQuery } -import akka.persistence.query.UpdatedDurableState -import akka.persistence.query.Offset -import akka.persistence.query.NoOffset -import akka.persistence.query.Sequence import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery import akka.persistence.state.scaladsl.{ DurableStateUpdateStore, GetObjectResult } import akka.persistence.typed.PersistenceId @@ -27,8 +30,6 @@ import akka.stream.typed.scaladsl.ActorSource import akka.stream.OverflowStrategy import scala.collection.immutable -import akka.persistence.testkit.internal.CurrentTime - object PersistenceTestKitDurableStateStore { val Identifier = "akka.persistence.testkit.state" } @@ -54,22 +55,29 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem) override def getObject(persistenceId: String): Future[GetObjectResult[A]] = this.synchronized { Future.successful(store.get(persistenceId) match { - case Some(record) => GetObjectResult(Some(record.value), record.revision) - case None => GetObjectResult(None, 0) + case Some(Record(_, _, revision, Some(value), _, _)) => GetObjectResult(Some(value), revision) + case Some(Record(_, _, revision, None, _, _)) => GetObjectResult(None, revision) + case None => GetObjectResult(None, 0) }) } override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = this.synchronized { val globalOffset = lastGlobalOffset.incrementAndGet() - val record = Record(globalOffset, persistenceId, revision, value, tag) + val record = Record(globalOffset, persistenceId, revision, Some(value), tag) store = store + (persistenceId -> record) publisher ! record Future.successful(Done) } - override def deleteObject(persistenceId: String): Future[Done] = this.synchronized { - store = store - persistenceId + override def deleteObject(persistenceId: String): Future[Done] = Future.successful(Done) + + override def deleteObject(persistenceId: String, revision: Long): Future[Done] = this.synchronized { + store = store.get(persistenceId) match { + case Some(record) => store + (persistenceId -> record.copy(value = None, revision = revision)) + case None => store + } + Future.successful(Done) } @@ -191,9 +199,15 @@ private final case class Record[A]( globalOffset: Long, persistenceId: String, revision: Long, - value: A, + value: Option[A], tag: String, - timestamp: Long = CurrentTime.now()) { - def toDurableStateChange: DurableStateChange[A] = - new UpdatedDurableState(persistenceId, revision, value, Sequence(globalOffset), timestamp) + timestamp: Long = System.currentTimeMillis) { + def toDurableStateChange: DurableStateChange[A] = { + value match { + case Some(v) => + new UpdatedDurableState(persistenceId, revision, v, Sequence(globalOffset), timestamp) + case None => + new DeletedDurableState(persistenceId, revision, Sequence(globalOffset), timestamp) + } + } } diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala index 9a714ddf441..2abaab4bdd8 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala @@ -31,6 +31,7 @@ object DurableStateBehaviorReplySpec { final case class IncrementReplyLater(replyTo: ActorRef[Done]) extends Command[Done] final case class ReplyNow(replyTo: ActorRef[Done]) extends Command[Done] final case class GetValue(replyTo: ActorRef[State]) extends Command[State] + final case class DeleteWithConfirmation(replyTo: ActorRef[Done]) extends Command[Done] case object Increment extends Command[Nothing] case class IncrementBy(by: Int) extends Command[Nothing] @@ -61,6 +62,9 @@ object DurableStateBehaviorReplySpec { case GetValue(replyTo) => Effect.reply(replyTo)(state) + case DeleteWithConfirmation(replyTo) => + Effect.delete[State]().thenReply(replyTo)(_ => Done) + case _ => ??? }) @@ -108,5 +112,20 @@ class DurableStateBehaviorReplySpec c ! GetValue(queryProbe.ref) queryProbe.expectMessage(State(1)) } + + "delete state thenReply" in { + val c = spawn(counter(nextPid())) + val updateProbe = TestProbe[Done]() + c ! IncrementWithConfirmation(updateProbe.ref) + updateProbe.expectMessage(Done) + + val deleteProbe = TestProbe[Done]() + c ! DeleteWithConfirmation(deleteProbe.ref) + deleteProbe.expectMessage(Done) + + val queryProbe = TestProbe[State]() + c ! GetValue(queryProbe.ref) + queryProbe.expectMessage(State(0)) + } } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateBehaviorImpl.scala index 5fbd13463b4..65bd02d983e 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateBehaviorImpl.scala @@ -176,6 +176,8 @@ private[akka] final case class DurableStateBehaviorImpl[Command, State]( final case class GetFailure(cause: Throwable) extends InternalProtocol case object UpsertSuccess extends InternalProtocol final case class UpsertFailure(cause: Throwable) extends InternalProtocol + case object DeleteSuccess extends InternalProtocol + final case class DeleteFailure(cause: Throwable) extends InternalProtocol case object RecoveryTimeout extends InternalProtocol final case class IncomingCommand[C](c: C) extends InternalProtocol diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateStoreInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateStoreInteractions.scala index 71fe6c784c0..1dcf8227bb1 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateStoreInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/DurableStateStoreInteractions.scala @@ -56,10 +56,30 @@ private[akka] trait DurableStateStoreInteractions[C, S] { newRunningState } + protected def internalDelete( + ctx: ActorContext[InternalProtocol], + cmd: Any, + state: Running.RunningState[S]): Running.RunningState[S] = { + + val newRunningState = state.nextRevision().copy(state = setup.emptyState) + val persistenceId = setup.persistenceId.id + + onDeleteInitiated(ctx, cmd) + + ctx.pipeToSelf[Done](setup.durableStateStore.deleteObject(persistenceId, newRunningState.revision)) { + case Success(_) => InternalProtocol.DeleteSuccess + case Failure(cause) => InternalProtocol.DeleteFailure(cause) + } + + newRunningState + } + // FIXME These hook methods are for Telemetry. What more parameters are needed? persistenceId? @InternalStableApi private[akka] def onWriteInitiated(@unused ctx: ActorContext[_], @unused cmd: Any): Unit = () + private[akka] def onDeleteInitiated(@unused ctx: ActorContext[_], @unused cmd: Any): Unit = () + protected def requestRecoveryPermit(): Unit = { setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, setup.selfClassic) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/EffectImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/EffectImpl.scala index 4a8ac9b55cc..af7c5d1fc9a 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/EffectImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/EffectImpl.scala @@ -71,6 +71,10 @@ private[akka] final case class Persist[State](newState: State) extends EffectImp override def toString: String = s"Persist(${newState.getClass.getName})" } +/** INTERNAL API */ +@InternalApi +private[akka] case class Delete[State]() extends EffectImpl[State] + /** INTERNAL API */ @InternalApi private[akka] case object Unhandled extends EffectImpl[Nothing] diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala index 530d068520b..c895acf171d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala @@ -88,6 +88,8 @@ private[akka] class Recovering[C, S]( case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit case UpsertSuccess => Behaviors.unhandled case _: UpsertFailure => Behaviors.unhandled + case DeleteSuccess => Behaviors.unhandled + case _: DeleteFailure => Behaviors.unhandled } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Running.scala index 2dbb5bc6783..467c2ff61bd 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Running.scala @@ -141,6 +141,10 @@ private[akka] object Running { case _: PersistNothing.type => (applySideEffects(sideEffects, state), true) + case _: Delete[_] => + val nextState = internalDelete(setup.context, msg, state) + (applySideEffects(sideEffects, nextState), true) + case _: Unhandled.type => import akka.actor.typed.scaladsl.adapter._ setup.context.system.toClassic.eventStream @@ -194,6 +198,8 @@ private[akka] object Running { case RecoveryPermitGranted => Behaviors.unhandled case _: GetSuccess[_] => Behaviors.unhandled case _: GetFailure => Behaviors.unhandled + case DeleteSuccess => Behaviors.unhandled + case DeleteFailure(_) => Behaviors.unhandled } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/scaladsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/scaladsl/Effect.scala index 28a49508712..f7725e1e588 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/scaladsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/scaladsl/Effect.scala @@ -25,7 +25,12 @@ object Effect { */ def persist[State](state: State): EffectBuilder[State] = Persist(state) - // FIXME add delete effect + /** + * Delete the persisted state. + * + * Side effects can be chained with `thenRun` + */ + def delete[State](): EffectBuilder[State] = Delete() /** * Do not persist anything diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/DurableStatePersistentBehaviorCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/DurableStatePersistentBehaviorCompileOnly.scala index 6709986e965..bd3d6585505 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/DurableStatePersistentBehaviorCompileOnly.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/DurableStatePersistentBehaviorCompileOnly.scala @@ -30,6 +30,7 @@ object DurableStatePersistentBehaviorCompileOnly { final case object Increment extends Command[Nothing] final case class IncrementBy(value: Int) extends Command[Nothing] final case class GetValue(replyTo: ActorRef[State]) extends Command[State] + final case object Delete extends Command[Nothing] //#command //#state @@ -44,6 +45,7 @@ object DurableStatePersistentBehaviorCompileOnly { case Increment => Effect.persist(state.copy(value = state.value + 1)) case IncrementBy(by) => Effect.persist(state.copy(value = state.value + by)) case GetValue(replyTo) => Effect.reply(replyTo)(state) + case Delete => Effect.delete[State]() } //#command-handler diff --git a/akka-persistence/src/main/mima-filters/2.6.19.backwards.excludes/30446-deleteObject.excludes b/akka-persistence/src/main/mima-filters/2.6.19.backwards.excludes/30446-deleteObject.excludes new file mode 100644 index 00000000000..e2bb7a9f767 --- /dev/null +++ b/akka-persistence/src/main/mima-filters/2.6.19.backwards.excludes/30446-deleteObject.excludes @@ -0,0 +1,3 @@ +# #30446 Addition of deleteObject overloaded method +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.state.javadsl.DurableStateUpdateStore.deleteObject") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.state.scaladsl.DurableStateUpdateStore.deleteObject") \ No newline at end of file diff --git a/akka-persistence/src/main/scala/akka/persistence/state/javadsl/DurableStateUpdateStore.scala b/akka-persistence/src/main/scala/akka/persistence/state/javadsl/DurableStateUpdateStore.scala index c8cd610ffc5..19179e42ba7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/state/javadsl/DurableStateUpdateStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/state/javadsl/DurableStateUpdateStore.scala @@ -20,5 +20,8 @@ trait DurableStateUpdateStore[A] extends DurableStateStore[A] { */ def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] + @deprecated(message = "Use the deleteObject overload with revision instead.", since = "2.6.20") def deleteObject(persistenceId: String): CompletionStage[Done] + + def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] } diff --git a/akka-persistence/src/main/scala/akka/persistence/state/scaladsl/DurableStateUpdateStore.scala b/akka-persistence/src/main/scala/akka/persistence/state/scaladsl/DurableStateUpdateStore.scala index 9b99be64ba2..755d665fa2b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/state/scaladsl/DurableStateUpdateStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/state/scaladsl/DurableStateUpdateStore.scala @@ -20,6 +20,8 @@ trait DurableStateUpdateStore[A] extends DurableStateStore[A] { */ def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] + @deprecated(message = "Use the deleteObject overload with revision instead.", since = "2.6.20") def deleteObject(persistenceId: String): Future[Done] + def deleteObject(persistenceId: String, revision: Long): Future[Done] }