Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement tool for parallel execution #2009

Merged
merged 29 commits into from
Jun 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e5d362f
Add parallel execution module
jan-goral Jun 3, 2021
9d649ee
Add subgraph selection
jan-goral Jun 3, 2021
c952d1d
Update
jan-goral Jun 7, 2021
e18d089
Add unit tests
jan-goral Jun 7, 2021
8afc135
Add unfinished doc
jan-goral Jun 9, 2021
181841e
Make context dsl protected
jan-goral Jun 9, 2021
aa66515
Update
jan-goral Jun 9, 2021
fdfe182
Update
jan-goral Jun 9, 2021
bc0e85d
Update
jan-goral Jun 9, 2021
90e95af
Improve graph validation
jan-goral Jun 10, 2021
71c4277
Remove unneeded error type
jan-goral Jun 11, 2021
23d972f
Rollback gradle version
jan-goral Jun 11, 2021
c202cdf
Format kotlin
jan-goral Jun 11, 2021
9fafbef
Remove wildcards
jan-goral Jun 11, 2021
d7bf537
Update doc
jan-goral Jun 11, 2021
6d50223
Add example graph
jan-goral Jun 11, 2021
37bdcf2
Update README.md
jan-goral Jun 11, 2021
2fbf097
Update README.md
jan-goral Jun 11, 2021
f6dfad9
Update README.md
jan-goral Jun 11, 2021
54e4159
Update references
jan-goral Jun 11, 2021
169f98c
Update parallel-execution-api-structures.puml
jan-goral Jun 11, 2021
76e1225
Update parallel-execution-api-structures.puml
jan-goral Jun 11, 2021
72245e8
Fix flaky test
jan-goral Jun 11, 2021
2214312
Update parallel-execution-api-functions.puml
jan-goral Jun 11, 2021
5bd3d32
Small fix
jan-goral Jun 11, 2021
4926c25
Remove unneeded values from state when expected types are specified.
jan-goral Jun 13, 2021
cbb0a0b
Update diagram
jan-goral Jun 14, 2021
1838aa7
Fix gradle version
jan-goral Jun 14, 2021
7a6bcda
Apply suggestions from code review
jan-goral Jun 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions docs/hld/parallel-example-graph.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
@startuml
'https://plantuml.com/component-diagram
skinparam componentStyle rectangle

component Finalize

component NotifyRemoteServices
component GenerateReport

component CalculateResults

component PerformExecution

component CollectRemoteData1
component FetchPreviousResults

component InitialCalculations
component RunRemoteProcess1
component RunRemoteProcess2

component CollectLocalData2
component CollectLocalData1
component Authorize

Finalize *--> GenerateReport
Finalize *--> NotifyRemoteServices

GenerateReport *--> CalculateResults
NotifyRemoteServices *--> CalculateResults

CalculateResults *--> PerformExecution
CalculateResults *--> FetchPreviousResults

InitialCalculations *--> CollectLocalData1
InitialCalculations *--> CollectLocalData2

PerformExecution *--> CollectRemoteData1
PerformExecution *---> InitialCalculations

FetchPreviousResults *--> Authorize

InitialCalculations -[hidden]r- RunRemoteProcess1

CollectRemoteData1 *--> RunRemoteProcess1
CollectRemoteData1 *--> RunRemoteProcess2

RunRemoteProcess1 *--> Authorize
RunRemoteProcess2 *--> Authorize
RunRemoteProcess2 -[hidden]r- FetchPreviousResults


