Skip to content

Commit

Permalink
HBASE-26834 Adapt ConnectionRule for both sync and async connections
Browse files Browse the repository at this point in the history
  • Loading branch information
ndimiduk committed Mar 14, 2022
1 parent 50e1230 commit 945ab47
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class TestShellExecEndpointCoprocessor {

@Rule
public final ConnectionRule connectionRule =
new ConnectionRule(miniClusterRule::createConnection);
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);

@Test
public void testShellExecUnspecified() {
Expand All @@ -69,7 +69,7 @@ public void testShellExecForeground() {
}

private void testShellExecForeground(final Consumer<ShellExecRequest.Builder> consumer) {
final AsyncConnection conn = connectionRule.getConnection();
final AsyncConnection conn = connectionRule.getAsyncConnection();
final AsyncAdmin admin = conn.getAdmin();

final String command = "echo -n \"hello world\"";
Expand All @@ -87,7 +87,7 @@ private void testShellExecForeground(final Consumer<ShellExecRequest.Builder> co

@Test
public void testShellExecBackground() throws IOException {
final AsyncConnection conn = connectionRule.getConnection();
final AsyncConnection conn = connectionRule.getAsyncConnection();
final AsyncAdmin admin = conn.getAdmin();

final File testDataDir = ensureTestDataDirExists(miniClusterRule.getTestingUtility());
Expand Down Expand Up @@ -121,7 +121,7 @@ private static File ensureTestDataDirExists(
final Path testDataDir = Optional.of(testingUtility)
.map(HBaseTestingUtility::getDataTestDir)
.map(Object::toString)
.map(val -> Paths.get(val))
.map(Paths::get)
.orElseThrow(() -> new RuntimeException("Unable to locate temp directory path."));
final File testDataDirFile = Files.createDirectories(testDataDir).toFile();
assertTrue(testDataDirFile.exists());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.ExternalResource;
Expand All @@ -35,7 +36,7 @@
* public class TestMyClass {
* private static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
* private static final ConnectionRule connectionRule =
* new ConnectionRule(miniClusterRule::createConnection);
* ConnectionRule.createAsyncConnectionRule(miniClusterRule::createConnection);
*
* @ClassRule
* public static final TestRule rule = RuleChain
Expand All @@ -44,32 +45,87 @@
* }
* }</pre>
*/
public class ConnectionRule extends ExternalResource {
public final class ConnectionRule extends ExternalResource {

private final Supplier<CompletableFuture<AsyncConnection>> connectionSupplier;
private AsyncConnection connection;
private final Supplier<Connection> connectionSupplier;
private final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier;

public ConnectionRule(final Supplier<CompletableFuture<AsyncConnection>> connectionSupplier) {
private Connection connection;
private AsyncConnection asyncConnection;

public static ConnectionRule createSyncConnectionRule(
final Supplier<Connection> connectionSupplier
) {
return new ConnectionRule(connectionSupplier, null);
}

public static ConnectionRule createAsyncConnectionRule(
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
) {
return new ConnectionRule(null, asyncConnectionSupplier);
}

public static ConnectionRule createConnectionRule(
final Supplier<Connection> connectionSupplier,
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
) {
return new ConnectionRule(connectionSupplier, asyncConnectionSupplier);
}

private ConnectionRule(
final Supplier<Connection> connectionSupplier,
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
) {
this.connectionSupplier = connectionSupplier;
this.asyncConnectionSupplier = asyncConnectionSupplier;
}

public AsyncConnection getConnection() {
public Connection getSyncConnection() {
if (connection == null) {
throw new IllegalStateException(
"ConnectionRule not initialized with a synchronous connection.");
}
return connection;
}

public AsyncConnection getAsyncConnection() {
if (asyncConnection == null) {
throw new IllegalStateException(
"ConnectionRule not initialized with an asynchronous connection.");
}
return asyncConnection;
}

@Override
protected void before() throws Throwable {
this.connection = connectionSupplier.get().join();
if (connectionSupplier != null) {
this.connection = connectionSupplier.get();
}
if (asyncConnectionSupplier != null) {
this.asyncConnection = asyncConnectionSupplier.get().join();
}
}

@Override
protected void after() {
if (this.connection != null) {
try {
connection.close();
} catch (IOException e) {
throw new RuntimeException(e);
CompletableFuture<Void> closeConnection = CompletableFuture.runAsync(() -> {
if (this.connection != null) {
try {
connection.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
});
CompletableFuture<Void> closeAsyncConnection = CompletableFuture.runAsync(() -> {
if (this.asyncConnection != null) {
try {
asyncConnection.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
CompletableFuture.allOf(closeConnection, closeAsyncConnection).join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.junit.ClassRule;
import org.junit.Rule;
Expand All @@ -41,7 +42,7 @@
*
* @Rule
* public final ConnectionRule connectionRule =
* new ConnectionRule(miniClusterRule::createConnection);
* ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
* }
* }</pre>
*/
Expand Down Expand Up @@ -102,11 +103,26 @@ public HBaseTestingUtility getTestingUtility() {
return testingUtility;
}

/**
* Create a {@link Connection} to the managed {@link MiniHBaseCluster}. It's up to the caller
* to {@link Connection#close() close()} the connection when finished.
*/
public Connection createSyncConnection() {
if (miniCluster == null) {
throw new IllegalStateException("test cluster not initialized");
}
try {
return ConnectionFactory.createConnection(miniCluster.getConf());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Create a {@link AsyncConnection} to the managed {@link MiniHBaseCluster}. It's up to the caller
* to {@link AsyncConnection#close() close()} the connection when finished.
*/
public CompletableFuture<AsyncConnection> createConnection() {
public CompletableFuture<AsyncConnection> createAsyncConnection() {
if (miniCluster == null) {
throw new IllegalStateException("test cluster not initialized");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public class TestMetaBrowser {
public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();

private final ConnectionRule connectionRule =
new ConnectionRule(miniClusterRule::createConnection);
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
private final ClearUserNamespacesAndTablesRule clearUserNamespacesAndTablesRule =
new ClearUserNamespacesAndTablesRule(connectionRule::getConnection);
new ClearUserNamespacesAndTablesRule(connectionRule::getAsyncConnection);

@Rule
public TestRule rule = RuleChain.outerRule(connectionRule)
Expand All @@ -84,7 +84,7 @@ public class TestMetaBrowser {

@Before
public void before() {
connection = connectionRule.getConnection();
connection = connectionRule.getAsyncConnection();
admin = connection.getAdmin();
}

Expand Down

0 comments on commit 945ab47

Please sign in to comment.