diff --git a/Library/Homebrew/cask/lib/hbc/system_command.rb b/Library/Homebrew/cask/lib/hbc/system_command.rb index 6414a9e807f1c..55a198b5d3f34 100644 --- a/Library/Homebrew/cask/lib/hbc/system_command.rb +++ b/Library/Homebrew/cask/lib/hbc/system_command.rb @@ -81,8 +81,9 @@ def each_output_line(&b) write_input_to(raw_stdin) raw_stdin.close_write - each_line_from [raw_stdout, raw_stderr], &b + ::Utils::IOSelector + .each_line_from(stdout: raw_stdout, stderr: raw_stderr, &b) @processed_status = raw_wait_thr.value end @@ -90,22 +91,6 @@ def write_input_to(raw_stdin) [*options[:input]].each { |line| raw_stdin.print line } end - def each_line_from(sources) - loop do - readable_sources = IO.select(sources)[0] - readable_sources.delete_if(&:eof?).first(1).each do |source| - type = ((source == sources[0]) ? :stdout : :stderr) - begin - yield(type, source.readline_nonblock || "") - rescue IO::WaitReadable, EOFError - next - end - end - break if readable_sources.empty? - end - sources.each(&:close_read) - end - def result Result.new(command, processed_output[:stdout], diff --git a/Library/Homebrew/test/spec_helper.rb b/Library/Homebrew/test/spec_helper.rb index 03b14720b8dfb..03425d857d9f6 100644 --- a/Library/Homebrew/test/spec_helper.rb +++ b/Library/Homebrew/test/spec_helper.rb @@ -3,6 +3,7 @@ require "rspec/its" require "rspec/wait" require "set" +require "English" if ENV["HOMEBREW_TESTS_COVERAGE"] require "simplecov" @@ -20,6 +21,7 @@ require "tap" require "test/support/helper/shutup" +require "test/support/helper/fake_curl" require "test/support/helper/fixtures" require "test/support/helper/formula" require "test/support/helper/mktmpdir" @@ -42,6 +44,7 @@ config.order = :random config.include(Test::Helper::Shutup) + config.include(Test::Helper::FakeCurl) config.include(Test::Helper::Fixtures) config.include(Test::Helper::Formula) config.include(Test::Helper::MkTmpDir) diff --git a/Library/Homebrew/test/support/fixtures/test.bin b/Library/Homebrew/test/support/fixtures/test.bin new file mode 100644 index 0000000000000..006fd44270a2f Binary files /dev/null and b/Library/Homebrew/test/support/fixtures/test.bin differ diff --git a/Library/Homebrew/test/support/helper/fake_curl.rb b/Library/Homebrew/test/support/helper/fake_curl.rb new file mode 100644 index 0000000000000..cc5aaad1a007d --- /dev/null +++ b/Library/Homebrew/test/support/helper/fake_curl.rb @@ -0,0 +1,25 @@ +require_relative "./mktmpdir" + +module Test + module Helper + module FakeCurl + def with_fake_curl(shell_command) + saved_value = ENV["HOMEBREW_CURL"] + + begin + mktmpdir do |path| + fake_curl_path = path/"fake_curl" + File.open(fake_curl_path, "w") do |file| + file.write("#! #{`which bash`}\n#{shell_command}\n") + end + FileUtils.chmod 0755, fake_curl_path + ENV["HOMEBREW_CURL"] = fake_curl_path + end + return yield + ensure + ENV["HOMEBREW_CURL"] = saved_value + end + end + end + end +end diff --git a/Library/Homebrew/test/utils/curl_spec.rb b/Library/Homebrew/test/utils/curl_spec.rb new file mode 100644 index 0000000000000..3533f2cb3fc39 --- /dev/null +++ b/Library/Homebrew/test/utils/curl_spec.rb @@ -0,0 +1,137 @@ +require "utils/curl" + +describe "in `utils/curl`" do + before(:example) { ENV["HOMEBREW_CURL"] = "" } + + describe "the `curl_args` method" do + let(:saved_travis_env) { ENV["TRAVIS"] } + before(:example) { ENV["TRAVIS"] = "1" } + + describe "given no options" do + subject { curl_args } + + it { is_expected.to be_an(Array) } + + its(:size) { is_expected.to eq(8) } + + it { is_expected.to start_with("/usr/bin/curl") } + + it { + is_expected.to contain_exactly( + "/usr/bin/curl", + "--remote-time", + "--location", + "--user-agent", + HOMEBREW_USER_AGENT_CURL, + "--progress-bar", + "--fail", + "--silent", + ) + } + end + + describe "with the `user_agent` option set to `:browser`" do + subject { curl_args(user_agent: :browser) } + + it { is_expected.to be_an(Array) } + + its(:size) { is_expected.to eq(8) } + + it { is_expected.to start_with("/usr/bin/curl") } + + it { + is_expected.to contain_exactly( + "/usr/bin/curl", + "--remote-time", + "--location", + "--user-agent", + HOMEBREW_USER_AGENT_FAKE_SAFARI, + "--progress-bar", + "--fail", + "--silent", + ) + } + end + + describe "with the `show_output` option enabled" do + subject { curl_args(show_output: true) } + + it { is_expected.to be_an(Array) } + + its(:size) { is_expected.to eq(5) } + + it { is_expected.to start_with("/usr/bin/curl") } + + it { + is_expected.to contain_exactly( + "/usr/bin/curl", + "--remote-time", + "--location", + "--user-agent", + HOMEBREW_USER_AGENT_CURL, + ) + } + end + + after(:example) { ENV["TRAVIS"] = saved_travis_env } + end + + describe "the `curl_output` method" do + describe "when curl outputs something on STDOUT only" do + subject { with_fake_curl("seq 1 5") { curl_output } } + + it { is_expected.to be_an(Array) } + its(:size) { is_expected.to eq(3) } + + its([0]) { is_expected.to eq([1, 2, 3, 4, 5, nil].join("\n")) } + its([1]) { is_expected.to be_empty } + its([2]) { is_expected.to be_a_success } + end + + describe "when curl outputs something on STDERR only" do + subject { with_fake_curl("seq 1 5 >&2") { curl_output } } + + it { is_expected.to be_an(Array) } + its(:size) { is_expected.to eq(3) } + + its([0]) { is_expected.to be_empty } + its([1]) { is_expected.to eq([1, 2, 3, 4, 5, nil].join("\n")) } + its([2]) { is_expected.to be_a_success } + end + + describe "when curl outputs something on STDOUT and STDERR" do + subject { + with_fake_curl(<<-EOF.undent) { curl_output } + for i in $(seq 1 2 5); do + echo $i; echo $(($i + 1)) >&2 + done + EOF + } + + it { is_expected.to be_an(Array) } + its(:size) { is_expected.to eq(3) } + + its([0]) { is_expected.to eq([1, 3, 5, nil].join("\n")) } + its([1]) { is_expected.to eq([2, 4, 6, nil].join("\n")) } + its([2]) { is_expected.to be_a_success } + end + + describe "with a very long STDERR output" do + let(:shell_command) { + <<-EOF.undent + for i in $(seq 1 2 100000); do + echo $i; echo $(($i + 1)) >&2 + done + EOF + } + + it "returns without deadlocking" do + wait(15).for { + with_fake_curl(shell_command) { curl_output } + }.to end_with(an_object_satisfying(&:success?)) + end + end + end + + after(:example) { ENV["HOMEBREW_CURL"] = "" } +end diff --git a/Library/Homebrew/test/utils/io_selector_spec.rb b/Library/Homebrew/test/utils/io_selector_spec.rb new file mode 100644 index 0000000000000..24207eb91aa3e --- /dev/null +++ b/Library/Homebrew/test/utils/io_selector_spec.rb @@ -0,0 +1,302 @@ +require "utils/io_selector" + +describe Utils::IOSelector do + describe "given text streams" do + let(:first_pipe) { IO.pipe } + let(:second_pipe) { IO.pipe } + + let(:first_reader) { first_pipe[0] } + let(:second_reader) { second_pipe[0] } + + let(:first_writer) { first_pipe[1] } + let(:second_writer) { second_pipe[1] } + + let(:queues) { { first: Queue.new, second: Queue.new } } + + let(:write_first!) do + thread = Thread.new(first_writer, queues[:second]) do |io, queue| + io.puts "Lorem" + wait(1).for { queue.pop }.to end_with("\n") + io.puts "dolor" + io.close + end + thread.abort_on_exception = true + thread + end + + let(:write_second!) do + thread = Thread.new(second_writer, queues[:first]) do |io, queue| + wait(1).for { queue.pop }.to end_with("\n") + io.puts "ipsum" + wait(1).for { queue.pop }.to end_with("\n") + io.puts "sit" + io.puts "amet" + io.close + end + thread.abort_on_exception = true + thread + end + + let(:queue_feeder) do + lambda do |proc_under_test, *args, &block| + proc_under_test.call(*args) do |tag, string_received| + queues[tag] << string_received + block.call(tag, string_received) + end + end + end + + before do + allow_any_instance_of(Utils::IOSelector) + .to receive(:each_line_nonblock) + .and_wrap_original do |*args, &block| + queue_feeder.call(*args, &block) + end + + write_first! + write_second! + end + + after do + write_first!.exit + write_second!.exit + end + + describe "::each_line_from" do + subject do + line_hash = { first: "", second: "", full_text: "" } + + Utils::IOSelector.each_line_from( + first: first_reader, + second: second_reader, + ) do |tag, string_received| + line_hash[tag] << string_received + line_hash[:full_text] << string_received + end + line_hash + end + + before { wait(1).for(subject) } + + its([:first]) { is_expected.to eq("Lorem\ndolor\n") } + its([:second]) { is_expected.to eq("ipsum\nsit\namet\n") } + + its([:full_text]) { + is_expected.to eq("Lorem\nipsum\ndolor\nsit\namet\n") + } + end + + describe "::new" do + let(:selector) do + Utils::IOSelector.new( + first: first_reader, + second: second_reader, + ) + end + subject { selector } + + describe "pre-read" do + its(:pending_streams) { + are_expected.to eq([first_reader, second_reader]) + } + + its(:separator) { + is_expected.to eq($INPUT_RECORD_SEPARATOR) + } + end + + describe "post-read" do + before do + wait(1).for { + subject.each_line_nonblock {} + true + }.to be true + end + + after { expect(selector.all_streams).to all be_closed } + + its(:pending_streams) { are_expected.to be_empty } + its(:separator) { + is_expected.to eq($INPUT_RECORD_SEPARATOR) + } + end + + describe "#each_line_nonblock" do + subject do + line_hash = { first: "", second: "", full_text: "" } + super().each_line_nonblock do |tag, string_received| + line_hash[tag] << string_received + line_hash[:full_text] << string_received + end + line_hash + end + + before { wait(1).for(subject) } + after { expect(selector.all_streams).to all be_closed } + + its([:first]) { is_expected.to eq("Lorem\ndolor\n") } + its([:second]) { is_expected.to eq("ipsum\nsit\namet\n") } + + its([:full_text]) { + is_expected.to eq("Lorem\nipsum\ndolor\nsit\namet\n") + } + end + + its(:all_streams) { + are_expected.to eq([first_reader, second_reader]) + } + + its(:all_tags) { are_expected.to eq([:first, :second]) } + + describe "#tag_of" do + subject do + { + "first tag" => super().tag_of(first_reader), + "second tag" => super().tag_of(second_reader), + } + end + + its(["first tag"]) { is_expected.to eq(:first) } + its(["second tag"]) { is_expected.to eq(:second) } + end + end + + describe "::new with a custom separator" do + subject do + tagged_streams = { + first: first_reader, + second: second_reader, + } + Utils::IOSelector.new(tagged_streams, ",") + end + + its(:separator) { is_expected.to eq(",") } + end + end + + describe "given binary streams" do + let(:pathname) { Pathname.new("#{TEST_FIXTURE_DIR}/test.bin") } + let(:first_reader) { File.open(pathname) } + let(:second_reader) { File.open(pathname) } + + describe "::each_chunk_from" do + subject do + blob_hash = { 0 => "".b, 1 => "".b } + merged_bytes = [] + + Utils::IOSelector.each_chunk_from( + [first_reader, second_reader], + 0x1000, + ) do |tag, string_received| + blob_hash[tag] << string_received + merged_bytes.concat(string_received.bytes) + end + + blob_hash[:merged_blob] = merged_bytes.sort!.pack("c*") + blob_hash + end + + before { wait(1).for(subject) } + + its([0]) { + is_expected + .to eq(Array.new(0x1002) { |n| n % 0x100 }.pack("c*")) + } + + its([1]) { + is_expected + .to eq(Array.new(0x1002) { |n| n % 0x100 }.pack("c*")) + } + + its([:merged_blob]) { + is_expected.to eq(0x1002.times + .flat_map { |n| [n % 0x100] * 2 } + .sort + .pack("c*")) + } + end + + describe "::new" do + let(:selector) do + Utils::IOSelector.new([first_reader, second_reader], nil) + end + subject { selector } + + describe "pre-read" do + its(:pending_streams) { + are_expected.to eq([first_reader, second_reader]) + } + + its(:separator) { is_expected.to be_nil } + end + + describe "post-read" do + before do + wait(1).for { + subject.each_chunk_nonblock(0x1234) {} + true + }.to be true + end + + after { expect(selector.all_streams).to all be_closed } + + its(:pending_streams) { are_expected.to be_empty } + its(:separator) { is_expected.to be_nil } + end + + describe "#each_chunk_nonblock" do + subject do + blob_hash = { 0 => "".b, 1 => "".b } + merged_bytes = [] + + super() + .each_chunk_nonblock(0x801) do |tag, string_received| + blob_hash[tag] << string_received + merged_bytes.concat(string_received.bytes) + end + + blob_hash[:merged_blob] = merged_bytes.sort!.pack("c*") + blob_hash + end + + before { wait(1).for(subject) } + after { expect(selector.all_streams).to all be_closed } + + its([0]) { + is_expected + .to eq(Array.new(0x1002) { |n| n % 0x100 }.pack("c*")) + } + + its([1]) { + is_expected + .to eq(Array.new(0x1002) { |n| n % 0x100 }.pack("c*")) + } + + its([:merged_blob]) { + is_expected.to eq(0x1002.times + .flat_map { |n| [n % 0x100] * 2 } + .sort + .pack("c*")) + } + end + + its(:all_streams) { + are_expected.to eq([first_reader, second_reader]) + } + + its(:all_tags) { are_expected.to eq([0, 1]) } + + describe "#tag_of" do + subject do + { + "first tag" => super().tag_of(first_reader), + "second tag" => super().tag_of(second_reader), + } + end + + its(["first tag"]) { is_expected.to eq(0) } + its(["second tag"]) { is_expected.to eq(1) } + end + end + end +end diff --git a/Library/Homebrew/utils.rb b/Library/Homebrew/utils.rb index 85305c01ae1fd..f0193b3780b0e 100644 --- a/Library/Homebrew/utils.rb +++ b/Library/Homebrew/utils.rb @@ -9,6 +9,7 @@ require "utils/github" require "utils/hash" require "utils/inreplace" +require "utils/io_selector" require "utils/link" require "utils/popen" require "utils/svn" diff --git a/Library/Homebrew/utils/curl.rb b/Library/Homebrew/utils/curl.rb index 5a40ae8469a94..83878e625bc9f 100644 --- a/Library/Homebrew/utils/curl.rb +++ b/Library/Homebrew/utils/curl.rb @@ -37,6 +37,8 @@ def curl(*args) def curl_output(*args) curl_args = curl_args(extra_args: args, show_output: true) Open3.popen3(*curl_args) do |_, stdout, stderr, wait_thread| - [stdout.read, stderr.read, wait_thread.value] + selector = Utils::IOSelector.new([stdout, stderr]) + result = selector.binread_nonblock.values + result << wait_thread.value end end diff --git a/Library/Homebrew/utils/io_selector.rb b/Library/Homebrew/utils/io_selector.rb new file mode 100644 index 0000000000000..7c1e4f11e75d2 --- /dev/null +++ b/Library/Homebrew/utils/io_selector.rb @@ -0,0 +1,117 @@ +require "delegate" +require "English" + +require "extend/io" + +module Utils + # + # The class `IOSelector` is a wrapper for `IO::select` with the + # added benefit that it spans the streams' lifetimes. + # + # The class accepts multiple IOs which must be open for reading. + # It then notifies the client as data becomes available + # per-stream. + # + # Its main use is to allow a client to read both `stdout` and + # `stderr` of a subprocess in a way that avoids buffer-related + # deadlocks. + # + # For a more in-depth explanation, see: + # https://github.com/Homebrew/brew/pull/2466 + # + class IOSelector < DelegateClass(Hash) + DEFAULT_BUFFER_SIZE = 0x1000 + + attr_reader :separator + + alias all_streams keys + alias all_tags values + alias tag_of fetch + + def self.binread_nonblock_from(streams, + buffer_size = DEFAULT_BUFFER_SIZE) + new(streams, nil).binread_nonblock(buffer_size) + end + + def self.each_line_from(streams = {}, + separator = $INPUT_RECORD_SEPARATOR, &block) + new(streams, separator).each_line_nonblock(&block) + end + + def self.each_chunk_from(streams, maxlen, outbuf = nil, &block) + selector = new(streams, nil) + selector.each_chunk_nonblock(maxlen, outbuf, &block) + end + + def initialize(streams = {}, + separator = $INPUT_RECORD_SEPARATOR) + unless streams.is_a?(Hash) + streams = Hash[streams.each_with_index.to_a.map(&:reverse)] + end + super(streams.invert.compare_by_identity) + @separator = separator + end + + def binread_nonblock(buffer_size = DEFAULT_BUFFER_SIZE) + chunk_buffer = "".b + with_tagged_buffers("".b) do |result_buffers| + each_chunk_nonblock(buffer_size, chunk_buffer) do |tag| + result_buffers[tag] << chunk_buffer + end + end + end + + def each_line_nonblock + each_readable_stream_until_eof do |stream| + line = stream.readline_nonblock(separator) || "" + yield(tag_of(stream), line) + end + close_streams + end + + def each_chunk_nonblock(maxlen, outbuf = nil) + each_readable_stream_until_eof do |stream| + chunk = stream.read_nonblock(maxlen, outbuf) || "".b + yield(tag_of(stream), chunk) + end + close_streams + end + + def pending_streams + @pending_streams ||= all_streams.dup + end + + private + + def each_readable_stream_until_eof(&block) + loop do + readable_streams.each do |stream| + pending_streams.delete(stream) if stream.eof? + yield_gracefully(stream, &block) + end + break if pending_streams.empty? + end + end + + def readable_streams + IO.select(pending_streams)[0] + end + + def yield_gracefully(stream) + yield(stream) + rescue IO::WaitReadable, IO::WaitWritable, EOFError + # We'll be back until/unless EOF + return + end + + def with_tagged_buffers(value) + tagged_buffers = Hash[all_tags.map { |tag| [tag, value.dup] }] + yield(tagged_buffers) + tagged_buffers + end + + def close_streams + all_streams.each(&:close_read) + end + end +end