diff --git a/spec/std/crystal/evented/arena_spec.cr b/spec/std/crystal/evented/arena_spec.cr index 47b8f9ecdb3e..1b8f4bd50534 100644 --- a/spec/std/crystal/evented/arena_spec.cr +++ b/spec/std/crystal/evented/arena_spec.cr @@ -4,68 +4,84 @@ require "spec" require "../../../../src/crystal/system/unix/evented/arena" describe Crystal::Evented::Arena do - describe "#lazy_allocate" do - it "yields block once" do + describe "#allocate_at?" do + it "yields block when not allocated" do arena = Crystal::Evented::Arena(Int32).new(32) pointer = nil index = nil called = 0 - ptr1, idx1 = arena.lazy_allocate(0) do |ptr, idx| + ret = arena.allocate_at?(0) do |ptr, idx| pointer = ptr index = idx called += 1 end + ret.should eq(index) called.should eq(1) - ptr2, idx2 = arena.lazy_allocate(0) do |ptr, idx| - called += 1 - end + ret = arena.allocate_at?(0) { called += 1 } + ret.should be_nil called.should eq(1) pointer.should_not be_nil index.should_not be_nil - ptr1.should eq(pointer) - idx1.should eq(index) - - ptr2.should eq(pointer) - idx2.should eq(index) + arena.get(index.not_nil!) do |ptr| + ptr.should eq(pointer) + end end it "allocates up to capacity" do arena = Crystal::Evented::Arena(Int32).new(32) + indexes = [] of Crystal::Evented::Arena::Index - objects = 32.times.map do |i| - arena.lazy_allocate(i) { |pointer| pointer.value = i } - end - objects.each do |(pointer, index)| - arena.get(index).should eq(pointer) - pointer.value.should eq(index.index) + indexes = 32.times.map do |i| + arena.allocate_at?(i) { |ptr, _| ptr.value = i } + end.to_a + + indexes.size.should eq(32) + + indexes.each do |index| + arena.get(index.not_nil!) do |pointer| + pointer.should eq(pointer) + pointer.value.should eq(index.not_nil!.index) + end end end it "checks bounds" do arena = Crystal::Evented::Arena(Int32).new(32) - expect_raises(IndexError) { arena.lazy_allocate(-1) { } } - expect_raises(IndexError) { arena.lazy_allocate(33) { } } + expect_raises(IndexError) { arena.allocate_at?(-1) { } } + expect_raises(IndexError) { arena.allocate_at?(33) { } } end end describe "#get" do it "returns previously allocated object" do arena = Crystal::Evented::Arena(Int32).new(32) - pointer, index = arena.lazy_allocate(30) { |ptr| ptr.value = 654321 } + pointer = nil + + index = arena.allocate_at(30) do |ptr| + pointer = ptr + ptr.value = 654321 + end + called = 0 2.times do - ptr = arena.get(index) - ptr.should eq(pointer) - ptr.value.should eq(654321) + arena.get(index.not_nil!) do |ptr| + ptr.should eq(pointer) + ptr.value.should eq(654321) + called += 1 + end end + called.should eq(2) + end + + it "can't access unallocated object" do + arena = Crystal::Evented::Arena(Int32).new(32) - # not allocated: expect_raises(RuntimeError) do - arena.get(Crystal::Evented::Arena::Index.new(10, 0)) + arena.get(Crystal::Evented::Arena::Index.new(10, 0)) { } end end @@ -73,90 +89,128 @@ describe Crystal::Evented::Arena do arena = Crystal::Evented::Arena(Int32).new(32) called = 0 - _, index1 = arena.lazy_allocate(2) { called += 1 } + index1 = arena.allocate_at(2) { called += 1 } called.should eq(1) arena.free(index1) { } - expect_raises(RuntimeError) { arena.get(index1) } + expect_raises(RuntimeError) { arena.get(index1) { } } - _, index2 = arena.lazy_allocate(2) { called += 1 } + index2 = arena.allocate_at(2) { called += 1 } called.should eq(2) - expect_raises(RuntimeError) { arena.get(index1) } + expect_raises(RuntimeError) { arena.get(index1) { } } - # doesn't raise: - arena.get(index2) + arena.get(index2) { } end it "checks out of bounds" do arena = Crystal::Evented::Arena(Int32).new(32) - expect_raises(IndexError) { arena.get(Crystal::Evented::Arena::Index.new(-1, 0)) } - expect_raises(IndexError) { arena.get(Crystal::Evented::Arena::Index.new(33, 0)) } + expect_raises(IndexError) { arena.get(Crystal::Evented::Arena::Index.new(-1, 0)) { } } + expect_raises(IndexError) { arena.get(Crystal::Evented::Arena::Index.new(33, 0)) { } } end end describe "#get?" do it "returns previously allocated object" do arena = Crystal::Evented::Arena(Int32).new(32) - pointer, index = arena.lazy_allocate(30) { |ptr| ptr.value = 654321 } + pointer = nil + index = arena.allocate_at(30) do |ptr| + pointer = ptr + ptr.value = 654321 + end + + called = 0 2.times do - ptr = arena.get?(index) - ptr.should eq(pointer) - ptr.not_nil!.value.should eq(654321) + ret = arena.get?(index) do |ptr| + ptr.should eq(pointer) + ptr.not_nil!.value.should eq(654321) + called += 1 + end + ret.should be_true end + called.should eq(2) + end - arena.get?(Crystal::Evented::Arena::Index.new(10, 0)).should be_nil + it "can't access unallocated index" do + arena = Crystal::Evented::Arena(Int32).new(32) + + called = 0 + ret = arena.get?(Crystal::Evented::Arena::Index.new(10, 0)) { called += 1 } + ret.should be_false + called.should eq(0) end it "checks generation" do arena = Crystal::Evented::Arena(Int32).new(32) called = 0 - pointer1, index1 = arena.lazy_allocate(2) { called += 1 } - called.should eq(1) + old_index = arena.allocate_at(2) { } + arena.free(old_index) { } - arena.free(index1) { } - arena.get?(index1).should be_nil + # not accessible after free: + ret = arena.get?(old_index) { called += 1 } + ret.should be_false + called.should eq(0) - pointer2, index2 = arena.lazy_allocate(2) { called += 1 } - called.should eq(2) - arena.get?(index1).should be_nil - arena.get?(index2).should eq(pointer2) + # can be reallocated: + new_index = arena.allocate_at(2) { } + + # still not accessible after reallocate: + ret = arena.get?(old_index) { called += 1 } + ret.should be_false + called.should eq(0) + + # accessible after reallocate (new index): + ret = arena.get?(new_index) { called += 1 } + ret.should be_true + called.should eq(1) end it "checks out of bounds" do arena = Crystal::Evented::Arena(Int32).new(32) - arena.get?(Crystal::Evented::Arena::Index.new(-1, 0)).should be_nil - arena.get?(Crystal::Evented::Arena::Index.new(33, 0)).should be_nil + called = 0 + + arena.get?(Crystal::Evented::Arena::Index.new(-1, 0)) { called += 1 }.should be_false + arena.get?(Crystal::Evented::Arena::Index.new(33, 0)) { called += 1 }.should be_false + + called.should eq(0) end end describe "#free" do it "deallocates the object" do arena = Crystal::Evented::Arena(Int32).new(32) - pointer, index1 = arena.lazy_allocate(3) { } - pointer.value = 123 + index1 = arena.allocate_at(3) { |ptr| ptr.value = 123 } arena.free(index1) { } - pointer, index2 = arena.lazy_allocate(3) { } + index2 = arena.allocate_at(3) { } index2.should_not eq(index1) - pointer.value.should eq(0) + + value = nil + arena.get(index2) { |ptr| value = ptr.value } + value.should eq(0) end it "checks generation" do arena = Crystal::Evented::Arena(Int32).new(32) + called = 0 + old_index = arena.allocate_at(1) { } - _, index1 = arena.lazy_allocate(1) { } - arena.free(index1) { called += 1 } + # can free: + arena.free(old_index) { called += 1 } called.should eq(1) - _, index2 = arena.lazy_allocate(1) { } - arena.free(index1) { called += 1 } + # can reallocate: + new_index = arena.allocate_at(1) { } + + # can't free with invalid index: + arena.free(old_index) { called += 1 } called.should eq(1) - arena.free(index2) { called += 1 } + # but new index can: + arena.free(new_index) { called += 1 } called.should eq(2) end diff --git a/src/crystal/system/unix/epoll/event_loop.cr b/src/crystal/system/unix/epoll/event_loop.cr index b209d2706aff..dc2f2052dfa2 100644 --- a/src/crystal/system/unix/epoll/event_loop.cr +++ b/src/crystal/system/unix/epoll/event_loop.cr @@ -92,22 +92,22 @@ class Crystal::Epoll::EventLoop < Crystal::Evented::EventLoop Crystal.trace :evloop, "event", fd: index.index, index: index.to_i64, events: events - pd = Evented.arena.get(index) - - if (events & (LibC::EPOLLERR | LibC::EPOLLHUP)) != 0 - pd.value.@readers.ready_all { |event| unsafe_resume_io(event) } - pd.value.@writers.ready_all { |event| unsafe_resume_io(event) } - return - end + Evented.arena.get?(index) do |pd| + if (events & (LibC::EPOLLERR | LibC::EPOLLHUP)) != 0 + pd.value.@readers.ready_all { |event| unsafe_resume_io(event) } + pd.value.@writers.ready_all { |event| unsafe_resume_io(event) } + return + end - if (events & LibC::EPOLLRDHUP) == LibC::EPOLLRDHUP - pd.value.@readers.ready_all { |event| unsafe_resume_io(event) } - elsif (events & LibC::EPOLLIN) == LibC::EPOLLIN - pd.value.@readers.ready_one { |event| unsafe_resume_io(event) } - end + if (events & LibC::EPOLLRDHUP) == LibC::EPOLLRDHUP + pd.value.@readers.ready_all { |event| unsafe_resume_io(event) } + elsif (events & LibC::EPOLLIN) == LibC::EPOLLIN + pd.value.@readers.ready_one { |event| unsafe_resume_io(event) } + end - if (events & LibC::EPOLLOUT) == LibC::EPOLLOUT - pd.value.@writers.ready_one { |event| unsafe_resume_io(event) } + if (events & LibC::EPOLLOUT) == LibC::EPOLLOUT + pd.value.@writers.ready_one { |event| unsafe_resume_io(event) } + end end end diff --git a/src/crystal/system/unix/evented/arena.cr b/src/crystal/system/unix/evented/arena.cr index ac254e23de9e..5dcb5c5c2c8c 100644 --- a/src/crystal/system/unix/evented/arena.cr +++ b/src/crystal/system/unix/evented/arena.cr @@ -23,7 +23,10 @@ # # Thread safety: the memory region is pre-allocated (up to capacity) using mmap # (virtual allocation) and pointers are never invalidated. Individual -# (de)allocations of objects are protected with a fine grained lock. +# allocation, deallocation and regular accesses are protected by a fine grained +# lock over each object: parallel accesses to the memory region are prohibited, +# and pointers are expected to not outlive the block that yielded them (don't +# capture them). # # Guarantees: `mmap` initializes the memory to zero, which means `T` objects are # initialized to zero by default, then `#free` will also clear the memory, so @@ -127,65 +130,70 @@ class Crystal::Evented::Arena(T) LibC.munmap(@buffer.to_unsafe, @buffer.bytesize) end - # Yields and allocates the object at *index* unless already allocated. - # Returns a pointer to the object at *index* and the generation index. + # Allocates the object at *index* unless already allocated, then yields a + # pointer to the object at *index* and the current generation index to later + # retrieve and free the allocated object. Eventually returns the generation + # index. # - # Permits two threads to allocate the same object in parallel yet only allow - # one to initialize it; the other one will silently receive the pointer and - # the generation index. + # Does nothing if the object has already been allocated and returns `nil`. # # There are no generational checks. # Raises if *index* is out of bounds. - def lazy_allocate(index : Int32, &) : {Pointer(T), Index} + def allocate_at?(index : Int32, & : (Pointer(T), Index) ->) : Index? entry = at(index) entry.value.@lock.sync do - pointer = entry.value.pointer - gen_index = Index.new(index, entry.value.generation) + return if entry.value.allocated? - unless entry.value.allocated? - {% unless flag?(:preview_mt) %} - @maximum = index if index > @maximum - {% end %} + {% unless flag?(:preview_mt) %} + @maximum = index if index > @maximum + {% end %} + entry.value.allocated = true - entry.value.allocated = true - yield pointer, gen_index - end + gen_index = Index.new(index, entry.value.generation) + yield entry.value.pointer, gen_index - {pointer, gen_index} + gen_index end end - # Returns a pointer to the object previously allocated at *index*. + # Same as `#allocate_at?` but raises when already allocated. + def allocate_at(index : Int32, & : (Pointer(T), Index) ->) : Index? + allocate_at?(index) { |ptr, idx| yield ptr, idx } || + raise RuntimeError.new("#{self.class.name}: already allocated index=#{index}") + end + + # Yields a pointer to the object previously allocated at *index*. # # Raises if the object isn't allocated. # Raises if the generation has changed (i.e. the object has been freed then reallocated). # Raises if *index* is negative. - def get(index : Index) : Pointer(T) - entry = at(index) - entry.value.pointer + def get(index : Index, &) : Nil + at(index) do |entry| + yield entry.value.pointer + end end - # Returns a pointer to the object previously allocated at *index*. - # Returns `nil` if the object isn't allocated or the generation has changed. + # Yields a pointer to the object previously allocated at *index* and returns + # true. + # Does nothing if the object isn't allocated or the generation has changed, + # and returns false. # # Raises if *index* is negative. - def get?(index : Index) : Pointer(T)? - if entry = at?(index) - entry.value.pointer + def get?(index : Index) : Bool + at?(index) do |entry| + yield entry.value.pointer + return true end + false end - # Yields the object previously allocated at *index* then releases it. Does - # nothing if the object isn't allocated or the generation has changed. + # Yields the object previously allocated at *index* then releases it. + # Does nothing if the object isn't allocated or the generation has changed. # # Raises if *index* is negative. def free(index : Index, &) : Nil - return unless entry = at?(index.index) - - entry.value.@lock.sync do - return unless entry.value.allocated? - return unless entry.value.generation == index.generation + at?(index) do |entry| begin yield entry.value.pointer ensure @@ -194,22 +202,31 @@ class Crystal::Evented::Arena(T) end end - private def at(index : Index) : Pointer(Entry(T)) + private def at(index : Index, &) : Nil entry = at(index.index) - unless entry.value.allocated? - raise RuntimeError.new("#{self.class.name}: object not allocated at index #{index.index}") + entry.value.@lock.lock + + unless entry.value.allocated? && entry.value.generation == index.generation + entry.value.@lock.unlock + raise RuntimeError.new("#{self.class.name}: invalid reference index=#{index.index}:#{index.generation} current=#{index.index}:#{entry.value.generation}") end - unless entry.value.generation == index.generation - raise RuntimeError.new("#{self.class.name}: object generation changed at index #{index.index} (#{index.generation} => #{entry.value.generation})") + + begin + yield entry + ensure + entry.value.@lock.unlock end - entry end - private def at?(index : Index) : Pointer(Entry(T))? + private def at?(index : Index, &) : Nil return unless entry = at?(index.index) - return unless entry.value.allocated? - return unless entry.value.generation == index.generation - entry + + entry.value.@lock.sync do + return unless entry.value.allocated? + return unless entry.value.generation == index.generation + + yield entry + end end private def at(index : Int32) : Pointer(Entry(T)) diff --git a/src/crystal/system/unix/evented/event_loop.cr b/src/crystal/system/unix/evented/event_loop.cr index 6f37c305e1f6..fc7e78d23da5 100644 --- a/src/crystal/system/unix/evented/event_loop.cr +++ b/src/crystal/system/unix/evented/event_loop.cr @@ -291,7 +291,6 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop protected def evented_close(io) return unless (index = io.__evloop_data).valid? - io.__evloop_data = Arena::INVALID_INDEX Evented.arena.free(index) do |pd| pd.value.@readers.ready_all do |event| @@ -308,7 +307,6 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop private def internal_remove(io) return unless (index = io.__evloop_data).valid? - io.__evloop_data = Arena::INVALID_INDEX Evented.arena.free(index) do |pd| pd.value.remove(io.fd) { } # ignore system error @@ -342,25 +340,36 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop end private def wait(type : Evented::Event::Type, io, timeout, &) - # get or allocate the poll descriptor + # prepare event (on the stack); we can't initialize it properly until we get + # the arena index below; we also can't use a nilable since `pointerof` would + # point to the union, not the event + event = uninitialized Evented::Event + + # add the event to the waiting list; in case we can't access or allocate the + # poll descriptor into the arena, we merely return to let the caller handle + # the situation (maybe the IO got closed?) if (index = io.__evloop_data).valid? - pd = Evented.arena.get(index) + event = Evented::Event.new(type, Fiber.current, index, timeout) + + return false unless Evented.arena.get?(index) do |pd| + yield pd, pointerof(event) + end else - pd, index = Evented.arena.lazy_allocate(io.fd) do |pd, index| + # OPTIMIZE: failing to allocate may be a simple conflict with 2 fibers + # starting to read or write on the same fd, we may want to detect any + # error situation instead of returning and retrying a syscall + return false unless Evented.arena.allocate_at?(io.fd) do |pd, index| # register the fd with the event loop (once), it should usually merely add # the fd to the current evloop but may "transfer" the ownership from # another event loop: io.__evloop_data = index pd.value.take_ownership(self, io.fd, index) + + event = Evented::Event.new(type, Fiber.current, index, timeout) + yield pd, pointerof(event) end end - # create an event (on the stack) - event = Evented::Event.new(type, Fiber.current, index, timeout) - - # add the event to the waiting list - yield pd, pointerof(event) - if event.wake_at? add_timer(pointerof(event)) @@ -457,14 +466,12 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop when .io_read? # reached read timeout: cancel io event; by rule the timer always wins, # even in case of conflict with #unsafe_resume_io we must resume the fiber - pd = Evented.arena.get(event.value.index) - pd.value.@readers.delete(event) + Evented.arena.get?(event.value.index, &.value.@readers.delete(event)) event.value.timed_out! when .io_write? # reached write timeout: cancel io event; by rule the timer always wins, # even in case of conflict with #unsafe_resume_io we must resume the fiber - pd = Evented.arena.get(event.value.index) - pd.value.@writers.delete(event) + Evented.arena.get?(event.value.index, &.value.@writers.delete(event)) event.value.timed_out! when .select_timeout? # always dequeue the event but only enqueue the fiber if we win the diff --git a/src/crystal/system/unix/evented/poll_descriptor.cr b/src/crystal/system/unix/evented/poll_descriptor.cr index e9779a71d6ab..1ef318e454bb 100644 --- a/src/crystal/system/unix/evented/poll_descriptor.cr +++ b/src/crystal/system/unix/evented/poll_descriptor.cr @@ -3,50 +3,43 @@ require "./event_loop" # Information related to the evloop for a fd, such as the read and write queues # (waiting `Event`), as well as which evloop instance currently owns the fd. # -# Thread-safe: mutations are protected with a lock. +# Thread-unsafe: parallel mutations must be protected with a lock. struct Crystal::Evented::PollDescriptor @event_loop : Evented::EventLoop? - @lock = SpinLock.new @readers = Waiters.new @writers = Waiters.new # Makes *event_loop* the new owner of *fd*. # Removes *fd* from the current event loop (if any). def take_ownership(event_loop : EventLoop, fd : Int32, index : Arena::Index) : Nil - @lock.sync do - current = @event_loop - - if event_loop == current - raise "BUG: evloop already owns the poll-descriptor for fd=#{fd}" - end - - # ensure we can't have cross enqueues after we transfer the fd, so we - # can optimize (all enqueues are local) and we don't end up with a timer - # from evloop A to cancel an event from evloop B (currently unsafe) - if current && !empty? - raise RuntimeError.new("BUG: transfering fd=#{fd} to another evloop with pending reader/writer fibers") - end - - @event_loop = event_loop - event_loop.system_add(fd, index) - current.try(&.system_del(fd, closing: false)) + current = @event_loop + + if event_loop == current + raise "BUG: evloop already owns the poll-descriptor for fd=#{fd}" + end + + # ensure we can't have cross enqueues after we transfer the fd, so we + # can optimize (all enqueues are local) and we don't end up with a timer + # from evloop A to cancel an event from evloop B (currently unsafe) + if current && !empty? + raise RuntimeError.new("BUG: transfering fd=#{fd} to another evloop with pending reader/writer fibers") end + + @event_loop = event_loop + event_loop.system_add(fd, index) + current.try(&.system_del(fd, closing: false)) end # Removes *fd* from its owner event loop. Raises on errors. def remove(fd : Int32) : Nil - @lock.sync do - current, @event_loop = @event_loop, nil - current.try(&.system_del(fd)) - end + current, @event_loop = @event_loop, nil + current.try(&.system_del(fd)) end # Same as `#remove` but yields on errors. def remove(fd : Int32, &) : Nil - @lock.sync do - current, @event_loop = @event_loop, nil - current.try(&.system_del(fd) { yield }) - end + current, @event_loop = @event_loop, nil + current.try(&.system_del(fd) { yield }) end # Returns true when there is at least one reader or writer. Returns false diff --git a/src/crystal/system/unix/evented/waiters.cr b/src/crystal/system/unix/evented/waiters.cr index bc79c5dc32c1..2d052718bae9 100644 --- a/src/crystal/system/unix/evented/waiters.cr +++ b/src/crystal/system/unix/evented/waiters.cr @@ -3,57 +3,52 @@ require "./event" # A FIFO queue of `Event` waiting on the same operation (either read or write) # for a fd. See `PollDescriptor`. # -# Thread safe: mutations are protected with a lock, and race conditions are -# handled through the ready atomic. +# Race conditions on the state of the waiting list are handled through the ready +# always ready variables. +# +# Thread unsafe: parallel mutations must be protected with a lock. struct Crystal::Evented::Waiters @list = PointerLinkedList(Event).new - @lock = SpinLock.new @ready = false @always_ready = false # Adds an event to the waiting list. May return false immediately if another # thread marked the list as ready in parallel, returns true otherwise. def add(event : Pointer(Event)) : Bool - @lock.sync do - if @always_ready - # another thread closed the fd or we received a fd error or hup event: - # the fd will never block again - return false - end - - if @ready - # another thread readied the fd before the current thread got to add - # the event: don't block and resets @ready for the next loop - @ready = false - return false - end + if @always_ready + # another thread closed the fd or we received a fd error or hup event: + # the fd will never block again + return false + end - @list.push(event) + if @ready + # another thread readied the fd before the current thread got to add + # the event: don't block and resets @ready for the next loop + @ready = false + return false end + + @list.push(event) true end def delete(event : Pointer(Event)) : Nil - @lock.sync do - @list.delete(event) if event.value.next - end + @list.delete(event) if event.value.next end # Removes one pending event or marks the list as ready when there are no # pending events (we got notified of readiness before a thread enqueued). def ready_one(& : Pointer(Event) -> Bool) : Nil - @lock.sync do - # loop until the block succesfully processes an event (it may have to - # dequeue the timeout from timers) - loop do - if event = @list.shift? - break if yield event - else - # no event queued but another thread may be waiting for the lock to - # add an event: set as ready to resolve the race condition - @ready = true - return - end + # loop until the block succesfully processes an event (it may have to + # dequeue the timeout from timers) + loop do + if event = @list.shift? + break if yield event + else + # no event queued but another thread may be waiting for the lock to + # add an event: set as ready to resolve the race condition + @ready = true + return end end end @@ -61,9 +56,7 @@ struct Crystal::Evented::Waiters # Dequeues all pending events and marks the list as always ready. This must be # called when a fd is closed or an error or hup event occurred. def ready_all(& : Pointer(Event) ->) : Nil - @lock.sync do - @list.consume_each { |event| yield event } - @always_ready = true - end + @list.consume_each { |event| yield event } + @always_ready = true end end diff --git a/src/crystal/system/unix/kqueue/event_loop.cr b/src/crystal/system/unix/kqueue/event_loop.cr index ef5d738b99c5..bf864b3f1542 100644 --- a/src/crystal/system/unix/kqueue/event_loop.cr +++ b/src/crystal/system/unix/kqueue/event_loop.cr @@ -126,30 +126,30 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop Crystal.trace :evloop, "event", fd: kevent.value.ident, index: index.to_i64, filter: kevent.value.filter, flags: kevent.value.flags, fflags: kevent.value.fflags - pd = Evented.arena.get(index) - - if (kevent.value.fflags & LibC::EV_EOF) == LibC::EV_EOF - # apparently some systems may report EOF on write with EVFILT_READ instead - # of EVFILT_WRITE, so let's wake all waiters: - pd.value.@readers.ready_all { |event| unsafe_resume_io(event) } - pd.value.@writers.ready_all { |event| unsafe_resume_io(event) } - return - end - - case kevent.value.filter - when LibC::EVFILT_READ - if (kevent.value.fflags & LibC::EV_ERROR) == LibC::EV_ERROR - # OPTIMIZE: pass errno (kevent.data) through PollDescriptor + Evented.arena.get?(index) do |pd| + if (kevent.value.fflags & LibC::EV_EOF) == LibC::EV_EOF + # apparently some systems may report EOF on write with EVFILT_READ instead + # of EVFILT_WRITE, so let's wake all waiters: pd.value.@readers.ready_all { |event| unsafe_resume_io(event) } - else - pd.value.@readers.ready_one { |event| unsafe_resume_io(event) } - end - when LibC::EVFILT_WRITE - if (kevent.value.fflags & LibC::EV_ERROR) == LibC::EV_ERROR - # OPTIMIZE: pass errno (kevent.data) through PollDescriptor pd.value.@writers.ready_all { |event| unsafe_resume_io(event) } - else - pd.value.@writers.ready_one { |event| unsafe_resume_io(event) } + return + end + + case kevent.value.filter + when LibC::EVFILT_READ + if (kevent.value.fflags & LibC::EV_ERROR) == LibC::EV_ERROR + # OPTIMIZE: pass errno (kevent.data) through PollDescriptor + pd.value.@readers.ready_all { |event| unsafe_resume_io(event) } + else + pd.value.@readers.ready_one { |event| unsafe_resume_io(event) } + end + when LibC::EVFILT_WRITE + if (kevent.value.fflags & LibC::EV_ERROR) == LibC::EV_ERROR + # OPTIMIZE: pass errno (kevent.data) through PollDescriptor + pd.value.@writers.ready_all { |event| unsafe_resume_io(event) } + else + pd.value.@writers.ready_one { |event| unsafe_resume_io(event) } + end end end end diff --git a/src/crystal/system/unix/socket.cr b/src/crystal/system/unix/socket.cr index 4aa0fc2f0af9..535f37f386c0 100644 --- a/src/crystal/system/unix/socket.cr +++ b/src/crystal/system/unix/socket.cr @@ -25,6 +25,9 @@ module Crystal::System::Socket end private def initialize_handle(fd) + {% if Crystal.has_constant?(:Evented) %} + @__evloop_data = Crystal::Evented::Arena::INVALID_INDEX + {% end %} end # Tries to bind the socket to a local address.