Skip to content

Commit

Permalink
[FLINK-28372][rpc] Migrate to Akka Artery
Browse files Browse the repository at this point in the history
  • Loading branch information
ferenc-csaky committed Mar 27, 2023
1 parent f6b55ca commit 204bd70
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,6 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
"Timeout for all outbound connections. If you should experience problems with connecting to a"
+ " TaskManager due to a slow network, you should increase this value.");

/** Timeout for the startup of the actor system. */
public static final ConfigOption<String> STARTUP_TIMEOUT =
ConfigOptions.key("akka.startup-timeout")
.stringType()
.noDefaultValue()
.withDescription(
"Timeout after which the startup of a remote component is considered being failed.");

/** Override SSL support for the Akka transport. */
public static final ConfigOption<Boolean> SSL_ENABLED =
ConfigOptions.key("akka.ssl.enabled")
Expand All @@ -120,7 +112,7 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.withDescription(
"Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink"
+ " fails because messages exceed this limit, then you should increase it. The message size requires a"
+ " size-unit specifier.");
+ " size-unit specifier. Has to be at least 32 KiB");

/** Maximum number of messages until another actor is executed by the same thread. */
public static final ConfigOption<Integer> DISPATCHER_THROUGHPUT =
Expand Down Expand Up @@ -178,13 +170,12 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.defaultValue(true)
.withDescription("Exit JVM on fatal Akka errors.");

/** Milliseconds a gate should be closed for after a remote connection was disconnected. */
public static final ConfigOption<Long> RETRY_GATE_CLOSED_FOR =
ConfigOptions.key("akka.retry-gate-closed-for")
.longType()
.defaultValue(50L)
.withDescription(
"Milliseconds a gate should be closed for after a remote connection was disconnected.");
/** Retry outbound connection only after this backoff. */
public static final ConfigOption<String> OUTBOUND_RESTART_BACKOFF =
ConfigOptions.key("akka.outbound-restart-backoff")
.stringType()
.defaultValue("50 ms")
.withDescription("Retry outbound connection only after this backoff.");

// ==================================================
// Configurations for fork-join-executor.
Expand Down Expand Up @@ -223,9 +214,37 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.build());

// ==================================================
// Configurations for client-socket-work-pool.
// Deprecated options
// ==================================================

/**
* Timeout for the startup of the actor system.
*
* @deprecated Don't use this option anymore. It has no effect on Flink.
*/
@Deprecated
public static final ConfigOption<String> STARTUP_TIMEOUT =
ConfigOptions.key("akka.startup-timeout")
.stringType()
.noDefaultValue()
.withDescription(
"Timeout after which the startup of a remote component is considered being failed.");

/**
* Milliseconds a gate should be closed for after a remote connection was disconnected.
*
* @deprecated Don't use this option anymore. It has no effect on Flink.
*/
@Deprecated
public static final ConfigOption<Long> RETRY_GATE_CLOSED_FOR =
ConfigOptions.key("akka.retry-gate-closed-for")
.longType()
.defaultValue(50L)
.withDescription(
"Milliseconds a gate should be closed for after a remote connection was disconnected.");

/** @deprecated Don't use this option anymore. It has no effect on Flink. */
@Deprecated
public static final ConfigOption<Integer> CLIENT_SOCKET_WORKER_POOL_SIZE_MIN =
ConfigOptions.key("akka.client-socket-worker-pool.pool-size-min")
.intType()
Expand All @@ -235,6 +254,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.text("Min number of threads to cap factor-based number to.")
.build());

/** @deprecated Don't use this option anymore. It has no effect on Flink. */
@Deprecated
public static final ConfigOption<Integer> CLIENT_SOCKET_WORKER_POOL_SIZE_MAX =
ConfigOptions.key("akka.client-socket-worker-pool.pool-size-max")
.intType()
Expand All @@ -244,6 +265,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.text("Max number of threads to cap factor-based number to.")
.build());

/** @deprecated Don't use this option anymore. It has no effect on Flink. */
@Deprecated
public static final ConfigOption<Double> CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR =
ConfigOptions.key("akka.client-socket-worker-pool.pool-size-factor")
.doubleType()
Expand All @@ -257,10 +280,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
+ " pool-size-max values.")
.build());

// ==================================================
// Configurations for server-socket-work-pool.
// ==================================================

/** @deprecated Don't use this option anymore. It has no effect on Flink. */
@Deprecated
public static final ConfigOption<Integer> SERVER_SOCKET_WORKER_POOL_SIZE_MIN =
ConfigOptions.key("akka.server-socket-worker-pool.pool-size-min")
.intType()
Expand All @@ -270,6 +291,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.text("Min number of threads to cap factor-based number to.")
.build());

/** @deprecated Don't use this option anymore. It has no effect on Flink. */
@Deprecated
public static final ConfigOption<Integer> SERVER_SOCKET_WORKER_POOL_SIZE_MAX =
ConfigOptions.key("akka.server-socket-worker-pool.pool-size-max")
.intType()
Expand All @@ -279,6 +302,8 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
.text("Max number of threads to cap factor-based number to.")
.build());

