Skip to content

Commit

Permalink
Merge pull request #2193 from Synesso/jem.mawson/sse-configuration
Browse files Browse the repository at this point in the history
Provide configuration settings for client SSE limits
  • Loading branch information
jrudolph authored Jan 29, 2019
2 parents 92f6799 + 4d11520 commit c733ab4
Show file tree
Hide file tree
Showing 20 changed files with 293 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ private[http] final case class LfuCacheSettingsImpl(
}

object CachingSettings extends SettingsCompanion[CachingSettings]("akka.http.caching") {
def fromSubConfig(root: Config, c: Config) = {
def fromSubConfig(root: Config, c: Config): CachingSettingsImpl = {
val lfuConfig = c.getConfig("lfu-cache")
CachingSettingsImpl(
LfuCacheSettingsImpl(
lfuConfig getInt "max-capacity",
lfuConfig getInt "initial-capacity",
lfuConfig getPotentiallyInfiniteDuration "time-to-live",
lfuConfig getPotentiallyInfiniteDuration "time-to-idle"
lfuConfig.getInt("max-capacity"),
lfuConfig.getInt("initial-capacity"),
lfuConfig.getPotentiallyInfiniteDuration("time-to-live"),
lfuConfig.getPotentiallyInfiniteDuration("time-to-idle")
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ private[akka] object ClientConnectionSettingsImpl extends SettingsCompanion[Clie
val c = inner.withFallback(root.getConfig(prefix))
new ClientConnectionSettingsImpl(
userAgentHeader = c.getString("user-agent-header").toOption.map(`User-Agent`(_)),
connectingTimeout = c getFiniteDuration "connecting-timeout",
idleTimeout = c getPotentiallyInfiniteDuration "idle-timeout",
requestHeaderSizeHint = c getIntBytes "request-header-size-hint",
connectingTimeout = c.getFiniteDuration("connecting-timeout"),
idleTimeout = c.getPotentiallyInfiniteDuration("idle-timeout"),
requestHeaderSizeHint = c.getIntBytes("request-header-size-hint"),
logUnencryptedNetworkBytes = LogUnencryptedNetworkBytes(c getString "log-unencrypted-network-bytes"),
websocketSettings = WebSocketSettingsImpl.client(c.getConfig("websocket")),
socketOptions = SocketOptionSettings.fromSubConfig(root, c.getConfig("socket-options")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,22 @@ private[akka] final case class ConnectionPoolSettingsImpl(
/** INTERNAL API */
@InternalApi
private[akka] object ConnectionPoolSettingsImpl extends SettingsCompanion[ConnectionPoolSettingsImpl]("akka.http.host-connection-pool") {
def fromSubConfig(root: Config, c: Config) = {
def fromSubConfig(root: Config, c: Config): ConnectionPoolSettingsImpl = {
new ConnectionPoolSettingsImpl(
c getInt "max-connections",
c getInt "min-connections",
c getInt "max-retries",
c getInt "max-open-requests",
c getInt "pipelining-limit",
c getFiniteDuration "base-connection-backoff",
c getFiniteDuration "max-connection-backoff",
c getPotentiallyInfiniteDuration "idle-timeout",
c.getInt("max-connections"),
c.getInt("min-connections"),
c.getInt("max-retries"),
c.getInt("max-open-requests"),
c.getInt("pipelining-limit"),
c.getFiniteDuration("base-connection-backoff"),
c.getFiniteDuration("max-connection-backoff"),
c.getPotentiallyInfiniteDuration("idle-timeout"),
ClientConnectionSettingsImpl.fromSubConfig(root, c.getConfig("client")),
c.getString("pool-implementation").toLowerCase match {
case "legacy" PoolImplementation.Legacy
case "new" PoolImplementation.New
},
c getPotentiallyInfiniteDuration "response-entity-subscription-timeout"
c.getPotentiallyInfiniteDuration("response-entity-subscription-timeout")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ private[http] final case class HttpsProxySettingsImpl(
}

object HttpsProxySettingsImpl extends SettingsCompanion[HttpsProxySettingsImpl]("akka.http.client.proxy.https") {
override def fromSubConfig(root: Config, c: Config) = {
override def fromSubConfig(root: Config, c: Config): HttpsProxySettingsImpl = {
new HttpsProxySettingsImpl(
c getString "host",
c getInt "port"
c.getString("host"),
c.getInt("port")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,34 @@ object ParserSettingsImpl extends SettingsCompanion[ParserSettingsImpl]("akka.ht
private[this] val noCustomStatusCodes: Int Option[StatusCode] = ConstantFun.scalaAnyToNone
private[ParserSettingsImpl] val noCustomMediaTypes: (String, String) Option[MediaType] = ConstantFun.scalaAnyTwoToNone

def fromSubConfig(root: Config, inner: Config) = {
def fromSubConfig(root: Config, inner: Config): ParserSettingsImpl = {
val c = inner.withFallback(root.getConfig(prefix))
val cacheConfig = c getConfig "header-cache"

new ParserSettingsImpl(
c getIntBytes "max-uri-length",
c getIntBytes "max-method-length",
c getIntBytes "max-response-reason-length",
c getIntBytes "max-header-name-length",
c getIntBytes "max-header-value-length",
c getIntBytes "max-header-count",
c getPossiblyInfiniteBytes "max-content-length",
c getPossiblyInfiniteBytes "max-to-strict-bytes",
c getIntBytes "max-chunk-ext-length",
c getIntBytes "max-chunk-size",
Uri.ParsingMode(c getString "uri-parsing-mode"),
CookieParsingMode(c getString "cookie-parsing-mode"),
c getBoolean "illegal-header-warnings",
(c getStringList "ignore-illegal-header-for").asScala.map(_.toLowerCase).toSet,
ErrorLoggingVerbosity(c getString "error-logging-verbosity"),
IllegalResponseHeaderValueProcessingMode(c getString "illegal-response-header-value-processing-mode"),
c.getIntBytes("max-uri-length"),
c.getIntBytes("max-method-length"),
c.getIntBytes("max-response-reason-length"),
c.getIntBytes("max-header-name-length"),
c.getIntBytes("max-header-value-length"),
c.getIntBytes("max-header-count"),
c.getPossiblyInfiniteBytes("max-content-length"),
c.getPossiblyInfiniteBytes("max-to-strict-bytes"),
c.getIntBytes("max-chunk-ext-length"),
c.getIntBytes("max-chunk-size"),
Uri.ParsingMode(c.getString("uri-parsing-mode")),
CookieParsingMode(c.getString("cookie-parsing-mode")),
c.getBoolean("illegal-header-warnings"),
c.getStringList("ignore-illegal-header-for").asScala.map(_.toLowerCase).toSet,
ErrorLoggingVerbosity(c.getString("error-logging-verbosity")),
IllegalResponseHeaderValueProcessingMode(c.getString("illegal-response-header-value-processing-mode")),
cacheConfig.entrySet.asScala.map(kvp kvp.getKey cacheConfig.getInt(kvp.getKey))(collection.breakOut),
c getBoolean "tls-session-info-header",
c getBoolean "modeled-header-parsing",
c.getBoolean("tls-session-info-header"),
c.getBoolean("modeled-header-parsing"),
noCustomMethods,
noCustomStatusCodes,
noCustomMediaTypes)
noCustomMediaTypes
)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ private[http] final case class PreviewServerSettingsImpl(

object PreviewServerSettingsImpl extends SettingsCompanion[PreviewServerSettingsImpl]("akka.http.server.preview") {
def fromSubConfig(root: Config, c: Config) = PreviewServerSettingsImpl(
c getBoolean "enable-http2"
c.getBoolean("enable-http2")
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,22 @@ private[http] object ServerSettingsImpl extends SettingsCompanion[ServerSettings
c.getString("server-header").toOption.map(Server(_)),
PreviewServerSettingsImpl.fromSubConfig(root, c.getConfig("preview")),
Timeouts(
c getPotentiallyInfiniteDuration "idle-timeout",
c getPotentiallyInfiniteDuration "request-timeout",
c getFiniteDuration "bind-timeout",
c getPotentiallyInfiniteDuration "linger-timeout"),
c getInt "max-connections",
c getInt "pipelining-limit",
c getBoolean "remote-address-header",
c getBoolean "raw-request-uri-header",
c getBoolean "transparent-head-requests",
c getBoolean "verbose-error-messages",
c getIntBytes "response-header-size-hint",
c getInt "backlog",
LogUnencryptedNetworkBytes(c getString "log-unencrypted-network-bytes"),
c.getPotentiallyInfiniteDuration("idle-timeout"),
c.getPotentiallyInfiniteDuration("request-timeout"),
c.getFiniteDuration("bind-timeout"),
c.getPotentiallyInfiniteDuration("linger-timeout")),
c.getInt("max-connections"),
c.getInt("pipelining-limit"),
c.getBoolean("remote-address-header"),
c.getBoolean("raw-request-uri-header"),
c.getBoolean("transparent-head-requests"),
c.getBoolean("verbose-error-messages"),
c.getIntBytes("response-header-size-hint"),
c.getInt("backlog"),
LogUnencryptedNetworkBytes(c.getString("log-unencrypted-network-bytes")),
SocketOptionSettings.fromSubConfig(root, c.getConfig("socket-options")),
defaultHostHeader =
HttpHeader.parse("Host", c getString "default-host-header", ParserSettings(root)) match {
HttpHeader.parse("Host", c.getString("default-host-header"), ParserSettings(root)) match {
case HttpHeader.ParsingResult.Ok(x: Host, Nil) x
case result
val info = result.errors.head.withSummary("Configured `default-host-header` is illegal")
Expand All @@ -100,8 +100,8 @@ private[http] object ServerSettingsImpl extends SettingsCompanion[ServerSettings
WebSocketSettingsImpl.server(c.getConfig("websocket")),
ParserSettingsImpl.fromSubConfig(root, c.getConfig("parsing")),
Http2ServerSettings.Http2ServerSettingsImpl.fromSubConfig(root, c.getConfig("http2")),
c getInt "default-http-port",
c getInt "default-https-port",
c.getInt("default-http-port"),
c.getInt("default-https-port"),
terminationDeadlineExceededResponseFrom(c)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ object Http2ServerSettings extends SettingsCompanion[Http2ServerSettings] {

private[http] object Http2ServerSettingsImpl extends akka.http.impl.util.SettingsCompanion[Http2ServerSettingsImpl]("akka.http.server.http2") {
def fromSubConfig(root: Config, c: Config): Http2ServerSettingsImpl = Http2ServerSettingsImpl(
maxConcurrentStreams = c getInt "max-concurrent-streams",
requestEntityChunkSize = c getIntBytes "request-entity-chunk-size",
incomingConnectionLevelBufferSize = c getIntBytes "incoming-connection-level-buffer-size",
incomingStreamLevelBufferSize = c getIntBytes "incoming-stream-level-buffer-size",
maxConcurrentStreams = c.getInt("max-concurrent-streams"),
requestEntityChunkSize = c.getIntBytes("request-entity-chunk-size"),
incomingConnectionLevelBufferSize = c.getIntBytes("incoming-connection-level-buffer-size"),
incomingStreamLevelBufferSize = c.getIntBytes("incoming-stream-level-buffer-size"),
None // no possibility to configure internal settings with config
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class EventStreamUnmarshallingTest extends JUnitSuite {

@Test
public void testFromEventStream() throws Exception {
public void testFromEventsStream() throws Exception {
ActorSystem system = ActorSystem.create();
try {
Materializer mat = ActorMaterializer.create(system);
Expand All @@ -43,7 +43,7 @@ public void testFromEventStream() throws Exception {

//#event-stream-unmarshalling-example
List<ServerSentEvent> unmarshalledEvents =
EventStreamUnmarshalling.fromEventStream()
EventStreamUnmarshalling.fromEventsStream(system)
.unmarshal(entity, system.dispatcher(), mat)
.thenCompose(source -> source.runWith(Sink.seq(), mat))
.toCompletableFuture()
Expand Down
4 changes: 3 additions & 1 deletion akka-http/src/main/mima-filters/10.1.4.backwards.excludes
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Changes against 10.1.4

ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.server.directives.BasicDirectives.toStrictEntity")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.server.directives.BasicDirectives.toStrictEntity")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.server.directives.BasicDirectives.extractStrictEntity")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.server.directives.BasicDirectives.extractStrictEntity")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.server.directives.BasicDirectives.extractStrictEntity")
3 changes: 3 additions & 0 deletions akka-http/src/main/mima-filters/10.1.7.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Changes against 10.1.7

ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling.fromEventsStream")
93 changes: 52 additions & 41 deletions akka-http/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,55 @@
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.

akka.http.routing {
# Enables/disables the returning of more detailed error messages to the
# client in the error response
# Should be disabled for browser-facing APIs due to the risk of XSS attacks
# and (probably) enabled for internal or non-browser APIs
# (Note that akka-http will always produce log messages containing the full error details)
verbose-error-messages = off

# Enables/disables ETag and `If-Modified-Since` support for FileAndResourceDirectives
file-get-conditional = on

# Enables/disables the rendering of the "rendered by" footer in directory listings
render-vanity-footer = yes

# The maximum size between two requested ranges. Ranges with less space in between will be coalesced.
#
# When multiple ranges are requested, a server may coalesce any of the ranges that overlap or that are separated
# by a gap that is smaller than the overhead of sending multiple parts, regardless of the order in which the
# corresponding byte-range-spec appeared in the received Range header field. Since the typical overhead between
# parts of a multipart/byteranges payload is around 80 bytes, depending on the selected representation's
# media type and the chosen boundary parameter length, it can be less efficient to transfer many small
# disjoint parts than it is to transfer the entire selected representation.
range-coalescing-threshold = 80

# The maximum number of allowed ranges per request.
# Requests with more ranges will be rejected due to DOS suspicion.
range-count-limit = 16

# The maximum number of bytes per ByteString a decoding directive will produce
# for an entity data stream.
decode-max-bytes-per-chunk = 1m

# Maximum content length after applying a decoding directive. When the directive
# decompresses, for example, an entity compressed with gzip, the resulting stream can be much
# larger than the max-content-length. Like with max-content-length, this is not necessarilly a
# problem when consuming the entity in a streaming fashion, but does risk high memory use
# when the entity is made strict or marshalled into an in-memory object.
# This limit (like max-content-length) can be overridden on a case-by-case basis using the
# withSizeLimit directive.
decode-max-size = 8m
}
akka.http {
routing {
# Enables/disables the returning of more detailed error messages to the
# client in the error response
# Should be disabled for browser-facing APIs due to the risk of XSS attacks
# and (probably) enabled for internal or non-browser APIs
# (Note that akka-http will always produce log messages containing the full error details)
verbose-error-messages = off

# Enables/disables ETag and `If-Modified-Since` support for FileAndResourceDirectives
file-get-conditional = on

# Enables/disables the rendering of the "rendered by" footer in directory listings
render-vanity-footer = yes

# The maximum size between two requested ranges. Ranges with less space in between will be coalesced.
#
# When multiple ranges are requested, a server may coalesce any of the ranges that overlap or that are separated
# by a gap that is smaller than the overhead of sending multiple parts, regardless of the order in which the
# corresponding byte-range-spec appeared in the received Range header field. Since the typical overhead between
# parts of a multipart/byteranges payload is around 80 bytes, depending on the selected representation's
# media type and the chosen boundary parameter length, it can be less efficient to transfer many small
# disjoint parts than it is to transfer the entire selected representation.
range-coalescing-threshold = 80

# The maximum number of allowed ranges per request.
# Requests with more ranges will be rejected due to DOS suspicion.
range-count-limit = 16

# The maximum number of bytes per ByteString a decoding directive will produce
# for an entity data stream.
decode-max-bytes-per-chunk = 1m

# Maximum content length after applying a decoding directive. When the directive
# decompresses, for example, an entity compressed with gzip, the resulting stream can be much
# larger than the max-content-length. Like with max-content-length, this is not necessarilly a
# problem when consuming the entity in a streaming fashion, but does risk high memory use
# when the entity is made strict or marshalled into an in-memory object.
# This limit (like max-content-length) can be overridden on a case-by-case basis using the
# withSizeLimit directive.
decode-max-size = 8m
}

# server-sent events
sse {
# The maximum size for parsing server-sent events.
max-event-size = 8192

# The maximum size for parsing lines of a server-sent event.
max-line-size = 4096
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ private[http] final case class RoutingSettingsImpl(

object RoutingSettingsImpl extends SettingsCompanion[RoutingSettingsImpl]("akka.http.routing") {
def fromSubConfig(root: Config, c: Config) = new RoutingSettingsImpl(
c getBoolean "verbose-error-messages",
c getBoolean "file-get-conditional",
c getBoolean "render-vanity-footer",
c getInt "range-count-limit",
c getBytes "range-coalescing-threshold",
c getIntBytes "decode-max-bytes-per-chunk",
c getPossiblyInfiniteBytes "decode-max-size")
c.getBoolean("verbose-error-messages"),
c.getBoolean("file-get-conditional"),
c.getBoolean("render-vanity-footer"),
c.getInt("range-count-limit"),
c.getBytes("range-coalescing-threshold"),
c.getIntBytes("decode-max-bytes-per-chunk"),
c.getPossiblyInfiniteBytes("decode-max-size")
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.http.impl.settings

import akka.annotation.InternalApi
import akka.http.impl.util.SettingsCompanion
import com.typesafe.config.Config

@InternalApi
private[http] final case class ServerSentEventSettingsImpl(
maxEventSize: Int,
maxLineSize: Int
) extends akka.http.scaladsl.settings.ServerSentEventSettings {
require(maxLineSize > 0, "max-line-size must be greater than 0")
require(maxEventSize > maxLineSize, "max-event-size must be greater than max-line-size")

override def productPrefix: String = "ServerSentEventSettings"

}

object ServerSentEventSettingsImpl extends SettingsCompanion[ServerSentEventSettingsImpl]("akka.http.sse") {
def fromSubConfig(root: Config, c: Config) = ServerSentEventSettingsImpl(
c.getInt("max-event-size"),
c.getInt("max-line-size")
)
}
Loading

0 comments on commit c733ab4

Please sign in to comment.