Skip to content

Commit

Permalink
Run the Kitura router on a DispatchQueue instead of an ELT
Browse files Browse the repository at this point in the history
  • Loading branch information
Pushkar N Kulkarni committed Aug 27, 2018
1 parent 1066775 commit 4569fcb
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 52 deletions.
19 changes: 9 additions & 10 deletions Sources/KituraNet/HTTP/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import NIOHTTP1
import NIOWebSocket
import LoggerAPI
import Foundation
import Dispatch

public class HTTPHandler: ChannelInboundHandler {

Expand Down Expand Up @@ -72,12 +73,13 @@ public class HTTPHandler: ChannelInboundHandler {
serverRequest.buffer!.byteBuffer.write(buffer: &buffer)
}
case .end(_):
serverResponse = HTTPServerResponse(ctx: ctx, handler: self)
serverResponse = HTTPServerResponse(channel: ctx.channel, handler: self)
//Make sure we use the latest delegate registered with the server
if let delegate = server.delegate {
Monitor.delegate?.started(request: serverRequest, response: serverResponse)
delegate.handle(request: serverRequest, response: serverResponse)
} //TODO: failure path
DispatchQueue.global().async {
let delegate = self.server.delegate ?? HTTPDummyServerDelegate()
Monitor.delegate?.started(request: self.serverRequest, response: self.serverResponse)
delegate.handle(request: self.serverRequest, response: self.serverResponse)
}
}
}

Expand Down Expand Up @@ -110,16 +112,13 @@ public class HTTPHandler: ChannelInboundHandler {
let target = server.latestWebSocketURI ?? "/<unknown>"
message = "No service has been registered for the path \(target)"
default:
if error is HTTPParserError {
break
}
// Don't handle any other errors for now!
// Don't handle any other errors, including `HTTPParserError`s
return
}

