Skip to content

Commit

Permalink
Adding some generic utility classes related to asynchronous processin…
Browse files Browse the repository at this point in the history
…g, reading, writing, and iterating over items.
  • Loading branch information
nh13 committed Oct 3, 2018
1 parent 12355d8 commit 689807f
Show file tree
Hide file tree
Showing 13 changed files with 987 additions and 3 deletions.
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import com.typesafe.sbt.SbtGit.GitCommand
import sbt.Keys._
import sbt._
import sbtassembly.AssemblyKeys.assembly
import sbtrelease.ReleasePlugin.autoImport.ReleaseTransformations._
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* The MIT License
*
* Copyright (c) 2018 Fulcrum Genomics
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/

package com.fulcrumgenomics.commons.async

import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, LinkedBlockingDeque, TimeUnit}

object AsyncIterator {
/** Builds an [[AsyncIterator]] and starts it.
*
* @param source the source iterator to consume
* @param bufferSize the maximum number of elements to buffer before the blocking when consuming the source, or None
* if unbounded.
* @tparam T the type of object to consume
*/
def apply[T](source: Iterator[T], bufferSize: Option[Int] = None): AsyncIterator[T] = {
new AsyncIterator[T](source, bufferSize).start()
}
}

/** An asynchronous wrapper for an [[Iterator]]. A separate thread will be created to consume the source iterator.
* Will buffer up to [[bufferSize]] elements before the blocking when consuming the source.
*
* @param source the source iterator to consume
* @param bufferSize the maximum number of elements to buffer before the blocking when consuming the source, or None
* if unbounded.
* @tparam T the type of object to consume
*/
class AsyncIterator[T](private val source: Iterator[T], bufferSize: Option[Int] = None) extends Iterator[T] with AsyncRunnable {
bufferSize.foreach(size => require(size > 0, s"bufferSize must be greater than zero when given, found $size"))

private var buffer: Option[T] = None

private val queue: BlockingQueue[T] = bufferSize match {
case Some(size) => new ArrayBlockingQueue[T](size)
case None => new LinkedBlockingDeque[T]()
}

protected def execute(): Unit = this.source.foreach(this.queue.put)

/** Returns true if there exists more elements, false otherwise */
def hasNext(): Boolean = {
checkAndRaise()

// Get the next item, or wait until the underlying thread is done and there are no more items in the queue
while (buffer.isEmpty && !(this.done && this.queue.isEmpty)) {
checkAndRaise() // check if hte underlying thread raised an exception
tryAndModifyInterruptedException("Interrupted waiting on taking from the queue.") {
buffer = Option(this.queue.poll(50, TimeUnit.MILLISECONDS))
}
}

// Did we get an item from the buffer?
buffer.nonEmpty
}

/** Gets the next item. */
def next(): T = {
checkAndRaise()
this.buffer match {
case None => throw new NoSuchElementException("Calling next() when hasNext() is false.")
case Some(item) =>
this.buffer = None
item
}
}
}
163 changes: 163 additions & 0 deletions src/main/scala/com/fulcrumgenomics/commons/async/AsyncRunnable.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* The MIT License
*
* Copyright (c) 2018 Fulcrum Genomics
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/

package com.fulcrumgenomics.commons.async

import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}

import scala.concurrent.{Awaitable, ExecutionContext, Future}

object AsyncRunnable {
/** A counter used for thread naming in [[AsyncRunnable]] and sub-classes */
private val threadsCreated = new AtomicInteger(0)

/** Generates a name for thread that will run the given runnable. */
private[AsyncRunnable] def nextName(asyncRunnable: AsyncRunnable): String = {
asyncRunnable.getClass.getSimpleName + this.threadsCreated.incrementAndGet
}
}

