Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-21658 Should get the meta replica number from zk instead of con… #231

Merged
merged 1 commit into from
May 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
Expand Down Expand Up @@ -134,12 +136,13 @@ private Pair<RegionState.State, ServerName> getStateAndServerName(
ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
HRegionLocation[] locs = new HRegionLocation[znodePaths.metaReplicaZNodes.size()];
private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
List<String> metaReplicaZNodes) {
HRegionLocation[] locs = new HRegionLocation[metaReplicaZNodes.size()];
MutableInt remaining = new MutableInt(locs.length);
znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> {
for (String metaReplicaZNode : metaReplicaZNodes) {
int replicaId = znodePaths.getMetaReplicaIdFromZnode(metaReplicaZNode);
String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
if (replicaId == DEFAULT_REPLICA_ID) {
addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
if (error != null) {
Expand Down Expand Up @@ -186,7 +189,23 @@ public CompletableFuture<RegionLocations> getMetaRegionLocation() {
tryComplete(remaining, locs, future);
});
}
});
}
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener(
zk.list(znodePaths.baseZNode)
.thenApply(children -> children.stream()
.filter(c -> c.startsWith(znodePaths.metaZNodePrefix)).collect(Collectors.toList())),
(metaReplicaZNodes, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
getMetaRegionLocation(future, metaReplicaZNodes);
});
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
Expand Down Expand Up @@ -284,6 +285,22 @@ protected void doExec(ZooKeeper zk) {
return future;
}

public CompletableFuture<List<String>> list(String path) {
if (closed.get()) {
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
}
CompletableFuture<List<String>> future = new CompletableFuture<>();
tasks.add(new ZKTask<List<String>>(path, future, "list") {

@Override
protected void doExec(ZooKeeper zk) {
zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true),
null);
}
});
return future;
}

private void closeZk() {
if (zookeeper != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public String getZNodeForReplica(int replicaId) {

/**
* Parse the meta replicaId from the passed znode
* @param znode
* @param znode the name of the znode, does not include baseZNode
* @return replicaId
*/
public int getMetaReplicaIdFromZnode(String znode) {
Expand All @@ -178,7 +178,7 @@ public int getMetaReplicaIdFromZnode(String znode) {

/**
* Is it the default meta replica's znode
* @param znode
* @param znode the name of the znode, does not include baseZNode
* @return true or false
*/
public boolean isDefaultMetaReplicaZnode(String znode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ public class TestZKAsyncRegistry {
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
TEST_UTIL.startMiniCluster(3);
REGISTRY = new ZKAsyncRegistry(TEST_UTIL.getConfiguration());
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
// make sure that we do not depend on this config when getting locations for meta replicas, see
// HBASE-21658.
conf.setInt(META_REPLICAS_NUM, 1);
REGISTRY = new ZKAsyncRegistry(conf);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -126,9 +128,15 @@ public String explainFailure() throws Exception {
}

@Test
public void testGetAndExists() throws Exception {
public void testRead() throws Exception {
assertArrayEquals(DATA, RO_ZK.get(PATH).get());
assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren());
List<String> children = RO_ZK.list(PATH).get();
assertEquals(CHILDREN, children.size());
Collections.sort(children);
for (int i = 0; i < CHILDREN; i++) {
assertEquals("c" + i, children.get(i));
}
assertNotNull(RO_ZK.zookeeper);
waitForIdleConnectionClosed();
}
Expand All @@ -145,6 +153,15 @@ public void testNoNode() throws InterruptedException, ExecutionException {
assertEquals(Code.NONODE, ke.code());
assertEquals(pathNotExists, ke.getPath());
}
try {
RO_ZK.list(pathNotExists).get();
fail("should fail because of " + pathNotExists + " does not exist");
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(KeeperException.class));
KeeperException ke = (KeeperException) e.getCause();
assertEquals(Code.NONODE, ke.code());
assertEquals(pathNotExists, ke.getPath());
}
// exists will not throw exception.
assertNull(RO_ZK.exists(pathNotExists).get());
}
Expand Down