-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
recreate http client on resume() #4185
recreate http client on resume() #4185
Conversation
Codecov Report
@@ Coverage Diff @@
## master #4185 +/- ##
==========================================
- Coverage 85.93% 81.08% -4.86%
==========================================
Files 152 152
Lines 7304 7312 +8
Branches 484 480 -4
==========================================
- Hits 6277 5929 -348
- Misses 1027 1383 +356
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM in general
Left 2 nits. We should explicitly document that this trait is not thread-safe.
@@ -1078,12 +1089,20 @@ class ContainerProxyTests | |||
|
|||
def runCount = atomicRunCount.get() | |||
override def suspend()(implicit transid: TransactionId): Future[Unit] = { | |||
println("suspending!!!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove that extranous logline.
} | ||
def resume()(implicit transid: TransactionId): Future[Unit] = { | ||
override def resume()(implicit transid: TransactionId): Future[Unit] = { | ||
println("resuming!!!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same.
@@ -73,6 +73,10 @@ trait Container { | |||
/** HTTP connection to the container, will be lazily established by callContainer */ | |||
protected var httpConnection: Option[ContainerClient] = None | |||
|
|||
/** maxConcurrent+timeout are cached during first init, so that resuming connections can reference */ | |||
protected var containerHttpMaxConcurrent: Int = 1 | |||
protected var containerHttpTimeout: FiniteDuration = 60.seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this was the case before, but we should add a comment to the trait stating that it's not thread-safe and the caller MUST ensure synchronization (which we do by using an actor)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@markusthoemmes Hello. I'm a newbie at scala and having a hard time reading the source code. Can you please explain what thread-safe means above, and why resume() (which I'm guessing that it somehow calls "docker container resume") is not thread safe??
@markusthoemmes any other comments? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM modulo some nits.
@@ -207,6 +207,7 @@ class MesosTask(override protected val id: ContainerId, | |||
override def resume()(implicit transid: TransactionId): Future[Unit] = { | |||
// resume not supported | |||
Future.successful(Unit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is dead code, you can just omit the Future.successful
call.
@@ -170,6 +173,7 @@ class ContainerProxyTests | |||
activation.annotations.get("limits") shouldBe Some(a.limits.toJson) | |||
activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson) | |||
activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson) | |||
system.log.info(s"acking ${activation.activationId}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these loglines needed? (Same above).
f9521ba
to
1552099
Compare
@markusthoemmes I think this is ready now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last comment on the test, then it's LGTM.
Future.successful(()) | ||
val r = super.resume() | ||
//verify that httpconn is recreated | ||
httpConnection should be('defined) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please await the futures of either resume and suspend before checking that the client is there. Right now this is not an issue but once the implementation of the trait changes we're prone to heisenbugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry I only caught this now... If I get time I'll throw a commit on there myself changing these bits to flatMaps.
@@ -104,7 +104,8 @@ class KubernetesContainer(protected[core] val id: ContainerId, | |||
super.suspend().flatMap(_ => kubernetes.suspend(this)) | |||
} | |||
|
|||
def resume()(implicit transid: TransactionId): Future[Unit] = kubernetes.resume(this) | |||
override def resume()(implicit transid: TransactionId): Future[Unit] = | |||
kubernetes.resume(this).map(_ => super.resume()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a flatMap
to also "wait" for the future returned by super.resume.
def resume()(implicit transid: TransactionId): Future[Unit] = | ||
if (useRunc) { runc.resume(id) } else { docker.unpause(id) } | ||
override def resume()(implicit transid: TransactionId): Future[Unit] = { | ||
if (useRunc) { runc.resume(id) } else { docker.unpause(id) }.map(_ => super.resume()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flatMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the map()
really run on the full if expression or just the else
block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when in doubt, refactor...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question @sven-lange-last. Better safe than sorry and make an interim value or put parentheses around the if expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's needed, but agree it will at least be more clear. Also changed to flatMap
Thanks for the feedback, it is always welcome. |
@@ -204,6 +209,17 @@ trait Container { | |||
RunResult(Interval(started, finished), response) | |||
} | |||
} | |||
private def openConnections(timeout: FiniteDuration, maxConcurrent: Int) = { | |||
if (Container.config.akkaClient) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot why are we still maintaining the Akka Client and the Apache Client, since the latter was found in #3812 to perform poorly ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we ready to move to the akka client exclusively? there were some reservations about making sure we didn't get bit by some akka issues as in the past.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ready 😄
FWIW, we have been using akka client exclusively in prod, with concurrency enabled, since December.
Tests are already using it as well.
I'm not sure if there are others that are interested in testing it more, but it would be great to retire the apache client IMHO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's confirm that via dev-list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I'm approving based on previous reviews and successful tests.
reopen connections only once, during Container.resume()
In testing concurrent actions (multiple activations in same container) we noticed that when a container is used after
Container.suspend()
, the http connection may be re-created multiple times (and may have side affects of losing connections each time a new one is created.Description
This PR addresses this by:
Container.resume()
(which means that subclasses must invokesuper.resume()
)This way the subsequent + multiple calls to /run will arrive after connection has already been recreated.
I extended the ContainerProxyTests to assert connection state (exists or not) after
suspend()
and afterresume()
.Related issue and scope
My changes affect the following components
Types of changes
Checklist: