Skip to content

Commit

Permalink
Adding support for chunked transfers
Browse files Browse the repository at this point in the history
Chunked transfers are not handled properly and time out while the
transfer is still ongoing.
With this patchset, each messageChunk is considered a heart beat and
resetting the timeout scheduler.
  • Loading branch information
derjust committed Jan 8, 2016
1 parent 6ffe437 commit 6d9f36a
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 6 deletions.
23 changes: 22 additions & 1 deletion src/main/scala/org/caoilte/spray/routing/LogAccessRouting.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class RequestAccessLogger(ctx: RequestContext, accessLogger: AccessLogger,
import ctx._

import context.dispatcher
val cancellable:Cancellable = context.system.scheduler.scheduleOnce(requestTimeout, self, RequestLoggingTimeout)
var cancellable:Cancellable = context.system.scheduler.scheduleOnce(requestTimeout, self, RequestLoggingTimeout)
var chunkedResponse:HttpResponse = null

def receive = {
case RequestLoggingTimeout => {
Expand All @@ -55,7 +56,27 @@ class RequestAccessLogger(ctx: RequestContext, accessLogger: AccessLogger,
accessLogger.logAccess(request, response, timeStampCalculator(Unit))
forwardMsgAndStop(response)
}
case Confirmed(ChunkedResponseStart(response), sentAck) => {
// In case a chunk transfer is started, store the response for later logging
chunkedResponse = response
responder forward Confirmed(ChunkedResponseStart(response), sentAck)
}
case Confirmed(MessageChunk(data, extension), sentAck) => {
// Each junk is considered a heart-beat: We are still producing output and the client is still consuming
// - therefore reset the requestTimeout schedule
cancellable.cancel()
cancellable = context.system.scheduler.scheduleOnce(requestTimeout, self, RequestLoggingTimeout)

responder forward Confirmed(MessageChunk(data, extension), sentAck)
}
case response:ChunkedMessageEnd => {
// Handled like HttpResponse: provide the (previously saved) ChunkedResponseStart for logging and quit
cancellable.cancel()
accessLogger.logAccess(request, chunkedResponse, timeStampCalculator(Unit))
forwardMsgAndStop(response)
}
case other => {
println(s"other $other")
responder forward other
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import akka.io.{Tcp, IO}
import spray.can.Http
import scala.concurrent.{Await, Future}
import akka.pattern.ask
import spray.http.{HttpResponse, HttpRequest}
import spray.http.{ChunkedResponseStart, ChunkedMessageEnd, HttpResponse, HttpRequest}
import com.typesafe.config.ConfigFactory
import akka.util.Timeout
import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,28 @@ object FailureRoutes extends Directives {

class LogAccessRoutingTests extends FlatSpec with ScalaFutures with GivenWhenThen {

behavior of "An HTTP Server that handles a request with a 200 chunked response within the request timeout"

it should "'Log Access' with a 200 chunked response and an Access Time less than the request timeout" in {
aTestLogAccessRoutingActor(
requestTimeoutMillis = 4000,
httpServiceActorFactory = ChunkedResponseServiceActor.factory(ChunkedResponse(12, 500), PATH)) { testKit =>
import testKit._

whenReady(makeHttpCall(), timeout(Span(8, Seconds))) { s =>
assert(s.entity.asString(HttpCharsets.`UTF-8`) === "response 12\nresponse 11\nresponse 10\nresponse 9\n"
+ "response 8\nresponse 7\nresponse 6\nresponse 5\n"
+ "response 4\nresponse 3\nresponse 2\nresponse 1\n")}

expectMsgPF(8 seconds, "Expected normal log access event with chunked response delayed properties") {
case LogEvent(
HttpRequest(HttpMethods.GET,URI, _, _, _),
HttpResponse(StatusCodes.OK, _, _, _), time, LogAccess
) if time >= 500 && time <= 7000 => true
}
}
}

behavior of "An HTTP Server that handles a request with a 200 response within the request timeout"

it should "'Log Access' with a 200 response and an Access Time less than the request timeout" in {
Expand Down Expand Up @@ -194,7 +216,7 @@ class LogAccessRoutingTests extends FlatSpec with ScalaFutures with GivenWhenThe
}


implicit val TIMEOUT: Timeout = 3.second
implicit val TIMEOUT: Timeout = 10.second
val PORT = 8084
val HOST = s"http://localhost:$PORT"
val PATH = "test"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package org.caoilte.spray.routing

import akka.testkit.TestKit
import org.caoilte.spray.routing.TestAccessLogger.TestAccessLogger
import spray.http.{HttpResponse, HttpRequest, HttpEntity, ContentTypes}
import spray.http._
import spray.routing._
import akka.actor.{Props, ActorRef, Actor}
import scala.concurrent._
Expand All @@ -15,7 +14,7 @@ object TestAccessLogger {
case object LogAccess extends LogType
case object AccessAlreadyLogged extends LogType

case class LogEvent(request: HttpRequest, response: HttpResponse, time: Long, logAccessType: LogType)
case class LogEvent(request: HttpRequest, response: HttpMessagePart, time: Long, logAccessType: LogType)

class TestAccessLogger(listener:ActorRef) extends AccessLogger {
override def logAccess(request: HttpRequest, response: HttpResponse, time: Long) = {
Expand All @@ -28,6 +27,69 @@ object TestAccessLogger {
}
}

case object ChunkedResponse {
val DEFAULT_RESPONSE = "response"
}

case class ChunkedResponse(amountOfChunks: Int, thinkingMillis: Long, responseMessage:String = ChunkedResponse.DEFAULT_RESPONSE)

object ChunkedResponseServiceActor {
def factory(chunkedResponse: ChunkedResponse, path:String): (ActorRef => Props) = actorRef => {
apply(new TestAccessLogger(actorRef), chunkedResponse, path)
}
def apply(accessLogger: AccessLogger, chunkedResponse: ChunkedResponse, path:String):Props = {
Props(new ChunkedResponseServiceActor(accessLogger, chunkedResponse, path))
}
}

class ChunkedResponseServiceActor(val accessLogger: AccessLogger, chunkedResponse: ChunkedResponse, path:String)
extends HttpServiceActor with LogAccessRoutingActor {

case class RequestForChunkedResponse(ctx: RequestContext)
case class Ok(ctx: RequestContext, remainingChunks: Int)

class ChunkedResponseActor(response:ChunkedResponse) extends Actor {
import response._
def receive: Receive = {
case RequestForChunkedResponse(ctx) => {
blocking {
Thread.sleep(thinkingMillis)
}
ctx.responder ! ChunkedResponseStart(HttpResponse()).withAck(Ok(ctx, amountOfChunks))
}
case Ok(ctx, 0) => {
ctx.responder ! ChunkedMessageEnd
}
case Ok(ctx, remainingChunks) => {
blocking {
Thread.sleep(thinkingMillis)
}

ctx.responder ! MessageChunk(s"$responseMessage $remainingChunks\n").withAck(Ok(ctx, remainingChunks - 1))
}
}
}

var testAc:ActorRef = _

implicit def executionContext = actorRefFactory.dispatcher

override def preStart {
testAc = context.actorOf(Props(new ChunkedResponseActor(chunkedResponse)), "chunked-response-test-actor")
super.preStart
}

val routes:Route = {
path(path) {
get { ctx: RequestContext =>
implicit val TIMEOUT: Timeout = Timeout(chunkedResponse.thinkingMillis * 2, TimeUnit.MILLISECONDS)
testAc ! RequestForChunkedResponse(ctx)
}
}
}

override def receive = runRoute(routes)
}

case object DelayedResponse {
val DEFAULT_RESPONSE = "response"
Expand Down

0 comments on commit 6d9f36a

Please sign in to comment.