diff --git a/src/crystal/system/event_loop.cr b/src/crystal/system/event_loop.cr index b8697025d2fb..b1b0f4907ec2 100644 --- a/src/crystal/system/event_loop.cr +++ b/src/crystal/system/event_loop.cr @@ -45,6 +45,15 @@ abstract class Crystal::EventLoop end end +abstract class Crystal::EventLoop + # The socket module is empty by default and filled with abstract defs when + # crystal/system/socket.cr is required. + module Socket + end + + include Socket +end + {% if flag?(:wasi) %} require "./wasi/event_loop" {% elsif flag?(:unix) %} diff --git a/src/crystal/system/event_loop/socket.cr b/src/crystal/system/event_loop/socket.cr new file mode 100644 index 000000000000..e6f35478b487 --- /dev/null +++ b/src/crystal/system/event_loop/socket.cr @@ -0,0 +1,66 @@ +# This file is only required when sockets are used (`require "./event_loop/socket"` in `src/crystal/system/socket.cr`) +# +# It fills `Crystal::EventLoop::Socket` with abstract defs. + +abstract class Crystal::EventLoop + module Socket + # Reads at least one byte from the socket into *slice*. + # + # Blocks the current fiber if no data is available for reading, continuing + # when available. Otherwise returns immediately. + # + # Returns the number of bytes read (up to `slice.size`). + # Returns 0 when the socket is closed and no data available. + # + # Use `#send_to` for sending a message to a specific target address. + abstract def read(socket : ::Socket, slice : Bytes) : Int32 + + # Writes at least one byte from *slice* to the socket. + # + # Blocks the current fiber if the socket is not ready for writing, + # continuing when ready. Otherwise returns immediately. + # + # Returns the number of bytes written (up to `slice.size`). + # + # Use `#receive_from` for capturing the source address of a message. + abstract def write(socket : ::Socket, slice : Bytes) : Int32 + + # Accepts an incoming TCP connection on the socket. + # + # Blocks the current fiber if no connection is waiting, continuing when one + # becomes available. Otherwise returns immediately. + # + # Returns a handle to the socket for the new connection. + abstract def accept(socket : ::Socket) : ::Socket::Handle? + + # Opens a connection on *socket* to the target *address*. + # + # Blocks the current fiber and continues when the connection is established. + # + # Returns `IO::Error` in case of an error. The caller is responsible for + # raising it as an exception if necessary. + abstract def connect(socket : ::Socket, address : ::Socket::Addrinfo | ::Socket::Address, timeout : ::Time::Span?) : IO::Error? + + # Sends at least one byte from *slice* to the socket with a target address + # *address*. + # + # Blocks the current fiber if the socket is not ready for writing, + # continuing when ready. Otherwise returns immediately. + # + # Returns the number of bytes sent (up to `slice.size`). + abstract def send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address) : Int32 + + # Receives at least one byte from the socket into *slice*, capturing the + # source address. + # + # Blocks the current fiber if no data is available for reading, continuing + # when available. Otherwise returns immediately. + # + # Returns a tuple containing the number of bytes received (up to `slice.size`) + # and the source address. + abstract def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address) + + # Closes the socket. + abstract def close(socket : ::Socket) : Nil + end +end diff --git a/src/crystal/system/socket.cr b/src/crystal/system/socket.cr index 03c6e4930291..7e7b939fbeae 100644 --- a/src/crystal/system/socket.cr +++ b/src/crystal/system/socket.cr @@ -1,3 +1,5 @@ +require "./event_loop/socket" + module Crystal::System::Socket # Creates a file descriptor / socket handle # private def create_handle(family, type, protocol, blocking) : Handle @@ -5,7 +7,9 @@ module Crystal::System::Socket # Initializes a file descriptor / socket handle for use with Crystal Socket # private def initialize_handle(fd) - # private def system_connect(addr, timeout = nil) + private def system_connect(addr, timeout = nil) + event_loop.connect(self, addr, timeout) + end # Tries to bind the socket to a local address. # Yields an `Socket::BindError` if the binding failed. @@ -13,11 +17,17 @@ module Crystal::System::Socket # private def system_listen(backlog) - # private def system_accept + private def system_accept + event_loop.accept(self) + end - # private def system_send_to(bytes : Bytes, addr : ::Socket::Address) + private def system_send_to(bytes : Bytes, addr : ::Socket::Address) + event_loop.send_to(self, bytes, addr) + end - # private def system_receive(bytes) + private def system_receive_from(bytes : Bytes) : Tuple(Int32, ::Socket::Address) + event_loop.receive_from(self, bytes) + end # private def system_close_read @@ -69,12 +79,20 @@ module Crystal::System::Socket # def self.fcntl(fd, cmd, arg = 0) - # private def system_read(slice : Bytes) : Int32 + private def system_read(slice : Bytes) : Int32 + event_loop.read(self, slice) + end - # private def system_write(slice : Bytes) : Int32 + private def system_write(slice : Bytes) : Int32 + event_loop.write(self, slice) + end # private def system_close + private def event_loop : Crystal::EventLoop::Socket + Crystal::EventLoop.current + end + # IPSocket: # private def system_local_address diff --git a/src/crystal/system/unix/event_loop_libevent.cr b/src/crystal/system/unix/event_loop_libevent.cr index fe95ec0c8a3e..15d5e8b8787a 100644 --- a/src/crystal/system/unix/event_loop_libevent.cr +++ b/src/crystal/system/unix/event_loop_libevent.cr @@ -75,4 +75,81 @@ class Crystal::LibEvent::EventLoop < Crystal::EventLoop end end end + + def read(socket : ::Socket, slice : Bytes) : Int32 + socket.evented_read("Error reading socket") do + LibC.recv(socket.fd, slice, slice.size, 0).to_i32 + end + end + + def write(socket : ::Socket, slice : Bytes) : Int32 + socket.evented_write("Error writing to socket") do + LibC.send(socket.fd, slice, slice.size, 0).to_i32 + end + end + + def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address) + sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*) + # initialize sockaddr with the initialized family of the socket + copy = sockaddr.value + copy.sa_family = socket.family + sockaddr.value = copy + + addrlen = LibC::SocklenT.new(sizeof(LibC::SockaddrStorage)) + + bytes_read = socket.evented_read("Error receiving datagram") do + LibC.recvfrom(socket.fd, slice, slice.size, 0, sockaddr, pointerof(addrlen)) + end + + {bytes_read, ::Socket::Address.from(sockaddr, addrlen)} + end + + def send_to(socket : ::Socket, slice : Bytes, addr : ::Socket::Address) : Int32 + bytes_sent = LibC.sendto(socket.fd, slice.to_unsafe.as(Void*), slice.size, 0, addr, addr.size) + raise ::Socket::Error.from_errno("Error sending datagram to #{addr}") if bytes_sent == -1 + # to_i32 is fine because string/slice sizes are an Int32 + bytes_sent.to_i32 + end + + def connect(socket : ::Socket, address : ::Socket::Addrinfo | ::Socket::Address, timeout : ::Time::Span?) : IO::Error? + loop do + if LibC.connect(socket.fd, address, address.size) == 0 + return + end + case Errno.value + when Errno::EISCONN + return + when Errno::EINPROGRESS, Errno::EALREADY + socket.wait_writable(timeout: timeout) do + return IO::TimeoutError.new("connect timed out") + end + else + return ::Socket::ConnectError.from_errno("connect") + end + end + end + + def accept(socket : ::Socket) : ::Socket::Handle? + loop do + client_fd = LibC.accept(socket.fd, nil, nil) + if client_fd == -1 + if socket.closed? + return + elsif Errno.value == Errno::EAGAIN + socket.wait_readable(raise_if_closed: false) do + raise IO::TimeoutError.new("Accept timed out") + end + return if socket.closed? + else + raise ::Socket::Error.from_errno("accept") + end + else + return client_fd + end + end + end + + def close(socket : ::Socket) : Nil + socket.evented_close + end end diff --git a/src/crystal/system/unix/socket.cr b/src/crystal/system/unix/socket.cr index 1a5f91b86998..a263e7742301 100644 --- a/src/crystal/system/unix/socket.cr +++ b/src/crystal/system/unix/socket.cr @@ -26,25 +26,6 @@ module Crystal::System::Socket {% end %} end - private def system_connect(addr, timeout = nil) - timeout = timeout.seconds unless timeout.is_a? ::Time::Span | Nil - loop do - if LibC.connect(fd, addr, addr.size) == 0 - return - end - case Errno.value - when Errno::EISCONN - return - when Errno::EINPROGRESS, Errno::EALREADY - wait_writable(timeout: timeout) do - return IO::TimeoutError.new("connect timed out") - end - else - return ::Socket::ConnectError.from_errno("connect") - end - end - end - # Tries to bind the socket to a local address. # Yields an `Socket::BindError` if the binding failed. private def system_bind(addr, addrstr, &) @@ -59,53 +40,6 @@ module Crystal::System::Socket end end - private def system_accept - loop do - client_fd = LibC.accept(fd, nil, nil) - if client_fd == -1 - if closed? - return - elsif Errno.value == Errno::EAGAIN - wait_acceptable - return if closed? - else - raise ::Socket::Error.from_errno("accept") - end - else - return client_fd - end - end - end - - private def wait_acceptable - wait_readable(raise_if_closed: false) do - raise IO::TimeoutError.new("Accept timed out") - end - end - - private def system_send_to(bytes : Bytes, addr : ::Socket::Address) - bytes_sent = LibC.sendto(fd, bytes.to_unsafe.as(Void*), bytes.size, 0, addr, addr.size) - raise ::Socket::Error.from_errno("Error sending datagram to #{addr}") if bytes_sent == -1 - # to_i32 is fine because string/slice sizes are an Int32 - bytes_sent.to_i32 - end - - private def system_receive(bytes) - sockaddr = Pointer(LibC::SockaddrStorage).malloc.as(LibC::Sockaddr*) - # initialize sockaddr with the initialized family of the socket - copy = sockaddr.value - copy.sa_family = family - sockaddr.value = copy - - addrlen = LibC::SocklenT.new(sizeof(LibC::SockaddrStorage)) - - bytes_read = evented_read("Error receiving datagram") do - LibC.recvfrom(fd, bytes, bytes.size, 0, sockaddr, pointerof(addrlen)) - end - - {bytes_read, ::Socket::Address.from(sockaddr, addrlen)} - end - private def system_close_read if LibC.shutdown(fd, LibC::SHUT_RD) != 0 raise ::Socket::Error.from_errno("shutdown read") @@ -248,23 +182,11 @@ module Crystal::System::Socket LibC.isatty(fd) == 1 end - private def system_read(slice : Bytes) : Int32 - evented_read("Error reading socket") do - LibC.recv(fd, slice, slice.size, 0).to_i32 - end - end - - private def system_write(slice : Bytes) : Int32 - evented_write("Error writing to socket") do - LibC.send(fd, slice, slice.size, 0) - end - end - private def system_close # Perform libevent cleanup before LibC.close. # Using a file descriptor after it has been closed is never defined and can # always lead to undefined results. This is not specific to libevent. - evented_close + event_loop.close(self) # Clear the @volatile_fd before actually closing it in order to # reduce the chance of reading an outdated fd value diff --git a/src/crystal/system/wasi/event_loop.cr b/src/crystal/system/wasi/event_loop.cr index e1c2fc2166f1..01094ee99a40 100644 --- a/src/crystal/system/wasi/event_loop.cr +++ b/src/crystal/system/wasi/event_loop.cr @@ -35,6 +35,38 @@ class Crystal::Wasi::EventLoop < Crystal::EventLoop def create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false) : Crystal::EventLoop::Event raise NotImplementedError.new("Crystal::Wasi::EventLoop.create_fd_read_event") end + + def read(socket : ::Socket, slice : Bytes) : Int32 + socket.evented_read("Error reading socket") do + LibC.recv(socket.fd, slice, slice.size, 0).to_i32 + end + end + + def write(socket : ::Socket, slice : Bytes) : Int32 + socket.evented_write("Error writing to socket") do + LibC.send(socket.fd, slice, slice.size, 0) + end + end + + def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address) + raise NotImplementedError.new "Crystal::Wasi::EventLoop#receive_from" + end + + def send_to(socket : ::Socket, slice : Bytes, addr : ::Socket::Address) : Int32 + raise NotImplementedError.new "Crystal::Wasi::EventLoop#send_to" + end + + def connect(socket : ::Socket, address : ::Socket::Addrinfo | ::Socket::Address, timeout : ::Time::Span | ::Nil) : IO::Error? + raise NotImplementedError.new "Crystal::Wasi::EventLoop#connect" + end + + def accept(socket : ::Socket) : ::Socket::Handle? + raise NotImplementedError.new "Crystal::Wasi::EventLoop#accept" + end + + def close(socket : ::Socket) : Nil + socket.evented_close + end end struct Crystal::Wasi::Event diff --git a/src/crystal/system/wasi/socket.cr b/src/crystal/system/wasi/socket.cr index ecdd20f3a2f9..901e8a4db1cb 100644 --- a/src/crystal/system/wasi/socket.cr +++ b/src/crystal/system/wasi/socket.cr @@ -15,10 +15,6 @@ module Crystal::System::Socket private def initialize_handle(fd) end - private def system_connect(addr, timeout = nil) - raise NotImplementedError.new "Crystal::System::Socket#system_connect" - end - # Tries to bind the socket to a local address. # Yields an `Socket::BindError` if the binding failed. private def system_bind(addr, addrstr, &) @@ -29,18 +25,6 @@ module Crystal::System::Socket raise NotImplementedError.new "Crystal::System::Socket#system_listen" end - private def system_accept - (raise NotImplementedError.new "Crystal::System::Socket#system_accept").as(Int32) - end - - private def system_send_to(bytes : Bytes, addr : ::Socket::Address) - raise NotImplementedError.new "Crystal::System::Socket#system_send_to" - end - - private def system_receive(bytes) - raise NotImplementedError.new "Crystal::System::Socket#system_receive" - end - private def system_close_read if LibC.shutdown(fd, LibC::SHUT_RD) != 0 raise ::Socket::Error.from_errno("shutdown read") @@ -155,23 +139,11 @@ module Crystal::System::Socket LibC.isatty(fd) == 1 end - private def system_read(slice : Bytes) : Int32 - evented_read("Error reading socket") do - LibC.recv(fd, slice, slice.size, 0).to_i32 - end - end - - private def system_write(slice : Bytes) : Int32 - evented_write("Error writing to socket") do - LibC.send(fd, slice, slice.size, 0) - end - end - private def system_close # Perform libevent cleanup before LibC.close. # Using a file descriptor after it has been closed is never defined and can # always lead to undefined results. This is not specific to libevent. - evented_close + event_loop.close(self) # Clear the @volatile_fd before actually closing it in order to # reduce the chance of reading an outdated fd value diff --git a/src/crystal/system/win32/event_loop_iocp.cr b/src/crystal/system/win32/event_loop_iocp.cr index ae89e85bd3ed..3332bb11a55c 100644 --- a/src/crystal/system/win32/event_loop_iocp.cr +++ b/src/crystal/system/win32/event_loop_iocp.cr @@ -148,6 +148,117 @@ class Crystal::Iocp::EventLoop < Crystal::EventLoop def create_timeout_event(fiber) : Crystal::EventLoop::Event Crystal::Iocp::Event.new(fiber, timeout: true) end + + private def wsa_buffer(bytes) + wsabuf = LibC::WSABUF.new + wsabuf.len = bytes.size + wsabuf.buf = bytes.to_unsafe + wsabuf + end + + def read(socket : ::Socket, slice : Bytes) : Int32 + wsabuf = wsa_buffer(slice) + + bytes_read = socket.wsa_overlapped_operation(socket.fd, "WSARecv", socket.read_timeout, connreset_is_error: false) do |overlapped| + flags = 0_u32 + ret = LibC.WSARecv(socket.fd, pointerof(wsabuf), 1, out bytes_received, pointerof(flags), overlapped, nil) + {ret, bytes_received} + end + + bytes_read.to_i32 + end + + def write(socket : ::Socket, slice : Bytes) : Int32 + wsabuf = wsa_buffer(slice) + + bytes = socket.wsa_overlapped_operation(socket.fd, "WSASend", socket.write_timeout) do |overlapped| + ret = LibC.WSASend(socket.fd, pointerof(wsabuf), 1, out bytes_sent, 0, overlapped, nil) + {ret, bytes_sent} + end + + bytes.to_i32 + end + + def send_to(socket : ::Socket, bytes : Bytes, addr : ::Socket::Address) : Int32 + wsabuf = wsa_buffer(bytes) + bytes_written = socket.wsa_overlapped_operation(socket.fd, "WSASendTo", socket.write_timeout) do |overlapped| + ret = LibC.WSASendTo(socket.fd, pointerof(wsabuf), 1, out bytes_sent, 0, addr, addr.size, overlapped, nil) + {ret, bytes_sent} + end + raise ::Socket::Error.from_wsa_error("Error sending datagram to #{addr}") if bytes_written == -1 + + # to_i32 is fine because string/slice sizes are an Int32 + bytes_written.to_i32 + end + + def receive(socket : ::Socket, slice : Bytes) : Int32 + receive_from(socket, slice)[0] + end + + def receive_from(socket : ::Socket, slice : Bytes) : Tuple(Int32, ::Socket::Address) + sockaddr = Pointer(LibC::SOCKADDR_STORAGE).malloc.as(LibC::Sockaddr*) + # initialize sockaddr with the initialized family of the socket + copy = sockaddr.value + copy.sa_family = socket.family + sockaddr.value = copy + + addrlen = sizeof(LibC::SOCKADDR_STORAGE) + + wsabuf = wsa_buffer(slice) + + flags = 0_u32 + bytes_read = socket.wsa_overlapped_operation(socket.fd, "WSARecvFrom", socket.read_timeout) do |overlapped| + ret = LibC.WSARecvFrom(socket.fd, pointerof(wsabuf), 1, out bytes_received, pointerof(flags), sockaddr, pointerof(addrlen), overlapped, nil) + {ret, bytes_received} + end + + {bytes_read.to_i32, ::Socket::Address.from(sockaddr, addrlen)} + end + + def connect(socket : ::Socket, address : ::Socket::Addrinfo | ::Socket::Address, timeout : ::Time::Span?) : IO::Error? + socket.overlapped_connect(socket.fd, "ConnectEx") do |overlapped| + # This is: LibC.ConnectEx(fd, address, address.size, nil, 0, nil, overlapped) + Crystal::System::Socket.connect_ex.call(socket.fd, address.to_unsafe, address.size, Pointer(Void).null, 0_u32, Pointer(UInt32).null, overlapped) + end + end + + def accept(socket : ::Socket) : ::Socket::Handle? + socket.system_accept do |client_handle| + address_size = sizeof(LibC::SOCKADDR_STORAGE) + 16 + + # buffer_size is set to zero to only accept the connection and don't receive any data. + # That will be a different operation. + # + # > If dwReceiveDataLength is zero, accepting the connection will not result in a receive operation. + # > Instead, AcceptEx completes as soon as a connection arrives, without waiting for any data. + # + # TODO: Investigate benefits from receiving data here directly. It's hard to integrate into the event loop and socket API. + buffer_size = 0 + output_buffer = Bytes.new(address_size * 2 + buffer_size) + + success = socket.overlapped_accept(socket.fd, "AcceptEx") do |overlapped| + # This is: LibC.AcceptEx(fd, client_handle, output_buffer, buffer_size, address_size, address_size, out received_bytes, overlapped) + received_bytes = uninitialized UInt32 + Crystal::System::Socket.accept_ex.call(socket.fd, client_handle, + output_buffer.to_unsafe.as(Void*), buffer_size.to_u32!, + address_size.to_u32!, address_size.to_u32!, pointerof(received_bytes), overlapped) + end + + if success + # AcceptEx does not automatically set the socket options on the accepted + # socket to match those of the listening socket, we need to ask for that + # explicitly with SO_UPDATE_ACCEPT_CONTEXT + socket.system_setsockopt client_handle, LibC::SO_UPDATE_ACCEPT_CONTEXT, socket.fd + + true + else + false + end + end + end + + def close(socket : ::Socket) : Nil + end end class Crystal::Iocp::Event diff --git a/src/crystal/system/win32/socket.cr b/src/crystal/system/win32/socket.cr index 58dbb3620239..974d11c93d31 100644 --- a/src/crystal/system/win32/socket.cr +++ b/src/crystal/system/win32/socket.cr @@ -110,10 +110,7 @@ module Crystal::System::Socket return ::Socket::BindError.from_wsa_error("Could not bind to '*'") end - error = overlapped_connect(fd, "ConnectEx") do |overlapped| - # This is: LibC.ConnectEx(fd, addr, addr.size, nil, 0, nil, overlapped) - Crystal::System::Socket.connect_ex.call(fd, addr.to_unsafe, addr.size, Pointer(Void).null, 0_u32, Pointer(UInt32).null, overlapped) - end + error = event_loop.connect(self, addr, timeout) if error return error @@ -132,7 +129,8 @@ module Crystal::System::Socket end end - private def overlapped_connect(socket, method, &) + # :nodoc: + def overlapped_connect(socket, method, &) OverlappedOperation.run(socket) do |operation| result = yield operation.start @@ -185,11 +183,11 @@ module Crystal::System::Socket end end - protected def system_accept : Handle? + def system_accept(& : Handle -> Bool) : Handle? client_socket = create_handle(family, type, protocol, blocking) initialize_handle(client_socket) - if system_accept(client_socket) + if yield client_socket client_socket else LibC.closesocket(client_socket) @@ -198,30 +196,7 @@ module Crystal::System::Socket end end - protected def system_accept(client_socket : Handle) : Bool - address_size = sizeof(LibC::SOCKADDR_STORAGE) + 16 - buffer_size = 0 - output_buffer = Bytes.new(address_size * 2 + buffer_size) - - success = overlapped_accept(fd, "AcceptEx") do |overlapped| - # This is: LibC.AcceptEx(fd, client_socket, output_buffer, buffer_size, address_size, address_size, out received_bytes, overlapped) - received_bytes = uninitialized UInt32 - Crystal::System::Socket.accept_ex.call(fd, client_socket, - output_buffer.to_unsafe.as(Void*), buffer_size.to_u32!, - address_size.to_u32!, address_size.to_u32!, pointerof(received_bytes), overlapped) - end - - return false unless success - - # AcceptEx does not automatically set the socket options on the accepted - # socket to match those of the listening socket, we need to ask for that - # explicitly with SO_UPDATE_ACCEPT_CONTEXT - system_setsockopt client_socket, LibC::SO_UPDATE_ACCEPT_CONTEXT, fd - - true - end - - private def overlapped_accept(socket, method, &) + def overlapped_accept(socket, method, &) OverlappedOperation.run(socket) do |operation| result = yield operation.start @@ -252,45 +227,6 @@ module Crystal::System::Socket end end - private def wsa_buffer(bytes) - wsabuf = LibC::WSABUF.new - wsabuf.len = bytes.size - wsabuf.buf = bytes.to_unsafe - wsabuf - end - - private def system_send_to(bytes : Bytes, addr : ::Socket::Address) - wsabuf = wsa_buffer(bytes) - bytes_written = overlapped_write(fd, "WSASendTo") do |overlapped| - ret = LibC.WSASendTo(fd, pointerof(wsabuf), 1, out bytes_sent, 0, addr, addr.size, overlapped, nil) - {ret, bytes_sent} - end - raise ::Socket::Error.from_wsa_error("Error sending datagram to #{addr}") if bytes_written == -1 - - # to_i32 is fine because string/slice sizes are an Int32 - bytes_written.to_i32 - end - - private def system_receive(bytes) - sockaddr = Pointer(LibC::SOCKADDR_STORAGE).malloc.as(LibC::Sockaddr*) - # initialize sockaddr with the initialized family of the socket - copy = sockaddr.value - copy.sa_family = family - sockaddr.value = copy - - addrlen = sizeof(LibC::SOCKADDR_STORAGE) - - wsabuf = wsa_buffer(bytes) - - flags = 0_u32 - bytes_read = overlapped_read(fd, "WSARecvFrom") do |overlapped| - ret = LibC.WSARecvFrom(fd, pointerof(wsabuf), 1, out bytes_received, pointerof(flags), sockaddr, pointerof(addrlen), overlapped, nil) - {ret, bytes_received} - end - - {bytes_read.to_i32, ::Socket::Address.from(sockaddr, addrlen)} - end - private def system_close_read if LibC.shutdown(fd, LibC::SH_RECEIVE) != 0 raise ::Socket::Error.from_wsa_error("shutdown read") @@ -392,7 +328,8 @@ module Crystal::System::Socket raise ::Socket::Error.from_wsa_error("getsockopt #{optname}") end - private def system_setsockopt(handle, optname, optval, level = LibC::SOL_SOCKET) + # :nodoc: + def system_setsockopt(handle, optname, optval, level = LibC::SOL_SOCKET) optsize = sizeof(typeof(optval)) ret = LibC.setsockopt(handle, level, optname, pointerof(optval).as(UInt8*), optsize) @@ -434,41 +371,6 @@ module Crystal::System::Socket LibC.GetConsoleMode(LibC::HANDLE.new(fd), out _) != 0 end - private def system_read(slice : Bytes) : Int32 - wsabuf = wsa_buffer(slice) - - bytes_read = overlapped_read(fd, "WSARecv", connreset_is_error: false) do |overlapped| - flags = 0_u32 - ret = LibC.WSARecv(fd, pointerof(wsabuf), 1, out bytes_received, pointerof(flags), overlapped, nil) - {ret, bytes_received} - end - - bytes_read.to_i32 - end - - private def system_write(slice : Bytes) : Int32 - wsabuf = wsa_buffer(slice) - - bytes = overlapped_write(fd, "WSASend") do |overlapped| - ret = LibC.WSASend(fd, pointerof(wsabuf), 1, out bytes_sent, 0, overlapped, nil) - {ret, bytes_sent} - end - - bytes.to_i32 - end - - private def overlapped_write(socket, method, &) - wsa_overlapped_operation(socket, method, write_timeout) do |operation| - yield operation - end - end - - private def overlapped_read(socket, method, *, connreset_is_error = true, &) - wsa_overlapped_operation(socket, method, read_timeout, connreset_is_error) do |operation| - yield operation - end - end - def system_close handle = @volatile_fd.swap(LibC::INVALID_SOCKET) diff --git a/src/socket.cr b/src/socket.cr index 0822b274e555..dfad08d762cf 100644 --- a/src/socket.cr +++ b/src/socket.cr @@ -264,7 +264,7 @@ class Socket < IO def receive(max_message_size = 512) : {String, Address} address = nil message = String.new(max_message_size) do |buffer| - bytes_read, address = system_receive(Slice.new(buffer, max_message_size)) + bytes_read, address = system_receive_from(Slice.new(buffer, max_message_size)) {bytes_read, 0} end {message, address.as(Address)} @@ -282,7 +282,7 @@ class Socket < IO # bytes_read, client_addr = server.receive(message) # ``` def receive(message : Bytes) : {Int32, Address} - system_receive(message) + system_receive_from(message) end # Calls `shutdown(2)` with `SHUT_RD` diff --git a/src/socket/udp_socket.cr b/src/socket/udp_socket.cr index 9175b787cfe9..bba9d1aea39a 100644 --- a/src/socket/udp_socket.cr +++ b/src/socket/udp_socket.cr @@ -70,7 +70,7 @@ class UDPSocket < IPSocket def receive(max_message_size = 512) : {String, IPAddress} address = nil message = String.new(max_message_size) do |buffer| - bytes_read, address = system_receive(Slice.new(buffer, max_message_size)) + bytes_read, address = system_receive_from(Slice.new(buffer, max_message_size)) {bytes_read, 0} end {message, address.as(IPAddress)} @@ -88,7 +88,7 @@ class UDPSocket < IPSocket # bytes_read, client_addr = server.receive(message) # ``` def receive(message : Bytes) : {Int32, IPAddress} - bytes_read, address = system_receive(message) + bytes_read, address = system_receive_from(message) {bytes_read, address.as(IPAddress)} end