Skip to content

Commit

Permalink
HBASE-21658 Should get the meta replica number from zk instead of con…
Browse files Browse the repository at this point in the history
…fig at client side
  • Loading branch information
Apache9 committed May 11, 2019
1 parent 3641e7a commit df27820
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
Expand Down Expand Up @@ -134,12 +136,13 @@ private Pair<RegionState.State, ServerName> getStateAndServerName(
ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
HRegionLocation[] locs = new HRegionLocation[znodePaths.metaReplicaZNodes.size()];
private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
List<String> metaReplicaZNodes) {
HRegionLocation[] locs = new HRegionLocation[metaReplicaZNodes.size()];
MutableInt remaining = new MutableInt(locs.length);
znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> {
for (String metaReplicaZNode : metaReplicaZNodes) {
int replicaId = znodePaths.getMetaReplicaIdFromZnode(metaReplicaZNode);
String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
if (replicaId == DEFAULT_REPLICA_ID) {
addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
if (error != null) {
Expand Down Expand Up @@ -186,7 +189,23 @@ public CompletableFuture<RegionLocations> getMetaRegionLocation() {
tryComplete(remaining, locs, future);
});
}
});
}
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener(
zk.list(znodePaths.baseZNode)
.thenApply(children -> children.stream()
.filter(c -> c.startsWith(znodePaths.metaZNodePrefix)).collect(Collectors.toList())),
(metaReplicaZNodes, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
getMetaRegionLocation(future, metaReplicaZNodes);
});
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
Expand Down Expand Up @@ -284,6 +285,22 @@ protected void doExec(ZooKeeper zk) {
return future;
}

public CompletableFuture<List<String>> list(String path) {
if (closed.get()) {
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
}
CompletableFuture<List<String>> future = new CompletableFuture<>();
tasks.add(new ZKTask<List<String>>(path, future, "list") {

@Override
protected void doExec(ZooKeeper zk) {
zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true),
null);
}
});
return future;
}

private void closeZk() {
if (zookeeper != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public String getZNodeForReplica(int replicaId) {

/**
* Parse the meta replicaId from the passed znode
* @param znode
* @param znode the name of the znode, does not include baseZNode
* @return replicaId
*/
public int getMetaReplicaIdFromZnode(String znode) {
Expand All @@ -178,7 +178,7 @@ public int getMetaReplicaIdFromZnode(String znode) {

/**
* Is it the default meta replica's znode
* @param znode
* @param znode the name of the znode, does not include baseZNode
* @return true or false
*/
public boolean isDefaultMetaReplicaZnode(String znode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ public class TestZKAsyncRegistry {
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
TEST_UTIL.startMiniCluster(3);
REGISTRY = new ZKAsyncRegistry(TEST_UTIL.getConfiguration());
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
// make sure that we do not depend on this config when getting locations for meta replicas, see
// HBASE-21658.
conf.setInt(META_REPLICAS_NUM, 1);
REGISTRY = new ZKAsyncRegistry(conf);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -126,9 +128,15 @@ public String explainFailure() throws Exception {
}

@Test
public void testGetAndExists() throws Exception {
public void testRead() throws Exception {
assertArrayEquals(DATA, RO_ZK.get(PATH).get());
assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren());
List<String> children = RO_ZK.list(PATH).get();
assertEquals(CHILDREN, children.size());
Collections.sort(children);
for (int i = 0; i < CHILDREN; i++) {
assertEquals("c" + i, children.get(i));
}
assertNotNull(RO_ZK.zookeeper);
waitForIdleConnectionClosed();
}
Expand All @@ -145,6 +153,15 @@ public void testNoNode() throws InterruptedException, ExecutionException {
assertEquals(Code.NONODE, ke.code());
assertEquals(pathNotExists, ke.getPath());
}
try {
RO_ZK.list(pathNotExists).get();
fail("should fail because of " + pathNotExists + " does not exist");
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(KeeperException.class));
KeeperException ke = (KeeperException) e.getCause();
assertEquals(Code.NONODE, ke.code());
assertEquals(pathNotExists, ke.getPath());
}
// exists will not throw exception.
assertNull(RO_ZK.exists(pathNotExists).get());
}
Expand Down

0 comments on commit df27820

Please sign in to comment.