/** A trait that can be mixed in to help manage asynchronous execution while being [[Runnable]].
*
* The `execute()` method should contain the work to be performed asynchronously.
*
* The `thread()` method can be used to return a [[Thread]] that wraps this [[Runnable]].
*
* The `start()` method can be used to create and start daemon [[Thread]] that wraps this [[Runnable]].
*
* The `checkAndRaise()` method should be used by implementing classes to check if the
* `execute()` method has encountered an exception, and if so, it will rethrow the exception as an
* [[Error]] or [[RuntimeException]].
*
* The `tryAndModifyInterruptedException()` method should wrap blocking code in implementing classes
* that are not part of the `execute()` method. This method will throw a [[RuntimeException]] if interrupted while
* waiting.
*/
trait AsyncRunnable extends Runnable {

/** A reference to an exception thrown during an asynchronous operation. */
private[AsyncRunnable] val _throwable: AtomicReference[Throwable] = new AtomicReference(null)

/** This signals that the run method has started. */
private val startedLatch: CountDownLatch = new CountDownLatch(1)

/** This signals that the run method has completed, even if an exception occurred. This is needed so the close method
* can be sure that the run method is no longer executing. */
private val doneLatch: CountDownLatch = new CountDownLatch(1)

final def run(): Unit = {
startedLatch.countDown()
try {
this.execute()
} catch {
case thr: Throwable =>
this._throwable.compareAndSet(null, thr)
this.uponException()
} finally {
this.uponFinally()
this.doneLatch.countDown()
}
}

/** Returns true if the [[run()]] method has started, false otherwise. */
def started: Boolean = this.startedLatch.getCount == 0

/** Returns true if the [[run()]] method has completed, false otherwise. */
def done: Boolean = this.doneLatch.getCount == 0

/** Returns a throwable if an exception occurred in the [[run()]] method, None otherwise. */
def throwable: Option[Throwable] = Option(this._throwable.get())

/** Waits for the [[run()]] method to start. */
def awaitStart(): Unit = this.startedLatch.await()

/** Returns an [[Awaitable]] that completes when the [[run()]] method has started. Returns the throwable if an
* exception was encountered, [[None]] otherwise. */
def uponStart()(implicit ec: ExecutionContext) : Awaitable[Unit] = Future { this.awaitStart() }

/** Waits for the [[run()]] method to complete. */
def awaitDone(): Unit = this.doneLatch.await()

/** Returns an [[Awaitable]] that completes when the [[run()]] method has completed. Returns the throwable if an
* exception was encountered, [[None]] otherwise. */
def uponDone()(implicit ec: ExecutionContext) : Awaitable[Option[Throwable]] = Future { this.awaitDone(); Option(this._throwable.get()) }

/** The name of this runnable. This is used as the name of the thread in [[thread()]] as well. The name is created
* based on the class name and the number of [[AsyncRunnable]]s already created.
* */
val name: String = AsyncRunnable.nextName(this)

/** Creates a new thread wrapping this runnable; the thread is not started. */
final def thread(): Thread = {
require(!this.started, "Already started.")
new Thread(this, this.name)
}

/** Starts this [[Runnable]] in a daemon thread.
*
* @param name optionally the name of the thread, otherwise a name is created based on the class name and the number
* of threads already created.
*/
final def start(name: Option[String] = None, daemon: Boolean = true): this.type = {
require(!this.started, "Already started.")
val t = thread()
t.setDaemon(daemon)
t.start()
this.startedLatch.countDown()
this
}

/** The method that does the asynchronous work. */
protected def execute(): Unit

/** The method to execute if an exception occurs in the asynchronous thread. This should not block. */
protected def uponException(): Unit = Unit

/** The method to execute upon successfully execution of the run method or an exception occurs. This should not block. */
protected def uponFinally(): Unit = Unit

/** Checks to see if an exception has been raised by an asynchronous thread and if so rethrows it. Use this method
* before code that assumes the threads have not encountered an exception.
*/
protected final def checkAndRaise(): Unit = {
_throwable.getAndSet(null) match {
case null => Unit
case thr: Throwable => throw thr
}
}

/** Executes the given block of code. If an [[InterruptedException]] is thrown, throws a [[RuntimeException]]
* with the given message. Use this method for blocking code that when interrupted should not be recoverable.
*
* @param message the message to use if an [[InterruptedException]] is thrown by the block of code
* @param f the block of code to execute
*/
protected def tryAndModifyInterruptedException[T](message: String)(f: => T): T = {
try { f } catch {
case ie: InterruptedException => throw new RuntimeException(message, ie)
}
}
}
122 changes: 122 additions & 0 deletions src/main/scala/com/fulcrumgenomics/commons/async/AsyncSink.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* The MIT License
*
* Copyright (c) 2018 Fulcrum Genomics
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/

