Skip to content

Commit

Permalink
Remove unneeded values from state when expected types are specified.
Browse files Browse the repository at this point in the history
Optimize reduce graph algorithm.
Add benchmarks for core functions
Small fixes
  • Loading branch information
jan-goral committed Jun 13, 2021
1 parent d5b5d54 commit 5d56150
Show file tree
Hide file tree
Showing 14 changed files with 293 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ object Parallel {
*/
data class Task<R : Any>(
val signature: Signature<R>,
val execute: ExecuteTask<R>
val execute: ExecuteTask<R>,
val expected: Boolean = true,
) {
/**
* The task signature.
Expand All @@ -78,7 +79,7 @@ object Parallel {
/**
* Parameterized factory for creating task functions in scope of [X].
*/
class Function<X : Context>(override val context: () -> X) : ContextProvider<X>
class Function<X : Context>(override val context: () -> X) : ContextProvider<X>()

data class Event internal constructor(
val type: Type<*>,
Expand Down Expand Up @@ -115,6 +116,11 @@ typealias Output = Any.() -> Unit
*/
typealias ParallelState = Map<Parallel.Type<*>, Any>

/**
* Immutable state for parallel execution.
*/
typealias Property = Pair<Parallel.Type<*>, Any>

/**
* Type for group of parallel tasks. Each task must be unique in group.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package flank.exection.parallel

import flank.exection.parallel.internal.initialValidators
import flank.exection.parallel.internal.contextValidators
import flank.exection.parallel.internal.reduceTo

/**
* Reduce given [Tasks] by [select] types to remove unneeded tasks from the graph.
* Reduce given [Tasks] by [expected] types to remove unneeded tasks from the graph.
* The returned graph will hold only tasks that are returning selected types, their dependencies and derived dependencies.
* Additionally this is keeping also the validators for initial state.
*
* @return Reduced [Tasks]
*/
operator fun Tasks.invoke(
select: Set<Parallel.Type<*>>
expected: Set<Parallel.Type<*>>
): Tasks =
reduceTo(select + initialValidators)
reduceTo(expected + contextValidators())

/**
* Shortcut for tasks reducing.
*/
operator fun Tasks.invoke(
vararg returns: Parallel.Type<*>
): Tasks = invoke(returns.toSet())
vararg expected: Parallel.Type<*>
): Tasks = invoke(expected.toSet())
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package flank.exection.parallel.internal

import flank.exection.parallel.ExecuteTask
import flank.exection.parallel.Parallel
import flank.exection.parallel.validator

/**
* Abstract factory for creating task function.
*/
internal interface ContextProvider<X : Parallel.Context> {
val context: () -> X
abstract class ContextProvider<X : Parallel.Context> {
protected abstract val context: () -> X

operator fun <R> invoke(body: suspend X.() -> R): ExecuteTask<R> =
{ context().also { it.state = this }.body() }

val validator by lazy { validator(context) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import flank.exection.parallel.Parallel.Logger
import flank.exection.parallel.Parallel.Task
import flank.exection.parallel.Parallel.Type
import flank.exection.parallel.ParallelState
import flank.exection.parallel.Property
import flank.exection.parallel.Tasks
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand All @@ -19,6 +20,7 @@ import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.scan
import kotlinx.coroutines.launch
import java.util.concurrent.atomic.AtomicInteger

/**
* Invoke the given [Execution] in parallel.
Expand All @@ -30,7 +32,7 @@ internal operator fun Execution.invoke(): Flow<ParallelState> =
.onEach { (type, value) -> if (value is Throwable && isNotClosed()) abortBy(type, value) }

// Accumulate each received value in state.
.scan(initial) { state, value -> state + value }
.scan(initial, updateState())

// Handle state changes.
.onEach { state: ParallelState ->
Expand Down Expand Up @@ -90,13 +92,21 @@ internal class Execution(
/**
* The set of value types required to complete the execution.
*/
val required = tasks.map(Task<*>::type).toSet()
val required = tasks.filter(Task<*>::expected).map(Task<*>::type).toSet()

/**
* Map of remaining tasks for run grouped by arguments.
*/
val remaining = tasks.groupBy(Task<*>::args).toMutableMap()

/**
* Reference counter for state types marked as not expected.
* Values of types that are not expected but required as dependencies,
* can be removed when there are no remaining tasks depending on them.
*/
val references = tasks.flatMap(Task<*>::args).groupBy { it }
.minus(required).mapValues { (_, refs) -> AtomicInteger(refs.size) }

/**
* Reference to optional output for structural logs.
*/
Expand Down Expand Up @@ -130,6 +140,17 @@ private suspend fun Execution.abortBy(type: Type<*>, cause: Throwable) {
channel.close()
}

/**
* Create function for updating state depending on reference counter state.
*/
private fun Execution.updateState(): suspend (ParallelState, Property) -> ParallelState =
if (references.isEmpty()) ParallelState::plus
else { state, property ->
references.filterValues { counter -> counter.compareAndSet(0, 0) }
.map { (type, _) -> type }
.let { junks -> state + property - junks }
}

/**
* The execution is complete when all required types was accumulated to state.
*/
Expand Down Expand Up @@ -157,7 +178,7 @@ private fun Execution.filterTasksFor(state: ParallelState): Map<Set<Type<*>>, Li
channel.isEmpty.not() || // some values are waiting in the channel queue.
throw DeadlockError(state, jobs, remaining)

// Remove from queue the tasks for current iteration.
// Remove from queue tasks for current iteration.
remaining -= keys
}

Expand Down Expand Up @@ -185,6 +206,11 @@ private fun Execution.execute(
task: Task<*>,
args: Map<Type<*>, Any>,
) {
// Decrement references to arguments types
if (references.isNotEmpty()) task.args.forEach { type ->
references[type]?.getAndUpdate { count -> count - 1 }
}

// Obtain type of current task.
val type: Type<*> = task.type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ import flank.exection.parallel.Tasks
* Get initial state validators.
* This is necessary to perform validations of initial state before the execution.
*/
internal val Tasks.initialValidators: List<Parallel.Context>
get() = mapNotNull { task -> task.type as? Parallel.Context }
internal fun Tasks.contextValidators(): List<Parallel.Context> =
mapNotNull { task -> task.type as? Parallel.Context }
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import flank.exection.parallel.ParallelState
* Factory function for lazy property delegate.
*/
internal fun <T : Any> Parallel.Context.lazyProperty(type: Parallel.Type<T>) = lazy {
@Suppress("UNCHECKED_CAST")
state[type] as T
}

Expand All @@ -16,19 +17,19 @@ internal fun <T : Any> Parallel.Context.lazyProperty(type: Parallel.Type<T>) = l
class EagerProperties(
private val state: () -> ParallelState
) {
private val set = mutableSetOf<Lazy<*>>()
private val props = mutableSetOf<Lazy<*>>()

/**
* Initialize eager properties, this performs also validation.
*/
operator fun invoke(): Unit = set.forEach { prop -> println(prop.value) }
operator fun invoke(): Unit = props.forEach { prop -> prop.value }

/**
* Register new parallel type. Inline modifier is necessary to perform real type check
*/
inline operator fun <reified T : Any> invoke(type: Parallel.Type<T>): Lazy<T> = lazy { type.value() as T }.append()

// local helpers need to be public because of inlined invoke
fun <T> Lazy<T>.append(): Lazy<T> = apply { set.plusAssign(this) }
fun <T> Lazy<T>.append(): Lazy<T> = apply { props += this }
fun <T : Any> Parallel.Type<T>.value(): Any? = state()[this]
}
Original file line number Diff line number Diff line change
@@ -1,42 +1,22 @@
package flank.exection.parallel.internal

import flank.exection.parallel.Parallel
import flank.exection.parallel.Parallel.Task
import flank.exection.parallel.Parallel.Type
import flank.exection.parallel.Tasks
import flank.exection.parallel.internal.graph.findDependenciesIn

internal infix fun Tasks.reduceTo(
selectedTypes: Set<Parallel.Type<*>>
): Tasks =
filter { task -> task.type in selectedTypes }
.toSet()
.apply {
val notFound = selectedTypes - map(Parallel.Task<*>::type)
if (notFound.isNotEmpty()) throw Exception("Cannot reduce find tasks for the following types: $notFound")
expected: Set<Type<*>>
): Tasks {
val notFound = expected - map(Task<*>::type)
if (notFound.isNotEmpty()) throw Exception("Cannot find tasks for the following types: $notFound")
val graph: Map<Type<*>, Set<Type<*>>> = associate { task -> task.type to task.args }
val dependencies = expected.findDependenciesIn(graph)
return mapNotNull { task ->
when (task.type) {
in expected -> task
in dependencies -> task.copy(expected = false)
else -> null
}
.reduce(this)

/**
* Reduce [all] steps to given receiver steps and their dependencies.
*
* @receiver The task selector for current reducing step.
* @param all The list of all tasks that are under reducing.
* @param acc Currently accumulated tasks.
* @return Accumulated tasks if selector is empty.
*/
private tailrec fun Tasks.reduce(
all: Tasks,
acc: Tasks =
if (isEmpty()) all
else emptySet(),
): Tasks =
when {
isEmpty() -> acc
else -> flatMap(Parallel.Task<*>::args)
.mapNotNull(all::findByType)
.toSet()
.reduce(all, acc + this)
}

private fun Tasks.findByType(
type: Parallel.Type<*>
): Parallel.Task<*>? =
find { task -> task.type == type }
}.toSet()
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package flank.exection.parallel.internal.graph

tailrec fun <T : Any> Map<T, Set<T>>.findCycles(

remaining: List<T> = toList()
.sortedByDescending { (_, edges) -> edges.size }
.map { (vertices, _) -> vertices }
.toMutableList(),
/**
* Find cycles in given graph.
*
* @receiver Graph to search.
* @return List of cycles. Each cycle is a list of nodes.
*/
internal tailrec fun <T : Any> Map<T, Set<T>>.findCycles(

remaining: Set<T> = toList()
.sortedByDescending { (_, children) -> children.size }
.map { (parent, _) -> parent }
.toSet(),

queue: List<T> = emptyList(),

Expand All @@ -28,7 +34,6 @@ tailrec fun <T : Any> Map<T, Set<T>>.findCycles(
val cycle = current in path + next

// println("$cycle R:$remaining Q:$queue N:$next C:$current P:$path V:$visited")
// println()

return findCycles(
remaining =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package flank.exection.parallel.internal.graph

/**
* Find dependencies for given nodes in [graph].
*
* @receiver Expected elements for current iteration.
* @param graph Graph to search.
* @param acc Currently accumulated elements.
* @return Set expected elements along with dependencies.
*/
internal tailrec fun <T> Set<T>.findDependenciesIn(
graph: Map<T, Set<T>>,
acc: Set<T> =
if (isEmpty()) graph.keys
else emptySet(),
): Set<T> =
when {
isEmpty() -> acc // No more elements, so return all accumulated.
else -> flatMap(graph::getValue).toSet() // Map each expected element to its dependencies.
.minus(acc) // Remove already accumulated elements to optimize calculations.
.findDependenciesIn(graph, acc + this) // Accumulate current dependencies and run next iteration.
}
Loading

0 comments on commit 5d56150

Please sign in to comment.