From 7d64756f30f10c7dcda3cd72f39621f0ff1c2930 Mon Sep 17 00:00:00 2001 From: Florin Lipan Date: Fri, 6 Apr 2018 19:56:50 +0300 Subject: [PATCH] Re-raise exceptions in parallel macro (#5726) --- spec/std/concurrent_spec.cr | 73 +++++++++++++++++++++++++------------ src/concurrent.cr | 62 +++++++++++++++++++++++++++++-- 2 files changed, 109 insertions(+), 26 deletions(-) diff --git a/spec/std/concurrent_spec.cr b/spec/std/concurrent_spec.cr index 1fd0a67c1799..f8d6a44e58f7 100644 --- a/spec/std/concurrent_spec.cr +++ b/spec/std/concurrent_spec.cr @@ -8,38 +8,65 @@ private def method_named(expected_named) Fiber.current.name.should eq(expected_named) end +class SomeParallelJobException < Exception +end + +private def raising_job : String + raise SomeParallelJobException.new("boom") + "result" +end + describe "concurrent" do - it "does four things concurrently" do - a, b, c, d = parallel(1 + 2, "hello".size, [1, 2, 3, 4].size, nil) - a.should eq(3) - b.should eq(5) - c.should eq(4) - d.should be_nil - end + describe "parallel" do + it "does four things concurrently" do + a, b, c, d = parallel(1 + 2, "hello".size, [1, 2, 3, 4].size, nil) + a.should eq(3) + b.should eq(5) + c.should eq(4) + d.should be_nil + end - it "uses spawn macro" do - chan = Channel(Int32).new + it "re-raises errors from Fibers as ConcurrentExecutionException" do + exception = expect_raises(ConcurrentExecutionException) do + a, b = parallel(raising_job, raising_job) + end - spawn method_with_named_args(chan) - chan.receive.should eq(3) + exception.cause.should be_a(SomeParallelJobException) + end - spawn method_with_named_args(chan, y: 20) - chan.receive.should eq(21) + it "is strict about the return value type" do + a, b = parallel(1 + 2, "hello") - spawn method_with_named_args(chan, x: 10, y: 20) - chan.receive.should eq(30) + typeof(a).should eq(Int32) + typeof(b).should eq(String) + end end - it "spawns named" do - spawn(name: "sub") do - Fiber.current.name.should eq("sub") + describe "spawn" do + it "uses spawn macro" do + chan = Channel(Int32).new + + spawn method_with_named_args(chan) + chan.receive.should eq(3) + + spawn method_with_named_args(chan, y: 20) + chan.receive.should eq(21) + + spawn method_with_named_args(chan, x: 10, y: 20) + chan.receive.should eq(30) end - Fiber.yield - end - it "spawns named with macro" do - spawn method_named("foo"), name: "foo" - Fiber.yield + it "spawns named" do + spawn(name: "sub") do + Fiber.current.name.should eq("sub") + end + Fiber.yield + end + + it "spawns named with macro" do + spawn method_named("foo"), name: "foo" + Fiber.yield + end end it "accepts method call with receiver" do diff --git a/src/concurrent.cr b/src/concurrent.cr index 20680ccefb29..4482507d803b 100644 --- a/src/concurrent.cr +++ b/src/concurrent.cr @@ -128,21 +128,77 @@ macro spawn(call, *, name = nil) {% end %} end +# Wraps around exceptions re-raised from concurrent calls. +# The original exception can be accessed via `#cause`. +class ConcurrentExecutionException < Exception +end + +# Runs the commands passed as arguments concurrently (in Fibers) and waits +# for them to finish. +# +# ``` +# def say(word) +# puts word +# end +# +# # Will print out the three words concurrently +# parallel( +# say("concurrency"), +# say("is"), +# say("easy") +# ) +# ``` +# +# Can also be used to conveniently collect the return values of the +# concurrent operations. +# +# ``` +# def concurrent_job(word) +# word +# end +# +# a, b, c = +# parallel( +# concurrent_job("concurrency"), +# concurrent_job("is"), +# concurrent_job("easy") +# ) +# +# puts a # => "concurrency" +# puts b # => "is" +# puts c # => "easy" +# ``` +# +# Due to the concurrent nature of this macro, it is highly recommended +# to handle any exceptions within the concurrent calls. Unhandled +# exceptions raised within the concurrent operations will be re-raised +# inside the parent fiber as `ConcurrentExecutionException`, with the +# `cause` attribute set to the original exception. macro parallel(*jobs) - %channel = Channel(Nil).new + %channel = Channel(Exception | Nil).new {% for job, i in jobs %} %ret{i} = uninitialized typeof({{job}}) spawn do begin %ret{i} = {{job}} - ensure + rescue e : Exception + %channel.send e + else %channel.send nil end end {% end %} - {{ jobs.size }}.times { %channel.receive } + {{ jobs.size }}.times do + %value = %channel.receive + if %value.is_a?(Exception) + raise ConcurrentExecutionException.new( + "An unhandled error occured inside a `parallel` call", + cause: %value + ) + end + end { {% for job, i in jobs %}