Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
pjfanning committed Apr 22, 2024
1 parent 730c1f5 commit c8dcd6b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.jdbc.state.scaladsl

import java.lang.invoke.{ MethodHandles, MethodType }

import scala.util.Try

private[scaladsl] object DurableStateExceptionSupport {
private val methodHandleLookup = MethodHandles.publicLookup()

private def exceptionClassOpt: Option[Class[_]] =
Try(Class.forName(
"org.apache.pekko.persistence.state.exception.DeleteRevisionException")).toOption

private lazy val constructorOpt = exceptionClassOpt.map { clz =>
val mt = MethodType.methodType(classOf[Unit], classOf[String])
methodHandleLookup.findConstructor(clz, mt)
}

def createDeleteRevisionExceptionIfSupported(message: String): Option[Exception] =
constructorOpt.map { constructor =>
constructor.invoke(message).asInstanceOf[Exception]
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import pekko.persistence.jdbc.state.{ scaladsl => jdbcStateScalaDsl }
import pekko.persistence.query.{ DurableStateChange, Offset, UpdatedDurableState }
import pekko.persistence.query.{ scaladsl => queryScalaDsl }
import pekko.persistence.state.{ scaladsl => stateScalaDsl }
import pekko.persistence.typed.state.internal.DurableStateStoreException
import pekko.serialization.Serialization
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.stream.{ Materializer, SystemMaterializer }
Expand Down Expand Up @@ -117,23 +116,17 @@ class JdbcDurableStateStore[A](
db.run(queries.deleteFromDb(persistenceId).map(_ => Done))

override def deleteObject(persistenceId: String, revision: Long): Future[Done] = {
db.run(queries.deleteBasedOnPersistenceIdAndRevision(persistenceId, revision)).flatMap { count =>
db.run(queries.deleteBasedOnPersistenceIdAndRevision(persistenceId, revision)).map { count =>
{
if (count == 0) {
db.run(queries.selectFromDbByPersistenceId(persistenceId).result).map { state =>
state.headOption match {
case Some(row) if row.revision < revision =>
throw new DurableStateStoreException(
s"Out of date revision [$revision] for object with persistenceId [$persistenceId]", null)
case Some(_) =>
throw new DurableStateStoreException(
s"Unknown revision [$revision] for object with persistenceId [$persistenceId]", null)
case _ =>
throw new DurableStateStoreException(
s"Object with persistenceId [$persistenceId] does not exist", null)
}
}
} else Future.successful(Done)
// if you run this code with Pekko 1.0.x, no exception will be thrown here
// this matches the behavior of pekko-connectors-jdbc 1.0.x
// if you run this code with Pekko 1.1.x, a DeleteRevisionException will be thrown here
DurableStateExceptionSupport.createDeleteRevisionExceptionIfSupported(
s"Failed to delete object with persistenceId [$persistenceId] and revision [$revision]")
.foreach(throw _)
}
Done
}
}
}
Expand Down

0 comments on commit c8dcd6b

Please sign in to comment.