do {
serverResponse = HTTPServerResponse(channel: ctx.channel, handler: self)
errorResponseSent = true
serverResponse = HTTPServerResponse(ctx: ctx, handler: self)
try serverResponse.end(with: .badRequest, message: message)
} catch {
Log.error("Failed to send error response")
Expand Down
4 changes: 2 additions & 2 deletions Sources/KituraNet/HTTP/HTTPServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public class HTTPServer : Server {
_ = channel.pipeline.remove(handler: httpHandler)
})
return channel.pipeline.add(handler: IdleStateHandler(allTimeout: TimeAmount.seconds(Int(HTTPHandler.keepAliveTimeout)))).then {
return channel.pipeline.configureHTTPServerPipeline(withServerUpgrade: config).then { () -> EventLoopFuture<Void> in
return channel.pipeline.configureHTTPServerPipeline(withServerUpgrade: config, withErrorHandling: true).then { () -> EventLoopFuture<Void> in
if let sslContext = self.sslContext {
_ = channel.pipeline.add(handler: try! OpenSSLServerHandler(context: sslContext), first: true)
}
Expand Down Expand Up @@ -275,7 +275,7 @@ public class HTTPServer : Server {
}
}

private class HTTPDummyServerDelegate: ServerDelegate {
class HTTPDummyServerDelegate: ServerDelegate {
/// Handle new incoming requests to the server
///
/// - Parameter request: The ServerRequest class instance for working with this request.
Expand Down
69 changes: 34 additions & 35 deletions Sources/KituraNet/HTTP/HTTPServerResponse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import Foundation
/// responses via the HTTP protocol.
public class HTTPServerResponse: ServerResponse {

/// The channel handler context on which an HTTP response should be written
private weak var ctx: ChannelHandlerContext?
/// The channel to which the HTTP response should be written
private weak var channel: Channel?

/// The handler that processed the HTTP request
private weak var handler: HTTPHandler?
Expand Down Expand Up @@ -54,8 +54,8 @@ public class HTTPServerResponse: ServerResponse {
/// The data to be written as a part of the response.
private var buffer: ByteBuffer?

init(ctx: ChannelHandlerContext, handler: HTTPHandler) {
self.ctx = ctx
init(channel: Channel, handler: HTTPHandler) {
self.channel = channel
self.handler = handler
let httpVersionMajor = handler.serverRequest?.httpVersionMajor ?? 0
let httpVersionMinor = handler.serverRequest?.httpVersionMinor ?? 0
Expand All @@ -67,16 +67,16 @@ public class HTTPServerResponse: ServerResponse {
///
/// - Parameter from: String data to be written.
public func write(from string: String) throws {
guard let ctx = ctx else {
fatalError("No channel handler context available.")
guard let channel = channel else {
fatalError("No channel available to write.")
}
if buffer == nil {
if ctx.eventLoop.inEventLoop {
self.buffer = ctx.channel.allocator.buffer(capacity: string.utf8.count)
if channel.eventLoop.inEventLoop {
self.buffer = channel.allocator.buffer(capacity: string.utf8.count)
self.buffer!.write(string: string)
} else {
ctx.eventLoop.execute {
self.buffer = ctx.channel.allocator.buffer(capacity: string.utf8.count)
channel.eventLoop.execute {
self.buffer = channel.allocator.buffer(capacity: string.utf8.count)
self.buffer!.write(string: string)
}
}
Expand All @@ -87,16 +87,16 @@ public class HTTPServerResponse: ServerResponse {
///
/// - Parameter from: Data object that contains the data to be written.
public func write(from data: Data) throws {
guard let ctx = ctx else {
fatalError("No channel handler context available.")
guard let channel = channel else {
fatalError("No channel available to write.")
}
if buffer == nil {
if ctx.eventLoop.inEventLoop {
self.buffer = ctx.channel.allocator.buffer(capacity: data.count)
if channel.eventLoop.inEventLoop {
self.buffer = channel.allocator.buffer(capacity: data.count)
self.buffer!.write(bytes: data)
} else {
ctx.eventLoop.execute {
self.buffer = ctx.channel.allocator.buffer(capacity: data.count)
channel.eventLoop.execute {
self.buffer = channel.allocator.buffer(capacity: data.count)
self.buffer!.write(bytes: data)
}
}
Expand All @@ -114,19 +114,19 @@ public class HTTPServerResponse: ServerResponse {
/// End sending the response.
///
public func end() throws {
guard let ctx = self.ctx else {
guard let channel = self.channel else {
fatalError("No channel handler context available.")
}
if ctx.eventLoop.inEventLoop {
try end0(ctx: ctx)
if channel.eventLoop.inEventLoop {
try end0(channel: channel)
} else {
ctx.eventLoop.execute {
try! self.end0(ctx: ctx)
channel.eventLoop.execute {
try! self.end0(channel: channel)
}
}
}

func end0(ctx: ChannelHandlerContext) throws {
func end0(channel: Channel) throws {
guard let handler = self.handler else {
fatalError("No HTTP handler available")
}
Expand All @@ -140,13 +140,12 @@ public class HTTPServerResponse: ServerResponse {
headers["Keep-Alive"] = ["timeout=\(HTTPHandler.keepAliveTimeout)"]
}
}

let response = HTTPResponseHead(version: httpVersion, status: status, headers: headers.httpHeaders())
ctx.write(handler.wrapOutboundOut(.head(response)), promise: nil)
channel.write(handler.wrapOutboundOut(.head(response)), promise: nil)
if let buffer = buffer {
ctx.write(handler.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
channel.write(handler.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
}
ctx.writeAndFlush(handler.wrapOutboundOut(.end(nil)), promise: nil)
channel.writeAndFlush(handler.wrapOutboundOut(.end(nil)), promise: nil)
handler.updateKeepAliveState()

if let request = handler.serverRequest {
Expand All @@ -156,19 +155,19 @@ public class HTTPServerResponse: ServerResponse {

/// End sending the response on an HTTP error
private func end(with errorCode: HTTPStatusCode, withBody: Bool = false) throws {
guard let ctx = self.ctx else {
guard let channel = self.channel else {
fatalError("No channel handler context available.")
}
if ctx.eventLoop.inEventLoop {
try end0(with: errorCode, ctx: ctx, withBody: withBody)
if channel.eventLoop.inEventLoop {
try end0(with: errorCode, channel: channel, withBody: withBody)
} else {
ctx.eventLoop.execute {
try! self.end0(with: errorCode, ctx: ctx, withBody: withBody)
channel.eventLoop.execute {
try! self.end0(with: errorCode, channel: channel, withBody: withBody)
}
}
}

private func end0(with errorCode: HTTPStatusCode, ctx: ChannelHandlerContext, withBody: Bool = false) throws {
private func end0(with errorCode: HTTPStatusCode, channel: Channel, withBody: Bool = false) throws {
guard let handler = self.handler else {
fatalError("No HTTP handler available")
}
Expand All @@ -179,11 +178,11 @@ public class HTTPServerResponse: ServerResponse {
//We don't keep the connection alive on an HTTP error
headers["Connection"] = ["Close"]
let response = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: status, headers: headers.httpHeaders())
ctx.write(handler.wrapOutboundOut(.head(response)), promise: nil)
channel.write(handler.wrapOutboundOut(.head(response)), promise: nil)
if withBody && buffer != nil {
ctx.write(handler.wrapOutboundOut(.body(.byteBuffer(buffer!))), promise: nil)
channel.write(handler.wrapOutboundOut(.body(.byteBuffer(buffer!))), promise: nil)
}
ctx.writeAndFlush(handler.wrapOutboundOut(.end(nil)), promise: nil)
channel.writeAndFlush(handler.wrapOutboundOut(.end(nil)), promise: nil)
handler.updateKeepAliveState()

if let request = handler.serverRequest {
Expand Down
7 changes: 2 additions & 5 deletions Tests/KituraNetTests/RegressionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class RegressionTests: KituraNetTest {
/// This is to verify the fix introduced in Kitura-net PR #229, where a malformed
/// request sent during a Keep-Alive session could cause the server to crash.
func testBadRequestFollowingGoodRequest() {
do {
do {
let server: HTTPServer = try startServer(nil, port: 0, useSSL: false, allowPortReuse: false)
defer {
server.stop()
Expand All @@ -178,15 +178,12 @@ class RegressionTests: KituraNetTest {
return
}
XCTAssertTrue(serverPort != 0, "Ephemeral server port not set")

var goodClient = GoodClient(with: HTTPClient(with: self.expectation(description: "Bad request error")))
try! goodClient.makeGoodRequestFollowedByBadRequest(serverPort)
waitForExpectations(timeout: 10)

} catch {
XCTFail("Couldn't start server")
}

}


Expand Down Expand Up @@ -270,10 +267,10 @@ class RegressionTests: KituraNetTest {
httpHeaders.add(name: "Connection", value: "Keep-Alive")
_ = channel.write(NIOAny(HTTPClientRequestPart.head(request)))
_ = channel.writeAndFlush(NIOAny(HTTPClientRequestPart.end(nil)))
sleep(1)
let request0 = HTTPRequestHead(version: HTTPVersion(major: 1, minor:1), method: .GET, uri: "#/")
_ = channel.write(NIOAny(HTTPClientRequestPart.head(request0)))
_ = channel.writeAndFlush(NIOAny(HTTPClientRequestPart.end(nil)))

}

}
Expand Down

0 comments on commit 4569fcb

Please sign in to comment.