Skip to content

Commit

Permalink
[GR-19220] Add timeout to Queue#pop
Browse files Browse the repository at this point in the history
PullRequest: truffleruby/3819
  • Loading branch information
andrykonchin committed Jun 12, 2023
2 parents 6f5a242 + d0c8d48 commit c7760e6
Show file tree
Hide file tree
Showing 17 changed files with 148 additions and 146 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Compatibility:
* Add `String#bytesplice` (#3039, @itarato).
* Add `String#byteindex` and `String#byterindex` (#3039, @itarato).
* Add implementations of `rb_proc_call_with_block`, `rb_proc_call_kw`, `rb_proc_call_with_block_kw` and `rb_funcall_with_block_kw` (#3068, @andrykonchin).
* Add optional `timeout` argument to `Thread::Queue#pop` (#3039, @itarato).

Performance:

Expand Down
11 changes: 11 additions & 0 deletions lib/truffle/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,15 @@ class ThreadError < StandardError
class Thread
Queue = ::Queue
SizedQueue = ::SizedQueue

class Queue
def pop(non_block = false, timeout: nil)
Primitive.queue_pop(
self,
Primitive.as_boolean(non_block),
Truffle::QueueOperations.validate_and_prepare_timeout(non_block, timeout))
end
alias_method :shift, :pop
alias_method :deq, :pop
end
end
6 changes: 3 additions & 3 deletions lib/truffle/timeout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ def abort
end
end

@chan = Truffle::Channel.new
@chan = Queue.new

def self.watch_channel
reqs = []

loop do
begin
while reqs.empty?
req = @chan.receive
req = @chan.pop
reqs << req if req
end

Expand All @@ -101,7 +101,7 @@ def self.watch_channel
if min.left > 0
before = Time.now

new_req = @chan.receive_timeout(min.left)
new_req = @chan.pop(timeout: min.left)

slept_for = Time.now - before
else
Expand Down
7 changes: 7 additions & 0 deletions spec/ruby/core/queue/deq_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
require_relative '../../spec_helper'
require_relative '../../shared/queue/deque'
require_relative '../../shared/types/rb_num2dbl_fails'

describe "Queue#deq" do
it_behaves_like :queue_deq, :deq, -> { Queue.new }
end

describe "Queue operations with timeout" do
ruby_version_is "3.2" do
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = Queue.new; q.push(1); q.deq(timeout: v) }
end
end
7 changes: 7 additions & 0 deletions spec/ruby/core/queue/pop_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
require_relative '../../spec_helper'
require_relative '../../shared/queue/deque'
require_relative '../../shared/types/rb_num2dbl_fails'

describe "Queue#pop" do
it_behaves_like :queue_deq, :pop, -> { Queue.new }
end

describe "Queue operations with timeout" do
ruby_version_is "3.2" do
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = Queue.new; q.push(1); q.pop(timeout: v) }
end
end
7 changes: 7 additions & 0 deletions spec/ruby/core/queue/shift_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
require_relative '../../spec_helper'
require_relative '../../shared/queue/deque'
require_relative '../../shared/types/rb_num2dbl_fails'

describe "Queue#shift" do
it_behaves_like :queue_deq, :shift, -> { Queue.new }
end

describe "Queue operations with timeout" do
ruby_version_is "3.2" do
it_behaves_like :rb_num2dbl_fails, nil, -> v { q = Queue.new; q.push(1); q.shift(timeout: v) }
end
end
17 changes: 17 additions & 0 deletions spec/ruby/shared/queue/deque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@
q.send(@method).should == 1
end

it "converts false-ish for non_blocking to boolean" do
q = @object.call
q << 1
q << 2

q.send(@method, false).should == 1
q.send(@method, nil).should == 2
end

it "returns nil for a closed empty queue" do
q = @object.call
q.close
Expand Down Expand Up @@ -143,5 +152,13 @@
q.close
-> { q.send(@method, true) }.should raise_error(ThreadError)
end

it "converts true-ish non_blocking argument to true" do
q = @object.call

-> { q.send(@method, true) }.should raise_error(ThreadError)
-> { q.send(@method, 1) }.should raise_error(ThreadError)
-> { q.send(@method, "") }.should raise_error(ThreadError)
end
end
end
17 changes: 17 additions & 0 deletions spec/ruby/shared/types/rb_num2dbl_fails.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Shared tests for rb_num2dbl related conversion failures.
#
# Usage example:
# it_behaves_like :rb_num2dbl_fails, nil, -> v { o = A.new; o.foo(v) }
#

describe :rb_num2dbl_fails, shared: true do
it "fails if string is provided" do
-> { @object.call("123") }.should raise_error(TypeError, "no implicit conversion to float from string")
end

it "fails if boolean is provided" do
-> { @object.call(true) }.should raise_error(TypeError, "no implicit conversion to float from true")
-> { @object.call(false) }.should raise_error(TypeError, "no implicit conversion to float from false")
end
end
6 changes: 3 additions & 3 deletions spec/truffle/objspace/define_finalizer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@
# The assumption is that this works as expected but is just hard to test.

it "will call the finalizer" do
channel = Truffle::Channel.new
queue = Queue.new
finalizer = proc {
channel.send :finalized
queue << :finalized
}
Object.new.tap do |object|
ObjectSpace.define_finalizer object, finalizer
ObjectSpace.reachable_objects_from(object).should include(finalizer)
end
Primitive.gc_force
Truffle::Debug.drain_finalization_queue # Not needed for correctness
channel.receive_timeout(TIME_TOLERANCE).should == :finalized
queue.pop(timeout: TIME_TOLERANCE).should == :finalized
end

end
6 changes: 3 additions & 3 deletions spec/truffle/objspace/undefine_finalizer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
# See comment in define_finalizer_spec.rb

it "successfully unregisters a finalizer" do
channel = Truffle::Channel.new
queue = Queue.new
Object.new.tap do |object|
finalizer = proc {
channel.send :finalized
queue << :finalized
}
ObjectSpace.define_finalizer object, finalizer
ObjectSpace.reachable_objects_from(object).should include(finalizer)
Expand All @@ -29,7 +29,7 @@
end
Primitive.gc_force
Truffle::Debug.drain_finalization_queue # Not needed for correctness
channel.try_receive.should be_nil
queue.should.empty?
end

end
4 changes: 4 additions & 0 deletions spec/truffleruby.next-specs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ spec/ruby/core/string/bytesplice_spec.rb

spec/ruby/core/string/byteindex_spec.rb
spec/ruby/core/string/byterindex_spec.rb

spec/ruby/core/queue/deq_spec.rb
spec/ruby/core/queue/pop_spec.rb
spec/ruby/core/queue/shift_spec.rb
2 changes: 1 addition & 1 deletion src/main/java/org/truffleruby/core/CoreLibrary.java
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,6 @@ public boolean isTruffleBootMainMethod(SharedMethodInfo info) {
"/core/pre.rb",
"/core/basic_object.rb",
"/core/array.rb",
"/core/channel.rb",
"/core/configuration.rb",
"/core/false.rb",
"/core/fiber.rb",
Expand Down Expand Up @@ -1053,6 +1052,7 @@ public boolean isTruffleBootMainMethod(SharedMethodInfo info) {
"/core/posix.rb",
"/core/main.rb",
"/core/post.rb",
"/core/truffle/queue_operations.rb",
POST_BOOT_FILE
};

Expand Down
91 changes: 30 additions & 61 deletions src/main/java/org/truffleruby/core/queue/QueueNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,17 @@
import com.oracle.truffle.api.library.CachedLibrary;
import com.oracle.truffle.api.profiles.InlinedBranchProfile;
import org.truffleruby.annotations.CoreMethod;
import org.truffleruby.annotations.Primitive;
import org.truffleruby.builtins.CoreMethodArrayArgumentsNode;
import org.truffleruby.builtins.CoreMethodNode;
import org.truffleruby.annotations.CoreModule;
import org.truffleruby.builtins.NonStandard;
import org.truffleruby.builtins.PrimitiveArrayArgumentsNode;
import org.truffleruby.core.array.ArrayGuards;
import org.truffleruby.core.array.RubyArray;
import org.truffleruby.core.array.library.ArrayStoreLibrary;
import org.truffleruby.core.cast.BooleanCastWithDefaultNode;
import org.truffleruby.core.cast.ToANode;
import org.truffleruby.core.klass.RubyClass;
import org.truffleruby.language.Nil;
import org.truffleruby.language.NotProvided;
import org.truffleruby.language.RubyBaseNodeWithExecute;
import org.truffleruby.language.RubyNode;
import org.truffleruby.annotations.Visibility;
import org.truffleruby.language.control.RaiseException;
import org.truffleruby.language.objects.AllocationTracing;
Expand All @@ -34,8 +32,6 @@
import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;
import com.oracle.truffle.api.dsl.Cached;
import com.oracle.truffle.api.dsl.Cached.Exclusive;
import com.oracle.truffle.api.dsl.CreateCast;
import com.oracle.truffle.api.dsl.NodeChild;
import com.oracle.truffle.api.dsl.Specialization;

@CoreModule(value = "Queue", isClass = true)
Expand Down Expand Up @@ -73,18 +69,11 @@ protected RubyQueue push(RubyQueue self, final Object value) {

}

@CoreMethod(names = { "pop", "shift", "deq" }, optional = 1)
@NodeChild(value = "queue", type = RubyNode.class)
@NodeChild(value = "nonBlocking", type = RubyBaseNodeWithExecute.class)
public abstract static class PopNode extends CoreMethodNode {

@CreateCast("nonBlocking")
protected RubyBaseNodeWithExecute coerceToBoolean(RubyBaseNodeWithExecute nonBlocking) {
return BooleanCastWithDefaultNode.create(false, nonBlocking);
}
@Primitive(name = "queue_pop")
public abstract static class PopNode extends PrimitiveArrayArgumentsNode {

@Specialization(guards = "!nonBlocking")
protected Object popBlocking(RubyQueue self, boolean nonBlocking,
protected Object popBlocking(RubyQueue self, boolean nonBlocking, Nil timeoutMilliseconds,
@Exclusive @Cached InlinedBranchProfile closedProfile) {
final UnsizedQueue queue = self.queue;

Expand All @@ -103,8 +92,31 @@ private Object doPop(UnsizedQueue queue) {
return getContext().getThreadManager().runUntilResult(this, queue::take);
}

@Specialization(guards = "!nonBlocking")
protected Object popBlocking(RubyQueue self, boolean nonBlocking, long timeoutMilliseconds) {
final UnsizedQueue queue = self.queue;
final long deadline = System.currentTimeMillis() + timeoutMilliseconds;

return getContext().getThreadManager().runUntilResult(this, () -> {
final long currentTimeout = deadline - System.currentTimeMillis();
final Object value;

if (currentTimeout > 0) {
value = queue.poll(currentTimeout);
} else {
value = queue.poll();
}

if (value == UnsizedQueue.CLOSED || value == null) {
return nil;
} else {
return value;
}
});
}

@Specialization(guards = "nonBlocking")
protected Object popNonBlock(RubyQueue self, boolean nonBlocking,
protected Object popNonBlock(RubyQueue self, boolean nonBlocking, Nil timeoutMilliseconds,
@Exclusive @Cached InlinedBranchProfile errorProfile) {
final UnsizedQueue queue = self.queue;

Expand All @@ -120,49 +132,6 @@ protected Object popNonBlock(RubyQueue self, boolean nonBlocking,

}

@NonStandard
@CoreMethod(names = "receive_timeout", required = 1, visibility = Visibility.PRIVATE, lowerFixnum = 1)
public abstract static class ReceiveTimeoutNode extends CoreMethodArrayArgumentsNode {

@Specialization
protected Object receiveTimeout(RubyQueue self, int duration) {
return receiveTimeout(self, (double) duration);
}

@TruffleBoundary
@Specialization
protected Object receiveTimeout(RubyQueue self, double duration) {
final UnsizedQueue queue = self.queue;

final long durationInMillis = (long) (duration * 1000.0);
final long start = System.currentTimeMillis();

return getContext().getThreadManager().runUntilResult(this, () -> {
long now = System.currentTimeMillis();
long waited = now - start;
if (waited >= durationInMillis) {
// Try again to make sure we at least tried once
final Object result = queue.poll();
return translateResult(result);
}

final Object result = queue.poll(durationInMillis);
return translateResult(result);
});
}

private Object translateResult(Object result) {
if (result == null) {
return false;
} else if (result == UnsizedQueue.CLOSED) {
return nil;
} else {
return result;
}
}

}

@CoreMethod(names = "empty?")
public abstract static class EmptyNode extends CoreMethodArrayArgumentsNode {

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/org/truffleruby/core/thread/ThreadManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -564,14 +564,14 @@ public Object apply(State state) {
}
}

/** Runs {@code action} until it returns a non-null value. The given action should throw an
* {@link InterruptedException} when {@link Thread#interrupt()} is called. Otherwise, the {@link SafepointManager}
* will not be able to interrupt this action. See {@link ThreadNodes.ThreadRunBlockingSystemCallNode} for blocking
* native calls. If the action throws an {@link InterruptedException}, it will be retried until it returns a
* non-null value.
/** Runs {@code action} until it returns a value (IOW, until it does not throw {@link InterruptedException}). The
* given action should throw an {@link InterruptedException} when {@link Thread#interrupt()} is called. Otherwise,
* the {@link SafepointManager} will not be able to interrupt this action. See
* {@link ThreadNodes.ThreadRunBlockingSystemCallNode} for blocking native calls. If the action throws an
* {@link InterruptedException}, it will be retried until it returns a non-null value.
*
* @param action must not touch any Ruby state
* @return the first non-null return value from {@code action} */
* @return the return value from {@code action} */
@TruffleBoundary
public <T> T runUntilResult(Node currentNode, BlockingAction<T> action) {
return runUntilResult(currentNode, action, null, null);
Expand Down
Loading

0 comments on commit c7760e6

Please sign in to comment.