Skip to content

Commit

Permalink
Delete effect in durable state, #30446 (#31529)
Browse files Browse the repository at this point in the history
* Add reset effect to the state dsl.
* The effect calls deleteObject in the durable state store.
* The effect updates state to the empty state.
* Implement DeletedDurableState for persistence query.
* Update PersistenceTestKitDurableStateStore so that deleteObject sets the Record payload to None, ie delete the payload.
* update documentation for delete effect
* increment the revision by one when deleting state
* Overload deleteObject with revision and deprecate deleteObject.
* add bin-comp exclude
  • Loading branch information
patriknw authored Sep 2, 2022
1 parent 7333594 commit 34a621a
Show file tree
Hide file tree
Showing 21 changed files with 147 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
package docs.akka.cluster.sharding.typed

import scala.annotation.nowarn

import akka.NotUsed
import akka.actor.ActorSystem
import akka.persistence.query.Offset
import akka.persistence.query.{ DeletedDurableState, Offset }
import akka.stream.scaladsl.Source

@nowarn
Expand All @@ -24,7 +23,8 @@ object DurableStateStoreQueryUsageCompileOnlySpec {
DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStoreQuery[Record]](pluginId)
val source: Source[DurableStateChange[Record], NotUsed] = durableStateStoreQuery.changes("tag", offset)
source.map {
case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => value
case UpdatedDurableState(persistenceId, revision, value, offset, timestamp) => Some(value)
case _: DeletedDurableState[_] => None
}
//#get-durable-state-store-query-example
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,3 @@ Java
: @@snip [DurableStateStoreQueryUsageCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/DurableStateStoreQueryUsageCompileOnlyTest.java) { #get-durable-state-store-query-example }

The @apidoc[DurableStateChange] elements can be `UpdatedDurableState` or `DeletedDurableState`.
`DeletedDurableState` is not implemented yet.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ and can be one of:
* `persist` will persist the latest state. If it's a new persistence id, the record will be inserted. In case of an existing
persistence id, the record will be updated only if the revision number of the incoming record is 1 more than the already
existing record. Otherwise `persist` will fail.
* `delete` will delete the state by setting it to the empty state and the revision number will be incremented by 1.
* `none` no state to be persisted, for example a read-only command
* `unhandled` the command is unhandled (not supported) in current state
* `stop` stop this actor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import akka.annotation.DoNotInherit
/**
* The `DurableStateStoreQuery` stream elements for `DurableStateStoreQuery`.
*
* The implementation can be a [[UpdatedDurableState]] or a `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446
* The implementation can be a [[UpdatedDurableState]] or a [[DeletedDurableState]].
*
* Not for user extension
*
Expand Down Expand Up @@ -53,3 +52,25 @@ final class UpdatedDurableState[A](
override val offset: Offset,
val timestamp: Long)
extends DurableStateChange[A]

object DeletedDurableState {

def unapply[A](arg: DeletedDurableState[A]): Option[(String, Long, Offset, Long)] =
Some((arg.persistenceId, arg.revision, arg.offset, arg.timestamp))
}

/**
*
* @param persistenceId The persistence id of the origin entity.
* @param revision The revision number from the origin entity.
* @param offset The offset that can be used in next `changes` or `currentChanges` query.
* @param timestamp The time the state was stored, in milliseconds since midnight, January 1, 1970 UTC
* (same as `System.currentTimeMillis`).
* @tparam A the type of the value
*/
final class DeletedDurableState[A](
val persistenceId: String,
val revision: Long,
override val offset: Offset,
val timestamp: Long)
extends DurableStateChange[A]
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
Expand All @@ -48,8 +48,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* in quick succession are likely to be skipped, with only the last update resulting in a change from this
* source.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
Expand All @@ -48,8 +48,8 @@ trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
* in quick succession are likely to be skipped, with only the last update resulting in a change from this
* source.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*
* @param tag The tag to get changes for.
* @param offset The offset to get changes since. Must either be [[akka.persistence.query.NoOffset]] to get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*/
def currentChangesBySlices(
entityType: String,
Expand All @@ -56,8 +56,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
* change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick
* succession are likely to be skipped, with only the last update resulting in a change from this source.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*/
def changesBySlices(
entityType: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
* This will return changes that occurred up to when the `Source` returned by this call is materialized. Changes to
* objects made since materialization are not guaranteed to be included in the results.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*/
def currentChangesBySlices(
entityType: String,
Expand All @@ -57,8 +57,8 @@ trait DurableStateStoreBySliceQuery[A] extends DurableStateStore[A] {
* change for each object since the offset will be emitted. In particular, multiple updates to a given object in quick
* succession are likely to be skipped, with only the last update resulting in a change from this source.
*
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or `DeletedDurableState`.
* `DeletedDurableState` is not implemented yet, see issue https://github.com/akka/akka/issues/30446.
* The [[DurableStateChange]] elements can be [[akka.persistence.query.UpdatedDurableState]] or
* [[akka.persistence.query.DeletedDurableState]].
*/
def changesBySlices(
entityType: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
package akka.persistence.testkit.state.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage

import java.util.concurrent.{ CompletableFuture, CompletionStage }
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._

import akka.japi.Pair
import akka.{ Done, NotUsed }
import akka.persistence.query.DurableStateChange
Expand Down Expand Up @@ -37,8 +35,10 @@ class PersistenceTestKitDurableStateStore[A](stateStore: SStore[A])
def upsertObject(persistenceId: String, seqNr: Long, value: A, tag: String): CompletionStage[Done] =
stateStore.upsertObject(persistenceId, seqNr, value, tag).toJava

def deleteObject(persistenceId: String): CompletionStage[Done] =
stateStore.deleteObject(persistenceId).toJava
def deleteObject(persistenceId: String): CompletionStage[Done] = CompletableFuture.completedFuture(Done)

def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] =
stateStore.deleteObject(persistenceId, revision).toJava

def changes(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = {
stateStore.changes(tag, offset).asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import scala.concurrent.Future
import akka.{ Done, NotUsed }
import akka.actor.ExtendedActorSystem
import akka.persistence.Persistence
import akka.persistence.query.DurableStateChange
import akka.persistence.query.{
DeletedDurableState,
DurableStateChange,
NoOffset,
Offset,
Sequence,
UpdatedDurableState
}
import akka.persistence.query.scaladsl.{ DurableStateStorePagedPersistenceIdsQuery, DurableStateStoreQuery }
import akka.persistence.query.UpdatedDurableState
import akka.persistence.query.Offset
import akka.persistence.query.NoOffset
import akka.persistence.query.Sequence
import akka.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery
import akka.persistence.state.scaladsl.{ DurableStateUpdateStore, GetObjectResult }
import akka.persistence.typed.PersistenceId
Expand All @@ -27,8 +30,6 @@ import akka.stream.typed.scaladsl.ActorSource
import akka.stream.OverflowStrategy
import scala.collection.immutable

import akka.persistence.testkit.internal.CurrentTime

object PersistenceTestKitDurableStateStore {
val Identifier = "akka.persistence.testkit.state"
}
Expand All @@ -54,22 +55,29 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem)

override def getObject(persistenceId: String): Future[GetObjectResult[A]] = this.synchronized {
Future.successful(store.get(persistenceId) match {
case Some(record) => GetObjectResult(Some(record.value), record.revision)
case None => GetObjectResult(None, 0)
case Some(Record(_, _, revision, Some(value), _, _)) => GetObjectResult(Some(value), revision)
case Some(Record(_, _, revision, None, _, _)) => GetObjectResult(None, revision)
case None => GetObjectResult(None, 0)
})
}

override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] =
this.synchronized {
val globalOffset = lastGlobalOffset.incrementAndGet()
val record = Record(globalOffset, persistenceId, revision, value, tag)
val record = Record(globalOffset, persistenceId, revision, Some(value), tag)
store = store + (persistenceId -> record)
publisher ! record
Future.successful(Done)
}

override def deleteObject(persistenceId: String): Future[Done] = this.synchronized {
store = store - persistenceId
override def deleteObject(persistenceId: String): Future[Done] = Future.successful(Done)

override def deleteObject(persistenceId: String, revision: Long): Future[Done] = this.synchronized {
store = store.get(persistenceId) match {
case Some(record) => store + (persistenceId -> record.copy(value = None, revision = revision))
case None => store
}

Future.successful(Done)
}

Expand Down Expand Up @@ -191,9 +199,15 @@ private final case class Record[A](
globalOffset: Long,
persistenceId: String,
revision: Long,
value: A,
value: Option[A],
tag: String,
timestamp: Long = CurrentTime.now()) {
def toDurableStateChange: DurableStateChange[A] =
new UpdatedDurableState(persistenceId, revision, value, Sequence(globalOffset), timestamp)
timestamp: Long = System.currentTimeMillis) {
def toDurableStateChange: DurableStateChange[A] = {
value match {
case Some(v) =>
new UpdatedDurableState(persistenceId, revision, v, Sequence(globalOffset), timestamp)
case None =>
new DeletedDurableState(persistenceId, revision, Sequence(globalOffset), timestamp)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ object DurableStateBehaviorReplySpec {
final case class IncrementReplyLater(replyTo: ActorRef[Done]) extends Command[Done]
final case class ReplyNow(replyTo: ActorRef[Done]) extends Command[Done]
final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
final case class DeleteWithConfirmation(replyTo: ActorRef[Done]) extends Command[Done]
case object Increment extends Command[Nothing]
case class IncrementBy(by: Int) extends Command[Nothing]

Expand Down Expand Up @@ -61,6 +62,9 @@ object DurableStateBehaviorReplySpec {
case GetValue(replyTo) =>
Effect.reply(replyTo)(state)

case DeleteWithConfirmation(replyTo) =>
Effect.delete[State]().thenReply(replyTo)(_ => Done)

case _ => ???

})
Expand Down Expand Up @@ -108,5 +112,20 @@ class DurableStateBehaviorReplySpec
c ! GetValue(queryProbe.ref)
queryProbe.expectMessage(State(1))
}

"delete state thenReply" in {
val c = spawn(counter(nextPid()))
val updateProbe = TestProbe[Done]()
c ! IncrementWithConfirmation(updateProbe.ref)
updateProbe.expectMessage(Done)

val deleteProbe = TestProbe[Done]()
c ! DeleteWithConfirmation(deleteProbe.ref)
deleteProbe.expectMessage(Done)

val queryProbe = TestProbe[State]()
c ! GetValue(queryProbe.ref)
queryProbe.expectMessage(State(0))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ private[akka] final case class DurableStateBehaviorImpl[Command, State](
final case class GetFailure(cause: Throwable) extends InternalProtocol
case object UpsertSuccess extends InternalProtocol
final case class UpsertFailure(cause: Throwable) extends InternalProtocol
case object DeleteSuccess extends InternalProtocol
final case class DeleteFailure(cause: Throwable) extends InternalProtocol
case object RecoveryTimeout extends InternalProtocol
final case class IncomingCommand[C](c: C) extends InternalProtocol

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,30 @@ private[akka] trait DurableStateStoreInteractions[C, S] {
newRunningState
}

protected def internalDelete(
ctx: ActorContext[InternalProtocol],
cmd: Any,
state: Running.RunningState[S]): Running.RunningState[S] = {

val newRunningState = state.nextRevision().copy(state = setup.emptyState)
val persistenceId = setup.persistenceId.id

onDeleteInitiated(ctx, cmd)

ctx.pipeToSelf[Done](setup.durableStateStore.deleteObject(persistenceId, newRunningState.revision)) {
case Success(_) => InternalProtocol.DeleteSuccess
case Failure(cause) => InternalProtocol.DeleteFailure(cause)
}

newRunningState
}

// FIXME These hook methods are for Telemetry. What more parameters are needed? persistenceId?
@InternalStableApi
private[akka] def onWriteInitiated(@unused ctx: ActorContext[_], @unused cmd: Any): Unit = ()

private[akka] def onDeleteInitiated(@unused ctx: ActorContext[_], @unused cmd: Any): Unit = ()

protected def requestRecoveryPermit(): Unit = {
setup.persistence.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, setup.selfClassic)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ private[akka] final case class Persist[State](newState: State) extends EffectImp
override def toString: String = s"Persist(${newState.getClass.getName})"
}

/** INTERNAL API */
@InternalApi
private[akka] case class Delete[State]() extends EffectImpl[State]

/** INTERNAL API */
@InternalApi
private[akka] case object Unhandled extends EffectImpl[Nothing]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ private[akka] class Recovering[C, S](
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
case UpsertSuccess => Behaviors.unhandled
case _: UpsertFailure => Behaviors.unhandled
case DeleteSuccess => Behaviors.unhandled
case _: DeleteFailure => Behaviors.unhandled
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ private[akka] object Running {
case _: PersistNothing.type =>
(applySideEffects(sideEffects, state), true)

case _: Delete[_] =>
val nextState = internalDelete(setup.context, msg, state)
(applySideEffects(sideEffects, nextState), true)

case _: Unhandled.type =>
import akka.actor.typed.scaladsl.adapter._
setup.context.system.toClassic.eventStream
Expand Down Expand Up @@ -194,6 +198,8 @@ private[akka] object Running {
case RecoveryPermitGranted => Behaviors.unhandled
case _: GetSuccess[_] => Behaviors.unhandled
case _: GetFailure => Behaviors.unhandled
case DeleteSuccess => Behaviors.unhandled
case DeleteFailure(_) => Behaviors.unhandled
}
}

Expand Down
Loading

0 comments on commit 34a621a

Please sign in to comment.