Skip to content

Commit

Permalink
Add EventLoop::Socket module (#14643)
Browse files Browse the repository at this point in the history
  • Loading branch information
straight-shoota authored Jun 2, 2024
1 parent cf570b8 commit 135e908
Show file tree
Hide file tree
Showing 11 changed files with 333 additions and 224 deletions.
9 changes: 9 additions & 0 deletions src/crystal/system/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ abstract class Crystal::EventLoop
end
end

abstract class Crystal::EventLoop
# The socket module is empty by default and filled with abstract defs when
# crystal/system/socket.cr is required.
module Socket
end

include Socket
end

{% if flag?(:wasi) %}
require "./wasi/event_loop"
{% elsif flag?(:unix) %}
Expand Down
66 changes: 66 additions & 0 deletions src/crystal/system/event_loop/socket.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# This file is only required when sockets are used (`require "./event_loop/socket"` in `src/crystal/system/socket.cr`)
#
# It fills `Crystal::EventLoop::Socket` with abstract defs.

abstract class Crystal::EventLoop
module Socket
# Reads at least one byte from the socket into *slice*.
#
# Blocks the current fiber if no data is available for reading, continuing
# when available. Otherwise returns immediately.
#
# Returns the number of bytes read (up to `slice.size`).
# Returns 0 when the socket is closed and no data available.
#
# Use `#send_to` for sending a message to a specific target address.
abstract def read(socket : ::Socket, slice : Bytes) : Int32

# Writes at least one byte from *slice* to the socket.
#
# Blocks the current fiber if the socket is not ready for writing,
# continuing when ready. Otherwise returns immediately.
#
# Returns the number of bytes written (up to `slice.size`).
#
# Use `#receive_from` for capturing the source address of a message.
abstract def write(socket : ::Socket, slice : Bytes) : Int32

# Accepts an incoming TCP connection on the socket.
#
# Blocks the current fiber if no connection is waiting, continuing when one
# becomes available. Otherwise returns immediately.
#
# Returns a handle to the socket for the new connection.
abstract def accept(socket : ::Socket) : ::Socket::Handle?

# Opens a connection on *socket* to the target *address*.
#
# Blocks the current fiber and continues when the connection is established.
#
# Returns `IO::Error` in case of an error. The caller is responsible for
# raising it as an exception if necessary.
abstract def connect(socket : ::Socket, address : ::Socket::Addrinfo | ::Socket::Address, timeout : ::Time::Span?) : IO::Error?

# Sends at least one byte from *slice* to the socket with a target address
# *address*.
#
# Blocks the current fiber if the socket is not ready for writing,
# continuing when ready. Otherwise returns immediately.
#
# Returns the number of bytes sent (up to `slice.size`).
abstract def send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address) : Int32

# Receives at least one byte from the socket into *slice*, capturing the
# source address.
#
# Blocks the current fiber if no data is available for reading, continuing
# when available. Otherwise returns immediately.
#
# Returns a tuple containing the number of bytes received (up to `slice.size`)
# and the source address.
abstract def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address)

# Closes the socket.
abstract def close(socket : ::Socket) : Nil
end
end
30 changes: 24 additions & 6 deletions src/crystal/system/socket.cr
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
require "./event_loop/socket"

module Crystal::System::Socket
# Creates a file descriptor / socket handle
# private def create_handle(family, type, protocol, blocking) : Handle

# Initializes a file descriptor / socket handle for use with Crystal Socket
# private def initialize_handle(fd)

# private def system_connect(addr, timeout = nil)
private def system_connect(addr, timeout = nil)
event_loop.connect(self, addr, timeout)
end

# Tries to bind the socket to a local address.
# Yields an `Socket::BindError` if the binding failed.
# private def system_bind(addr, addrstr)

# private def system_listen(backlog)

# private def system_accept
private def system_accept
event_loop.accept(self)
end

# private def system_send_to(bytes : Bytes, addr : ::Socket::Address)
private def system_send_to(bytes : Bytes, addr : ::Socket::Address)
event_loop.send_to(self, bytes, addr)
end

# private def system_receive(bytes)
private def system_receive_from(bytes : Bytes) : Tuple(Int32, ::Socket::Address)
event_loop.receive_from(self, bytes)
end

# private def system_close_read

Expand Down Expand Up @@ -69,12 +79,20 @@ module Crystal::System::Socket

# def self.fcntl(fd, cmd, arg = 0)

# private def system_read(slice : Bytes) : Int32
private def system_read(slice : Bytes) : Int32
event_loop.read(self, slice)
end

# private def system_write(slice : Bytes) : Int32
private def system_write(slice : Bytes) : Int32
event_loop.write(self, slice)
end

