Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#779] feat: Grpc server support random port #820

Merged
merged 2 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,32 @@

import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ExitUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;

public class GrpcServer implements ServerInterface {

private static final Logger LOG = LoggerFactory.getLogger(GrpcServer.class);

private final Server server;
private Server server;
private final int port;
private int listenPort;
private final ExecutorService pool;
private List<Pair<BindableService, List<ServerInterceptor>>> servicesWithInterceptors;
private GRPCMetrics grpcMetrics;
private RssBaseConf rssConf;

protected GrpcServer(
RssBaseConf conf,
List<Pair<BindableService, List<ServerInterceptor>>> servicesWithInterceptors,
GRPCMetrics grpcMetrics) {
this.port = conf.getInteger(RssBaseConf.RPC_SERVER_PORT);
long maxInboundMessageSize = conf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE);
this.rssConf = conf;
this.port = rssConf.getInteger(RssBaseConf.RPC_SERVER_PORT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If using the random port, why reserve this var of port ? the listen port and port are unclear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with u.in same test case have reuse RssBaseConf(i think it is a mistake), retain
this the port var just reserve the configuration value to avoid test failed.
ShuffleWithRssClientTest

this.servicesWithInterceptors = servicesWithInterceptors;
this.grpcMetrics = grpcMetrics;

int rpcExecutorSize = conf.getInteger(RssBaseConf.RPC_EXECUTOR_SIZE);
pool = new GrpcThreadPoolExecutor(
rpcExecutorSize,
Expand All @@ -68,12 +76,15 @@ protected GrpcServer(
ThreadUtils.getThreadFactory("Grpc"),
grpcMetrics
);
}

boolean isMetricsEnabled = conf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
private Server buildGrpcServer(int serverPort) {
boolean isMetricsEnabled = rssConf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
long maxInboundMessageSize = rssConf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE);
ServerBuilder<?> builder = ServerBuilder
.forPort(port)
.forPort(serverPort)
.executor(pool)
.maxInboundMessageSize((int)maxInboundMessageSize);
.maxInboundMessageSize((int) maxInboundMessageSize);
if (isMetricsEnabled) {
builder.addTransportFilter(new MonitoringServerTransportFilter(grpcMetrics));
}
Expand All @@ -88,7 +99,7 @@ protected GrpcServer(
}
builder.addService(ServerInterceptors.intercept(serviceWithInterceptors.getLeft(), interceptors));
});
this.server = builder.build();
return builder.build();
}

public static class Builder {
Expand Down Expand Up @@ -155,21 +166,27 @@ protected void afterExecute(Runnable r, Throwable t) {
}
}

@Override
public int start() throws IOException {
try {
server.start();
listenPort = server.getPort();
} catch (IOException e) {
ExitUtils.terminate(1, "Fail to start grpc server", e, LOG);
this.listenPort = RssUtils.startServiceOnPort(this,
Constants.GRPC_SERVICE_NAME, port, rssConf);
} catch (Exception e) {
ExitUtils.terminate(1, "Fail to start grpc server on conf port:" + port, e, LOG);
}
LOG.info("Grpc server started, configured port: {}, listening on {}.", port, listenPort);
return port;
return listenPort;
}

