Skip to content

Commit

Permalink
Remove sendMessageReliablySync; callers can wait themselves.
Browse files Browse the repository at this point in the history
This makes the waiting more explicit.
  • Loading branch information
JoshRosen committed Aug 6, 2014
1 parent c01c450 commit a2f745c
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import scala.collection.mutable.SynchronizedQueue
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Try

import org.apache.spark._
import org.apache.spark.util.{SystemClock, Utils}
Expand Down Expand Up @@ -849,11 +848,6 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
promise.future
}

def sendMessageReliablySync(connectionManagerId: ConnectionManagerId,
message: Message): Try[Message] = {
Try(Await.result(sendMessageReliably(connectionManagerId, message), Duration.Inf))
}

def onReceiveMessage(callback: (Message, ConnectionManagerId) => Option[Message]) {
onReceiveCallback = callback
}
Expand Down Expand Up @@ -911,7 +905,7 @@ private[spark] object ConnectionManager {

(0 until count).map(i => {
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
manager.sendMessageReliablySync(manager.id, bufferMessage)
Await.result(manager.sendMessageReliably(manager.id, bufferMessage), Duration.Inf)
})
println("--------------------------")
println()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ package org.apache.spark.network
import java.nio.ByteBuffer
import org.apache.spark.{SecurityManager, SparkConf}

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Try

private[spark] object SenderTest {
def main(args: Array[String]) {

Expand Down Expand Up @@ -51,7 +55,8 @@ private[spark] object SenderTest {
val dataMessage = Message.createBufferMessage(buffer.duplicate)
val startTime = System.currentTimeMillis
/* println("Started timer at " + startTime) */
val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)
val promise = manager.sendMessageReliably(targetConnectionManagerId, dataMessage)
val responseStr: String = Try(Await.result(promise, Duration.Inf))
.map { response =>
val buffer = response.asInstanceOf[BufferMessage].buffers(0)
new String(buffer.array, "utf-8")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import org.apache.spark.Logging
import org.apache.spark.network._
import org.apache.spark.util.Utils

import scala.util.{Failure, Success}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}

/**
* A network interface for BlockManager. Each slave should have one
Expand Down Expand Up @@ -117,8 +119,8 @@ private[spark] object BlockManagerWorker extends Logging {
val connectionManager = blockManager.connectionManager
val blockMessage = BlockMessage.fromPutBlock(msg)
val blockMessageArray = new BlockMessageArray(blockMessage)
val resultMessage = connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage)
val resultMessage = Try(Await.result(connectionManager.sendMessageReliably(
toConnManagerId, blockMessageArray.toBufferMessage), Duration.Inf))
resultMessage.isSuccess
}

Expand All @@ -127,8 +129,8 @@ private[spark] object BlockManagerWorker extends Logging {
val connectionManager = blockManager.connectionManager
val blockMessage = BlockMessage.fromGetBlock(msg)
val blockMessageArray = new BlockMessageArray(blockMessage)
val responseMessage = connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage)
val responseMessage = Try(Await.result(connectionManager.sendMessageReliably(
toConnManagerId, blockMessageArray.toBufferMessage), Duration.Inf))
responseMessage match {
case Success(message) => {
val bufferMessage = message.asInstanceOf[BufferMessage]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ConnectionManagerSuite extends FunSuite {
buffer.flip

val bufferMessage = Message.createBufferMessage(buffer.duplicate)
manager.sendMessageReliablySync(manager.id, bufferMessage)
Await.ready(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds)

assert(receivedMessage == true)

Expand Down Expand Up @@ -80,7 +80,7 @@ class ConnectionManagerSuite extends FunSuite {

(0 until count).map(i => {
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
manager.sendMessageReliablySync(managerServer.id, bufferMessage)
Await.ready(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds)
})

assert(numReceivedServerMessages == 10)
Expand Down Expand Up @@ -119,7 +119,7 @@ class ConnectionManagerSuite extends FunSuite {
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
manager.sendMessageReliablySync(managerServer.id, bufferMessage)
Await.result(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds)

assert(numReceivedServerMessages == 0)
assert(numReceivedMessages == 0)
Expand Down

0 comments on commit a2f745c

Please sign in to comment.