Skip to content

Commit

Permalink
Merge pull request #16 from nicklewis/support-large-results
Browse files Browse the repository at this point in the history
(maint) Avoid deadlock with large results
  • Loading branch information
samwoods1 authored Feb 3, 2017
2 parents 72d3263 + a5a9c17 commit 4d644d8
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 27 deletions.
58 changes: 31 additions & 27 deletions lib/in_parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,34 +126,37 @@ def self.wait_for_processes(proxy = self, binding = nil, timeout = nil, kill_all
kill_child_processes
raise_error = ::RuntimeError.new("Child process ran longer than timeout of #{timeout}")
end
@@process_infos.each do |process_info|
# wait up to half a second for each thread to see if it is complete, if not, check the next thread.
# returns immediately if the process has completed.
thr = process_info[:wait_thread].join(0.5)
unless thr.nil?
# the process completed, get the result and rethrow on error.
begin
# Print the STDOUT and STDERR for each process with signals for start and end
@@logger.info "------ Begin output for #{process_info[:method_sym]} - #{process_info[:pid]}"
# Content from the other thread will already be pre-pended with log stuff (info, warn, date/time, etc)
# So don't use logger, just use puts.
puts " " + File.new(process_info[:std_out], 'r').readlines.join(" ")
@@logger.info "------ Completed output for #{process_info[:method_sym]} - #{process_info[:pid]}"
result = process_info[:result].read
marshalled_result = (result.nil? || result.empty?) ? result : Marshal.load(result)
# Kill all other processes and let them log their stdout before re-raising
# if a child process raised an error.
if marshalled_result.is_a?(Exception)
raise_error = marshalled_result.dup
kill_child_processes if kill_all_on_error
marshalled_result = nil

if result = IO.select(@@process_infos.map {|p| p[:result]}, nil, nil, 0.5)
read_ios = result.first
read_ios.each do |reader|
process_info = @@process_infos.find {|p| p[:result] == reader}
process_info[:result_buffer] << reader.read
if reader.eof?
result = process_info[:result_buffer].string
# the process completed, get the result and rethrow on error.
begin
# Print the STDOUT and STDERR for each process with signals for start and end
@@logger.info "------ Begin output for #{process_info[:method_sym]} - #{process_info[:pid]}"
# Content from the other thread will already be pre-pended with log stuff (info, warn, date/time, etc)
# So don't use logger, just use puts.
puts " " + File.new(process_info[:std_out], 'r').readlines.join(" ")
@@logger.info "------ Completed output for #{process_info[:method_sym]} - #{process_info[:pid]}"
marshalled_result = (result.nil? || result.empty?) ? result : Marshal.load(result)
# Kill all other processes and let them log their stdout before re-raising
# if a child process raised an error.
if marshalled_result.is_a?(Exception)
raise_error = marshalled_result.dup
kill_child_processes if kill_all_on_error
marshalled_result = nil
end
results_map[process_info[:index]] = { process_info[:tmp_result] => marshalled_result }
ensure
File.delete(process_info[:std_out]) if File.exists?(process_info[:std_out])
# close the read end pipe
process_info[:result].close unless process_info[:result].closed?
@@process_infos.delete(process_info)
end
results_map[process_info[:index]] = { process_info[:tmp_result] => marshalled_result }
ensure
File.delete(process_info[:std_out]) if File.exists?(process_info[:std_out])
# close the read end pipe
process_info[:result].close unless process_info[:result].closed?
@@process_infos.delete(process_info)
end
end
end
Expand Down Expand Up @@ -235,6 +238,7 @@ def self._execute_in_parallel(method_sym, obj = self, &block)
:std_out => "tmp/pp_#{pid}",
:result => read_result,
:tmp_result => "unresolved_parallel_result_#{@@result_id}",
:result_buffer => StringIO.new,
:index => @@process_infos.count }
@@process_infos.push(process_info)
@@result_id += 1
Expand Down
13 changes: 13 additions & 0 deletions spec/in-paralell_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,19 @@ def raise_an_error
expect(@result_2).to eq({ :foo => "bar" })
end

it "should return large results" do
# 2**16 = 64k is typical buffer size
long_string = 'a' * (2**16+1)

expect do
run_in_parallel(timeout=1) do
@result = method_with_param(long_string)
end
end.not_to raise_error

expect(@result).to eq "bar + #{long_string}"
end

it "should return a singleton class value" do

run_in_parallel { @result = get_singleton_class }
Expand Down

0 comments on commit 4d644d8

Please sign in to comment.