@Override
public void startOnPort(int port) {
ExitUtils.terminate(1, "Fail to start grpc server",
new RuntimeException("GRpcServer not implement now"), LOG);
public void startOnPort(int startPort) throws Exception {
this.server = buildGrpcServer(startPort);
try {
server.start();
listenPort = server.getPort();
} catch (Exception e) {
throw e;
}
LOG.info("Grpc server started, configured port: {}, listening on {}.", port, listenPort);
}

public void stop() throws InterruptedException {
Expand All @@ -189,7 +206,7 @@ public void blockUntilShutdown() throws InterruptedException {
}

public int getPort() {
return port <= 0 ? listenPort : port;
return listenPort;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ private Constants() {
public static final double MILLION_SECONDS_PER_SECOND = 1E3D;
public static final String DEVICE_NO_SPACE_ERROR_MESSAGE = "No space left on device";
public static final String NETTY_STREAM_SERVICE_NAME = "netty.rpc.server";
public static final String GRPC_SERVICE_NAME = "grpc.server";
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ public static boolean isServerPortBindCollision(Throwable e) {
} else if (e instanceof Errors.NativeIoException) {
return (e.getMessage() != null && e.getMessage().startsWith("bind() failed: "))
|| isServerPortBindCollision(e.getCause());
} else if (e instanceof IOException) {
return (e.getMessage() != null && e.getMessage().startsWith("Failed to bind to address"))
|| isServerPortBindCollision(e.getCause());
} else {
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions docs/server_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ This document will introduce how to deploy Uniffle shuffle servers.
| Property Name | Default | Description |
|-------------------------------------------------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| rss.coordinator.quorum | - | Coordinator quorum |
| rss.rpc.server.port | - | RPC port for Shuffle server |
| rss.rpc.server.port | - | RPC port for Shuffle server, if set zero, grpc server start on random port. |
| rss.jetty.http.port | - | Http port for Shuffle server |
| rss.server.netty.port | -1 | Netty port for Shuffle server, if set zero, netty server start on random port. |
| rss.server.buffer.capacity | -1 | Max memory of buffer manager for shuffle server. If negative, JVM heap size * buffer.ratio is used |
Expand Down Expand Up @@ -156,4 +156,4 @@ rss.server.single.buffer.flush.threshold 129m
rss.server.max.concurrency.of.single.partition.writer 20
rss.server.huge-partition.size.threshold 20g
rss.server.huge-partition.memory.limit.ratio 0.2
```
```
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import static org.junit.jupiter.api.Assertions.assertTrue;

public class ShuffleServerEnableStreamServerTest extends CoordinatorTestBase {
public class ShuffleServerOnRandomPortTest extends CoordinatorTestBase {
@BeforeAll
public static void setupServers() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
Expand All @@ -38,10 +38,10 @@ public static void setupServers() throws Exception {
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setInteger("rss.server.netty.port", 0);
shuffleServerConf.setInteger("rss.rpc.server.port", 0);
shuffleServerConf.setInteger("rss.random.port.min", 30000);
shuffleServerConf.setInteger("rss.random.port.max", 40000);
createShuffleServer(shuffleServerConf);
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 1);
shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
createShuffleServer(shuffleServerConf);
startServers();
Expand All @@ -58,14 +58,35 @@ public void startStreamServerOnRandomPort() throws Exception {

int maxRetries = 100;
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
// start netty server with already bind port
shuffleServerConf.setInteger("rss.server.netty.port", actualPort);
shuffleServerConf.setInteger("rss.jetty.http.port", 18082);
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 2);
shuffleServerConf.setInteger("rss.port.max.retry", maxRetries);
ShuffleServer ss = new ShuffleServer(shuffleServerConf);
ss.start();
assertTrue(ss.getNettyPort() > actualPort && actualPort <= actualPort + maxRetries);
ss.stopServer();
}

@Test
public void startGrpcServerOnRandomPort() throws Exception {
CoordinatorTestUtils.waitForRegister(coordinatorClient, 2);
Thread.sleep(5000);
int actualPort = shuffleServers.get(0).getGrpcPort();
assertTrue(actualPort >= 30000 && actualPort < 40000);
actualPort = shuffleServers.get(1).getGrpcPort();
assertTrue(actualPort >= 30000 && actualPort <= 40000);

int maxRetries = 100;
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
// start grpc server with already bind port
shuffleServerConf.setInteger("rss.rpc.server.port", actualPort);
shuffleServerConf.setInteger("rss.jetty.http.port", 18083);
shuffleServerConf.setInteger("rss.port.max.retry", maxRetries);
ShuffleServer ss = new ShuffleServer(shuffleServerConf);
ss.start();
assertTrue(ss.getGrpcPort() > actualPort && actualPort <= actualPort + maxRetries);
ss.stopServer();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static void main(String[] args) throws Exception {

public void start() throws Exception {
jettyServer.start();
server.start();
grpcPort = server.start();
if (nettyServerEnabled) {
nettyPort = streamServer.start();
}
Expand Down