Skip to content

Commit

Permalink
Merge pull request #664 from soutaro/fork-if-available
Browse files Browse the repository at this point in the history
Fork to start worker processes
  • Loading branch information
soutaro authored Oct 24, 2022
2 parents 3b58aea + f9ebea4 commit cef6d63
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 60 deletions.
2 changes: 1 addition & 1 deletion lib/steep/drivers/check.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def run
server_reader = LanguageServer::Protocol::Transport::Io::Reader.new(server_read)
server_writer = LanguageServer::Protocol::Transport::Io::Writer.new(server_write)

typecheck_workers = Server::WorkerProcess.spawn_typecheck_workers(
typecheck_workers = Server::WorkerProcess.start_typecheck_workers(
steepfile: project.steepfile_path,
args: command_line_patterns,
delay_shutdown: true,
Expand Down
2 changes: 1 addition & 1 deletion lib/steep/drivers/checkfile.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def run

Steep.logger.info { "Starting #{count} workers for #{files.size} files..." }

typecheck_workers = Server::WorkerProcess.spawn_typecheck_workers(
typecheck_workers = Server::WorkerProcess.start_typecheck_workers(
steepfile: project.steepfile_path,
args: [],
delay_shutdown: true,
Expand Down
4 changes: 2 additions & 2 deletions lib/steep/drivers/langserver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def project
def run
@project = load_config()

interaction_worker = Server::WorkerProcess.spawn_worker(:interaction, name: "interaction", steepfile: project.steepfile_path, steep_command: jobs_option.steep_command_value)
typecheck_workers = Server::WorkerProcess.spawn_typecheck_workers(steepfile: project.steepfile_path, args: [], steep_command: jobs_option.steep_command_value, count: jobs_option.jobs_count_value)
interaction_worker = Server::WorkerProcess.start_worker(:interaction, name: "interaction", steepfile: project.steepfile_path, steep_command: jobs_option.steep_command_value)
typecheck_workers = Server::WorkerProcess.start_typecheck_workers(steepfile: project.steepfile_path, args: [], steep_command: jobs_option.steep_command_value, count: jobs_option.jobs_count_value)

master = Server::Master.new(
project: project,
Expand Down
2 changes: 1 addition & 1 deletion lib/steep/drivers/stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def run
server_reader = LanguageServer::Protocol::Transport::Io::Reader.new(server_read)
server_writer = LanguageServer::Protocol::Transport::Io::Writer.new(server_write)

typecheck_workers = Server::WorkerProcess.spawn_typecheck_workers(
typecheck_workers = Server::WorkerProcess.start_typecheck_workers(
steepfile: project.steepfile_path,
delay_shutdown: true,
args: command_line_patterns,
Expand Down
2 changes: 1 addition & 1 deletion lib/steep/drivers/watch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def run()
server_reader = LanguageServer::Protocol::Transport::Io::Reader.new(server_read)
server_writer = LanguageServer::Protocol::Transport::Io::Writer.new(server_write)

typecheck_workers = Server::WorkerProcess.spawn_typecheck_workers(steepfile: project.steepfile_path, args: dirs.map(&:to_s), steep_command: jobs_option.steep_command_value, count: jobs_option.jobs_count_value)
typecheck_workers = Server::WorkerProcess.start_typecheck_workers(steepfile: project.steepfile_path, args: dirs.map(&:to_s), steep_command: jobs_option.steep_command_value, count: jobs_option.jobs_count_value)

master = Server::Master.new(
project: project,
Expand Down
1 change: 0 additions & 1 deletion lib/steep/drivers/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module Drivers
class Worker
attr_reader :stdout, :stderr, :stdin

attr_accessor :steepfile_path
attr_accessor :worker_type
attr_accessor :worker_name
attr_accessor :delay_shutdown
Expand Down
5 changes: 4 additions & 1 deletion lib/steep/server/master.rb
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,10 @@ def start_type_check(request, last_request:, start_progress:)

Steep.logger.info "Sending $/typecheck/start notifications"
typecheck_workers.each do |worker|
assignment = Services::PathAssignment.new(max_index: typecheck_workers.size, index: worker.index)
assignment = Services::PathAssignment.new(
max_index: typecheck_workers.size,
index: worker.index || raise
)

job_queue << SendMessageJob.to_worker(
worker,
Expand Down
100 changes: 86 additions & 14 deletions lib/steep/server/worker_process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,120 @@ def initialize(reader:, writer:, stderr:, wait_thread:, name:, index: nil)
@index = index
end

def self.spawn_worker(type, name:, steepfile:, steep_command: "steep", options: [], delay_shutdown: false, index: nil)
def self.start_worker(type, name:, steepfile:, steep_command: "steep", index: nil, delay_shutdown: false, patterns: [])
begin
fork_worker(
type,
name: name,
steepfile: steepfile,
index: index,
delay_shutdown: delay_shutdown,
patterns: patterns
)
rescue NotImplementedError
spawn_worker(
type,
name: name,
steepfile: steepfile,
steep_command: steep_command,
index: index,
delay_shutdown: delay_shutdown,
patterns: patterns
)
end
end

def self.fork_worker(type, name:, steepfile:, index:, delay_shutdown:, patterns:)
stdin_in, stdin_out = IO.pipe
stdout_in, stdout_out = IO.pipe

worker = Drivers::Worker.new(stdout: stdout_out, stdin: stdin_in, stderr: STDERR)

worker.steepfile = steepfile
worker.worker_type = type
worker.worker_name = name
worker.delay_shutdown = delay_shutdown
if (max, this = index)
worker.max_index = max
worker.index = this
end
worker.commandline_args = patterns

pid = fork do
worker.run()
end

pid or raise

writer = LanguageServer::Protocol::Transport::Io::Writer.new(stdin_out)
reader = LanguageServer::Protocol::Transport::Io::Reader.new(stdout_in)

# @type var wait_thread: Thread & _ProcessWaitThread
wait_thread = _ = Thread.new { Process.waitpid(pid) }
wait_thread.define_singleton_method(:pid) { pid }

stdin_in.close
stdout_out.close

new(
reader: reader,
writer: writer,
stderr: STDERR,
wait_thread: wait_thread,
name: name,
index: index&.[](1)
)
end

def self.spawn_worker(type, name:, steepfile:, steep_command:, index:, delay_shutdown:, patterns:)
args = ["--name=#{name}", "--steepfile=#{steepfile}"]
args << (%w(debug info warn error fatal unknown)[Steep.logger.level].yield_self {|log_level| "--log-level=#{log_level}" })

if Steep.log_output.is_a?(String)
args << "--log-output=#{Steep.log_output}"
end

if (max, this = index)
args << "--max-index=#{max}"
args << "--index=#{this}"
end

if delay_shutdown
args << "--delay-shutdown"
end

command = case type
when :interaction
[steep_command, "worker", "--interaction", *args, *options]
[steep_command, "worker", "--interaction", *args, *patterns]
when :typecheck
[steep_command, "worker", "--typecheck", *args, *options]
[steep_command, "worker", "--typecheck", *args, *patterns]
else
raise "Unknown type: #{type}"
end

if delay_shutdown
command << "--delay-shutdown"
end

stdin, stdout, thread = if Gem.win_platform?
Open3.popen2(*command, new_pgroup: true)
__skip__ = Open3.popen2(*command, new_pgroup: true)
else
Open3.popen2(*command, pgroup: true)
__skip__ = Open3.popen2(*command, pgroup: true)
end
stderr = nil

writer = LanguageServer::Protocol::Transport::Io::Writer.new(stdin)
reader = LanguageServer::Protocol::Transport::Io::Reader.new(stdout)

new(reader: reader, writer: writer, stderr: stderr, wait_thread: thread, name: name, index: index)
new(reader: reader, writer: writer, stderr: stderr, wait_thread: thread, name: name, index: index&.[](1))
end

def self.spawn_typecheck_workers(steepfile:, args:, steep_command: "steep", count: [Etc.nprocessors - 1, 1].max, delay_shutdown: false)
def self.start_typecheck_workers(steepfile:, args:, steep_command: "steep", count: [Etc.nprocessors - 1, 1].max, delay_shutdown: false)
count.times.map do |i|
spawn_worker(
start_worker(
:typecheck,
name: "typecheck@#{i}",
steepfile: steepfile,
steep_command: steep_command,
options: ["--max-index=#{count}", "--index=#{i}", *args],
index: [count, i],
patterns: args,
delay_shutdown: delay_shutdown,
index: i
)
end
end
Expand Down
24 changes: 20 additions & 4 deletions rbs_collection.steep.lock.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ gems:
version: '0'
source:
type: stdlib
- name: rbs
version: 2.7.0
source:
type: rubygems
- name: dbm
version: '0'
source:
Expand Down Expand Up @@ -98,6 +102,22 @@ gems:
version: '0'
source:
type: stdlib
- name: pathname
version: '0'
source:
type: stdlib
- name: optparse
version: '0'
source:
type: stdlib
- name: tsort
version: '0'
source:
type: stdlib
- name: rdoc
version: '0'
source:
type: stdlib
- name: monitor
version: '0'
source:
Expand All @@ -118,10 +138,6 @@ gems:
version: '0'
source:
type: stdlib
- name: pathname
version: '0'
source:
type: stdlib
- name: forwardable
version: '0'
source:
Expand Down
1 change: 0 additions & 1 deletion rbs_collection.steep.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ gems:
ignore: true
- name: set
- name: rbs
ignore: true
- name: with_steep_types
ignore: true
- name: dbm
24 changes: 11 additions & 13 deletions sig/steep/drivers/worker.rbs
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
module Steep
module Drivers
class Worker
attr_reader stdout: untyped
attr_reader stdout: IO

attr_reader stderr: untyped
attr_reader stderr: IO

attr_reader stdin: untyped
attr_reader stdin: IO

attr_accessor steepfile_path: untyped
attr_accessor worker_type: Symbol

attr_accessor worker_type: untyped
attr_accessor worker_name: String

attr_accessor worker_name: untyped
attr_accessor delay_shutdown: bool

attr_accessor delay_shutdown: untyped
attr_accessor max_index: Integer

attr_accessor max_index: untyped
attr_accessor index: Integer

attr_accessor index: untyped

attr_accessor commandline_args: untyped
attr_accessor commandline_args: Array[String]

include Utils::DriverHelper

def initialize: (stdout: untyped, stderr: untyped, stdin: untyped) -> void
def initialize: (stdout: IO, stderr: IO, stdin: IO) -> void

def run: () -> 0
def run: () -> Integer
end
end
end
Loading

0 comments on commit cef6d63

Please sign in to comment.