From b0ae6db371edfce13268d8fc2eeddb6fb3950130 Mon Sep 17 00:00:00 2001 From: xumanbu Date: Thu, 13 Apr 2023 10:47:09 +0800 Subject: [PATCH] [#779] feat: Grpc server support random port (#820) ### What changes were proposed in this pull request? 1. grpc server port support random port 2. grpc sever will retry binding with increased port num if configured port is already occupied ### Why are the changes needed? 1. improve robustness for server startup 2. sync with netty server functionality Fixs #779 ### Does this PR introduce any user-facing change? Yes. `rss.rpc.server.port` could be set as zero. ### How was this patch tested? UT Co-authored-by: jam.xu --- .../apache/uniffle/common/rpc/GrpcServer.java | 51 ++++++++++++------- .../apache/uniffle/common/util/Constants.java | 1 + .../apache/uniffle/common/util/RssUtils.java | 3 ++ docs/server_guide.md | 4 +- ...ava => ShuffleServerOnRandomPortTest.java} | 27 ++++++++-- .../apache/uniffle/server/ShuffleServer.java | 2 +- 6 files changed, 65 insertions(+), 23 deletions(-) rename integration-test/common/src/test/java/org/apache/uniffle/test/{ShuffleServerEnableStreamServerTest.java => ShuffleServerOnRandomPortTest.java} (72%) diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java index d96ee40ba8..132ec04d06 100644 --- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java +++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java @@ -40,24 +40,32 @@ import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.metrics.GRPCMetrics; +import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.common.util.ExitUtils; +import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.common.util.ThreadUtils; public class GrpcServer implements ServerInterface { private static final Logger LOG = LoggerFactory.getLogger(GrpcServer.class); - private final Server server; + private Server server; private final int port; private int listenPort; private final ExecutorService pool; + private List>> servicesWithInterceptors; + private GRPCMetrics grpcMetrics; + private RssBaseConf rssConf; protected GrpcServer( RssBaseConf conf, List>> servicesWithInterceptors, GRPCMetrics grpcMetrics) { - this.port = conf.getInteger(RssBaseConf.RPC_SERVER_PORT); - long maxInboundMessageSize = conf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE); + this.rssConf = conf; + this.port = rssConf.getInteger(RssBaseConf.RPC_SERVER_PORT); + this.servicesWithInterceptors = servicesWithInterceptors; + this.grpcMetrics = grpcMetrics; + int rpcExecutorSize = conf.getInteger(RssBaseConf.RPC_EXECUTOR_SIZE); pool = new GrpcThreadPoolExecutor( rpcExecutorSize, @@ -68,12 +76,15 @@ protected GrpcServer( ThreadUtils.getThreadFactory("Grpc"), grpcMetrics ); + } - boolean isMetricsEnabled = conf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED); + private Server buildGrpcServer(int serverPort) { + boolean isMetricsEnabled = rssConf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED); + long maxInboundMessageSize = rssConf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE); ServerBuilder builder = ServerBuilder - .forPort(port) + .forPort(serverPort) .executor(pool) - .maxInboundMessageSize((int)maxInboundMessageSize); + .maxInboundMessageSize((int) maxInboundMessageSize); if (isMetricsEnabled) { builder.addTransportFilter(new MonitoringServerTransportFilter(grpcMetrics)); } @@ -88,7 +99,7 @@ protected GrpcServer( } builder.addService(ServerInterceptors.intercept(serviceWithInterceptors.getLeft(), interceptors)); }); - this.server = builder.build(); + return builder.build(); } public static class Builder { @@ -155,21 +166,27 @@ protected void afterExecute(Runnable r, Throwable t) { } } + @Override public int start() throws IOException { try { - server.start(); - listenPort = server.getPort(); - } catch (IOException e) { - ExitUtils.terminate(1, "Fail to start grpc server", e, LOG); + this.listenPort = RssUtils.startServiceOnPort(this, + Constants.GRPC_SERVICE_NAME, port, rssConf); + } catch (Exception e) { + ExitUtils.terminate(1, "Fail to start grpc server on conf port:" + port, e, LOG); } - LOG.info("Grpc server started, configured port: {}, listening on {}.", port, listenPort); - return port; + return listenPort; } @Override - public void startOnPort(int port) { - ExitUtils.terminate(1, "Fail to start grpc server", - new RuntimeException("GRpcServer not implement now"), LOG); + public void startOnPort(int startPort) throws Exception { + this.server = buildGrpcServer(startPort); + try { + server.start(); + listenPort = server.getPort(); + } catch (Exception e) { + throw e; + } + LOG.info("Grpc server started, configured port: {}, listening on {}.", port, listenPort); } public void stop() throws InterruptedException { @@ -189,7 +206,7 @@ public void blockUntilShutdown() throws InterruptedException { } public int getPort() { - return port <= 0 ? listenPort : port; + return listenPort; } } diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java b/common/src/main/java/org/apache/uniffle/common/util/Constants.java index 5e946866d2..514cf0e7fa 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java +++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java @@ -72,4 +72,5 @@ private Constants() { public static final double MILLION_SECONDS_PER_SECOND = 1E3D; public static final String DEVICE_NO_SPACE_ERROR_MESSAGE = "No space left on device"; public static final String NETTY_STREAM_SERVICE_NAME = "netty.rpc.server"; + public static final String GRPC_SERVICE_NAME = "grpc.server"; } diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java index 888e96fa78..92688083fa 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java +++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java @@ -204,6 +204,9 @@ public static boolean isServerPortBindCollision(Throwable e) { } else if (e instanceof Errors.NativeIoException) { return (e.getMessage() != null && e.getMessage().startsWith("bind() failed: ")) || isServerPortBindCollision(e.getCause()); + } else if (e instanceof IOException) { + return (e.getMessage() != null && e.getMessage().startsWith("Failed to bind to address")) + || isServerPortBindCollision(e.getCause()); } else { return false; } diff --git a/docs/server_guide.md b/docs/server_guide.md index 0fb7af4e7b..48143a228d 100644 --- a/docs/server_guide.md +++ b/docs/server_guide.md @@ -67,7 +67,7 @@ This document will introduce how to deploy Uniffle shuffle servers. | Property Name | Default | Description | |-------------------------------------------------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | rss.coordinator.quorum | - | Coordinator quorum | -| rss.rpc.server.port | - | RPC port for Shuffle server | +| rss.rpc.server.port | - | RPC port for Shuffle server, if set zero, grpc server start on random port. | | rss.jetty.http.port | - | Http port for Shuffle server | | rss.server.netty.port | -1 | Netty port for Shuffle server, if set zero, netty server start on random port. | | rss.server.buffer.capacity | -1 | Max memory of buffer manager for shuffle server. If negative, JVM heap size * buffer.ratio is used | @@ -156,4 +156,4 @@ rss.server.single.buffer.flush.threshold 129m rss.server.max.concurrency.of.single.partition.writer 20 rss.server.huge-partition.size.threshold 20g rss.server.huge-partition.memory.limit.ratio 0.2 -``` \ No newline at end of file +``` diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java similarity index 72% rename from integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java rename to integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java index f98af1a4dd..67664a6852 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java @@ -27,7 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; -public class ShuffleServerEnableStreamServerTest extends CoordinatorTestBase { +public class ShuffleServerOnRandomPortTest extends CoordinatorTestBase { @BeforeAll public static void setupServers() throws Exception { CoordinatorConf coordinatorConf = getCoordinatorConf(); @@ -38,10 +38,10 @@ public static void setupServers() throws Exception { createCoordinatorServer(coordinatorConf); ShuffleServerConf shuffleServerConf = getShuffleServerConf(); shuffleServerConf.setInteger("rss.server.netty.port", 0); + shuffleServerConf.setInteger("rss.rpc.server.port", 0); shuffleServerConf.setInteger("rss.random.port.min", 30000); shuffleServerConf.setInteger("rss.random.port.max", 40000); createShuffleServer(shuffleServerConf); - shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 1); shuffleServerConf.setInteger("rss.jetty.http.port", 18081); createShuffleServer(shuffleServerConf); startServers(); @@ -58,9 +58,9 @@ public void startStreamServerOnRandomPort() throws Exception { int maxRetries = 100; ShuffleServerConf shuffleServerConf = getShuffleServerConf(); + // start netty server with already bind port shuffleServerConf.setInteger("rss.server.netty.port", actualPort); shuffleServerConf.setInteger("rss.jetty.http.port", 18082); - shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 2); shuffleServerConf.setInteger("rss.port.max.retry", maxRetries); ShuffleServer ss = new ShuffleServer(shuffleServerConf); ss.start(); @@ -68,4 +68,25 @@ public void startStreamServerOnRandomPort() throws Exception { ss.stopServer(); } + @Test + public void startGrpcServerOnRandomPort() throws Exception { + CoordinatorTestUtils.waitForRegister(coordinatorClient, 2); + Thread.sleep(5000); + int actualPort = shuffleServers.get(0).getGrpcPort(); + assertTrue(actualPort >= 30000 && actualPort < 40000); + actualPort = shuffleServers.get(1).getGrpcPort(); + assertTrue(actualPort >= 30000 && actualPort <= 40000); + + int maxRetries = 100; + ShuffleServerConf shuffleServerConf = getShuffleServerConf(); + // start grpc server with already bind port + shuffleServerConf.setInteger("rss.rpc.server.port", actualPort); + shuffleServerConf.setInteger("rss.jetty.http.port", 18083); + shuffleServerConf.setInteger("rss.port.max.retry", maxRetries); + ShuffleServer ss = new ShuffleServer(shuffleServerConf); + ss.start(); + assertTrue(ss.getGrpcPort() > actualPort && actualPort <= actualPort + maxRetries); + ss.stopServer(); + } + } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java index e713649aa4..c23dbb0a26 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java @@ -126,7 +126,7 @@ public static void main(String[] args) throws Exception { public void start() throws Exception { jettyServer.start(); - server.start(); + grpcPort = server.start(); if (nettyServerEnabled) { nettyPort = streamServer.start(); }