Skip to content

Commit

Permalink
Fix: prohibit parallel access to arena objects
Browse files Browse the repository at this point in the history
Fixes race conditions in the evloop when another thread closes the fd,
and thus frees the poll descriptor (waitlists):

- After waiting for events and processing the events: processing events
  and timers would fail to get the index and raise an exception.

- After trying to read or write but before we could actually get the
  poll descriptor: we'd fail to get the index and raise an exception.

- After trying to read/write and after we allocated the poll descriptor,
  but before we could wait: we'd be adding to a freed poll descriptor.

- Resetting the arena index on IO (__evloop_data) allowed a parallel
  wait to re-allocate the poll descriptor (despite the IO being closed).

- ...

Preventing parallel accesses to an arena object may be a bit drastic,
but at least we know for sure that we don't have any parallelism issue.

It also allows to drop the lock on poll descriptor as well as on each
waiting list: we don't need them anymore. A disadvantage is that
parallel read/write/close and evloop process event/timeout may
conflict... though they already competed for the 3 aforementioned locks.
  • Loading branch information
ysbaddaden committed Oct 17, 2024
1 parent 375253c commit 6985080
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 214 deletions.
168 changes: 111 additions & 57 deletions spec/std/crystal/evented/arena_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,159 +4,213 @@ require "spec"
require "../../../../src/crystal/system/unix/evented/arena"

describe Crystal::Evented::Arena do
describe "#lazy_allocate" do
it "yields block once" do
describe "#allocate_at?" do
it "yields block when not allocated" do
arena = Crystal::Evented::Arena(Int32).new(32)
pointer = nil
index = nil
called = 0

ptr1, idx1 = arena.lazy_allocate(0) do |ptr, idx|
ret = arena.allocate_at?(0) do |ptr, idx|
pointer = ptr
index = idx
called += 1
end
ret.should eq(index)
called.should eq(1)

ptr2, idx2 = arena.lazy_allocate(0) do |ptr, idx|
called += 1
end
ret = arena.allocate_at?(0) { called += 1 }
ret.should be_nil
called.should eq(1)

pointer.should_not be_nil
index.should_not be_nil

ptr1.should eq(pointer)
idx1.should eq(index)

ptr2.should eq(pointer)
idx2.should eq(index)
arena.get(index.not_nil!) do |ptr|
ptr.should eq(pointer)
end
end

it "allocates up to capacity" do
arena = Crystal::Evented::Arena(Int32).new(32)
indexes = [] of Crystal::Evented::Arena::Index

objects = 32.times.map do |i|
arena.lazy_allocate(i) { |pointer| pointer.value = i }
end
objects.each do |(pointer, index)|
arena.get(index).should eq(pointer)
pointer.value.should eq(index.index)
indexes = 32.times.map do |i|
arena.allocate_at?(i) { |ptr, _| ptr.value = i }
end.to_a

indexes.size.should eq(32)

indexes.each do |index|
arena.get(index.not_nil!) do |pointer|
pointer.should eq(pointer)
pointer.value.should eq(index.not_nil!.index)
end
end
end

it "checks bounds" do
arena = Crystal::Evented::Arena(Int32).new(32)
expect_raises(IndexError) { arena.lazy_allocate(-1) { } }
expect_raises(IndexError) { arena.lazy_allocate(33) { } }
expect_raises(IndexError) { arena.allocate_at?(-1) { } }
expect_raises(IndexError) { arena.allocate_at?(33) { } }
end
end

describe "#get" do
it "returns previously allocated object" do
arena = Crystal::Evented::Arena(Int32).new(32)
pointer, index = arena.lazy_allocate(30) { |ptr| ptr.value = 654321 }
pointer = nil

index = arena.allocate_at(30) do |ptr|
pointer = ptr
ptr.value = 654321
end
called = 0

2.times do
ptr = arena.get(index)
ptr.should eq(pointer)
ptr.value.should eq(654321)
arena.get(index.not_nil!) do |ptr|
ptr.should eq(pointer)
ptr.value.should eq(654321)
called += 1
end
end
called.should eq(2)
end

it "can't access unallocated object" do
arena = Crystal::Evented::Arena(Int32).new(32)

# not allocated:
expect_raises(RuntimeError) do
arena.get(Crystal::Evented::Arena::Index.new(10, 0))
arena.get(Crystal::Evented::Arena::Index.new(10, 0)) { }
end
end

it "checks generation" do
arena = Crystal::Evented::Arena(Int32).new(32)
called = 0

_, index1 = arena.lazy_allocate(2) { called += 1 }
index1 = arena.allocate_at(2) { called += 1 }
called.should eq(1)

arena.free(index1) { }
expect_raises(RuntimeError) { arena.get(index1) }
expect_raises(RuntimeError) { arena.get(index1) { } }

_, index2 = arena.lazy_allocate(2) { called += 1 }
index2 = arena.allocate_at(2) { called += 1 }
called.should eq(2)
expect_raises(RuntimeError) { arena.get(index1) }
expect_raises(RuntimeError) { arena.get(index1) { } }

# doesn't raise:
arena.get(index2)
arena.get(index2) { }
end

it "checks out of bounds" do
arena = Crystal::Evented::Arena(Int32).new(32)
expect_raises(IndexError) { arena.get(Crystal::Evented::Arena::Index.new(-1, 0)) }
expect_raises(IndexError) { arena.get(Crystal::Evented::Arena::Index.new(33, 0)) }
expect_raises(IndexError) { arena.get(Crystal::Evented::Arena::Index.new(-1, 0)) { } }
expect_raises(IndexError) { arena.get(Crystal::Evented::Arena::Index.new(33, 0)) { } }
end
end