package com.fulcrumgenomics.commons.async

import java.io.Closeable
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean

object AsyncSink {
/** Builds an [[AsyncSink]] and starts it.
*
* @param sink the method to invoke to process an object of type [[T]].
* @param bufferSize the number of elements to buffer before blocking when processing the elements, or None if
* unbounded.
* @param source the optional source to close when this sink is closed.
* @tparam T the type of object to process
*/
def apply[T](sink: T => Unit, bufferSize: Option[Int] = None, source: Option[{ def close(): Unit }] = None): AsyncSink[T] = {
new AsyncSink[T](sink, bufferSize, source).start()
}
}



/** Asynchronous wrapper for applying a `sink` method to items of type [T]. Items can be added to this sink (with
* `add()`) until no more items will be added (indicated by calling the `close()` method), ensuring all items added to
* the sink will have the `sink` method applied to them.
*
* NOTE: Any exception thrown by the [[sink]] method will be propagated back to the caller
* during the next available call to [[add()]] or [[close()]]. After the exception
* has been thrown to the caller, it is not safe to attempt further operations on the instance.
*
* NOTE: [[add()]] or [[close()]] are not thread-safe, so there must be only one thread that calls them.
*
* @param sink the method to invoke to process an object of type [[T]].
* @param bufferSize the number of elements to buffer before blocking when processing the elements, or None if
* unbounded.
* @param source the optional source to close when this sink is closed.
* @tparam T the type of object to process
*/
class AsyncSink[T](val sink: T => Unit, bufferSize: Option[Int] = None, val source: Option[{ def close(): Unit }] = None)
extends Closeable with AsyncRunnable {
bufferSize.foreach(size => require(size > 0, s"bufferSize must be greater than zero when given, found $size"))

private val isClosed: AtomicBoolean = new AtomicBoolean(false)

private val queue: BlockingQueue[T] = bufferSize match {
case Some(size) => new ArrayBlockingQueue[T](size)
case None => new LinkedBlockingDeque[T]()
}

/** Adds the item to the queue to be processed by [[sink]].
*
* @param item the item to queue
*/
def add(item: T): Unit = {
if (this.isClosed.get()) throw new RuntimeException("Attempt to add record to closed sink.")
checkAndRaise()
tryAndModifyInterruptedException("Interrupted queueing item.") { this.queue.put(item) }
checkAndRaise()
}

/** Attempts to finish draining the queue and then calls `close()` on the source to allow implementation
* to do any one time clean up.
*/
def close(): Unit = {
require(this.started, "Attempting to close a sink that hasn't been started.")
checkAndRaise()
if (this.isClosed.compareAndSet(false, true)) {
// Wait for the execute method to complete
awaitDone()
// The queue should be empty but if it's not, we'll drain it here to protect against any lost data.
// There's no need to timeout on poll because take is called only when queue is not empty and
// at this point the thread is definitely dead and no-one is removing items from the queue.
// The item pulled will never be null (same reasoning).
while (!this.queue.isEmpty) {
sink(this.queue.take())
}
this.source.foreach(_.close())
checkAndRaise()
}
}

protected def execute(): Unit = {
// The order of the two conditions is important, because we want to make sure that emptiness status of the
// queue does not change after we have evaluated isClosed as it is now (isClosed checked before
// queue.isEmpty), the two operations are effectively atomic if isClosed returns true
while (!this.isClosed.get() || !this.queue.isEmpty) {
val item = this.queue.poll(50, TimeUnit.MILLISECONDS)
if (item != null) this.sink(item)
}
}

override protected def uponException(): Unit = {
// In case the sink was blocking on a full queue before ex has been set, clear the queue
// so that the sink will no longer be blocked so that it can see the exception.
this.queue.clear()
}
}
Loading

0 comments on commit 689807f

Please sign in to comment.