diff --git a/eventuate-log-leveldb/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala b/eventuate-log-leveldb/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala index 376cdb62..ca137ad4 100644 --- a/eventuate-log-leveldb/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala +++ b/eventuate-log-leveldb/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala @@ -21,7 +21,9 @@ import java.nio.ByteBuffer import java.util.concurrent.TimeUnit import akka.actor._ +import akka.pattern.ask import akka.serialization.SerializationExtension +import akka.util.Timeout import com.rbmhtechnology.eventuate.DurableEvent import com.rbmhtechnology.eventuate.log._ @@ -39,6 +41,9 @@ import scala.concurrent.duration._ import scala.util._ class LeveldbEventLogSettings(config: Config) extends EventLogSettings { + val readTimeout: FiniteDuration = + config.getDuration("eventuate.log.read-timeout", TimeUnit.MILLISECONDS).millis + val rootDir: String = config.getString("eventuate.log.leveldb.dir") @@ -106,6 +111,9 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With private def eventIterator(from: Long, classifier: Int): EventIterator = new EventIterator(from, classifier) + private def eventReader(): ActorRef = + context.actorOf(Props(new EventReader).withDispatcher("eventuate.log.dispatchers.read-dispatcher")) + override def readReplicationProgresses: Future[Map[String, Long]] = completed(withIterator(iter => replicationProgressMap.readReplicationProgresses(iter))) @@ -113,15 +121,13 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With completed(withIterator(iter => replicationProgressMap.readReplicationProgress(logId))) override def replicationRead(fromSequenceNr: Long, toSequenceNr: Long, max: Int, filter: DurableEvent => Boolean): Future[BatchReadResult] = - Future(readSync(fromSequenceNr, toSequenceNr, EventKey.DefaultClassifier, max, filter))(services.readDispatcher) + eventReader().ask(EventReader.ReadSync(fromSequenceNr, toSequenceNr, EventKey.DefaultClassifier, max, filter))(settings.readTimeout, self).mapTo[BatchReadResult] override def read(fromSequenceNr: Long, toSequenceNr: Long, max: Int): Future[BatchReadResult] = - Future(readSync(fromSequenceNr, toSequenceNr, EventKey.DefaultClassifier, max, _ => true))(services.readDispatcher) + eventReader().ask(EventReader.ReadSync(fromSequenceNr, toSequenceNr, EventKey.DefaultClassifier, max, _ => true))(settings.readTimeout, self).mapTo[BatchReadResult] - override def read(fromSequenceNr: Long, toSequenceNr: Long, max: Int, aggregateId: String): Future[BatchReadResult] = { - val numericId = aggregateIdMap.numericId(aggregateId) - Future(readSync(fromSequenceNr, toSequenceNr, numericId, max, _ => true))(services.readDispatcher) - } + override def read(fromSequenceNr: Long, toSequenceNr: Long, max: Int, aggregateId: String): Future[BatchReadResult] = + eventReader().ask(EventReader.ReadSync(fromSequenceNr, toSequenceNr, aggregateIdMap.numericId(aggregateId), max, _ => true))(settings.readTimeout, self).mapTo[BatchReadResult] override def recoverClock: Future[EventLogClock] = completed { val snap = readEventLogClockSnapshot @@ -189,12 +195,10 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With private def withIterator[R](body: DBIterator => R): R = { val so = snapshotOptions() val iter = leveldb.iterator(so) - addActiveIterator(iter) try { body(iter) } finally { iter.close() - removeActiveIterator(iter) so.snapshot().close() } } @@ -214,7 +218,6 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With val iter1 = leveldb.iterator(opts) val iter2 = iter1.asScala.takeWhile(entry => eventKey(entry.getKey).classifier == classifier).map(entry => event(entry.getValue)) - addActiveIterator(iter1) iter1.seek(eventKeyBytes(classifier, from)) override def hasNext: Boolean = @@ -225,7 +228,6 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With override def close(): Unit = { iter1.close() - removeActiveIterator(iter1) opts.snapshot().close() } } @@ -253,46 +255,29 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With } override def postStop(): Unit = { - while (activeIterators.get.nonEmpty) { - // Wait a bit for all concurrent read iterators to be closed - // See https://github.com/RBMHTechnology/eventuate/issues/87 - Thread.sleep(500) - } leveldb.close() super.postStop() } - // ------------------------------------------------------------------- - // Support for tracking active iterators used by concurrent readers. - // It helps to avoid `pthread lock: invalid argument` errors raised - // by native code when closing the leveldb instance maintained by - // this event log actor, mainly during integration tests. - // ------------------------------------------------------------------- - - import java.util.concurrent.atomic._ - import java.util.function._ - - private val activeIterators = new AtomicReference[Set[DBIterator]](Set()) + private class EventReader() extends Actor { + import EventReader._ - private val addAccumulator = new BinaryOperator[Set[DBIterator]] { - override def apply(acc: Set[DBIterator], u: Set[DBIterator]): Set[DBIterator] = - acc + u.head + def receive = { + case ReadSync(from, to, classifier, max, filter) => + Try(readSync(from, to, classifier, max, filter)) match { + case Success(r) => sender() ! r + case Failure(e) => sender() ! Status.Failure(e) + } + context.stop(self) + } } - private val removeAccumulator = new BinaryOperator[Set[DBIterator]] { - override def apply(acc: Set[DBIterator], u: Set[DBIterator]): Set[DBIterator] = - acc - u.head + private object EventReader { + case class ReadSync(fromSequenceNr: Long, toSequenceNr: Long, classifier: Int, max: Int, filter: DurableEvent => Boolean) } - - def addActiveIterator(iter: DBIterator): Unit = - activeIterators.accumulateAndGet(Set(iter), addAccumulator) - - def removeActiveIterator(iter: DBIterator): Unit = - activeIterators.accumulateAndGet(Set(iter), removeAccumulator) } object LeveldbEventLog { - private[leveldb]type CloseableIterator[A] = Iterator[A] with Closeable private[leveldb] case class EventKey(classifier: Int, sequenceNr: Long) @@ -332,7 +317,6 @@ object LeveldbEventLog { Future.fromTry(Try(body)) private[leveldb] trait WithBatch { - protected def leveldb: DB protected def leveldbWriteOptions: WriteOptions