/** @deprecated Don't use this option anymore. It has no effect on Flink. */
@Deprecated
public static final ConfigOption<Double> SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR =
ConfigOptions.key("akka.server-socket-worker-pool.pool-size-factor")
.doubleType()
Expand All @@ -292,10 +317,6 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
+ " pool-size-max values.")
.build());

// ==================================================
// Deprecated options
// ==================================================

/**
* The Akka death watch heartbeat interval.
*
Expand Down
5 changes: 0 additions & 5 deletions flink-rpc/flink-rpc-akka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,6 @@ under the License.
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.10.6.Final</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ public static ActorSystem startRemoteActorSystem(
} catch (Exception e) {
// we can continue to try if this contains a netty channel exception
Throwable cause = e.getCause();
if (!(cause instanceof org.jboss.netty.channel.ChannelException
|| cause instanceof java.net.BindException)) {
if (!(cause instanceof java.net.BindException)) {
throw e;
} // else fall through the loop and try the next port
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private AkkaRpcServiceConfiguration(
boolean captureAskCallStack,
boolean forceRpcInvocationSerialization) {

checkArgument(maximumFramesize > 0L, "Maximum framesize must be positive.");
checkArgument(maximumFramesize >= 32768L, "Maximum framesize must be at least 32 KiB.");
this.configuration = configuration;
this.timeout = timeout;
this.maximumFramesize = maximumFramesize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
Expand Down Expand Up @@ -54,9 +53,6 @@ public class AkkaRpcServiceUtils {

private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcServiceUtils.class);

private static final String AKKA_TCP = "akka.tcp";
private static final String AKKA_SSL_TCP = "akka.ssl.tcp";

static final String SUPERVISOR_NAME = "rpc";

private static final String SIMPLE_AKKA_CONFIG_TEMPLATE =
Expand Down Expand Up @@ -129,16 +125,7 @@ public static String getRpcUrl(

checkNotNull(config, "config is null");

final boolean sslEnabled =
config.getBoolean(AkkaOptions.SSL_ENABLED)
&& SecurityOptions.isInternalSSLEnabled(config);

return getRpcUrl(
hostname,
port,
endpointName,
addressResolution,
sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP);
return getRpcUrl(hostname, port, endpointName, addressResolution);
}

/**
Expand All @@ -147,15 +134,10 @@ public static String getRpcUrl(
* @param endpointName The name of the RPC endpoint.
* @param addressResolution Whether to try address resolution of the given hostname or not. This
* allows to fail fast in case that the hostname cannot be resolved.
* @param akkaProtocol True, if security/encryption is enabled, false otherwise.
* @return The RPC URL of the specified RPC endpoint.
*/
public static String getRpcUrl(
String hostname,
int port,
String endpointName,
AddressResolution addressResolution,
AkkaProtocol akkaProtocol)
String hostname, int port, String endpointName, AddressResolution addressResolution)
throws UnknownHostException {

checkNotNull(hostname, "hostname is null");
Expand All @@ -170,8 +152,7 @@ public static String getRpcUrl(

final String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port);

return internalRpcUrl(
endpointName, Optional.of(new RemoteAddressInformation(hostPort, akkaProtocol)));
return internalRpcUrl(endpointName, Optional.of(hostPort));
}

public static String getLocalRpcUrl(String endpointName) {
Expand All @@ -182,52 +163,16 @@ public static boolean isRecipientTerminatedException(Throwable exception) {
return exception.getMessage().contains("had already been terminated.");
}

private static final class RemoteAddressInformation {
private final String hostnameAndPort;
private final AkkaProtocol akkaProtocol;

private RemoteAddressInformation(String hostnameAndPort, AkkaProtocol akkaProtocol) {
this.hostnameAndPort = hostnameAndPort;
this.akkaProtocol = akkaProtocol;
}

private String getHostnameAndPort() {
return hostnameAndPort;
}

private AkkaProtocol getAkkaProtocol() {
return akkaProtocol;
}
}

private static String internalRpcUrl(
String endpointName, Optional<RemoteAddressInformation> remoteAddressInformation) {
final String protocolPrefix =
remoteAddressInformation
.map(rai -> akkaProtocolToString(rai.getAkkaProtocol()))
.orElse("akka");
final Optional<String> optionalHostnameAndPort =
remoteAddressInformation.map(RemoteAddressInformation::getHostnameAndPort);

final StringBuilder url = new StringBuilder(String.format("%s://flink", protocolPrefix));
optionalHostnameAndPort.ifPresent(hostPort -> url.append("@").append(hostPort));
private static String internalRpcUrl(String endpointName, Optional<String> optHostPort) {
final StringBuilder url = new StringBuilder("akka://flink");
optHostPort.ifPresent(hostPort -> url.append("@").append(hostPort));

url.append("/user/").append(SUPERVISOR_NAME).append("/").append(endpointName);

// protocolPrefix://flink[@hostname:port]/user/rpc/endpointName
return url.toString();
}

private static String akkaProtocolToString(AkkaProtocol akkaProtocol) {
return akkaProtocol == AkkaProtocol.SSL_TCP ? AKKA_SSL_TCP : AKKA_TCP;
}

/** Whether to use TCP or encrypted TCP for Akka. */
public enum AkkaProtocol {
TCP,
SSL_TCP
}

// ------------------------------------------------------------------------
// RPC service configuration
// ------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 204bd70

Please sign in to comment.