Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seperate Socket and FileDescriptor IO #4707

Merged
merged 3 commits into from
Aug 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions src/concurrent/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,21 @@ class Scheduler
if flags.includes?(LibEvent2::EventFlags::Write)
fd_io.resume_write
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
fd_io.write_timed_out = true
fd_io.resume_write
fd_io.resume_write(timed_out: true)
end
end
event
end

def self.create_fd_write_event(sock : Socket, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Write
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
event = @@eb.new_event(sock.fd, flags, sock) do |s, flags, data|
sock_ref = data.as(Socket)
if flags.includes?(LibEvent2::EventFlags::Write)
sock_ref.resume_write
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
sock_ref.resume_write(timed_out: true)
end
end
event
Expand All @@ -51,8 +64,21 @@ class Scheduler
if flags.includes?(LibEvent2::EventFlags::Read)
fd_io.resume_read
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
fd_io.read_timed_out = true
fd_io.resume_read
fd_io.resume_read(timed_out: true)
end
end
event
end

def self.create_fd_read_event(sock : Socket, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Read
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
event = @@eb.new_event(sock.fd, flags, sock) do |s, flags, data|
sock_ref = data.as(Socket)
if flags.includes?(LibEvent2::EventFlags::Read)
sock_ref.resume_read
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
sock_ref.resume_read(timed_out: true)
end
end
event
Expand Down
36 changes: 10 additions & 26 deletions src/event.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,25 @@ module Event
@freed = false
end

def add
LibEvent2.event_add(@event, nil)
end

def add(timeout)
def add(timeout : LibC::Timeval? = nil)
if timeout
t = to_timeval(timeout)
LibEvent2.event_add(@event, pointerof(t))
timeout_copy = timeout
LibEvent2.event_add(@event, pointerof(timeout_copy))
else
add
LibEvent2.event_add(@event, nil)
end
end

def add(timeout : Time::Span)
seconds, remainder_ticks = timeout.ticks.divmod(Time::Span::TicksPerSecond)
timeval = LibC::Timeval.new(tv_sec: seconds, tv_usec: remainder_ticks / Time::Span::TicksPerMicrosecond)
add(timeval)
end

def free
LibEvent2.event_free(@event) unless @freed
@freed = true
end

private def to_timeval(time : Int)
t = uninitialized LibC::Timeval
t.tv_sec = typeof(t.tv_sec).new(time)
t.tv_usec = typeof(t.tv_usec).new(0)
t
end

private def to_timeval(time : Float)
t = uninitialized LibC::Timeval

seconds = typeof(t.tv_sec).new(time)
useconds = typeof(t.tv_usec).new((time - seconds) * 1e6)

t.tv_sec = seconds
t.tv_usec = useconds
t
end
end

# :nodoc:
Expand Down
6 changes: 5 additions & 1 deletion src/fiber.cr
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,16 @@ class Fiber
{% end %}
end

def sleep(time)
def sleep(time : Time::Span)
event = @resume_event ||= Scheduler.create_resume_event(self)
event.add(time)
Scheduler.reschedule
end

def sleep(time : Number)
sleep(time.seconds)
end

def yield
sleep(0)
end
Expand Down
157 changes: 13 additions & 144 deletions src/io/file_descriptor.cr
Original file line number Diff line number Diff line change
@@ -1,64 +1,22 @@
require "./syscall"
require "c/fcntl"

# An `IO` over a file descriptor.
class IO::FileDescriptor
include Buffered
include IO::Buffered
include IO::Syscall

@read_timeout : Float64?
@write_timeout : Float64?
@read_event : Event::Event?
@write_event : Event::Event?

# :nodoc:
property read_timed_out : Bool
property write_timed_out : Bool

def initialize(@fd : Int32, blocking = false, edge_triggerable = false)
@edge_triggerable = !!edge_triggerable
def initialize(@fd : Int32, blocking = false)
@closed = false
@read_timed_out = false
@write_timed_out = false
@fd = fd

unless blocking
self.blocking = false
if @edge_triggerable
@read_event = Scheduler.create_fd_read_event(self, @edge_triggerable)
@write_event = Scheduler.create_fd_write_event(self, @edge_triggerable)
end
end
end

# Set the number of seconds to wait when reading before raising an `IO::Timeout`.
def read_timeout=(read_timeout : Number)
@read_timeout = read_timeout.to_f
end

# ditto
def read_timeout=(read_timeout : Time::Span)
self.read_timeout = read_timeout.total_seconds
end

# Sets no timeout on read operations, so an `IO::Timeout` will never be raised.
def read_timeout=(read_timeout : Nil)
@read_timeout = nil
end

# Set the number of seconds to wait when writing before raising an `IO::Timeout`.
def write_timeout=(write_timeout : Number)
@write_timeout = write_timeout.to_f
end

# ditto
def write_timeout=(write_timeout : Time::Span)
self.write_timeout = write_timeout.total_seconds
end

# Sets no timeout on write operations, so an `IO::Timeout` will never be raised.
def write_timeout=(write_timeout : Nil)
@write_timeout = nil
end

def blocking
fcntl(LibC::F_GETFL) & LibC::O_NONBLOCK == 0
end
Expand Down Expand Up @@ -93,20 +51,6 @@ class IO::FileDescriptor
self.class.fcntl @fd, cmd, arg
end

# :nodoc:
def resume_read
if reader = @readers.try &.shift?
reader.resume
end
end

# :nodoc:
def resume_write
if writer = @writers.try &.shift?
writer.resume
end
end

def stat
if LibC.fstat(@fd, out stat) != 0
raise Errno.new("Unable to get stat")
Expand Down Expand Up @@ -238,97 +182,29 @@ class IO::FileDescriptor
end

private def unbuffered_read(slice : Bytes)
count = slice.size
loop do
bytes_read = LibC.read(@fd, slice.pointer(count).as(Void*), count)
if bytes_read != -1
return bytes_read
end

if Errno.value == Errno::EAGAIN
wait_readable
else
raise Errno.new "Error reading file"
end
end
ensure
if (readers = @readers) && !readers.empty?
add_read_event
read_syscall_helper(slice, "Error reading file") do
# `to_i32` is acceptable because `Slice#size` is a Int32
LibC.read(@fd, slice, slice.size).to_i32

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this explicit conversion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because read_syscall_helper is explicitly annotated to return i32.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but read_syscall_helper didn't exist before this refactor, so why does it need to be explicitly annotated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because IO#read should always return typeof(Bytes.new.size). That wasn't the case before.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it :). Thanks for bearing with me!

Copy link
Contributor Author

@RX14 RX14 Aug 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be fantastic if we could force return types from an abstract declaration. Returning anything other than i32 from read doesn't make sense, and a return value from write is similarly useless. But that's (literally) another issue.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we were discussing exactly that with @matiasgarciaisaia in relation to #4864).

end
end

private def unbuffered_write(slice : Bytes)
count = slice.size
total = count
loop do
bytes_written = LibC.write(@fd, slice.pointer(count).as(Void*), count)
if bytes_written != -1
count -= bytes_written
return total if count == 0
slice += bytes_written
else
if Errno.value == Errno::EAGAIN
wait_writable
next
elsif Errno.value == Errno::EBADF
write_syscall_helper(slice, "Error writing file") do |slice|
LibC.write(@fd, slice, slice.size).tap do |return_code|
if return_code == -1 && Errno.value == Errno::EBADF
raise IO::Error.new "File not open for writing"
else
raise Errno.new "Error writing file"
end
end
end
ensure
if (writers = @writers) && !writers.empty?
add_write_event
end
end

private def wait_readable
wait_readable { |err| raise err }
end

private def wait_readable
readers = (@readers ||= Deque(Fiber).new)
readers << Fiber.current
add_read_event
Scheduler.reschedule

if @read_timed_out
@read_timed_out = false
yield Timeout.new("Read timed out")
end

nil
end

private def add_read_event
return if @edge_triggerable
private def add_read_event(timeout = @read_timeout)
event = @read_event ||= Scheduler.create_fd_read_event(self)
event.add @read_timeout
nil
end

private def wait_writable(timeout = @write_timeout)
wait_writable(timeout: timeout) { |err| raise err }
end

# msg/timeout are overridden in nonblock_connect
private def wait_writable(msg = "Write timed out", timeout = @write_timeout)
writers = (@writers ||= Deque(Fiber).new)
writers << Fiber.current
add_write_event timeout
Scheduler.reschedule

if @write_timed_out
@write_timed_out = false
yield Timeout.new(msg)
end

event.add timeout
nil
end

private def add_write_event(timeout = @write_timeout)
return if @edge_triggerable
event = @write_event ||= Scheduler.create_fd_write_event(self)
event.add timeout
nil
Expand Down Expand Up @@ -358,15 +234,8 @@ class IO::FileDescriptor
@read_event = nil
@write_event.try &.free
@write_event = nil
if readers = @readers
Scheduler.enqueue readers
readers.clear
end

if writers = @writers
Scheduler.enqueue writers
writers.clear
end
reschedule_waiting

raise err if err
end
Expand Down
Loading