Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metric builder and sorter #826

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
113 changes: 89 additions & 24 deletions src/main/scala/com/fulcrumgenomics/util/Metric.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,36 @@ object Metric extends LazyLogging {
}
}

private[util] def build[T <: Metric](reflectiveBuilder: ReflectiveBuilder[T],
nh13 marked this conversation as resolved.
Show resolved Hide resolved
toArg: Int => String,
nh13 marked this conversation as resolved.
Show resolved Hide resolved
fail: (String, Option[Throwable]) => ()
): T = {
val names = Metric.names[T]
forloop(from = 0, until = names.length) { i =>
reflectiveBuilder.argumentLookup.forField(names(i)) match {
case Some(arg) =>
val value = {
val tmp = toArg(i)
if (tmp.isEmpty && arg.argumentType == classOf[Option[_]]) ReflectionUtil.SpecialEmptyOrNoneToken else tmp
}

val argumentValue = ReflectionUtil.constructFromString(arg.argumentType, arg.unitType, value) match {
case Success(v) => v
case Failure(thr) =>
fail(s"Could not construct value for column '${arg.name}' of type '${arg.typeDescription}' from '$value'", Some(thr))
}
arg.value = argumentValue
case None =>
fail(s"Did not have a field with name '${names(i)}'.", None)
}
}

// build it. NB: if arguments are missing values, then an exception will be thrown here
// Also, we don't use the default "build()" method since if a collection or option is empty, it will be treated as
// missing.
reflectiveBuilder.build(reflectiveBuilder.argumentLookup.ordered.map(arg => arg.value getOrElse unreachable(s"Arguments not set: ${arg.name}")))
}

/** Reads metrics from a set of lines. The first line should be the header with the field names. Each subsequent
* line should be a single metric. */
def iterator[T <: Metric](lines: Iterator[String], source: Option[String] = None)(implicit tt: ru.TypeTag[T]): Iterator[T] = {
Expand All @@ -122,33 +152,14 @@ object Metric extends LazyLogging {

if (lines.isEmpty) fail(lineNumber=1, message="No header found")
val parser = new DelimitedDataParser(lines=lines, delimiter=Delimiter, ignoreBlankLines=false, trimFields=true)
val names = parser.headers.toIndexedSeq
val reflectiveBuilder = new ReflectiveBuilder(clazz)

parser.zipWithIndex.map { case (row, rowIndex) =>
forloop(from = 0, until = names.length) { i =>
reflectiveBuilder.argumentLookup.forField(names(i)) match {
case Some(arg) =>
val value = {
val tmp = row[String](i)
if (tmp.isEmpty && arg.argumentType == classOf[Option[_]]) ReflectionUtil.SpecialEmptyOrNoneToken else tmp
}

val argumentValue = ReflectionUtil.constructFromString(arg.argumentType, arg.unitType, value) match {
case Success(v) => v
case Failure(thr) =>
fail(lineNumber=rowIndex+2, message=s"Could not construct value for column '${arg.name}' of type '${arg.typeDescription}' from '$value'", Some(thr))
}
arg.value = argumentValue
case None =>
fail(lineNumber=rowIndex+2, message=s"Did not have a field with name '${names(i)}'.")
}
}

// build it. NB: if arguments are missing values, then an exception will be thrown here
// Also, we don't use the default "build()" method since if a collection or option is empty, it will be treated as
// missing.
reflectiveBuilder.build(reflectiveBuilder.argumentLookup.ordered.map(arg => arg.value getOrElse unreachable(s"Arguments not set: ${arg.name}")))
build(
reflectiveBuilder = reflectiveBuilder,
toArg = i => row[String](i),
fail = (message, throwable) => fail(rowIndex+2, message, throwable)
)
}
}

Expand Down Expand Up @@ -205,6 +216,60 @@ object Metric extends LazyLogging {
def writer[T <: Metric](writer: Writer)(implicit tt: ru.TypeTag[T]): MetricWriter[T] = new MetricWriter[T](writer)
}


class MetricSorter[Key <: Ordered[Key], T <: Metric](maxObjectsInRam: Int = MetricSorter.MaxInMemory,
keyfunc: T => Key,
tmpDir: DirPath = Io.tmpDir
)(implicit tt: ru.TypeTag[T]) extends Sorter[T, Key](
maxObjectsInRam = maxObjectsInRam,
codec = new MetricSorter.MetricSorterCodec[T](),
keyfunc = keyfunc,
tmpDir = tmpDir
)

object MetricSorter {
/** The default maximum # of records to keep and sort in memory. */
val MaxInMemory: Int = 1e6.toInt

class MetricSorterCodec[T <: Metric]()(implicit tt: ru.TypeTag[T])
nh13 marked this conversation as resolved.
Show resolved Hide resolved
extends Sorter.Codec[T] with LazyLogging {
private val clazz: Class[T] = ReflectionUtil.typeTagToClass[T]
private val reflectiveBuilder = new ReflectiveBuilder(clazz)

private def fail(message: String,
nh13 marked this conversation as resolved.
Show resolved Hide resolved
throwable: Option[Throwable] = None): Unit = {
val fullMessage = s"For metric '${clazz.getSimpleName}'\n$message"
throwable.foreach { thr =>
val stringWriter = new StringWriter
thr.printStackTrace(new PrintWriter(stringWriter))
val banner = "#" * 80
logger.debug(banner)
logger.debug(stringWriter.toString)
logger.debug(banner)
}
throw FailureException(message=Some(fullMessage))
}

/** Encode the metric into an array of bytes. */
def encode(metric: T): Array[Byte] = metric.values.mkString(Metric.DelimiterAsString).getBytes

/** Decode a metric from an array of bytes. */
def decode(bs: Array[Byte], start: Int, length: Int): T = {
val fields = new String(bs.slice(from=start, until=start+length)).split(Metric.DelimiterAsString)
Metric.build(
reflectiveBuilder = reflectiveBuilder,
toArg = i => fields(i),
fail = fail
)

// build it. NB: if arguments are missing values, then an exception will be thrown here
// Also, we don't use the default "build()" method since if a collection or option is empty, it will be treated as
// missing.
reflectiveBuilder.build(reflectiveBuilder.argumentLookup.ordered.map(arg => arg.value getOrElse unreachable(s"Arguments not set: ${arg.name}")))
nh13 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/**
* Base trait for metrics.
*
Expand Down