Skip to content

Commit

Permalink
Safer tracking of issued leveldb iterators, fixes RBMHTechnology#234
Browse files Browse the repository at this point in the history
Previously, creating an iterator and tracking wasn't an atomic operation
resulting in iterator operations being run after the db had been closed.
This resulted in all kinds of crashes because of illegal native state
during the shutdown procedure.
  • Loading branch information
jrudolph committed Mar 14, 2016
1 parent 9150300 commit e80cac3
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 53 deletions.
5 changes: 5 additions & 0 deletions eventuate-log-leveldb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ eventuate {
# Delay between two tries to physically delete all requested events while
# keeping those that are not yet replicated.
deletion-retry-delay = 1m

# The maximum duration to wait for outstanding iterators being closed during
# shutdown. If there are iterators still left after this period closing will
# be aborted with an exception and the LevelDB handle will not be closed.
close-wait-max = 30s
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.rbmhtechnology.eventuate.log.leveldb
import java.io._
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference

import akka.actor._
import akka.serialization.SerializationExtension
Expand All @@ -31,6 +32,7 @@ import com.typesafe.config.Config
import org.fusesource.leveldbjni.JniDBFactory._
import org.iq80.leveldb._

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.immutable.Seq
import scala.concurrent.Future
Expand All @@ -51,6 +53,9 @@ class LeveldbEventLogSettings(config: Config) extends EventLogSettings {
val deletionBatchSize: Int =
config.getInt("eventuate.log.leveldb.deletion-batch-size")

val closeWaitMax: FiniteDuration =
config.getDuration("eventuate.log.leveldb.iterator-close-wait-max", TimeUnit.MILLISECONDS).millis

val initRetryDelay: FiniteDuration =
Duration.Zero

Expand Down Expand Up @@ -187,47 +192,25 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With
}

private def withIterator[R](body: DBIterator => R): R = {
val so = snapshotOptions()
val iter = leveldb.iterator(so)
addActiveIterator(iter)
try {
body(iter)
} finally {
iter.close()
removeActiveIterator(iter)
so.snapshot().close()
}
val iter = allocateIterator()
try body(iter.iterator)
finally iter.close()
}

private def withEventIterator[R](from: Long, classifier: Int)(body: EventIterator => R): R = {
val iter = eventIterator(from, classifier)
try {
body(iter)
} finally {
iter.close()
}
try body(iter)
finally iter.close()
}

private class EventIterator(from: Long, classifier: Int) extends Iterator[DurableEvent] with Closeable {
val opts = snapshotOptions()

val iter1 = leveldb.iterator(opts)
val iter2 = iter1.asScala.takeWhile(entry => eventKey(entry.getKey).classifier == classifier).map(entry => event(entry.getValue))

addActiveIterator(iter1)
iter1.seek(eventKeyBytes(classifier, from))
val iter1 = allocateIterator()
val iter2 = iter1.iterator.asScala.takeWhile(entry => eventKey(entry.getKey).classifier == classifier).map(entry => event(entry.getValue))
iter1.iterator.seek(eventKeyBytes(classifier, from))

override def hasNext: Boolean =
iter2.hasNext

override def next(): DurableEvent =
iter2.next()

override def close(): Unit = {
iter1.close()
removeActiveIterator(iter1)
opts.snapshot().close()
}
override def hasNext: Boolean = iter2.hasNext
override def next(): DurableEvent = iter2.next()
override def close(): Unit = iter1.close()
}

private def eventBytes(e: DurableEvent): Array[Byte] =
Expand All @@ -253,12 +236,9 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With
}

override def postStop(): Unit = {
while (activeIterators.get.nonEmpty) {
// Wait a bit for all concurrent read iterators to be closed
// See https://github.com/RBMHTechnology/eventuate/issues/87
Thread.sleep(500)
}
waitForOutstandingIterators()
leveldb.close()

super.postStop()
}

Expand All @@ -269,26 +249,69 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With
// this event log actor, mainly during integration tests.
// -------------------------------------------------------------------

import java.util.concurrent.atomic._
import java.util.function._

private val activeIterators = new AtomicReference[Set[DBIterator]](Set())

private val addAccumulator = new BinaryOperator[Set[DBIterator]] {
override def apply(acc: Set[DBIterator], u: Set[DBIterator]): Set[DBIterator] =
acc + u.head
private trait ManagedIterator {
def iterator: DBIterator
def close(): Unit
}

private val removeAccumulator = new BinaryOperator[Set[DBIterator]] {
override def apply(acc: Set[DBIterator], u: Set[DBIterator]): Set[DBIterator] =
acc - u.head
// invariants:
// - after finished changed to true, outstandingIterators will never be increased above 0 again

private case class State(finishing: Boolean, outstandingIterators: Int)
private val state = new AtomicReference[State](State(false, 0))

@tailrec private def waitForOutstandingIterators(): Unit = {
val cur = state.get
if (!state.compareAndSet(cur, cur.copy(finishing = true)))
waitForOutstandingIterators()
else {
val start = System.nanoTime()
// spin until all outstanding iterators have been given back
while (state.get.outstandingIterators > 0 && (System.nanoTime() - start) < settings.closeWaitMax.toNanos) Thread.sleep(100)
if (state.get.outstandingIterators > 0)
throw new RuntimeException("Outstanding iterators were not closed after " +
s"eventuate.log.leveldb.iterator-close-wait-max = ${settings.closeWaitMax} either because of " +
"long running operations or because iterator handles were not closed.")
}
}

def addActiveIterator(iter: DBIterator): Unit =
activeIterators.accumulateAndGet(Set(iter), addAccumulator)
private def allocateIterator(): ManagedIterator = {
def doAllocate(): ManagedIterator = {
val opts = snapshotOptions()
val it = leveldb.iterator(opts)

new ManagedIterator {
def iterator: DBIterator = it

var closed = false
def close(): Unit =
if (!closed) {
it.close()
opts.snapshot().close()
decreaseCounter()
closed = true
}
}
}

@tailrec def decreaseCounter(): Unit = {
val cur = state.get
if (!state.compareAndSet(cur, cur.copy(outstandingIterators = cur.outstandingIterators - 1)))
decreaseCounter()
}

def removeActiveIterator(iter: DBIterator): Unit =
activeIterators.accumulateAndGet(Set(iter), removeAccumulator)
@tailrec def rec(): ManagedIterator =
state.get match {
case s @ State(false, n) =>
if (!state.compareAndSet(s, s.copy(outstandingIterators = n + 1)))
rec()
else
doAllocate()
case State(true, _) => throw new RuntimeException("Cannot create iterators while shutting down!")
}

rec()
}
}

object LeveldbEventLog {
Expand Down

0 comments on commit e80cac3

Please sign in to comment.