Skip to content

Commit

Permalink
[SPARK-19529][BRANCH-1.6] Backport PR apache#16866 to branch-1.6
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR backports PR apache#16866 to branch-1.6

## How was this patch tested?

Existing tests.

Author: Cheng Lian <[email protected]>

Closes apache#16917 from liancheng/spark-19529-1.6-backport.
  • Loading branch information
liancheng authored and JoshRosen committed Feb 14, 2017
1 parent e78138a commit a50ef3d
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public TransportClientFactory(
*
* Concurrency: This method is safe to call from multiple threads.
*/
public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
public TransportClient createClient(String remoteHost, int remotePort)
throws IOException, InterruptedException {
// Get connection from the connection pool first.
// If it is not found or not active, create a new one.
final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
Expand Down Expand Up @@ -176,13 +177,14 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO
* As with {@link #createClient(String, int)}, this method is blocking.
*/
public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
throws IOException {
throws IOException, InterruptedException {
final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
return createClient(address);
}

/** Create a completely new {@link TransportClient} to the remote address. */
private TransportClient createClient(InetSocketAddress address) throws IOException {
private TransportClient createClient(InetSocketAddress address)
throws IOException, InterruptedException {
logger.debug("Creating new connection to " + address);

Bootstrap bootstrap = new Bootstrap();
Expand All @@ -209,7 +211,7 @@ public void initChannel(SocketChannel ch) {
// Connect to the remote server
long preConnect = System.nanoTime();
ChannelFuture cf = bootstrap.connect(address);
if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
if (!cf.await(conf.connectionTimeoutMs())) {
throw new IOException(
String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
} else if (cf.cause() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public void run() {
clients.add(client);
} catch (IOException e) {
failed.incrementAndGet();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
Expand Down Expand Up @@ -140,7 +142,7 @@ public void reuseClientsUpToConfigVariableConcurrent() throws Exception {
}

@Test
public void returnDifferentClientsForDifferentServers() throws IOException {
public void returnDifferentClientsForDifferentServers() throws IOException, InterruptedException {
TransportClientFactory factory = context.createClientFactory();
TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort());
Expand Down Expand Up @@ -169,7 +171,7 @@ public void neverReturnInactiveClients() throws IOException, InterruptedExceptio
}

@Test
public void closeBlockClientsWithFactory() throws IOException {
public void closeBlockClientsWithFactory() throws IOException, InterruptedException {
TransportClientFactory factory = context.createClientFactory();
TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void fetchBlocks(
new RetryingBlockFetcher.BlockFetchStarter() {
@Override
public void createAndStart(String[] blockIds, BlockFetchingListener listener)
throws IOException {
throws IOException, InterruptedException {
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
}
Expand Down Expand Up @@ -136,7 +136,7 @@ public void registerWithShuffleServer(
String host,
int port,
String execId,
ExecutorShuffleInfo executorInfo) throws IOException {
ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
checkInit();
TransportClient client = clientFactory.createUnmanagedClient(host, port);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public static interface BlockFetchStarter {
* {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection
* issues.
*/
void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException;
void createAndStart(String[] blockIds, BlockFetchingListener listener)
throws IOException, InterruptedException;
}

/** Shared executor service used for waiting and retrying. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public MesosExternalShuffleClient(
super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
}

public void registerDriverWithShuffleService(String host, int port) throws IOException {
public void registerDriverWithShuffleService(String host, int port)
throws IOException, InterruptedException {
checkInit();
ByteBuffer registerDriver = new RegisterDriver(appId).toByteBuffer();
TransportClient client = clientFactory.createClient(host, port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void afterEach() {
}

@Test
public void testGoodClient() throws IOException {
public void testGoodClient() throws IOException, InterruptedException {
clientFactory = context.createClientFactory(
Lists.<TransportClientBootstrap>newArrayList(
new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
Expand Down Expand Up @@ -132,7 +132,7 @@ public void testBadClient() {
}

@Test
public void testNoSaslClient() throws IOException {
public void testNoSaslClient() throws IOException, InterruptedException {
clientFactory = context.createClientFactory(
Lists.<TransportClientBootstrap>newArrayList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void testFetchNoServer() throws Exception {
}

private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
throws IOException {
throws IOException, InterruptedException {
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
client.init(APP_ID);
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void afterEach() {
}

@Test
public void testValid() throws IOException {
public void testValid() throws IOException, InterruptedException {
validate("my-app-id", "secret", false);
}

Expand All @@ -83,12 +83,13 @@ public void testBadSecret() {
}

@Test
public void testEncryption() throws IOException {
public void testEncryption() throws IOException, InterruptedException {
validate("my-app-id", "secret", true);
}

/** Creates an ExternalShuffleClient and attempts to register with the server. */
private void validate(String appId, String secretKey, boolean encrypt) throws IOException {
private void validate(String appId, String secretKey, boolean encrypt)
throws IOException, InterruptedException {
ExternalShuffleClient client =
new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true, encrypt);
client.init(appId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void afterEach() {
}

@Test
public void testNoFailures() throws IOException {
public void testNoFailures() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand All @@ -85,7 +85,7 @@ public void testNoFailures() throws IOException {
}

@Test
public void testUnrecoverableFailure() throws IOException {
public void testUnrecoverableFailure() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand All @@ -104,7 +104,7 @@ public void testUnrecoverableFailure() throws IOException {
}

@Test
public void testSingleIOExceptionOnFirst() throws IOException {
public void testSingleIOExceptionOnFirst() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand All @@ -127,7 +127,7 @@ public void testSingleIOExceptionOnFirst() throws IOException {
}

@Test
public void testSingleIOExceptionOnSecond() throws IOException {
public void testSingleIOExceptionOnSecond() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand All @@ -149,7 +149,7 @@ public void testSingleIOExceptionOnSecond() throws IOException {
}

@Test
public void testTwoIOExceptions() throws IOException {
public void testTwoIOExceptions() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testTwoIOExceptions() throws IOException {
}

@Test
public void testThreeIOExceptions() throws IOException {
public void testThreeIOExceptions() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand Down Expand Up @@ -209,7 +209,7 @@ public void testThreeIOExceptions() throws IOException {
}

@Test
public void testRetryAndUnrecoverable() throws IOException {
public void testRetryAndUnrecoverable() throws IOException, InterruptedException {
BlockFetchingListener listener = mock(BlockFetchingListener.class);

List<? extends Map<String, Object>> interactions = Arrays.asList(
Expand Down Expand Up @@ -252,7 +252,7 @@ public void testRetryAndUnrecoverable() throws IOException {
@SuppressWarnings("unchecked")
private static void performInteractions(List<? extends Map<String, Object>> interactions,
BlockFetchingListener listener)
throws IOException {
throws IOException, InterruptedException {

TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);
Expand Down

0 comments on commit a50ef3d

Please sign in to comment.