@enduml
59 changes: 59 additions & 0 deletions docs/hld/parallel-execution-api-functions.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
@startuml
'left to right direction
package "parallel" {

object "Parallel.Function.invoke()" as Function {
creates
}

class "Parallel.Task" as Task<R>
class "Parallel.Task.Signature" as Task_Signature<R>
interface "Parallel.Type<R>" as Type
interface "Parallel.Type<R>" as Type2
interface ExecuteTask<R> << (T, orchid) >>
interface ParallelState << (T, orchid) >>

object "using" as usingType {
creates
}
object "using" as usingSignature {
creates
}
object from {
creates
}

object "invoke()" as invokeExecution {
executes
}
object "invoke()" as reduce {
reduces
}

ExecuteTask <. "1" Function

ExecuteTask <--o "1" usingSignature
usingSignature "1" #--> Task_Signature
usingSignature "1" ..> Task
usingSignature -right-* usingType

Type <--# "1" usingType
ExecuteTask <--o "1" usingType
usingType "1" ..> Task
usingType *-left- from

Type <--# "1" from
Type <--o "*" from
from "1" ..> Task_Signature

Type2 <--o "*" reduce
Task <. "*" reduce
Task <-# "*" reduce

Task <--# "*" invokeExecution

invokeExecution "*" .left.> ParallelState
invokeExecution "1" o-left-> ParallelState

}
@enduml
56 changes: 56 additions & 0 deletions docs/hld/parallel-execution-api-structures.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
@startuml
package "parallel" {
class "Parallel.Context" as Context {
out: Output?
}
class "Parallel.Task" as Task<R> {
signature: Signature<R>
execute: ExecuteTask<R>
}
class "Parallel.Task.Signature" as Signature<R>{
returns: Type<R>
args: Set<Type<*>>
}
class "Parallel.Function" as Function<X: ParallelContext> {
context: () -> X
= suspend <R> (ExecuteTaskInContext) -> ExecuteTask
}
interface "Any data" as Data << (R, yellow) >>
interface ExecuteTask<R> << (T, orchid) >> {
= suspend ParallelState.() -> R
}
interface ExecuteTaskInContext<X: Parallel.Context> << (T, orchid) >> {
= suspend X.() -> R
}
interface "Parallel.Type" as Type <T>
interface ParallelState << (T, orchid) >> {
= Map<Parallel.Type<*>, Any>
}
interface Output << (T, orchid) >> {
= Any.() -> Unit
}

Function ..> ExecuteTask
Function #.. ExecuteTaskInContext

ExecuteTaskInContext #.. Context
ExecuteTaskInContext .> Data

Data <. ExecuteTask
Data "*" --o "1" ParallelState

ExecuteTask "1" -* "1" Task
ExecuteTask #.. ParallelState

Context --|> Type
Context "0..1" o..> "1" Output

ParallelState "1" -o "1" Context
ParallelState "1" o- "*" Type

Task "1" *-- "1" Signature

Type "1" -* "1" Signature
Type "*" -o "1" Signature

@enduml
65 changes: 65 additions & 0 deletions docs/hld/parallel-execution.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
@startuml
skinparam ConditionEndStyle hline

|Invoke execution|
split
-[hidden]->
start
:arguments;
:initial state;
repeat :Handle state}
-[dotted]->
if (is channel closed for receive?) then
stop
else (no)
endif
:next result<
if (on first error) then (true)
:Abort execution}
else (false)
endif
repeat while (accumulate result)
detach

|Handle state|
split again
-[hidden]->
if (is complete) then (yes)
:close channel/
elseif (is finished) then (yes)
:nothing to do;
detach
else
:filter tasks for current state;
:group them by arguments;
repeat :prepare properties|
repeat :Execute task}
repeat while (for each task)
repeat while (for each group)
endif
detach

|Execute task|
split again
-[hidden]->
partition async {
:log start;
:prepare context;
if (execute task in context) then (try)
:log success;
else (catch)
:log exception;
endif
:send result]
detach
}

|Abort execution|
split again
-[hidden]->
:cancel scope;
repeat :send "not started"]
repeat while (for each remaining task)
:close channel/
detach
@enduml
119 changes: 119 additions & 0 deletions docs/hld/task-graph.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
@startuml
package "Valid graph" {
map H1 #lightgreen {
}
map I1 #lightgreen {
}
map F1 #lightgreen {
I1 *--> I1
}
map E1 #lightgreen {
I1 *--> I1
}
map D1 #lightgreen {
I1 *--> I1
}
map C1 #lightgreen {
D1 *--> D1
}
map B1 #lightgreen {
C1 *-> C1
E1 *--> E1
H1*---> H1
}
map A1 #lightgreen {
B1 *--> B1
F1 *--> F1
C1 *--> C1
I1 *--> I1
}
}
package "Selected graph" {
map H2 #lightgreen {
}
map I2 #lightgreen {
}
map F2 #lightgrey {
I2 *--> I2
}
map E2 #lightgreen {
I2 *--> I2
}
map D2 #lightgreen {
I2 *--> I2
}
map C2 #lightgreen {
D2 *--> D2
}
map B2 #lightblue {
C2 *-> C2
E2 *--> E2
H2*---> H2
}
map A2 #lightgrey {
B2 *--> B2
F2 *--> F2
C2 *--> C2
I2 *--> I2
}
}
package "Broken graph" {
map H3 #lightgreen {
}
map I3 #black {
}
map F3 #red {
I3 *--> I3
}
map E3 #red {
I3 *--> I3
}
map D3 #red {
I3 *--> I3
}
map C3 #yellow {
D3 *--> D3
}
map B3 #yellow {
C3 *-> C3
E3 *--> E3
H3*---> H3
}
map A3 #yellow {
B3 *--> B3
F3 *--> F3
C3 *--> C3
I3 *--> I3
}
}

package "Failed graph" {
map H4 #lightgreen {
}
map I4 #lightgreen {
}
map F4 #red {
I4 *--> I4
}
map E4 #yellow {
I4 *--> I4
}
map D4 #yellow {
I4 *--> I4
}
map C4 #lightgrey {
D4 *--> D4
}
map B4 #lightgrey {
C4 *-> C4
E4 *--> E4
H4*---> H4
}
map A4 #lightgrey {
B4 *--> B4
F4 *--> F4
C4 *--> C4
I4 *--> I4
}
}
@enduml
2 changes: 2 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ include(
":tool:junit",
":tool:log",
":tool:log:format",
":tool:execution:parallel",
":tool:execution:synchronized",
)

plugins {
Expand Down
Loading