Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Avoid LevelDB shutdown while iterators are in use
Browse files Browse the repository at this point in the history
- Use iterators in child actors and rely on Akka to first stop these hild actors before stopping the parent event log actor that shuts down LevelDB.
  • Loading branch information
krasserm committed Mar 18, 2016
1 parent 41997bf commit e99bd08
Showing 1 changed file with 24 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit

import akka.actor._
import akka.pattern.ask
import akka.serialization.SerializationExtension
import akka.util.Timeout

import com.rbmhtechnology.eventuate.DurableEvent
import com.rbmhtechnology.eventuate.log._
Expand All @@ -39,6 +41,9 @@ import scala.concurrent.duration._
import scala.util._

class LeveldbEventLogSettings(config: Config) extends EventLogSettings {
val readTimeout: FiniteDuration =
config.getDuration("eventuate.log.read-timeout", TimeUnit.MILLISECONDS).millis

val rootDir: String =
config.getString("eventuate.log.leveldb.dir")

Expand Down Expand Up @@ -106,22 +111,23 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With
private def eventIterator(from: Long, classifier: Int): EventIterator =
new EventIterator(from, classifier)

private def eventReader(): ActorRef =
context.actorOf(Props(new EventReader).withDispatcher("eventuate.log.dispatchers.read-dispatcher"))

override def readReplicationProgresses: Future[Map[String, Long]] =
completed(withIterator(iter => replicationProgressMap.readReplicationProgresses(iter)))

override def readReplicationProgress(logId: String): Future[Long] =
completed(withIterator(iter => replicationProgressMap.readReplicationProgress(logId)))

override def replicationRead(fromSequenceNr: Long, toSequenceNr: Long, max: Int, filter: DurableEvent => Boolean): Future[BatchReadResult] =
Future(readSync(fromSequenceNr, toSequenceNr, EventKey.DefaultClassifier, max, filter))(services.readDispatcher)
eventReader().ask(EventReader.ReadSync(fromSequenceNr, toSequenceNr, EventKey.DefaultClassifier, max, filter))(settings.readTimeout, self).mapTo[BatchReadResult]

override def read(fromSequenceNr: Long, toSequenceNr: Long, max: Int): Future[BatchReadResult] =
Future(readSync(fromSequenceNr, toSequenceNr, EventKey.DefaultClassifier, max, _ => true))(services.readDispatcher)
eventReader().ask(EventReader.ReadSync(fromSequenceNr, toSequenceNr, EventKey.DefaultClassifier, max, _ => true))(settings.readTimeout, self).mapTo[BatchReadResult]

override def read(fromSequenceNr: Long, toSequenceNr: Long, max: Int, aggregateId: String): Future[BatchReadResult] = {
val numericId = aggregateIdMap.numericId(aggregateId)
Future(readSync(fromSequenceNr, toSequenceNr, numericId, max, _ => true))(services.readDispatcher)
}
override def read(fromSequenceNr: Long, toSequenceNr: Long, max: Int, aggregateId: String): Future[BatchReadResult] =
eventReader().ask(EventReader.ReadSync(fromSequenceNr, toSequenceNr, aggregateIdMap.numericId(aggregateId), max, _ => true))(settings.readTimeout, self).mapTo[BatchReadResult]

override def recoverClock: Future[EventLogClock] = completed {
val snap = readEventLogClockSnapshot
Expand Down Expand Up @@ -189,12 +195,10 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With
private def withIterator[R](body: DBIterator => R): R = {
val so = snapshotOptions()
val iter = leveldb.iterator(so)
addActiveIterator(iter)
try {
body(iter)
} finally {
iter.close()
removeActiveIterator(iter)
so.snapshot().close()
}
}
Expand All @@ -214,7 +218,6 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With
val iter1 = leveldb.iterator(opts)
val iter2 = iter1.asScala.takeWhile(entry => eventKey(entry.getKey).classifier == classifier).map(entry => event(entry.getValue))

addActiveIterator(iter1)
iter1.seek(eventKeyBytes(classifier, from))

override def hasNext: Boolean =
Expand All @@ -225,7 +228,6 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With

override def close(): Unit = {
iter1.close()
removeActiveIterator(iter1)
opts.snapshot().close()
}
}
Expand Down Expand Up @@ -253,46 +255,29 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With
}

override def postStop(): Unit = {
while (activeIterators.get.nonEmpty) {
// Wait a bit for all concurrent read iterators to be closed
// See https://github.com/RBMHTechnology/eventuate/issues/87
Thread.sleep(500)
}
leveldb.close()
super.postStop()
}

// -------------------------------------------------------------------
// Support for tracking active iterators used by concurrent readers.
// It helps to avoid `pthread lock: invalid argument` errors raised
// by native code when closing the leveldb instance maintained by
// this event log actor, mainly during integration tests.
// -------------------------------------------------------------------

import java.util.concurrent.atomic._
import java.util.function._

private val activeIterators = new AtomicReference[Set[DBIterator]](Set())
private class EventReader() extends Actor {
import EventReader._

private val addAccumulator = new BinaryOperator[Set[DBIterator]] {
override def apply(acc: Set[DBIterator], u: Set[DBIterator]): Set[DBIterator] =
acc + u.head
def receive = {
case ReadSync(from, to, classifier, max, filter) =>
Try(readSync(from, to, classifier, max, filter)) match {
case Success(r) => sender() ! r
case Failure(e) => sender() ! Status.Failure(e)
}
context.stop(self)
}
}

private val removeAccumulator = new BinaryOperator[Set[DBIterator]] {
override def apply(acc: Set[DBIterator], u: Set[DBIterator]): Set[DBIterator] =
acc - u.head
private object EventReader {
case class ReadSync(fromSequenceNr: Long, toSequenceNr: Long, classifier: Int, max: Int, filter: DurableEvent => Boolean)
}

def addActiveIterator(iter: DBIterator): Unit =
activeIterators.accumulateAndGet(Set(iter), addAccumulator)

def removeActiveIterator(iter: DBIterator): Unit =
activeIterators.accumulateAndGet(Set(iter), removeAccumulator)
}

object LeveldbEventLog {

private[leveldb]type CloseableIterator[A] = Iterator[A] with Closeable

private[leveldb] case class EventKey(classifier: Int, sequenceNr: Long)
Expand Down Expand Up @@ -332,7 +317,6 @@ object LeveldbEventLog {
Future.fromTry(Try(body))

private[leveldb] trait WithBatch {

protected def leveldb: DB
protected def leveldbWriteOptions: WriteOptions

Expand Down

0 comments on commit e99bd08

Please sign in to comment.