Skip to content

Commit

Permalink
Add FileDescriptorIO timeouts.
Browse files Browse the repository at this point in the history
  New properties read_timeout, write_timeout
  TCPSocket & UDPSocket accept dns_timeout and connect_timeout in initialize.
  All IO switched to level triggered.
  • Loading branch information
technorama committed Aug 24, 2015
1 parent 2312184 commit fe3ccc5
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 94 deletions.
7 changes: 2 additions & 5 deletions spec/std/file_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,10 @@ describe "File" do
file.closed?.should be_true
end

it "raises when closing twice" do
it "should not raise when closing twice" do
file = File.new(__FILE__)
file.close

expect_raises IO::Error, /closed stream/ do
file.close
end
file.close
end

it "does to_s when closed" do
Expand Down
20 changes: 19 additions & 1 deletion spec/std/socket_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,24 @@ describe "UNIXSocket" do
end
end

it "tests read and write timeouts" do
UNIXSocket.pair do |left, right|
# BUG: shrink the socket buffers first
left.write_timeout = 0.0001
right.read_timeout = 0.0001
buf = ("a" * 4096).to_slice

expect_raises(IO::Timeout, "write timed out") do
loop { left.write buf }
end

expect_raises(IO::Timeout, "read timed out") do
loop { right.read buf }
end
end
end


it "creates the socket file" do
path = "/tmp/crystal-test-unix-sock"

Expand Down Expand Up @@ -89,7 +107,7 @@ describe "TCPSocket" do
end

it "fails when host doesn't exist" do
expect_raises(SocketError, /^getaddrinfo: (.+ not known|no address .+)$/i) do
expect_raises(SocketError, /^getaddrinfo: (.+ not known|no address .+|Non-recoverable failure in name resolution)$/i) do
TCPSocket.new("localhostttttt", 12345)
end
end
Expand Down
29 changes: 10 additions & 19 deletions src/concurrent/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -32,42 +32,33 @@ class Scheduler
event.free
end

def self.create_fd_events(io : FileDescriptorIO)
flags = LibEvent2::EventFlags::Read | LibEvent2::EventFlags::Write | LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET
event = @@eb.new_event(io.fd, flags, io) do |s, flags, data|
fd_io = data as FileDescriptorIO
if flags.includes?(LibEvent2::EventFlags::Read)
fd_io.resume_read
end
if flags.includes?(LibEvent2::EventFlags::Write)
fd_io.resume_write
end
end
event.add
event
end

def self.create_fd_write_event(io : FileDescriptorIO)
def self.create_fd_write_event(io : FileDescriptorIO, edge_triggered = false : Bool)
flags = LibEvent2::EventFlags::Write
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
event = @@eb.new_event(io.fd, flags, io) do |s, flags, data|
fd_io = data as FileDescriptorIO
if flags.includes?(LibEvent2::EventFlags::Write)
fd_io.resume_write
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
fd_io.write_timed_out = true
fd_io.resume_write
end
end
event.add
event
end

def self.create_fd_read_event(io : FileDescriptorIO)
def self.create_fd_read_event(io : FileDescriptorIO, edge_triggered = false : Bool)
flags = LibEvent2::EventFlags::Read
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
event = @@eb.new_event(io.fd, flags, io) do |s, flags, data|
fd_io = data as FileDescriptorIO
if flags.includes?(LibEvent2::EventFlags::Read)
fd_io.resume_read
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
fd_io.read_timed_out = true
fd_io.resume_read
end
end
event.add
event
end

Expand Down
16 changes: 13 additions & 3 deletions src/event/event.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,29 @@ module Event
# :nodoc:
struct Event
def initialize(@event)
@freed = false
end

def add
LibEvent2.event_add(@event, nil)
end

def add(timeout)
t = to_timeval(timeout)
LibEvent2.event_add(@event, pointerof(t))
if timeout
t = to_timeval(timeout)
LibEvent2.event_add(@event, pointerof(t))
else
add
end
end

def free
LibEvent2.event_free(@event)
LibEvent2.event_free(@event) unless @freed
@freed = true
end

def finalize
free
end

private def to_timeval(time : Int)
Expand Down
5 changes: 3 additions & 2 deletions src/event/lib_event2.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require "socket/addrinfo"
require "socket/libc"

@[Link("rt")] ifdef linux
@[Link("event")]
Expand Down Expand Up @@ -43,7 +43,8 @@ lib LibEvent2
fun event_new(eb : EventBase, s : EvutilSocketT, events : EventFlags, callback : Callback, data : Void*) : Event
fun event_free(event : Event)
fun event_add(event : Event, timeout : LibC::TimeVal*) : Int

fun event_add(event : Event, timeout : LibC::TimeVal*) : Int
fun event_del(event : Event) : Int

type DnsBase = Void*

Expand Down
3 changes: 3 additions & 0 deletions src/io.cr
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ lib LibC
end

module IO
class Timeout < Exception
end

def self.select(read_ios, write_ios = nil, error_ios = nil)
select(read_ios, write_ios, error_ios, nil).not_nil!
end
Expand Down
125 changes: 93 additions & 32 deletions src/io/file_descriptor_io.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,34 @@ class FileDescriptorIO
property? flush_on_newline
property? sync

def initialize(fd, blocking = false, edge_triggerable = true)
# seconds to wait when reading before raising IO::Timeout
property read_timeout
# seconds to wait when writing before raising IO::Timeout
property write_timeout
# :nodoc:
property read_timed_out, write_timed_out # only used in event callbacks

