Skip to content

Commit

Permalink
WIP: mt-safe runnables queue scheduler + fiber.resumeable (x86_64 only)
Browse files Browse the repository at this point in the history
  • Loading branch information
ysbaddaden committed Oct 3, 2018
1 parent b3cfc35 commit b067a0f
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 30 deletions.
46 changes: 37 additions & 9 deletions src/crystal/scheduler.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require "./event_loop"
require "fiber"
require "thread"
require "./event_loop"
require "./scheduler/*"

# :nodoc:
#
Expand Down Expand Up @@ -45,27 +46,43 @@ class Crystal::Scheduler
# :nodoc:
def initialize(@main : Fiber)
@current = @main
@runnables = Deque(Fiber).new
@runnables = Queue(Fiber).new
end

protected def enqueue(fiber : Fiber) : Nil
@runnables << fiber
@runnables.push_bottom(fiber)
end

protected def enqueue(fibers : Enumerable(Fiber)) : Nil
@runnables.concat fibers
fibers.each { |fiber| enqueue(fiber) }
end

protected def resume(fiber : Fiber) : Nil
spin_lock { fiber.resumeable? }
current, @current = @current, fiber
GC.stack_bottom = fiber.@stack_bottom
Fiber.swapcontext(pointerof(current.@stack_top), fiber.@stack_top)
fiber.register_gc_stack
Fiber.swapcontext(pointerof(current.@context), pointerof(fiber.@context))
end

private def spin_lock : Nil
# 1. first check always succeeds with a single thread:
until yield
# 2. constant-time retries to avoid a thread context switch:
100.times { return if yield }

# 3. give up, yield CPU time to another thread:
LibC.sched_yield
end
end

protected def reschedule : Nil
if runnable = @runnables.shift?
runnable.resume
if fiber = @runnables.pop_bottom
resume(fiber)
else
# TODO: resume @main, to save the context of the current fiber (and not
# block another thread from resuming it) and let @main steal fibers
# from other schedulers and eventually park the thread.
# TODO: let a dedicated thread handle the event loop
Crystal::EventLoop.resume
end
end
Expand All @@ -76,7 +93,18 @@ class Crystal::Scheduler
end

protected def yield : Nil
sleep(0.seconds)
fiber = @runnables.pop_bottom
enqueue(@current)

if fiber
resume(fiber)
else
# TODO: resume @main, to save the context of the current fiber (and not
# block another thread from resuming it) and let @main steal fibers
# from other schedulers and eventually park the thread.
# TODO: let a dedicated thread handle the event loop
Crystal::EventLoop.resume
end
end

protected def yield(fiber : Fiber) : Nil
Expand Down
116 changes: 116 additions & 0 deletions src/crystal/scheduler/queue.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# :nodoc:
class Crystal::Scheduler
# :nodoc:
#
# Crystal only accepts simple primitives such as Atomic(T). This struct wraps
# a 64-bit struct as an Atomic(UInt64) and transparently casts from and to
# UInt64 and T.
struct AtomicRef8(T)
def initialize(value : T)
# TODO: raise a compile-time error unless sizeof(T) == 8
@atomic = Atomic(UInt64).new(value.unsafe_as(UInt64))
end

def get : T
@atomic.get.unsafe_as(T)
end

def set(value : T)
@atomic.set(value.unsafe_as(UInt64))
end

def compare_and_set(cmp : T, new : T) : Bool
_, success = @atomic.compare_and_set(cmp.unsafe_as(UInt64), new.as(UInt64))
success
end
end

# :nodoc:
#
# Thread-safe non-blocking queue for work-stealing schedulers.
#
# Based on:
#
# - "Scheduling Multithreaded Computations by Work Stealing" (2001) by
# Nimar S. Arora, Robert D. Blumofe and C. Greg Plaxton.
#
# - "Verification of a Concurrent Deque Implementation" (1999) by
# Robert D. Blumofe, C. Greg Plaxton and Sandip Ray.
struct Queue(T)
# :nodoc:
record Age, tag : Int32, top : Int32

# :nodoc:
SIZE = 4 * 1024 * 1024 * 1024

def initialize
@bot = Atomic(Int32).new(0)
@age = AtomicRef8(Age).new(Age.new(0, 0))

prot = LibC::PROT_READ | LibC::PROT_WRITE
flags = LibC::MAP_ANONYMOUS | LibC::MAP_PRIVATE
buf = LibC.mmap(nil, SIZE, prot, flags, -1, 0)
raise Errno.new("mmap") if buf == LibC::MAP_FAILED
@deq = buf
end

def free
LibC.munmap(@deq, SIZE)
end

# Pushes an item to the tail of the queue. Not thread-safe and must be
# called from the thread that owns the queue.
def push_bottom(item : T) : Nil
bot = @bot.get
@deq[bot] = item
@bot.set(bot + 1)
end

# Pops an item from the tail of the queue. Not thread-safe and must be
# called from the thread that owns the queue.
def pop_bottom : T?
bot = @bot.get
return if bot == 0

bot -= 1
@bot.set(bot)

item = @deq[bot]
old_age = @age.get
return item if bot > old_age.top

@bot.set(0)
new_age = Age.new(old_age.tag + 1, 0)

if bot == old_age.top
_, success = @age.compare_and_set(old_age, new_age)
return item if success
end

@age.set(new_age)
nil
end

# Pops an item from the head of the queue. Thread-safe and should be called
# from threads that don't own the queue (i.e. stealing threads).
def pop_top : T?
old_age = @age.get
bot = @bot.get
return if bot <= old_age.top

