-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-28372][rpc] Migrate to Akka Artery #22271
base: master
Are you sure you want to change the base?
Conversation
And there is no way to disable this / make artery think they are on different nodes? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much testing have to done with actual workloads in different environments?
That was the big question mark on this ticket; how to ensure things actually still work as expected.
Should we model this as an opt-in/out that we can try out in a release?
...flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
Show resolved
Hide resolved
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java
Show resolved
Hide resolved
.add(" }") | ||
.add(" }") | ||
.add(" }") | ||
.add(" log-remote-lifecycle-events = " + logLifecycleEvents) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happened to this option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not exist in Artery. What we can control in artery are these options:
# If this is "on", all inbound remote messages will be logged at DEBUG level,
# if off then they are not logged
log-received-messages = off
# If this is "on", all outbound remote messages will be logged at DEBUG level,
# if off then they are not logged
log-sent-messages = off
# Logging of message types with payload size in bytes larger than
# this value. Maximum detected size per message type is logged once,
# with an increase threshold of 10%.
# By default this feature is turned off. Activate it by setting the property to
# a value in bytes, such as 1000b. Note that for all messages larger than this
# limit there will be extra performance and scalability cost.
log-frame-size-exceeding = off
I did not added this yet, because there are multiple options here IMO:
- Keep the old config option in Flink and apply that to both sent and received.
- Deprecate the currently existing log option and add 2 for both received and sent.
We can also consider what to do with the frame size exceeding events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These options seem to be doing something different than the lifecycle event one.
We may have to manually subscribe to and log these events as shown in akka/akka#28003 (comment) to replicate this option.
.add(" connection-timeout = " + akkaTCPTimeout) | ||
.add(" maximum-frame-size = " + akkaFramesize) | ||
.add(" tcp-nodelay = on") | ||
.add(" client-socket-worker-pool {") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With artery there's no additional thread pool? Do we potentially need to adjust the akka thread pools to accomodate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client and server socket worker pools were Netty specific options and there is no alternative in the Artery config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That doesn't quite answer my question 😅
Are there no options for artery because it has no additional thread pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not have a definitive answer at this point, but I guess yeah. The docs are not too specific, so I quite quickly ended up checking all the configuation options, which do not provide additional thread pool config for Artery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
Then I assume they use one of Akkas thread pool; would be good to know which one that is in case we need to make it configurable.
...nk-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
Show resolved
Hide resolved
<dependency> | ||
<groupId>io.netty</groupId> | ||
<artifactId>netty</artifactId> | ||
<version>3.10.6.Final</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs a NOTICE update
@@ -235,6 +254,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con | |||
.text("Min number of threads to cap factor-based number to.") | |||
.build()); | |||
|
|||
/** @deprecated Don't use this option anymore. It has no effect on Flink. */ | |||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs regeneration of the docs
.withDescription( | ||
"Milliseconds a gate should be closed for after a remote connection was disconnected."); | ||
/** Retry outbound connection only after this backoff. */ | ||
public static final ConfigOption<String> OUTBOUND_RESTART_BACKOFF = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a de-facto replacement RETRY_GATE_CLOSED_FOR
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this change based on the configuration comments.
classic doc:
# After failed to establish an outbound connection, the remoting will mark the
# address as failed. This configuration option controls how much time should
# be elapsed before reattempting a new connection. While the address is
# gated, all messages sent to the address are delivered to dead-letters.
# Since this setting limits the rate of reconnects setting it to a
# very short interval (i.e. less than a second) may result in a storm of
# reconnect attempts.
retry-gate-closed-for = 5 s
artery doc:
# Retry outbound connection after this backoff.
# Only used when transport is tcp or tls-tcp.
outbound-restart-backoff = 1 second
The outbound-restart-backoff
comments are a lot less specific, but they point to the same direction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they point to the same direction
That was also my conclusion from the docs. Shall we add retry-gate-closed-for as a deprecated key to the new option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that makes sense. I also thought about using the value of retry-gate-closed-for
if that is set explicitly and outbound-restart-backoff
is missing.
Thanks for the review @zentol! First things first, I moved the PR back to draft, I am sure there will be some tuning and probably some modifications as well as first I just wanted to show what is possible without changing too many things.
At this point I did not went too far into that direction, but there is no obvious way to change that with config options (did not went through all parts thoroughly yet).
For noew, I tested manually with a standalon and Yarn setup. On Yarn, tested internal security enabled as well to trigger the TLS parts. I plan to do the same on a K8s setup as well.
After making these changes kinda leaning towards to a pluggable solution myself as well. So we make sure it does not break any existing functionality and be able to correct any remaining problem on the go. Until now I did not touch or check the e2e-tests, so I expect some failures on that front, but I am working on that too. |
d8170dd
to
204bd70
Compare
hint: Plenty of these failures might be due to expected and perfectly fine error messages that are being picked up by the pesky "no exception in the log" rules. |
What is the purpose of the change
Changes Akka remoting mechanism from the classic Netty based one to Artery.
Brief change log
Verifying this change
After deploying a job, check the job and task managers on the Flink dashboard.
This change is already covered by existing tests under the
flink-rpc
module.There are some parts that may require some discussion. I disabled the
RemoteAkkaRpcActorTest#failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable
test case, because with Artery, lifecycle monitoring is only triggered if the 2 RPC service are on different nodes. Also, in the current iteration I did not exposedwatch-failure-detector
related fields in theAkkaOptions
, which probably should be done, but first I just wanted to get some opinion about the way it is in general currently.Does this pull request potentially affect one of the following parts:
flink-rpc
)@Public(Evolving)
: noDocumentation