Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

safer tracking of issued leveldb iterators #231

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions eventuate-log-leveldb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ eventuate {
# Delay between two tries to physically delete all requested events while
# keeping those that are not yet replicated.
deletion-retry-delay = 1m

# The maximum duration to wait for outstanding iterators being closed during
# shutdown. If there are iterators still left after this period closing will
# be aborted with an exception and the LevelDB handle will not be closed.
iterator-close-wait-max = 30s
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.rbmhtechnology.eventuate.log.leveldb
import java.io._
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference

import akka.actor._
import akka.serialization.SerializationExtension
Expand All @@ -31,6 +32,7 @@ import com.typesafe.config.Config
import org.fusesource.leveldbjni.JniDBFactory._
import org.iq80.leveldb._

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.immutable.Seq
import scala.concurrent.Future
Expand All @@ -51,6 +53,9 @@ class LeveldbEventLogSettings(config: Config) extends EventLogSettings {
val deletionBatchSize: Int =
config.getInt("eventuate.log.leveldb.deletion-batch-size")

val closeWaitMax: FiniteDuration =
config.getDuration("eventuate.log.leveldb.iterator-close-wait-max", TimeUnit.MILLISECONDS).millis

val initRetryDelay: FiniteDuration =
Duration.Zero

Expand Down Expand Up @@ -187,47 +192,25 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With
}

private def withIterator[R](body: DBIterator => R): R = {
val so = snapshotOptions()
val iter = leveldb.iterator(so)
addActiveIterator(iter)
try {
body(iter)
} finally {
iter.close()
removeActiveIterator(iter)
so.snapshot().close()
}
val iter = allocateIterator()
try body(iter.iterator)
finally iter.close()
}

private def withEventIterator[R](from: Long, classifier: Int)(body: EventIterator => R): R = {
val iter = eventIterator(from, classifier)
try {
body(iter)
} finally {
iter.close()
}
try body(iter)
finally iter.close()
}

private class EventIterator(from: Long, classifier: Int) extends Iterator[DurableEvent] with Closeable {
val opts = snapshotOptions()

val iter1 = leveldb.iterator(opts)
val iter2 = iter1.asScala.takeWhile(entry => eventKey(entry.getKey).classifier == classifier).map(entry => event(entry.getValue))

addActiveIterator(iter1)
iter1.seek(eventKeyBytes(classifier, from))
val iter1 = allocateIterator()
val iter2 = iter1.iterator.asScala.takeWhile(entry => eventKey(entry.getKey).classifier == classifier).map(entry => event(entry.getValue))
iter1.iterator.seek(eventKeyBytes(classifier, from))

override def hasNext: Boolean =
iter2.hasNext

override def next(): DurableEvent =
iter2.next()

override def close(): Unit = {
iter1.close()
removeActiveIterator(iter1)
opts.snapshot().close()
}
override def hasNext: Boolean = iter2.hasNext
override def next(): DurableEvent = iter2.next()
override def close(): Unit = iter1.close()
}

private def eventBytes(e: DurableEvent): Array[Byte] =
Expand All @@ -253,12 +236,9 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With
}

override def postStop(): Unit = {
while (activeIterators.get.nonEmpty) {
// Wait a bit for all concurrent read iterators to be closed
// See https://github.com/RBMHTechnology/eventuate/issues/87
Thread.sleep(500)
}
waitForOutstandingIterators()
leveldb.close()

super.postStop()
}

Expand All @@ -269,26 +249,69 @@ class LeveldbEventLog(id: String, prefix: String) extends EventLog(id) with With
// this event log actor, mainly during integration tests.
// -------------------------------------------------------------------

import java.util.concurrent.atomic._
import java.util.function._

private val activeIterators = new AtomicReference[Set[DBIterator]](Set())

private val addAccumulator = new BinaryOperator[Set[DBIterator]] {
override def apply(acc: Set[DBIterator], u: Set[DBIterator]): Set[DBIterator] =
acc + u.head
private trait ManagedIterator {
def iterator: DBIterator
def close(): Unit
}

private val removeAccumulator = new BinaryOperator[Set[DBIterator]] {
override def apply(acc: Set[DBIterator], u: Set[DBIterator]): Set[DBIterator] =
acc - u.head
// invariants:
// - after finished changed to true, outstandingIterators will never be increased above 0 again

private case class State(finishing: Boolean, outstandingIterators: Int)
private val state = new AtomicReference[State](State(false, 0))

@tailrec private def waitForOutstandingIterators(): Unit = {
val cur = state.get
if (!state.compareAndSet(cur, cur.copy(finishing = true)))
waitForOutstandingIterators()
else {
val start = System.nanoTime()
// spin until all outstanding iterators have been given back
while (state.get.outstandingIterators > 0 && (System.nanoTime() - start) < settings.closeWaitMax.toNanos) Thread.sleep(100)
if (state.get.outstandingIterators > 0)
throw new RuntimeException("Outstanding iterators were not closed after " +
s"eventuate.log.leveldb.iterator-close-wait-max = ${settings.closeWaitMax} either because of " +
"long running operations or because iterator handles were not closed.")
}
}

def addActiveIterator(iter: DBIterator): Unit =
activeIterators.accumulateAndGet(Set(iter), addAccumulator)
private def allocateIterator(): ManagedIterator = {
def doAllocate(): ManagedIterator = {
val opts = snapshotOptions()
val it = leveldb.iterator(opts)

new ManagedIterator {
def iterator: DBIterator = it

var closed = false
def close(): Unit =
if (!closed) {
it.close()
opts.snapshot().close()
decreaseCounter()
closed = true
}
}
}

@tailrec def decreaseCounter(): Unit = {
val cur = state.get
if (!state.compareAndSet(cur, cur.copy(outstandingIterators = cur.outstandingIterators - 1)))
decreaseCounter()
}

def removeActiveIterator(iter: DBIterator): Unit =
activeIterators.accumulateAndGet(Set(iter), removeAccumulator)
@tailrec def rec(): ManagedIterator =
state.get match {
case s @ State(false, n) =>
if (!state.compareAndSet(s, s.copy(outstandingIterators = n + 1)))
rec()
else
doAllocate()
case State(true, _) => throw new RuntimeException("Cannot create iterators while shutting down!")
}

rec()
}
}

object LeveldbEventLog {
Expand Down