Skip to content

Commit

Permalink
temporarily throw DurableStateStoreException
Browse files Browse the repository at this point in the history
  • Loading branch information
pjfanning committed Apr 13, 2024
1 parent c0a1c28 commit 25eeb7a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@ import slick.jdbc.{ JdbcBackend, JdbcProfile }
import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.ExtendedActorSystem
import pekko.annotation.ApiMayChange
import pekko.pattern.ask
import pekko.persistence.jdbc.PekkoSerialization
import pekko.persistence.jdbc.state.DurableStateQueries
import pekko.persistence.jdbc.config.DurableStateTableConfiguration
import pekko.persistence.jdbc.state.{ DurableStateTables, OffsetSyntax }
import pekko.persistence.query.{ DurableStateChange, Offset }
import pekko.persistence.jdbc.journal.dao.FlowControl
import pekko.persistence.jdbc.state.{ scaladsl => jdbcStateScalaDsl }
import pekko.persistence.state.{ scaladsl => stateScalaDsl }
import pekko.persistence.query.{ DurableStateChange, Offset, UpdatedDurableState }
import pekko.persistence.query.{ scaladsl => queryScalaDsl }
import pekko.persistence.state.{ scaladsl => stateScalaDsl }
import pekko.persistence.typed.state.internal.DurableStateStoreException
import pekko.serialization.Serialization
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.stream.{ Materializer, SystemMaterializer }
import pekko.util.Timeout
import OffsetSyntax._
import pekko.annotation.ApiMayChange
import pekko.persistence.query.UpdatedDurableState

object JdbcDurableStateStore {
val Identifier = "jdbc-durable-state-store"
Expand Down Expand Up @@ -117,7 +117,14 @@ class JdbcDurableStateStore[A](
db.run(queries.deleteFromDb(persistenceId).map(_ => Done))

override def deleteObject(persistenceId: String, revision: Long): Future[Done] =
db.run(queries.deleteBasedOnPersistenceIdAndRevision(persistenceId, revision).map(_ => Done))
db.run(queries.deleteBasedOnPersistenceIdAndRevision(persistenceId, revision).map {
count => {
if (count == 0)
throw new DurableStateStoreException(
s"Object with persistenceId [$persistenceId] and revision [$revision] does not exist", null)
else Done
}
})

override def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = {
Source
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ object Dependencies {

val Libraries: Seq[ModuleID] = Seq(
"org.apache.pekko" %% "pekko-persistence-query" % PekkoVersion,
"org.apache.pekko" %% "pekko-persistence-typed" % PekkoVersion,
"com.typesafe.slick" %% "slick" % SlickVersion,
"com.typesafe.slick" %% "slick-hikaricp" % SlickVersion,
"ch.qos.logback" % "logback-classic" % LogbackVersion % Test,
Expand Down

0 comments on commit 25eeb7a

Please sign in to comment.