# private def system_close

private def event_loop : Crystal::EventLoop::Socket
Crystal::EventLoop.current
end

# IPSocket:

# private def system_local_address
Expand Down
77 changes: 77 additions & 0 deletions src/crystal/system/unix/event_loop_libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,81 @@ class Crystal::LibEvent::EventLoop < Crystal::EventLoop
end
end
end

def read(socket : ::Socket, slice : Bytes) : Int32
socket.evented_read("Error reading socket") do
LibC.recv(socket.fd, slice, slice.size, 0).to_i32
end
end

def write(socket : ::Socket, slice : Bytes) : Int32
socket.evented_write("Error writing to socket") do
LibC.send(socket.fd, slice, slice.size, 0).to_i32
end
end

def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address)
sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*)
# initialize sockaddr with the initialized family of the socket
copy = sockaddr.value
copy.sa_family = socket.family
sockaddr.value = copy

addrlen = LibC::SocklenT.new(sizeof(LibC::SockaddrStorage))

bytes_read = socket.evented_read("Error receiving datagram") do
LibC.recvfrom(socket.fd, slice, slice.size, 0, sockaddr, pointerof(addrlen))
end

{bytes_read, ::Socket::Address.from(sockaddr, addrlen)}
end

def send_to(socket : ::Socket, slice : Bytes, addr : ::Socket::Address) : Int32

Check warning on line 107 in src/crystal/system/unix/event_loop_libevent.cr

View workflow job for this annotation

GitHub Actions / LLVM 13.0.0

positional parameter 'addr' corresponds to parameter 'address' of the overridden method Crystal::EventLoop::Socket#send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address), which has a different name and may affect named argument passing

Check warning on line 107 in src/crystal/system/unix/event_loop_libevent.cr

View workflow job for this annotation

GitHub Actions / LLVM 14.0.0

positional parameter 'addr' corresponds to parameter 'address' of the overridden method Crystal::EventLoop::Socket#send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address), which has a different name and may affect named argument passing

Check warning on line 107 in src/crystal/system/unix/event_loop_libevent.cr

View workflow job for this annotation

GitHub Actions / LLVM 15.0.6

positional parameter 'addr' corresponds to parameter 'address' of the overridden method Crystal::EventLoop::Socket#send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address), which has a different name and may affect named argument passing

Check warning on line 107 in src/crystal/system/unix/event_loop_libevent.cr

View workflow job for this annotation

GitHub Actions / LLVM 16.0.3

positional parameter 'addr' corresponds to parameter 'address' of the overridden method Crystal::EventLoop::Socket#send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address), which has a different name and may affect named argument passing

Check warning on line 107 in src/crystal/system/unix/event_loop_libevent.cr

View workflow job for this annotation

GitHub Actions / LLVM 17.0.6

positional parameter 'addr' corresponds to parameter 'address' of the overridden method Crystal::EventLoop::Socket#send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address), which has a different name and may affect named argument passing

Check warning on line 107 in src/crystal/system/unix/event_loop_libevent.cr

View workflow job for this annotation

GitHub Actions / LLVM 18.1.4

positional parameter 'addr' corresponds to parameter 'address' of the overridden method Crystal::EventLoop::Socket#send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address), which has a different name and may affect named argument passing
bytes_sent = LibC.sendto(socket.fd, slice.to_unsafe.as(Void*), slice.size, 0, addr, addr.size)
raise ::Socket::Error.from_errno("Error sending datagram to #{addr}") if bytes_sent == -1
# to_i32 is fine because string/slice sizes are an Int32
bytes_sent.to_i32
end

def connect(socket : ::Socket, address : ::Socket::Addrinfo | ::Socket::Address, timeout : ::Time::Span?) : IO::Error?
loop do
if LibC.connect(socket.fd, address, address.size) == 0
return
end
case Errno.value
when Errno::EISCONN
return
when Errno::EINPROGRESS, Errno::EALREADY
socket.wait_writable(timeout: timeout) do
return IO::TimeoutError.new("connect timed out")
end
else
return ::Socket::ConnectError.from_errno("connect")
end
end
end

def accept(socket : ::Socket) : ::Socket::Handle?
loop do
client_fd = LibC.accept(socket.fd, nil, nil)
if client_fd == -1
if socket.closed?
return
elsif Errno.value == Errno::EAGAIN
socket.wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Accept timed out")
end
return if socket.closed?
else
raise ::Socket::Error.from_errno("accept")
end
else
return client_fd
end
end
end

def close(socket : ::Socket) : Nil
socket.evented_close
end
end
80 changes: 1 addition & 79 deletions src/crystal/system/unix/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,6 @@ module Crystal::System::Socket
{% end %}
end