def initialize(fd, blocking = false, edge_triggerable = false)
@edge_triggerable = !!edge_triggerable
@flush_on_newline = false
@sync = false
@closed = false
@read_timed_out = false
@write_timed_out = false
@fd = fd
@in_buffer_rem = Slice.new(Pointer(UInt8).null, 0)
@out_count = 0
@read_timeout = nil
@write_timeout = nil
@readers = [] of Fiber
@writers = [] of Fiber

unless blocking
self.blocking = false
if @edge_triggerable
@event = Scheduler.create_fd_events(self)
@read_event = Scheduler.create_fd_read_event(self, @edge_triggerable)
@write_event = Scheduler.create_fd_write_event(self, @edge_triggerable)
end
@readers = [] of Fiber
@writers = [] of Fiber
end
end

Expand Down Expand Up @@ -64,12 +76,14 @@ class FileDescriptorIO
self.class.fcntl @fd, cmd, arg
end

# :nodoc:
def resume_read
if reader = readers.pop?
reader.resume
end
end

# :nodoc:
def resume_write
if writer = writers.pop?
writer.resume
Expand Down Expand Up @@ -102,6 +116,21 @@ class FileDescriptorIO
close rescue nil
end

def close
return if closed?

super()

@read_event.try &.free
@read_event = nil
@write_event.try &.free
@write_event = nil
Scheduler.enqueue @readers
@readers.clear
Scheduler.enqueue @writers
@writers.clear
end

def closed?
@closed
end
Expand All @@ -128,56 +157,88 @@ class FileDescriptorIO
private def unbuffered_read(slice : Slice(UInt8), count)
loop do
bytes_read = LibC.read(@fd, slice.pointer(count), LibC::SizeT.cast(count))
if bytes_read == -1
if LibC.errno == Errno::EAGAIN
wait_readable
else
raise Errno.new "Error reading file"
end
else
if bytes_read != -1
return bytes_read
end
end
end

private def wait_readable
readers << Fiber.current
if @edge_triggerable
Scheduler.reschedule
else
event = Scheduler.create_fd_read_event(self)
Scheduler.reschedule
event.free
if LibC.errno == Errno::EAGAIN
wait_readable
else
raise Errno.new "Error reading file"
end
end
ensure
add_read_event unless readers.empty?
end

private def unbuffered_write(slice : Slice(UInt8), count)
total = count
loop do
bytes_written = LibC.write(@fd, slice.pointer(count), LibC::SizeT.cast(count))
if bytes_written == -1
if bytes_written != -1
count -= bytes_written
return total if count == 0
slice += bytes_written
else
if LibC.errno == Errno::EAGAIN
wait_writable
next
else
raise Errno.new "Error writing file"
end
end
count -= bytes_written
return total if count == 0
slice += bytes_written
end
ensure
add_write_event unless writers.empty?
end

private def wait_readable
wait_readable { |err| raise err }
end

private def wait_readable
readers << Fiber.current
add_read_event
Scheduler.reschedule

if @read_timed_out
@read_timed_out = false
yield Timeout.new("read timed out")
end

nil
end

private def wait_writable
private def add_read_event
return if @edge_triggerable
event = @read_event ||= Scheduler.create_fd_read_event(self)
event.add @read_timeout
nil
end

private def wait_writable timeout = @write_timeout
wait_writable(timeout: timeout) { |err| raise err }
end

# msg/timeout are overridden in nonblock_connect
private def wait_writable msg = "write timed out", timeout = @write_timeout
writers << Fiber.current
if @edge_triggerable
Scheduler.reschedule
else
event = Scheduler.create_fd_write_event(self)
Scheduler.reschedule
event.free
add_write_event timeout
Scheduler.reschedule

if @write_timed_out
@write_timed_out = false
yield Timeout.new(msg)
end

nil
end

private def add_write_event timeout = @write_timeout
return if @edge_triggerable
event = @write_event ||= Scheduler.create_fd_write_event(self)
event.add timeout
nil
end

private def unbuffered_rewind
Expand Down
6 changes: 3 additions & 3 deletions src/kernel.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
STDIN = FileDescriptorIO.new(0, blocking: LibC.isatty(0) == 0, edge_triggerable: ifdef darwin; false; else; true; end)
STDOUT = (FileDescriptorIO.new(1, blocking: LibC.isatty(1) == 0, edge_triggerable: ifdef darwin; false; else; true; end)).tap { |f| f.flush_on_newline = true }
STDERR = FileDescriptorIO.new(2, blocking: LibC.isatty(2) == 0, edge_triggerable: ifdef darwin; false; else; true; end)
STDIN = FileDescriptorIO.new(0, blocking: LibC.isatty(0) == 0)
STDOUT = (FileDescriptorIO.new(1, blocking: LibC.isatty(1) == 0)).tap { |f| f.flush_on_newline = true }
STDERR = FileDescriptorIO.new(2, blocking: LibC.isatty(2) == 0)

PROGRAM_NAME = String.new(ARGV_UNSAFE.value)
ARGV = (ARGV_UNSAFE + 1).to_slice(ARGC_UNSAFE - 1).map { |c_str| String.new(c_str) }
Expand Down
Loading

0 comments on commit fe3ccc5

Please sign in to comment.