-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ContainerClient + akka http alternative to HttpUtils #3812
ContainerClient + akka http alternative to HttpUtils #3812
Conversation
Codecov Report
@@ Coverage Diff @@
## master #3812 +/- ##
==========================================
- Coverage 75.69% 71.07% -4.63%
==========================================
Files 145 146 +1
Lines 6930 6983 +53
Branches 423 431 +8
==========================================
- Hits 5246 4963 -283
- Misses 1684 2020 +336
Continue to review full report at Codecov.
|
29960b2
to
52fd033
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great we're getting this back. It's a delicate change though, so let's be extra careful on reviews.
import whisk.http.PoolingRestClient | ||
|
||
trait ContainerClient { | ||
def close(): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we implement java.lang.AutoCloseable
instead? Gives you the niceness of integrating into the try with resource
world (although that's not needed here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove the close
method from the trait then and instead
trait ContainerClient extends AutoCloseable {
...
?
// Timeout includes all retries. | ||
as.scheduler.scheduleOnce(timeout) { | ||
promise.tryFailure(new TimeoutException(s"Request to ${endpoint} could not be completed in time.")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As scala futures are not abortable, even though you're finishing the Promise here, the underlying HTTP request might still be in flight. Should we instead extend the PoolingRestClient
to take timeout values as well? I believe you can configure the underlying connection pool to have connections timeout.
That'd be in line with what we have today wrt. timeout handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried adding .completionTimeout()
stage at the pool and the queue with no luck (was expecting a failure in that case, but don't get any success or failure...). I expected it to work at the pool. Any tips here?
EDIT: OK, I guess completionTimeout
is stream level, and we need timeout per event...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I tried to do this using https://github.com/paypal/squbs/blob/master/squbs-ext/src/main/scala/org/squbs/streams/Timeout.scala#L268
I'm not wild about:
- dragging in squbs artifacts (seems heave handed for "just" adding a timeout)
- the
Try[HttpResponse]
is wrapped as aTry[Try[HttpResponse]]
this seems awkward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok idle-timeout is working, removed these changes and the promise.tryFailure
} | ||
} | ||
|
||
tryOnce() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the above gets implemented (timeout on the connections themselves rather than enforced by the promise), you can drop the Promise
here (hard to reason about) and implement the retry like this:
private def retryingRequest(req: Future[HttpRequest], retry: Boolean): Future[HttpResponse] = {
request(req).recoverWith {
case _: akka.stream.StreamTcpException if retry =>
akka.pattern.after(retryInterval, as.scheduler)(retryingRequest(req, retry))
case t => Future.failed(t)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
much nicer! Thanks!
//map the HttpResponse to ContainerResponse | ||
val r = promise.future | ||
.flatMap({ response => | ||
val contentLength = response.entity.contentLengthOption.getOrElse(0l) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a behavioral change? I think HttpUtils handles an unknown contentLength as NoResponseReceived
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes good catch; fixed
if (contentLength <= maxResponse.toBytes) { | ||
Unmarshal(response.entity.withSizeLimit(maxResponse.toBytes)).to[String].map { o => | ||
//handle 204 as NoResponseReceived for parity with HttpUtils client | ||
if (response.status == StatusCodes.NoContent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? HttpUtils doesn't have that extra clause. It does implement however the case of an absent Content-Length as noted above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to satisfy a test case that I transferred from ContainerConnectionTests
to PoolingContainerClientTests
- see handle empty entity response
.
Now this test is arguably wrong, compare to not truncate responses within limit
(or one of them, at least), which returns a null
and empty string as test responses (with a 200, not 204).
HttpUtils
(or org.apache.http
) seems to vary from akka http in its handling for this case, where the response.getEntity
is null on HttpUtils
, but only when there is a 204 (not when there is a null
sent as the response entity...)
I agree this is weird, but wanted to keep the tests at parity between HttpUtils
and PoolingContainerClient
at least for now.
//ignore the tail (MUST CONSUME ENTIRE ENTITY!) | ||
tail.runWith(Sink.ignore) | ||
//captured string MAY be larger than the max response, so take only maxResponse bytes to get the exact length | ||
Future.successful(truncatedResponse.take(maxResponse.toBytes.toInt).utf8String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both ignore cases need to wait on the stream to be consumed, like:
tail.runWith(Sink.ignore).map(_ => truncatedResponse.take(maxResponse.toBytes.toInt).utf8String)
af68ed1
to
5011dea
Compare
.recover { | ||
case t: StreamTcpException => Left(Timeout(t)) | ||
case t: TimeoutException => Left(Timeout(t)) | ||
case t: Throwable => Left(ConnectionError(t)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be use case NonFatal(t) => Left(ConnectionError(t))
to avoid handling of fatal errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good!
Future { Left(NoResponseReceived()) } | ||
}) | ||
.recover { | ||
case t: StreamTcpException => Left(Timeout(t)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW - this is also parity with HttpUtils, but seems wrong. If there are retries on StreamTcpException
, and after retrying until reaching the timeout period is still failing, we should broadcast the same exception, e.g. as Left(ConnectionError(t))
right?
AFAIK this is not checked anywhere, so I would prefer to change it
logging.warn(this, s"POST failed with $t - no retry because timeout exceeded.") | ||
Future.failed(t) | ||
} | ||
case t => Future.failed(t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required
logging.warn(this, s"POST failed with $t - no retry because timeout exceeded.") | ||
Future.failed(t) | ||
} | ||
case t => Future.failed(t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be we also track retryCount
and include that in both failure case logging and also add a success case logging (if retryCount
> 0) to get a sense of how many times retries are being performed
} else { | ||
//ignore the tail (MUST CONSUME ENTIRE ENTITY!) | ||
//captured string MAY be larger than the max response, so take only maxResponse bytes to get the exact length | ||
tail.runWith(Sink.ignore).map(_ => truncatedResponse.take(maxResponse.toBytes.toInt).utf8String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be safe to convert byte stream truncated at arbitrary boundary to be converted to string?
HttpUtils
also used same approach so behavior wise its compatible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is as safe as truncation can be - client may get an error, but the ActivationResponse
will end up with some info regarding the truncation.
object PoolingContainerClient { | ||
|
||
/** A helper method to post one single request to a connection. Used for container tests. */ | ||
def post(host: String, port: Int, endPoint: String, content: JsValue, timeout: Duration = 30.seconds)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method currently being used? If not then we can probably drop it or move it to some utility in test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but I will update it so that it is used in place of HttpUtils.post
during tests when pooling-client == true
tid: TransactionId): (Int, Option[JsObject]) = { | ||
val connection = new PoolingContainerClient(host, port, 90.seconds, 1.MB, 1) | ||
val response = executeRequest(connection, endPoint, content) | ||
connection.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should connection be closed after await is done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These post
and concurrentPost
functions maintain test compatibility with HttpUtils
, they are used in synchronous fashion so should be closed after the await.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to move below the await then?
@@ -513,7 +512,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St | |||
.getOrElse(Future.successful(true)) // For CouchDB it is expected that the entire document is deleted. | |||
|
|||
override def shutdown(): Unit = { | |||
Await.ready(client.shutdown(), 1.minute) | |||
client.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change required? client.shutdown()
returns a Future
so needs Await
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes!
.withConnectionSettings(if (timeout.isDefined) { | ||
ClientConnectionSettings(system.settings.config) | ||
.withIdleTimeout(timeout.get) | ||
} else { ClientConnectionSettings(system.settings.config) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternative
private val timeoutSettings = {
val ps = ConnectionPoolSettings(system.settings.config)
timeout.map(t => ps.withUpdatedConnectionSettings(_.withIdleTimeout(t))).getOrElse(ps)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
} | ||
|
||
// Additional queue in case all connections are busy. Should hardly ever be | ||
// filled in practice but can be useful, e.g., in tests starting many | ||
// asynchronous requests in a very short period of time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs can be retained
} | ||
} | ||
|
||
private def truncated(responseBytes: Source[ByteString, _], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be move it to object PoolingContainerClient
and then have a test for this logic. Per current test coverage some flows are not covered
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is at least one test already that covers truncation: https://github.com/apache/incubator-openwhisk/pull/3812/files#diff-70e8c471d9056bda26d602a05f7ad091R180
Is codecov.io updated on each build? Can you tell why it wouldn't show in coverage? Theses tests PoolingContainerClientTests
are using ContainerClient
directly, no mocks etc, so I'm not sure why the coverage would not reflect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added tests, I think coverage is better, will look again after next run;
I also removed the case (Nil, tail) =>
it is not clear when this would ever come into play, or how to test for it working properly.
with ContainerClient | ||
with AutoCloseable { | ||
|
||
def close() = shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PoolingRestClient.shutdown
returns a Future
while ContainerClient.close
is defined to return a Unit. Should we wait for the result completion or change contract for ContainerClient
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@markusthoemmes WDYT? This may affect use of AutoCloseable
- for now I will return a Unit
case _ => | ||
//handle missing Content-Length as NoResponseReceived | ||
//also handle 204 as NoResponseReceived, for parity with HttpUtils client | ||
Future { Left(NoResponseReceived()) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never use Future.apply
if you already have the value of the Future at hand. It will schedule the value to the ExecutionContext unnecessarily.
Use Future.successful(Left(NoResponseReceived())
instead (note how it doesn't require an ExecutionContext)
} | ||
}) | ||
.recover { | ||
case t: StreamTcpException => Left(Timeout(t)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a StreamTcpException always a timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but this is parity with HttpUtils - when timeout after retries, the Timeout
response is used. I agree this is wrong, but wasn't sure how to otherwise make it compatible. If this compatibility is not a problem, I would change it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I changed this so that on retry timeout, we don't rethrow a StreamTcpException, but rather a TimeoutException (with the message from StreamTcpException); this way we can project the Timeout consistency, but not imply that a mid-flight StreamTcpException is any indication of a timeout - WDYT?
val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8) | ||
entity.setContentType("application/json") | ||
|
||
val request = new HttpPost(baseUri.setPath(endpoint).build) | ||
request.addHeader(HttpHeaders.ACCEPT, "application/json") | ||
request.setEntity(entity) | ||
|
||
execute(request, timeout, maxConcurrent, retry) | ||
Future { execute(request, timeout, maxConcurrent, retry) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should add blocking
here as well. This is using sync IO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of Future.successful
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in this case you actually want another thread to take over, so you use Future()
, but you also include blocking
to denote that this is a blocking operation that might grab a Thread indefinitly. It gives the ExecutionContext the chance to adapt accordingly (create more threads)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few nits on organization and documentation of the code mostly. Getting there, well done 🎉 . Will do a deeper pass shortly.
|
||
//create the request | ||
val req = Marshal(body).to[MessageEntity].map { b => | ||
//DO NOT reuse the connection (in case of paused containers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In all cases actually, not just "in case of paused containers".
* content type and the accept headers are both 'application/json. | ||
* The reason we still use this class for the action container is a mysterious hang | ||
* in the Akka http client where a future fails to properly timeout and we have not | ||
* determined why that is. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the ScalaDoc.
import whisk.http.PoolingRestClient | ||
|
||
trait ContainerClient { | ||
def close(): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove the close
method from the trait then and instead
trait ContainerClient extends AutoCloseable {
...
?
if (r._2 > 0) { | ||
logging.info(this, s"completed after ${r._2} retries") | ||
} | ||
val response = r._1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can unpack response
and retries
directly in the flapMap, like:
.flatMap { case (response, retries) =>
To avoid the tuple accessors.
} | ||
case _ => | ||
//handle missing Content-Length as NoResponseReceived | ||
//also handle 204 as NoResponseReceived, for parity with HttpUtils client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still need to drain the entity, like:
response.discardEntityBytes().future.map(_ => Left(NoResponseReceived())
since this case is also reached by an unknown Content-Length.
@@ -166,16 +170,20 @@ trait Container { | |||
implicit transid: TransactionId): Future[RunResult] = { | |||
val started = Instant.now() | |||
val http = httpConnection.getOrElse { | |||
val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB) | |||
val conn = if (config.poolingClient) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
poolingClient
seems an odd name since HttpUtils
does pool as well. Should we rename to newContainerClient
? (akka themselves did the same when they implemented the new ConnectionPool)
* @param queueSize once all connections are used, how big of queue to allow for additional requests | ||
* @param retryInterval duration between retries for TCP connection errors | ||
*/ | ||
protected class PoolingContainerClient( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we name this AkkaContainerClient
or NewContainerClient
for more clarity? Can we also rename HttpUtils
to something more meaningful now, like ApacheBlockingContainerClient
?
import whisk.core.entity.size.SizeLong | ||
import whisk.http.PoolingRestClient | ||
|
||
trait ContainerClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The trait should be placed in its own file (or both ContainerClients should be placed in this one file, but I'd prefer one file for the trait, one for the akka based and one for the apache based)
private val timeoutSettings = { | ||
val ps = ConnectionPoolSettings(system.settings.config) | ||
timeout.map(t => ps.withUpdatedConnectionSettings(_.withIdleTimeout(t))).getOrElse(ps) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat! 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @chetanmeh !
@@ -29,6 +29,7 @@ whisk { | |||
container-pool { | |||
num-core: 4 # used for computing --cpushares, and max number of containers allowed | |||
core-share: 2 # used for computing --cpushares, and max number of containers allowed | |||
pooling-client: true # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use HttpUtils) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we default to false for now? For safety?
@chetanmeh this is close - let me know if you think I missed any of your comments? I think I got them all. RE codecov - still some mysteries in the summary at top of PR conversation, but the report on codecov.io looks right to me. One anomaly in the PR conversation, I'm not sure where this is coming from?
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last round of comments from my side. I'm okay merging when these changes are made, since it's behind a toggle anyway.
Great job, thank you very much 🎉
.flatMap { | ||
case (response, retries) => { | ||
if (retries > 0) { | ||
logging.info(this, s"completed after ${retries} retries") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to write a metrics here rather than printing this per request? Or maybe move the logline to debug and write a metrics additionally?
private def truncated(responseBytes: Source[ByteString, _], | ||
previouslyCaptured: ByteString = ByteString.empty): Future[String] = { | ||
responseBytes.prefixAndTail(1).runWith(Sink.head).flatMap { | ||
case (Seq(prefix), tail) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this case match all possible outcomes? Wasn't there a Nil
case here before? I guess we won't reach the Nil
case in runtime because we check earlier if contentLength < maxBytes
. Might still make sense to include it for good measure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed it since I was not able to establish a test that actually exercised that code path; can add it back for defense 👍
val conn = new ApacheBlockingContainerClient( | ||
s"${addr.host}:${addr.port}", | ||
timeout, | ||
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this also check the feature toggle and use the correct client as configured?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also noticed that ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT
is not used in Container.scala
... These are both 1mb, but docs for MAX_ACTIVATION_LIMIT
says This refers to the invoke-time parameters - but in this case we are limiting the response size (and I don't see any assertion of limit on the request entity size in former HttpUtils?).
WDYT?
@@ -296,7 +301,7 @@ class ContainerPoolTests | |||
val (containers, factory) = testContainers(2) | |||
val feed = TestProbe() | |||
|
|||
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) | |||
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2, false), feed.ref)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should externalize building of the ContainerPoolConfig into a method so we don't need to adjust the values not needed for these tests continually. Check https://github.com/apache/incubator-openwhisk/pull/3767/files#diff-d00e1ef9ea3255332a28c35676361e29 for an impl. I did in another PR of exactly the same issue. I think it keeps the testcases clearer and reduces the diff in future PRs.
@@ -188,6 +189,7 @@ object KubernetesClientTests { | |||
implicit def strToInstant(str: String): Instant = | |||
strToDate(str).get | |||
|
|||
implicit val as = ActorSystem("kubernetes-client-tests-actor-system") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actorSystem is leaked I think. Could you make the TestKubernetesClient
take the actorSystem as an implicit parameter, so you can use the one imported (and closed) by the tests above?
Or maybe even move the class into the testclass. Makes it even easier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will give it a try, but it isn't clear why this test was setup this way? @dgrove-oss @jcrossley3 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TestKubernetesClient
is shared with KubernetesContainerTests
- so for now, changed to implicit ActorSystem (and left object/classes in current places). Good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't remember any particular reason; I think it may be as simple as that since we hadn't needed to have our hands on an ActorSystem before in the stubbed out test client it wasn't plumbed through.
.asInstanceOf[Timeout] | ||
.t | ||
.asInstanceOf[RetryableConnectionError] | ||
.t shouldBe a[HttpHostConnectException] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be rewritten a little less verbose like:
result match {
case Left(Timeout(RetryableConnectionError(_: HttpHostConnectException))) => // all good
case _ =>
fail(s"$result was not a Timeout(RetryableConnectionError(HttpHostConnectException)))")
}
I was not able to reproduce this locally though, the test always failed with a ConnectError in both implementations. Is this intermittent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice - much better! (worked for me)
//seems like this varies, but often is ~64k or ~128k | ||
val limit = 300.KB | ||
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, 100) | ||
Seq(true, false).foreach { code => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you rename code
to success
or similar? Threw me off quite a bit when reading through (also in other occurences please).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
val waited = end.toEpochMilli - start.toEpochMilli | ||
result should be('left) | ||
result.left.get shouldBe a[Timeout] | ||
result.left.get.asInstanceOf[Timeout].t shouldBe a[TimeoutException] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as below, could be rewritten to:
result match {
case Left(Timeout(_ : TimeoutException) => // good
case _ => fail(...)
}
IMO that pronounces the nesting of the exceptions more and is clearer in what you're actually testing. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
val limit = 300.KB | ||
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, 100) | ||
Seq(true, false).foreach { code => | ||
Seq("0123456789" * 100000).foreach { r => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why even use Seq.foreach
if you only pass in one value?
Please add a comment on what that value is supposed to be, i.e.
// Generate a response that's 1MB
val response = "0123456789" * 1024 * 1024
To make the numbers less magic.
@tysonnorris Yeh that is a confusing part. This happens because on Master builds CosmosDB test run properly but for PR runs they do not run. Hence you would see codecov showing drop in coverage for each PR. Do not have a good solution for it. One way may be to skip coverage calculation for such code paths |
@chetanmeh @markusthoemmes I think I have addressed all comments |
@@ -513,7 +514,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St | |||
.getOrElse(Future.successful(true)) // For CouchDB it is expected that the entire document is deleted. | |||
|
|||
override def shutdown(): Unit = { | |||
Await.ready(client.shutdown(), 1.minute) | |||
Await.result(client.shutdown(), 30.seconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to change this for this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. This should now enable a single Invoker to handle lot more concurrent connections to containers!
@markusthoemmes I added one more forgotten required change: enable ActionContainer based tests to explicitly use akka vs apache http client. While it is true that this could be accomplished by coercing test's akka config to include |
If the additional parameter doesn’t cause breaking downstream changes 👍 otherwise a separate method that tests can opt into. |
Yeah the client is changed by setting the additional param in downstream tests, and default value is false (use old client). It does mean that there needs to be 2 separate tests to run tests with both clients. |
@tysonnorris the runtime tests now inherit properly from a common parent (the basic action runner tests) and that might provide a way for you to hide testing both old and new clients without requiring changes upstream... i didn't look too closely but mentioning it as it may be relevant. |
I'm a bit unsure about a "per-test" flag here. I'd be okay to make the new akka-client the default for all those tests straight away (after a decent amount of local test runs to squash out obvious heisenbugs). No need to gradually enable them one after the other. WDYT? |
@rabbah we would need to run the same test cases twice for each test, with a different config each run. I'm not sure how to simply enable this structure without rewriting the tests? Separately, if that is possible, we still need a way to disable some clients from being used in some tests - the reason I arrived at creating the new client is the concurrency tests simply don't work with the old apache client (at least they don't work in travis). |
@markusthoemmes @rabbah I'm ok to switch all the tests to use akka http (and not apache http) after running them locally to verify, if that works? (we can either run all akka, or a mix of akka and apache, but NOT all apache) |
i'd say run them all with the new client -- if it can't handle the unit tests, we have a problem ;) |
…ing (truncation etc)
…ing (truncation etc)
1df04f3
to
cb3a7da
Compare
PG2 3412 🔵 |
…n. (apache#3812) HttpUtils (http client for invoker -> action container) uses org.apache.http client that is synchronous and poor performing for concurrent requests. I ran into problems using it with concurrent activation support. Instead of trying to force that client to work, this is work towards replacing it (or re-replacing it) with akka http based client.
Description
HttpUtils (http client for invoker -> action container) uses org.apache.http client that is synchronous and poor performing for concurrent requests. I ran into problems using it with concurrent activation support. Instead of trying to force that client to work, this PR is to work towards replacing it (or re-replacing it) with akka http based client.
This PR provides:
Related issue and scope
My changes affect the following components
Types of changes
Checklist:
Initial tests using this client to support this PR (for adding concurrency support in nodejs container) apache/openwhisk-runtime-nodejs#41 were good, where tests requiring coordinated completion of 128 concurrent requests succeeded, while same tests with existing HttpUtils/org.apache.http client failed with as few as 3 concurrent requests.
More tests will be done, but wanted to get early feedback on this in general (wip labelled).