item = @seq[old_age.top]
new_age = Age.new(old_age.tag, old_age.top + 1)

_, success = @age.compare_and_set(old_age, new_age)
return item if success
end

# Lazily returns the queue size. It may be equal or slightly more or less
# than the actual size.
def lazy_size : Int32
bot = @bot.get
age = @age.get
bot - age.top
end
end
end
57 changes: 44 additions & 13 deletions src/fiber.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ require "thread/linked_list"
# context to another one. There are two methods: `Fiber#makecontext` and
# `Fiber.swapcontext`.
#
# - `Fiber.swapcontext(current_stack_ptr : Void**, dest_stack_ptr : Void*)
# - `Fiber.swapcontext(current_context : Fiber::Context*, new_context : Fiber::Context*)
#
# A fiber context switch in Crystal is achieved by calling a symbol (which
# must never be inlined) that will push the callee-saved registers (sometimes
Expand All @@ -29,9 +29,9 @@ require "thread/linked_list"
# - `Fiber#makecontext(stack_ptr : Void*, fiber_main : Fiber ->))`
#
# `makecontext` is responsible to reserve and initialize space on the stack
# for the initial context and save the initial `@stack_top` pointer. The first
# time a fiber is resumed, the `fiber_main` proc must be called, passing
# `self` as its first argument.
# for the initial context and to save the initial `@context.stack_top`
# pointer. The first time a fiber is resumed, the `fiber_main` proc must be
# called, passing `self` as its first argument.
require "./fiber/*"

# :nodoc:
Expand All @@ -42,16 +42,31 @@ fun _fiber_get_stack_top : Void*
end

class Fiber
# :nodoc:
#
# The arch-specific make/swapcontext assembly relies on the Fiber::Context
# struct and expects the following layout. Avoid moving the struct properties
# as it would require to update all the make/swapcontext implementations.
@[Extern]
struct Context
property resumeable : LibC::Long
property stack_top : Void*

def initialize
@resumeable = 0
@stack_top = Pointer(Void).null
end
end

STACK_SIZE = 8 * 1024 * 1024

@@fibers = Thread::LinkedList(Fiber).new
@@stack_pool = [] of Void*

@stack : Void*
@resume_event : Crystal::Event?
@stack_top = Pointer(Void).null
protected property stack_top : Void*
@context = Context.new
protected property stack_bottom : Void*
@resume_event : Crystal::Event?
property name : String?

# :nodoc:
Expand All @@ -60,6 +75,14 @@ class Fiber
# :nodoc:
property previous : Fiber?

def resumeable? : Bool
@context.resumeable == 1
end

def running? : Bool
@context.resumeable == 0
end

# :nodoc:
def self.inactive(fiber : Fiber)
@@fibers.delete(fiber)
Expand All @@ -86,7 +109,7 @@ class Fiber
def initialize
@proc = Proc(Void).new { }
@stack = Pointer(Void).null
@stack_top = _fiber_get_stack_top
@context.stack_top = _fiber_get_stack_top
@stack_bottom = GC.stack_bottom
@name = "main"

Expand Down Expand Up @@ -177,17 +200,25 @@ class Fiber
to_s(io)
end

# Push the used section of the stack
protected def push_gc_roots
# Push the used section of the stack
GC.push_stack @stack_top, @stack_bottom
GC.push_stack @context.stack_top, @stack_bottom
end

# :nodoc:
#
# Registers the current fiber stack to the GC as the stack that the current
# thread is running.
def register_gc_stack : Nil
GC.stack_bottom = fiber.@stack_bottom
# thread = Thread.current
# GC.register_altstack(thread.stack_bottom, thread.stack_size, @stack_bottom, STACK_SIZE)
end

# pushes the stack of pending fibers when the GC wants to collect memory:
GC.before_collect do
current = Fiber.current

@@fibers.unsafe_each do |fiber|
fiber.push_gc_roots unless fiber == current
fiber.push_gc_roots unless fiber.running?
end
end
end
18 changes: 10 additions & 8 deletions src/fiber/x86_64.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class Fiber
# :nodoc:
def makecontext(stack_ptr, fiber_main)
# in x86-64, the context switch push/pop 7 registers
@stack_top = (stack_ptr - 7).as(Void*)
@context.stack_top = (stack_ptr - 7).as(Void*)

stack_ptr[0] = fiber_main.pointer # initial `resume` will `ret` to this address
stack_ptr[-1] = self.as(Void*) # this will be `pop` into %rdi (first argument)
Expand All @@ -15,23 +15,25 @@ class Fiber
@[Naked]
def self.swapcontext(current, to) : Nil
asm("
pushq %rdi
pushq %rbx
pushq %rdi // push argument for initial resume
pushq %rbx // push callee-saved registers on the stack
pushq %rbp
pushq %r12
pushq %r13
pushq %r14
pushq %r15
movq %rsp, ($0)
movq %rsp, 8(%rdi) // @src_context.stack_top = SP
movl $1, 0(%rdi) // @src_context.resumeable = 1
movq $1, %rsp
popq %r15
movl $0, 0(%rsi) // @dst_context.resumeable = 0
movq 8(%rsi), %rsp // SP = @dst_context.stack_top
popq %r15 // pop callee-saved registers from the stack
popq %r14
popq %r13
popq %r12
popq %rbp
popq %rbx
popq %rdi"
:: "r"(current), "r"(to))
popq %rdi // pop argument for initial resume
"
end
end

0 comments on commit b067a0f

Please sign in to comment.