Skip to content

Commit

Permalink
Start worker process by fork if available
Browse files Browse the repository at this point in the history
  • Loading branch information
soutaro committed Oct 24, 2022
1 parent 30a4baf commit f9ebea4
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 13 deletions.
5 changes: 4 additions & 1 deletion lib/steep/server/master.rb
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,10 @@ def start_type_check(request, last_request:, start_progress:)

Steep.logger.info "Sending $/typecheck/start notifications"
typecheck_workers.each do |worker|
assignment = Services::PathAssignment.new(max_index: typecheck_workers.size, index: worker.index)
assignment = Services::PathAssignment.new(
max_index: typecheck_workers.size,
index: worker.index || raise
)

job_queue << SendMessageJob.to_worker(
worker,
Expand Down
92 changes: 82 additions & 10 deletions lib/steep/server/worker_process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,97 @@ def initialize(reader:, writer:, stderr:, wait_thread:, name:, index: nil)
@index = index
end

def self.start_worker(type, name:, steepfile:, steep_command: "steep", options: [], delay_shutdown: false, index: nil)
def self.start_worker(type, name:, steepfile:, steep_command: "steep", index: nil, delay_shutdown: false, patterns: [])
begin
fork_worker(
type,
name: name,
steepfile: steepfile,
index: index,
delay_shutdown: delay_shutdown,
patterns: patterns
)
rescue NotImplementedError
spawn_worker(
type,
name: name,
steepfile: steepfile,
steep_command: steep_command,
index: index,
delay_shutdown: delay_shutdown,
patterns: patterns
)
end
end

def self.fork_worker(type, name:, steepfile:, index:, delay_shutdown:, patterns:)
stdin_in, stdin_out = IO.pipe
stdout_in, stdout_out = IO.pipe

worker = Drivers::Worker.new(stdout: stdout_out, stdin: stdin_in, stderr: STDERR)

worker.steepfile = steepfile
worker.worker_type = type
worker.worker_name = name
worker.delay_shutdown = delay_shutdown
if (max, this = index)
worker.max_index = max
worker.index = this
end
worker.commandline_args = patterns

pid = fork do
worker.run()
end

pid or raise

writer = LanguageServer::Protocol::Transport::Io::Writer.new(stdin_out)
reader = LanguageServer::Protocol::Transport::Io::Reader.new(stdout_in)

# @type var wait_thread: Thread & _ProcessWaitThread
wait_thread = _ = Thread.new { Process.waitpid(pid) }
wait_thread.define_singleton_method(:pid) { pid }

stdin_in.close
stdout_out.close

new(
reader: reader,
writer: writer,
stderr: STDERR,
wait_thread: wait_thread,
name: name,
index: index&.[](1)
)
end

def self.spawn_worker(type, name:, steepfile:, steep_command:, index:, delay_shutdown:, patterns:)
args = ["--name=#{name}", "--steepfile=#{steepfile}"]
args << (%w(debug info warn error fatal unknown)[Steep.logger.level].yield_self {|log_level| "--log-level=#{log_level}" })

if Steep.log_output.is_a?(String)
args << "--log-output=#{Steep.log_output}"
end

if (max, this = index)
args << "--max-index=#{max}"
args << "--index=#{this}"
end

if delay_shutdown
args << "--delay-shutdown"
end

command = case type
when :interaction
[steep_command, "worker", "--interaction", *args, *options]
[steep_command, "worker", "--interaction", *args, *patterns]
when :typecheck
[steep_command, "worker", "--typecheck", *args, *options]
[steep_command, "worker", "--typecheck", *args, *patterns]
else
raise "Unknown type: #{type}"
end

if delay_shutdown
command << "--delay-shutdown"
end

stdin, stdout, thread = if Gem.win_platform?
__skip__ = Open3.popen2(*command, new_pgroup: true)
else
Expand All @@ -47,7 +119,7 @@ def self.start_worker(type, name:, steepfile:, steep_command: "steep", options:
writer = LanguageServer::Protocol::Transport::Io::Writer.new(stdin)
reader = LanguageServer::Protocol::Transport::Io::Reader.new(stdout)

new(reader: reader, writer: writer, stderr: stderr, wait_thread: thread, name: name, index: index)
new(reader: reader, writer: writer, stderr: stderr, wait_thread: thread, name: name, index: index&.[](1))
end

def self.start_typecheck_workers(steepfile:, args:, steep_command: "steep", count: [Etc.nprocessors - 1, 1].max, delay_shutdown: false)
Expand All @@ -57,9 +129,9 @@ def self.start_typecheck_workers(steepfile:, args:, steep_command: "steep", coun
name: "typecheck@#{i}",
steepfile: steepfile,
steep_command: steep_command,
options: ["--max-index=#{count}", "--index=#{i}", *args],
index: [count, i],
patterns: args,
delay_shutdown: delay_shutdown,
index: i
)
end
end
Expand Down
23 changes: 21 additions & 2 deletions sig/steep/server/worker_process.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,28 @@ module Steep
name: String,
steepfile: Pathname,
?steep_command: ::String,
?options: Array[String],
?patterns: Array[String],
?delay_shutdown: bool,
?index: Integer?
?index: [Integer, Integer]?
) -> WorkerProcess

def self.fork_worker: (
worker_type `type`,
name: String,
steepfile: Pathname,
patterns: Array[String],
delay_shutdown: bool,
index: [Integer, Integer]?
) -> WorkerProcess

def self.spawn_worker: (
worker_type `type`,
name: String,
steepfile: Pathname,
steep_command: ::String,
patterns: Array[String],
delay_shutdown: bool,
index: [Integer, Integer]?
) -> WorkerProcess

def self.start_typecheck_workers: (
Expand Down

0 comments on commit f9ebea4

Please sign in to comment.