describe "#get?" do
it "returns previously allocated object" do
arena = Crystal::Evented::Arena(Int32).new(32)
pointer, index = arena.lazy_allocate(30) { |ptr| ptr.value = 654321 }
pointer = nil

index = arena.allocate_at(30) do |ptr|
pointer = ptr
ptr.value = 654321
end

called = 0
2.times do
ptr = arena.get?(index)
ptr.should eq(pointer)
ptr.not_nil!.value.should eq(654321)
ret = arena.get?(index) do |ptr|
ptr.should eq(pointer)
ptr.not_nil!.value.should eq(654321)
called += 1
end
ret.should be_true
end
called.should eq(2)
end

arena.get?(Crystal::Evented::Arena::Index.new(10, 0)).should be_nil
it "can't access unallocated index" do
arena = Crystal::Evented::Arena(Int32).new(32)

called = 0
ret = arena.get?(Crystal::Evented::Arena::Index.new(10, 0)) { called += 1 }
ret.should be_false
called.should eq(0)
end

it "checks generation" do
arena = Crystal::Evented::Arena(Int32).new(32)
called = 0

pointer1, index1 = arena.lazy_allocate(2) { called += 1 }
called.should eq(1)
old_index = arena.allocate_at(2) { }
arena.free(old_index) { }

arena.free(index1) { }
arena.get?(index1).should be_nil
# not accessible after free:
ret = arena.get?(old_index) { called += 1 }
ret.should be_false
called.should eq(0)

pointer2, index2 = arena.lazy_allocate(2) { called += 1 }
called.should eq(2)
arena.get?(index1).should be_nil
arena.get?(index2).should eq(pointer2)
# can be reallocated:
new_index = arena.allocate_at(2) { }

# still not accessible after reallocate:
ret = arena.get?(old_index) { called += 1 }
ret.should be_false
called.should eq(0)

# accessible after reallocate (new index):
ret = arena.get?(new_index) { called += 1 }
ret.should be_true
called.should eq(1)
end

it "checks out of bounds" do
arena = Crystal::Evented::Arena(Int32).new(32)
arena.get?(Crystal::Evented::Arena::Index.new(-1, 0)).should be_nil
arena.get?(Crystal::Evented::Arena::Index.new(33, 0)).should be_nil
called = 0

arena.get?(Crystal::Evented::Arena::Index.new(-1, 0)) { called += 1 }.should be_false
arena.get?(Crystal::Evented::Arena::Index.new(33, 0)) { called += 1 }.should be_false

called.should eq(0)
end
end

describe "#free" do
it "deallocates the object" do
arena = Crystal::Evented::Arena(Int32).new(32)
pointer, index1 = arena.lazy_allocate(3) { }
pointer.value = 123

index1 = arena.allocate_at(3) { |ptr| ptr.value = 123 }
arena.free(index1) { }

pointer, index2 = arena.lazy_allocate(3) { }
index2 = arena.allocate_at(3) { }
index2.should_not eq(index1)
pointer.value.should eq(0)

value = nil
arena.get(index2) { |ptr| value = ptr.value }
value.should eq(0)
end

it "checks generation" do
arena = Crystal::Evented::Arena(Int32).new(32)

called = 0
old_index = arena.allocate_at(1) { }

_, index1 = arena.lazy_allocate(1) { }
arena.free(index1) { called += 1 }
# can free:
arena.free(old_index) { called += 1 }
called.should eq(1)

_, index2 = arena.lazy_allocate(1) { }
arena.free(index1) { called += 1 }
# can reallocate:
new_index = arena.allocate_at(1) { }

# can't free with invalid index:
arena.free(old_index) { called += 1 }
called.should eq(1)

arena.free(index2) { called += 1 }
# but new index can:
arena.free(new_index) { called += 1 }
called.should eq(2)
end

Expand Down
28 changes: 14 additions & 14 deletions src/crystal/system/unix/epoll/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,22 @@ class Crystal::Epoll::EventLoop < Crystal::Evented::EventLoop

Crystal.trace :evloop, "event", fd: index.index, index: index.to_i64, events: events

pd = Evented.arena.get(index)

if (events & (LibC::EPOLLERR | LibC::EPOLLHUP)) != 0
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) }
pd.value.@writers.ready_all { |event| unsafe_resume_io(event) }
return
end
Evented.arena.get?(index) do |pd|
if (events & (LibC::EPOLLERR | LibC::EPOLLHUP)) != 0
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) }
pd.value.@writers.ready_all { |event| unsafe_resume_io(event) }
return
end

if (events & LibC::EPOLLRDHUP) == LibC::EPOLLRDHUP
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) }
elsif (events & LibC::EPOLLIN) == LibC::EPOLLIN
pd.value.@readers.ready_one { |event| unsafe_resume_io(event) }
end
if (events & LibC::EPOLLRDHUP) == LibC::EPOLLRDHUP
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) }
elsif (events & LibC::EPOLLIN) == LibC::EPOLLIN
pd.value.@readers.ready_one { |event| unsafe_resume_io(event) }
end

if (events & LibC::EPOLLOUT) == LibC::EPOLLOUT
pd.value.@writers.ready_one { |event| unsafe_resume_io(event) }
if (events & LibC::EPOLLOUT) == LibC::EPOLLOUT
pd.value.@writers.ready_one { |event| unsafe_resume_io(event) }
end
end
end

Expand Down
Loading

0 comments on commit 6985080

Please sign in to comment.