Skip to content

Commit

Permalink
[#779] feat: Grpc server support random port (#820)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1. grpc server port support random port 
2. grpc sever will retry binding with increased port num if configured port is already occupied

### Why are the changes needed?
1. improve robustness for server startup
2. sync with netty server functionality

Fixs #779

### Does this PR introduce any user-facing change?
Yes. `rss.rpc.server.port` could be set as zero.

### How was this patch tested?
UT

Co-authored-by: jam.xu <[email protected]>
  • Loading branch information
xumanbu and xumanbu authored Apr 13, 2023
1 parent c9abe9a commit b0ae6db
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 23 deletions.
51 changes: 34 additions & 17 deletions common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,32 @@

import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ExitUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;

public class GrpcServer implements ServerInterface {

private static final Logger LOG = LoggerFactory.getLogger(GrpcServer.class);

private final Server server;
private Server server;
private final int port;
private int listenPort;
private final ExecutorService pool;
private List<Pair<BindableService, List<ServerInterceptor>>> servicesWithInterceptors;
private GRPCMetrics grpcMetrics;
private RssBaseConf rssConf;

protected GrpcServer(
RssBaseConf conf,
List<Pair<BindableService, List<ServerInterceptor>>> servicesWithInterceptors,
GRPCMetrics grpcMetrics) {
this.port = conf.getInteger(RssBaseConf.RPC_SERVER_PORT);
long maxInboundMessageSize = conf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE);
this.rssConf = conf;
this.port = rssConf.getInteger(RssBaseConf.RPC_SERVER_PORT);
this.servicesWithInterceptors = servicesWithInterceptors;
this.grpcMetrics = grpcMetrics;

int rpcExecutorSize = conf.getInteger(RssBaseConf.RPC_EXECUTOR_SIZE);
pool = new GrpcThreadPoolExecutor(
rpcExecutorSize,
Expand All @@ -68,12 +76,15 @@ protected GrpcServer(
ThreadUtils.getThreadFactory("Grpc"),
grpcMetrics
);
}

boolean isMetricsEnabled = conf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
private Server buildGrpcServer(int serverPort) {
boolean isMetricsEnabled = rssConf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
long maxInboundMessageSize = rssConf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE);
ServerBuilder<?> builder = ServerBuilder
.forPort(port)
.forPort(serverPort)
.executor(pool)
.maxInboundMessageSize((int)maxInboundMessageSize);
.maxInboundMessageSize((int) maxInboundMessageSize);
if (isMetricsEnabled) {
builder.addTransportFilter(new MonitoringServerTransportFilter(grpcMetrics));
}
Expand All @@ -88,7 +99,7 @@ protected GrpcServer(
}
builder.addService(ServerInterceptors.intercept(serviceWithInterceptors.getLeft(), interceptors));
});
this.server = builder.build();
return builder.build();
}

public static class Builder {
Expand Down Expand Up @@ -155,21 +166,27 @@ protected void afterExecute(Runnable r, Throwable t) {
}
}

@Override
public int start() throws IOException {
try {
server.start();
listenPort = server.getPort();
} catch (IOException e) {
ExitUtils.terminate(1, "Fail to start grpc server", e, LOG);
this.listenPort = RssUtils.startServiceOnPort(this,
Constants.GRPC_SERVICE_NAME, port, rssConf);
} catch (Exception e) {
ExitUtils.terminate(1, "Fail to start grpc server on conf port:" + port, e, LOG);
}
LOG.info("Grpc server started, configured port: {}, listening on {}.", port, listenPort);
return port;
return listenPort;
}

