diff --git a/eventuate-log-leveldb/src/main/resources/reference.conf b/eventuate-log-leveldb/src/main/resources/reference.conf index d4886903..ab6ab72d 100644 --- a/eventuate-log-leveldb/src/main/resources/reference.conf +++ b/eventuate-log-leveldb/src/main/resources/reference.conf @@ -18,5 +18,10 @@ eventuate { # Delay between two tries to physically delete all requested events while # keeping those that are not yet replicated. deletion-retry-delay = 1m + + # The maximum duration to wait for outstanding iterators being closed during + # shutdown. If there are iterators still left after this period closing will + # be aborted with an exception and the LevelDB handle will not be closed. + iterator-close-wait-max = 30s } } diff --git a/eventuate-log-leveldb/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala b/eventuate-log-leveldb/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala index 376cdb62..37e80c04 100644 --- a/eventuate-log-leveldb/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala +++ b/eventuate-log-leveldb/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala @@ -19,6 +19,7 @@ package com.rbmhtechnology.eventuate.log.leveldb import java.io._ import java.nio.ByteBuffer import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference import akka.actor._ import akka.serialization.SerializationExtension @@ -31,6 +32,7 @@ import com.typesafe.config.Config import org.fusesource.leveldbjni.JniDBFactory._ import org.iq80.leveldb._ +import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.immutable.Seq import scala.concurrent.Future @@ -51,6 +53,9 @@ class LeveldbEventLogSettings(config: Config) extends EventLogSettings { val deletionBatchSize: Int = config.getInt("eventuate.log.leveldb.deletion-batch-size") + val closeWaitMax: FiniteDuration = + config.getDuration("eventuate.log.leveldb.iterator-close-wait-max", TimeUnit.MILLISECONDS).millis + val initRetryDelay: FiniteDuration = Duration.Zero @@ -187,47 +192,25 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With } private def withIterator[R](body: DBIterator => R): R = { - val so = snapshotOptions() - val iter = leveldb.iterator(so) - addActiveIterator(iter) - try { - body(iter) - } finally { - iter.close() - removeActiveIterator(iter) - so.snapshot().close() - } + val iter = allocateIterator() + try body(iter.iterator) + finally iter.close() } private def withEventIterator[R](from: Long, classifier: Int)(body: EventIterator => R): R = { val iter = eventIterator(from, classifier) - try { - body(iter) - } finally { - iter.close() - } + try body(iter) + finally iter.close() } private class EventIterator(from: Long, classifier: Int) extends Iterator[DurableEvent] with Closeable { - val opts = snapshotOptions() - - val iter1 = leveldb.iterator(opts) - val iter2 = iter1.asScala.takeWhile(entry => eventKey(entry.getKey).classifier == classifier).map(entry => event(entry.getValue)) - - addActiveIterator(iter1) - iter1.seek(eventKeyBytes(classifier, from)) + val iter1 = allocateIterator() + val iter2 = iter1.iterator.asScala.takeWhile(entry => eventKey(entry.getKey).classifier == classifier).map(entry => event(entry.getValue)) + iter1.iterator.seek(eventKeyBytes(classifier, from)) - override def hasNext: Boolean = - iter2.hasNext - - override def next(): DurableEvent = - iter2.next() - - override def close(): Unit = { - iter1.close() - removeActiveIterator(iter1) - opts.snapshot().close() - } + override def hasNext: Boolean = iter2.hasNext + override def next(): DurableEvent = iter2.next() + override def close(): Unit = iter1.close() } private def eventBytes(e: DurableEvent): Array[Byte] = @@ -253,12 +236,9 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With } override def postStop(): Unit = { - while (activeIterators.get.nonEmpty) { - // Wait a bit for all concurrent read iterators to be closed - // See https://github.com/RBMHTechnology/eventuate/issues/87 - Thread.sleep(500) - } + waitForOutstandingIterators() leveldb.close() + super.postStop() } @@ -269,26 +249,69 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With // this event log actor, mainly during integration tests. // ------------------------------------------------------------------- - import java.util.concurrent.atomic._ - import java.util.function._ - - private val activeIterators = new AtomicReference[Set[DBIterator]](Set()) - - private val addAccumulator = new BinaryOperator[Set[DBIterator]] { - override def apply(acc: Set[DBIterator], u: Set[DBIterator]): Set[DBIterator] = - acc + u.head + private trait ManagedIterator { + def iterator: DBIterator + def close(): Unit } - private val removeAccumulator = new BinaryOperator[Set[DBIterator]] { - override def apply(acc: Set[DBIterator], u: Set[DBIterator]): Set[DBIterator] = - acc - u.head + // invariants: + // - after finished changed to true, outstandingIterators will never be increased above 0 again + + private case class State(finishing: Boolean, outstandingIterators: Int) + private val state = new AtomicReference[State](State(false, 0)) + + @tailrec private def waitForOutstandingIterators(): Unit = { + val cur = state.get + if (!state.compareAndSet(cur, cur.copy(finishing = true))) + waitForOutstandingIterators() + else { + val start = System.nanoTime() + // spin until all outstanding iterators have been given back + while (state.get.outstandingIterators > 0 && (System.nanoTime() - start) < settings.closeWaitMax.toNanos) Thread.sleep(100) + if (state.get.outstandingIterators > 0) + throw new RuntimeException("Outstanding iterators were not closed after " + + s"eventuate.log.leveldb.iterator-close-wait-max = ${settings.closeWaitMax} either because of " + + "long running operations or because iterator handles were not closed.") + } } - def addActiveIterator(iter: DBIterator): Unit = - activeIterators.accumulateAndGet(Set(iter), addAccumulator) + private def allocateIterator(): ManagedIterator = { + def doAllocate(): ManagedIterator = { + val opts = snapshotOptions() + val it = leveldb.iterator(opts) + + new ManagedIterator { + def iterator: DBIterator = it + + var closed = false + def close(): Unit = + if (!closed) { + it.close() + opts.snapshot().close() + decreaseCounter() + closed = true + } + } + } + + @tailrec def decreaseCounter(): Unit = { + val cur = state.get + if (!state.compareAndSet(cur, cur.copy(outstandingIterators = cur.outstandingIterators - 1))) + decreaseCounter() + } - def removeActiveIterator(iter: DBIterator): Unit = - activeIterators.accumulateAndGet(Set(iter), removeAccumulator) + @tailrec def rec(): ManagedIterator = + state.get match { + case s @ State(false, n) => + if (!state.compareAndSet(s, s.copy(outstandingIterators = n + 1))) + rec() + else + doAllocate() + case State(true, _) => throw new RuntimeException("Cannot create iterators while shutting down!") + } + + rec() + } } object LeveldbEventLog {