private def system_connect(addr, timeout = nil)
timeout = timeout.seconds unless timeout.is_a? ::Time::Span | Nil
loop do
if LibC.connect(fd, addr, addr.size) == 0
return
end
case Errno.value
when Errno::EISCONN
return
when Errno::EINPROGRESS, Errno::EALREADY
wait_writable(timeout: timeout) do
return IO::TimeoutError.new("connect timed out")
end
else
return ::Socket::ConnectError.from_errno("connect")
end
end
end

# Tries to bind the socket to a local address.
# Yields an `Socket::BindError` if the binding failed.
private def system_bind(addr, addrstr, &)
Expand All @@ -59,53 +40,6 @@ module Crystal::System::Socket
end
end

private def system_accept
loop do
client_fd = LibC.accept(fd, nil, nil)
if client_fd == -1
if closed?
return
elsif Errno.value == Errno::EAGAIN
wait_acceptable
return if closed?
else
raise ::Socket::Error.from_errno("accept")
end
else
return client_fd
end
end
end

private def wait_acceptable
wait_readable(raise_if_closed: false) do
raise IO::TimeoutError.new("Accept timed out")
end
end

private def system_send_to(bytes : Bytes, addr : ::Socket::Address)
bytes_sent = LibC.sendto(fd, bytes.to_unsafe.as(Void*), bytes.size, 0, addr, addr.size)
raise ::Socket::Error.from_errno("Error sending datagram to #{addr}") if bytes_sent == -1
# to_i32 is fine because string/slice sizes are an Int32
bytes_sent.to_i32
end

private def system_receive(bytes)
sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*)
# initialize sockaddr with the initialized family of the socket
copy = sockaddr.value
copy.sa_family = family
sockaddr.value = copy

addrlen = LibC::SocklenT.new(sizeof(LibC::SockaddrStorage))

bytes_read = evented_read("Error receiving datagram") do
LibC.recvfrom(fd, bytes, bytes.size, 0, sockaddr, pointerof(addrlen))
end

{bytes_read, ::Socket::Address.from(sockaddr, addrlen)}
end

private def system_close_read
if LibC.shutdown(fd, LibC::SHUT_RD) != 0
raise ::Socket::Error.from_errno("shutdown read")
Expand Down Expand Up @@ -248,23 +182,11 @@ module Crystal::System::Socket
LibC.isatty(fd) == 1
end

private def system_read(slice : Bytes) : Int32
evented_read("Error reading socket") do
LibC.recv(fd, slice, slice.size, 0).to_i32
end
end

private def system_write(slice : Bytes) : Int32
evented_write("Error writing to socket") do
LibC.send(fd, slice, slice.size, 0)
end
end

private def system_close
# Perform libevent cleanup before LibC.close.
# Using a file descriptor after it has been closed is never defined and can
# always lead to undefined results. This is not specific to libevent.
evented_close
event_loop.close(self)

# Clear the @volatile_fd before actually closing it in order to
# reduce the chance of reading an outdated fd value
Expand Down
32 changes: 32 additions & 0 deletions src/crystal/system/wasi/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,38 @@ class Crystal::Wasi::EventLoop < Crystal::EventLoop
def create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false) : Crystal::EventLoop::Event
raise NotImplementedError.new("Crystal::Wasi::EventLoop.create_fd_read_event")
end

def read(socket : ::Socket, slice : Bytes) : Int32
socket.evented_read("Error reading socket") do
LibC.recv(socket.fd, slice, slice.size, 0).to_i32
end
end

def write(socket : ::Socket, slice : Bytes) : Int32
socket.evented_write("Error writing to socket") do
LibC.send(socket.fd, slice, slice.size, 0)
end
end

def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address)
raise NotImplementedError.new "Crystal::Wasi::EventLoop#receive_from"
end

def send_to(socket : ::Socket, slice : Bytes, addr : ::Socket::Address) : Int32
raise NotImplementedError.new "Crystal::Wasi::EventLoop#send_to"
end

def connect(socket : ::Socket, address : ::Socket::Addrinfo | ::Socket::Address, timeout : ::Time::Span | ::Nil) : IO::Error?
raise NotImplementedError.new "Crystal::Wasi::EventLoop#connect"
end

def accept(socket : ::Socket) : ::Socket::Handle?
raise NotImplementedError.new "Crystal::Wasi::EventLoop#accept"
end

def close(socket : ::Socket) : Nil
socket.evented_close
end
end

struct Crystal::Wasi::Event
Expand Down
Loading

0 comments on commit 135e908

Please sign in to comment.