@Override
public void startOnPort(int port) {
ExitUtils.terminate(1, "Fail to start grpc server",
new RuntimeException("GRpcServer not implement now"), LOG);
public void startOnPort(int startPort) throws Exception {
this.server = buildGrpcServer(startPort);
try {
server.start();
listenPort = server.getPort();
} catch (Exception e) {
throw e;
}
LOG.info("Grpc server started, configured port: {}, listening on {}.", port, listenPort);
}

public void stop() throws InterruptedException {
Expand All @@ -189,7 +206,7 @@ public void blockUntilShutdown() throws InterruptedException {
}

public int getPort() {
return port <= 0 ? listenPort : port;
return listenPort;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ private Constants() {
public static final double MILLION_SECONDS_PER_SECOND = 1E3D;
public static final String DEVICE_NO_SPACE_ERROR_MESSAGE = "No space left on device";
public static final String NETTY_STREAM_SERVICE_NAME = "netty.rpc.server";
public static final String GRPC_SERVICE_NAME = "grpc.server";
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ public static boolean isServerPortBindCollision(Throwable e) {
} else if (e instanceof Errors.NativeIoException) {
return (e.getMessage() != null && e.getMessage().startsWith("bind() failed: "))
|| isServerPortBindCollision(e.getCause());
} else if (e instanceof IOException) {
return (e.getMessage() != null && e.getMessage().startsWith("Failed to bind to address"))
|| isServerPortBindCollision(e.getCause());
} else {
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions docs/server_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ This document will introduce how to deploy Uniffle shuffle servers.
| Property Name | Default | Description |
|-------------------------------------------------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| rss.coordinator.quorum | - | Coordinator quorum |
| rss.rpc.server.port | - | RPC port for Shuffle server |
| rss.rpc.server.port | - | RPC port for Shuffle server, if set zero, grpc server start on random port. |
| rss.jetty.http.port | - | Http port for Shuffle server |
| rss.server.netty.port | -1 | Netty port for Shuffle server, if set zero, netty server start on random port. |
| rss.server.buffer.capacity | -1 | Max memory of buffer manager for shuffle server. If negative, JVM heap size * buffer.ratio is used |
Expand Down Expand Up @@ -156,4 +156,4 @@ rss.server.single.buffer.flush.threshold 129m
rss.server.max.concurrency.of.single.partition.writer 20
rss.server.huge-partition.size.threshold 20g
rss.server.huge-partition.memory.limit.ratio 0.2
```
```
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import static org.junit.jupiter.api.Assertions.assertTrue;

public class ShuffleServerEnableStreamServerTest extends CoordinatorTestBase {
public class ShuffleServerOnRandomPortTest extends CoordinatorTestBase {
@BeforeAll
public static void setupServers() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
Expand All @@ -38,10 +38,10 @@ public static void setupServers() throws Exception {
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setInteger("rss.server.netty.port", 0);
shuffleServerConf.setInteger("rss.rpc.server.port", 0);
shuffleServerConf.setInteger("rss.random.port.min", 30000);
shuffleServerConf.setInteger("rss.random.port.max", 40000);
createShuffleServer(shuffleServerConf);
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 1);
shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
createShuffleServer(shuffleServerConf);
startServers();
Expand All @@ -58,14 +58,35 @@ public void startStreamServerOnRandomPort() throws Exception {

int maxRetries = 100;
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
// start netty server with already bind port
shuffleServerConf.setInteger("rss.server.netty.port", actualPort);
shuffleServerConf.setInteger("rss.jetty.http.port", 18082);
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 2);
shuffleServerConf.setInteger("rss.port.max.retry", maxRetries);
ShuffleServer ss = new ShuffleServer(shuffleServerConf);
ss.start();
assertTrue(ss.getNettyPort() > actualPort && actualPort <= actualPort + maxRetries);
ss.stopServer();
}

@Test
public void startGrpcServerOnRandomPort() throws Exception {
CoordinatorTestUtils.waitForRegister(coordinatorClient, 2);
Thread.sleep(5000);
int actualPort = shuffleServers.get(0).getGrpcPort();
assertTrue(actualPort >= 30000 && actualPort < 40000);
actualPort = shuffleServers.get(1).getGrpcPort();
assertTrue(actualPort >= 30000 && actualPort <= 40000);

int maxRetries = 100;
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
// start grpc server with already bind port
shuffleServerConf.setInteger("rss.rpc.server.port", actualPort);
shuffleServerConf.setInteger("rss.jetty.http.port", 18083);
shuffleServerConf.setInteger("rss.port.max.retry", maxRetries);
ShuffleServer ss = new ShuffleServer(shuffleServerConf);
ss.start();
assertTrue(ss.getGrpcPort() > actualPort && actualPort <= actualPort + maxRetries);
ss.stopServer();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static void main(String[] args) throws Exception {

public void start() throws Exception {
jettyServer.start();
server.start();
grpcPort = server.start();
if (nettyServerEnabled) {
nettyPort = streamServer.start();
}
Expand Down

0 comments on commit b0ae6db

Please sign in to comment.