Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix list blocking operations #21

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/scala/Commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ trait CommandProcessing extends Actor {
* the command's destination (usually a ClientNode sending a
* Command).
*/
def respond(response: Any): Unit = {
def respond(response: Any, currentCommand: Command = command): Unit = {
if (debug) {
def path(x: ActorRef): String = x.path.toString.replace("akka://curiodb/user/", "")
println(Seq(
Expand All @@ -302,7 +302,7 @@ trait CommandProcessing extends Actor {
).mkString("\n"))
}
if (response != ()) {
command.client.foreach {client => client ! Response(response, command.id)}
currentCommand.client.foreach {client => client ! Response(response, currentCommand.id)}
}
}

Expand Down
22 changes: 14 additions & 8 deletions src/main/scala/Data.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package curiodb

import akka.actor.{Cancellable, ActorRef}
import com.dictiography.collections.{IndexedTreeMap, IndexedTreeSet}
import net.agkn.hll.HLL
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -183,7 +184,7 @@ class ListNode extends Node[mutable.ListBuffer[String]] {
/**
* Set of blocked Command instances awaiting a response.
*/
var blocked = mutable.LinkedHashSet[Command]()
val blocked = mutable.LinkedHashMap[Command, Option[Cancellable]]()

/**
* Called on each of the blocking commands, storing the received
Expand All @@ -193,11 +194,15 @@ class ListNode extends Node[mutable.ListBuffer[String]] {
*/
def block(): Any = {
if (value.isEmpty) {
blocked += command
context.system.scheduler.scheduleOnce(args.last.toInt seconds) {
blocked -= command
respond(null)
}
val timeout = args.last.toInt
val cancellable =
if (timeout > 0)
Some(context.system.scheduler.scheduleOnce(timeout seconds) {
blocked -= command
respond(null)
})
else None
blocked += (command -> cancellable)
()
} else run(command.name.tail) // Run the non-blocking version.
}
Expand All @@ -211,9 +216,10 @@ class ListNode extends Node[mutable.ListBuffer[String]] {
while (value.nonEmpty && blocked.nonEmpty) {
// Set the node's current Command to the blocked Command, so
// that the run method has access to the correct Command.
command = blocked.head
val (command, cancellable) = blocked.head
cancellable.map(_.cancel())
blocked -= command
respond(run(command.name.tail))
respond(run(command.name.tail), command)
}
result
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/scala/System.scala
Original file line number Diff line number Diff line change
Expand Up @@ -901,9 +901,12 @@ abstract class ClientNode extends Node[Null] with PubSubClient with AggregateCom
*/
def routeWithTimeout(command: Command) = {
streaming = command.streaming
if (!streaming && commandTimeout > 0) {
val timeout = if(command.name == "BLPOP" || command.name == "BRPOP"
|| command.name == "BRPOPLPUSH") 0
else commandTimeout
if (!streaming && timeout > 0) {
val id = command.id
context.system.scheduler.scheduleOnce(commandTimeout milliseconds) {
context.system.scheduler.scheduleOnce(timeout milliseconds) {
commands.remove(id) match {
case Some(c) => respondFinal(ErrorReply(s"Timeout on $c"))
case None =>
Expand Down