From e80cac3332e338d555861fab3b159c405d327691 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Mon, 14 Mar 2016 10:33:34 +0100 Subject: [PATCH] Safer tracking of issued leveldb iterators, fixes #234 Previously, creating an iterator and tracking wasn't an atomic operation resulting in iterator operations being run after the db had been closed. This resulted in all kinds of crashes because of illegal native state during the shutdown procedure. --- .../src/main/resources/reference.conf | 5 + .../log/leveldb/LeveldbEventLog.scala | 129 +++++++++++------- 2 files changed, 81 insertions(+), 53 deletions(-) diff --git a/eventuate-log-leveldb/src/main/resources/reference.conf b/eventuate-log-leveldb/src/main/resources/reference.conf index d4886903..7cacd315 100644 --- a/eventuate-log-leveldb/src/main/resources/reference.conf +++ b/eventuate-log-leveldb/src/main/resources/reference.conf @@ -18,5 +18,10 @@ eventuate { # Delay between two tries to physically delete all requested events while # keeping those that are not yet replicated. deletion-retry-delay = 1m + + # The maximum duration to wait for outstanding iterators being closed during + # shutdown. If there are iterators still left after this period closing will + # be aborted with an exception and the LevelDB handle will not be closed. + close-wait-max = 30s } } diff --git a/eventuate-log-leveldb/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala b/eventuate-log-leveldb/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala index 376cdb62..37e80c04 100644 --- a/eventuate-log-leveldb/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala +++ b/eventuate-log-leveldb/src/main/scala/com/rbmhtechnology/eventuate/log/leveldb/LeveldbEventLog.scala @@ -19,6 +19,7 @@ package com.rbmhtechnology.eventuate.log.leveldb import java.io._ import java.nio.ByteBuffer import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference import akka.actor._ import akka.serialization.SerializationExtension @@ -31,6 +32,7 @@ import com.typesafe.config.Config import org.fusesource.leveldbjni.JniDBFactory._ import org.iq80.leveldb._ +import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.immutable.Seq import scala.concurrent.Future @@ -51,6 +53,9 @@ class LeveldbEventLogSettings(config: Config) extends EventLogSettings { val deletionBatchSize: Int = config.getInt("eventuate.log.leveldb.deletion-batch-size") + val closeWaitMax: FiniteDuration = + config.getDuration("eventuate.log.leveldb.iterator-close-wait-max", TimeUnit.MILLISECONDS).millis + val initRetryDelay: FiniteDuration = Duration.Zero @@ -187,47 +192,25 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With } private def withIterator[R](body: DBIterator => R): R = { - val so = snapshotOptions() - val iter = leveldb.iterator(so) - addActiveIterator(iter) - try { - body(iter) - } finally { - iter.close() - removeActiveIterator(iter) - so.snapshot().close() - } + val iter = allocateIterator() + try body(iter.iterator) + finally iter.close() } private def withEventIterator[R](from: Long, classifier: Int)(body: EventIterator => R): R = { val iter = eventIterator(from, classifier) - try { - body(iter) - } finally { - iter.close() - } + try body(iter) + finally iter.close() } private class EventIterator(from: Long, classifier: Int) extends Iterator[DurableEvent] with Closeable { - val opts = snapshotOptions() - - val iter1 = leveldb.iterator(opts) - val iter2 = iter1.asScala.takeWhile(entry => eventKey(entry.getKey).classifier == classifier).map(entry => event(entry.getValue)) - - addActiveIterator(iter1) - iter1.seek(eventKeyBytes(classifier, from)) + val iter1 = allocateIterator() + val iter2 = iter1.iterator.asScala.takeWhile(entry => eventKey(entry.getKey).classifier == classifier).map(entry => event(entry.getValue)) + iter1.iterator.seek(eventKeyBytes(classifier, from)) - override def hasNext: Boolean = - iter2.hasNext - - override def next(): DurableEvent = - iter2.next() - - override def close(): Unit = { - iter1.close() - removeActiveIterator(iter1) - opts.snapshot().close() - } + override def hasNext: Boolean = iter2.hasNext + override def next(): DurableEvent = iter2.next() + override def close(): Unit = iter1.close() } private def eventBytes(e: DurableEvent): Array[Byte] = @@ -253,12 +236,9 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With } override def postStop(): Unit = { - while (activeIterators.get.nonEmpty) { - // Wait a bit for all concurrent read iterators to be closed - // See https://github.com/RBMHTechnology/eventuate/issues/87 - Thread.sleep(500) - } + waitForOutstandingIterators() leveldb.close() + super.postStop() } @@ -269,26 +249,69 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With // this event log actor, mainly during integration tests. // ------------------------------------------------------------------- - import java.util.concurrent.atomic._ - import java.util.function._ - - private val activeIterators = new AtomicReference[Set[DBIterator]](Set()) - - private val addAccumulator = new BinaryOperator[Set[DBIterator]] { - override def apply(acc: Set[DBIterator], u: Set[DBIterator]): Set[DBIterator] = - acc + u.head + private trait ManagedIterator { + def iterator: DBIterator + def close(): Unit } - private val removeAccumulator = new BinaryOperator[Set[DBIterator]] { - override def apply(acc: Set[DBIterator], u: Set[DBIterator]): Set[DBIterator] = - acc - u.head + // invariants: + // - after finished changed to true, outstandingIterators will never be increased above 0 again + + private case class State(finishing: Boolean, outstandingIterators: Int) + private val state = new AtomicReference[State](State(false, 0)) + + @tailrec private def waitForOutstandingIterators(): Unit = { + val cur = state.get + if (!state.compareAndSet(cur, cur.copy(finishing = true))) + waitForOutstandingIterators() + else { + val start = System.nanoTime() + // spin until all outstanding iterators have been given back + while (state.get.outstandingIterators > 0 && (System.nanoTime() - start) < settings.closeWaitMax.toNanos) Thread.sleep(100) + if (state.get.outstandingIterators > 0) + throw new RuntimeException("Outstanding iterators were not closed after " + + s"eventuate.log.leveldb.iterator-close-wait-max = ${settings.closeWaitMax} either because of " + + "long running operations or because iterator handles were not closed.") + } } - def addActiveIterator(iter: DBIterator): Unit = - activeIterators.accumulateAndGet(Set(iter), addAccumulator) + private def allocateIterator(): ManagedIterator = { + def doAllocate(): ManagedIterator = { + val opts = snapshotOptions() + val it = leveldb.iterator(opts) + + new ManagedIterator { + def iterator: DBIterator = it + + var closed = false + def close(): Unit = + if (!closed) { + it.close() + opts.snapshot().close() + decreaseCounter() + closed = true + } + } + } + + @tailrec def decreaseCounter(): Unit = { + val cur = state.get + if (!state.compareAndSet(cur, cur.copy(outstandingIterators = cur.outstandingIterators - 1))) + decreaseCounter() + } - def removeActiveIterator(iter: DBIterator): Unit = - activeIterators.accumulateAndGet(Set(iter), removeAccumulator) + @tailrec def rec(): ManagedIterator = + state.get match { + case s @ State(false, n) => + if (!state.compareAndSet(s, s.copy(outstandingIterators = n + 1))) + rec() + else + doAllocate() + case State(true, _) => throw new RuntimeException("Cannot create iterators while shutting down!") + } + + rec() + } } object LeveldbEventLog {