Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26834 Adapt ConnectionRule for both sync and async connections (branch-2) #4220

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class TestShellExecEndpointCoprocessor {

@Rule
public final ConnectionRule connectionRule =
new ConnectionRule(miniClusterRule::createConnection);
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);

@Test
public void testShellExecUnspecified() {
Expand All @@ -69,7 +69,7 @@ public void testShellExecForeground() {
}

private void testShellExecForeground(final Consumer<ShellExecRequest.Builder> consumer) {
final AsyncConnection conn = connectionRule.getConnection();
final AsyncConnection conn = connectionRule.getAsyncConnection();
final AsyncAdmin admin = conn.getAdmin();

final String command = "echo -n \"hello world\"";
Expand All @@ -87,7 +87,7 @@ private void testShellExecForeground(final Consumer<ShellExecRequest.Builder> co

@Test
public void testShellExecBackground() throws IOException {
final AsyncConnection conn = connectionRule.getConnection();
final AsyncConnection conn = connectionRule.getAsyncConnection();
final AsyncAdmin admin = conn.getAdmin();

final File testDataDir = ensureTestDataDirExists(miniClusterRule.getTestingUtility());
Expand Down Expand Up @@ -121,7 +121,7 @@ private static File ensureTestDataDirExists(
final Path testDataDir = Optional.of(testingUtility)
.map(HBaseTestingUtility::getDataTestDir)
.map(Object::toString)
.map(val -> Paths.get(val))
.map(Paths::get)
.orElseThrow(() -> new RuntimeException("Unable to locate temp directory path."));
final File testDataDirFile = Files.createDirectories(testDataDir).toFile();
assertTrue(testDataDirFile.exists());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.ExternalResource;
Expand All @@ -35,7 +36,7 @@
* public class TestMyClass {
* private static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();
* private static final ConnectionRule connectionRule =
* new ConnectionRule(miniClusterRule::createConnection);
* ConnectionRule.createAsyncConnectionRule(miniClusterRule::createConnection);
*
* @ClassRule
* public static final TestRule rule = RuleChain
Expand All @@ -44,32 +45,87 @@
* }
* }</pre>
*/
public class ConnectionRule extends ExternalResource {
public final class ConnectionRule extends ExternalResource {

private final Supplier<CompletableFuture<AsyncConnection>> connectionSupplier;
private AsyncConnection connection;
private final Supplier<Connection> connectionSupplier;
private final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier;

public ConnectionRule(final Supplier<CompletableFuture<AsyncConnection>> connectionSupplier) {
private Connection connection;
private AsyncConnection asyncConnection;

public static ConnectionRule createConnectionRule(
final Supplier<Connection> connectionSupplier
) {
return new ConnectionRule(connectionSupplier, null);
}

public static ConnectionRule createAsyncConnectionRule(
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
) {
return new ConnectionRule(null, asyncConnectionSupplier);
}

public static ConnectionRule createConnectionRule(
final Supplier<Connection> connectionSupplier,
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
) {
return new ConnectionRule(connectionSupplier, asyncConnectionSupplier);
}

private ConnectionRule(
final Supplier<Connection> connectionSupplier,
final Supplier<CompletableFuture<AsyncConnection>> asyncConnectionSupplier
) {
this.connectionSupplier = connectionSupplier;
this.asyncConnectionSupplier = asyncConnectionSupplier;
}

public AsyncConnection getConnection() {
public Connection getConnection() {
if (connection == null) {
throw new IllegalStateException(
"ConnectionRule not initialized with a synchronous connection.");
}
return connection;
}

public AsyncConnection getAsyncConnection() {
if (asyncConnection == null) {
throw new IllegalStateException(
"ConnectionRule not initialized with an asynchronous connection.");
}
return asyncConnection;
}

@Override
protected void before() throws Throwable {
this.connection = connectionSupplier.get().join();
if (connectionSupplier != null) {
this.connection = connectionSupplier.get();
}
if (asyncConnectionSupplier != null) {
this.asyncConnection = asyncConnectionSupplier.get().join();
}
}

@Override
protected void after() {
if (this.connection != null) {
try {
connection.close();
} catch (IOException e) {
throw new RuntimeException(e);
CompletableFuture<Void> closeConnection = CompletableFuture.runAsync(() -> {
if (this.connection != null) {
try {
connection.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
});
CompletableFuture<Void> closeAsyncConnection = CompletableFuture.runAsync(() -> {
if (this.asyncConnection != null) {
try {
asyncConnection.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
CompletableFuture.allOf(closeConnection, closeAsyncConnection).join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.junit.ClassRule;
import org.junit.Rule;
Expand All @@ -42,7 +43,7 @@
*
* @Rule
* public final ConnectionRule connectionRule =
* new ConnectionRule(miniClusterRule::createConnection);
* ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
* }
* }</pre>
*/
Expand Down Expand Up @@ -107,11 +108,26 @@ public HBaseTestingUtility getTestingUtility() {
return testingUtility;
}

/**
* Create a {@link Connection} to the managed {@link MiniHBaseCluster}. It's up to the caller
* to {@link Connection#close() close()} the connection when finished.
*/
public Connection createConnection() {
if (miniCluster == null) {
throw new IllegalStateException("test cluster not initialized");
}
try {
return ConnectionFactory.createConnection(miniCluster.getConf());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Create a {@link AsyncConnection} to the managed {@link MiniHBaseCluster}. It's up to the caller
* to {@link AsyncConnection#close() close()} the connection when finished.
*/
public CompletableFuture<AsyncConnection> createConnection() {
public CompletableFuture<AsyncConnection> createAsyncConnection() {
if (miniCluster == null) {
throw new IllegalStateException("test cluster not initialized");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public class TestApiV1ClusterMetricsResource {
})
.build();
private static final ConnectionRule connectionRule =
new ConnectionRule(miniClusterRule::createConnection);
private static final ClassSetup classRule = new ClassSetup(connectionRule::getConnection);
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
private static final ClassSetup classRule = new ClassSetup(connectionRule::getAsyncConnection);

private static final class ClassSetup extends ExternalResource {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public class TestMetaBrowser {
public static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder().build();

private final ConnectionRule connectionRule =
new ConnectionRule(miniClusterRule::createConnection);
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
private final ClearUserNamespacesAndTablesRule clearUserNamespacesAndTablesRule =
new ClearUserNamespacesAndTablesRule(connectionRule::getConnection);
new ClearUserNamespacesAndTablesRule(connectionRule::getAsyncConnection);

@Rule
public TestRule rule = RuleChain.outerRule(connectionRule)
Expand All @@ -84,7 +84,7 @@ public class TestMetaBrowser {

@Before
public void before() {
connection = connectionRule.getConnection();
connection = connectionRule.getAsyncConnection();
admin = connection.getAdmin();
}

Expand Down