Skip to content

Commit

Permalink
HBASE-22804 Provide an API to get list of successful regions and tota…
Browse files Browse the repository at this point in the history
…l expected regions in Canary
  • Loading branch information
Caroline Zhou authored and xcangCRM committed Sep 16, 2019
1 parent b1d4878 commit ac07609
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 1 deletion.
139 changes: 138 additions & 1 deletion hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.atomic.LongAdder;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
Expand Down Expand Up @@ -133,14 +134,20 @@ public interface Sink {
long incWriteFailureCount();
Map<String,String> getWriteFailures();
void updateWriteFailures(String regionName, String serverName);
long getReadSuccessCount();
long incReadSuccessCount();
long getWriteSuccessCount();
long incWriteSuccessCount();
}

/**
* Simple implementation of canary sink that allows plotting to a file or standard output.
*/
public static class StdOutSink implements Sink {
private AtomicLong readFailureCount = new AtomicLong(0),
writeFailureCount = new AtomicLong(0);
writeFailureCount = new AtomicLong(0),
readSuccessCount = new AtomicLong(0),
writeSuccessCount = new AtomicLong(0);
private Map<String, String> readFailures = new ConcurrentHashMap<>();
private Map<String, String> writeFailures = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -183,6 +190,26 @@ public Map<String, String> getWriteFailures() {
public void updateWriteFailures(String regionName, String serverName) {
writeFailures.put(regionName, serverName);
}

@Override
public long getReadSuccessCount() {
return readSuccessCount.get();
}

@Override
public long incReadSuccessCount() {
return readSuccessCount.incrementAndGet();
}

@Override
public long getWriteSuccessCount() {
return writeSuccessCount.get();
}

@Override
public long incWriteSuccessCount() {
return writeSuccessCount.incrementAndGet();
}
}

/**
Expand Down Expand Up @@ -219,6 +246,7 @@ public void publishReadTiming(String znode, String server, long msTime) {
public static class RegionStdOutSink extends StdOutSink {
private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
private LongAdder writeLatency = new LongAdder();
private final Map<String, RegionTaskResult> regionMap = new ConcurrentHashMap<>();

public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) {
incReadFailureCount();
Expand All @@ -234,6 +262,10 @@ public void publishReadFailure(ServerName serverName, RegionInfo region,

public void publishReadTiming(ServerName serverName, RegionInfo region,
ColumnFamilyDescriptor column, long msTime) {
incReadSuccessCount();
RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString());
res.setReadSuccess();
res.setReadLatency(msTime);
LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName,
column.getNameAsString(), msTime);
}
Expand All @@ -252,6 +284,10 @@ public void publishWriteFailure(ServerName serverName, RegionInfo region,

public void publishWriteTiming(ServerName serverName, RegionInfo region,
ColumnFamilyDescriptor column, long msTime) {
incWriteSuccessCount();
RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString());
res.setWriteSuccess();
res.setWriteLatency(msTime);
LOG.info("Write to {} on {} {} in {}ms",
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime);
}
Expand All @@ -273,6 +309,14 @@ public void initializeWriteLatency() {
public LongAdder getWriteLatency() {
return this.writeLatency;
}

public Map<String, RegionTaskResult> getRegionMap() {
return this.regionMap;
}

public int getTotalExpectedRegions() {
return this.regionMap.size();
}
}

/**
Expand Down Expand Up @@ -931,6 +975,96 @@ Sink getSink(Configuration configuration, Class clazz) {
clazz, Sink.class));
}

/**
* Canary region mode-specific data structure which stores information about each region
* to be scanned
*/
public static class RegionTaskResult {
private RegionInfo region;
private TableName tableName;
private ServerName serverName;
private AtomicLong readLatency = null;
private AtomicLong writeLatency = null;
private boolean readSuccess = false;
private boolean writeSuccess = false;

public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName) {
this.region = region;
this.tableName = tableName;
this.serverName = serverName;
}

public RegionInfo getRegionInfo() {
return this.region;
}

public String getRegionNameAsString() {
return this.region.getRegionNameAsString();
}

public TableName getTableName() {
return this.tableName;
}

public String getTableNameAsString() {
return this.tableName.getNameAsString();
}

public ServerName getServerName() {
return this.serverName;
}

public String getServerNameAsString() {
return this.serverName.getServerName();
}

public long getReadLatency() {
if (this.readLatency == null) {
return -1;
}
return this.readLatency.get();
}

public void setReadLatency(long readLatency) {
if (this.readLatency != null) {
this.readLatency.set(readLatency);
} else {
this.readLatency = new AtomicLong(readLatency);
}
}

public long getWriteLatency() {
if (this.writeLatency == null) {
return -1;
}
return this.writeLatency.get();
}

public void setWriteLatency(long writeLatency) {
if (this.writeLatency != null) {
this.writeLatency.set(writeLatency);
} else {
this.writeLatency = new AtomicLong(writeLatency);
}
}

public boolean isReadSuccess() {
return this.readSuccess;
}

public void setReadSuccess() {
this.readSuccess = true;
}

public boolean isWriteSuccess() {
return this.writeSuccess;
}

public void setWriteSuccess() {
this.writeSuccess = true;
}
}

/**
* A Factory method for {@link Monitor}.
* Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a RegionMonitor.
Expand Down Expand Up @@ -1346,6 +1480,9 @@ private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
RegionInfo region = location.getRegion();
tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink)sink,
taskType, rawScanEnabled, rwLatency));
Map<String, RegionTaskResult> regionMap = ((RegionStdOutSink) sink).getRegionMap();
regionMap.put(region.getRegionNameAsString(), new RegionTaskResult(region,
region.getTable(), rs));
}
return executor.invokeAll(tasks);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.hadoop.hbase.tool;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
Expand All @@ -29,6 +32,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand Down Expand Up @@ -126,6 +130,55 @@ public void testBasicCanaryWorks() throws Exception {
isA(ColumnFamilyDescriptor.class), anyLong());
}

@Test
public void testCanaryRegionTaskResult() throws Exception {
TableName tableName = TableName.valueOf("testCanaryRegionTaskResult");
Table table = testingUtility.createTable(tableName, new byte[][] { FAMILY });
// insert some test rows
for (int i=0; i<1000; i++) {
byte[] iBytes = Bytes.toBytes(i);
Put p = new Put(iBytes);
p.addColumn(FAMILY, COLUMN, iBytes);
table.put(p);
}
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink());
Canary canary = new Canary(executor, sink);
String[] args = { "-writeSniffing", "-t", "10000", "testCanaryRegionTaskResult" };
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args));

assertTrue("verify read success count > 0", sink.getReadSuccessCount() > 0);
assertTrue("verify write success count > 0", sink.getWriteSuccessCount() > 0);
verify(sink, atLeastOnce()).publishReadTiming(isA(ServerName.class), isA(RegionInfo.class),
isA(ColumnFamilyDescriptor.class), anyLong());
verify(sink, atLeastOnce()).publishWriteTiming(isA(ServerName.class), isA(RegionInfo.class),
isA(ColumnFamilyDescriptor.class), anyLong());

assertTrue("canary should expect to scan at least 1 region",
sink.getTotalExpectedRegions() > 0);
Map<String, Canary.RegionTaskResult> regionMap = sink.getRegionMap();
assertFalse("verify region map has size > 0", regionMap.isEmpty());

for (String regionName : regionMap.keySet()) {
Canary.RegionTaskResult res = regionMap.get(regionName);
assertNotNull("verify each expected region has a RegionTaskResult object in the map", res);
assertNotNull("verify getRegionNameAsString()", regionName);
assertNotNull("verify getRegionInfo()", res.getRegionInfo());
assertNotNull("verify getTableName()", res.getTableName());
assertNotNull("verify getTableNameAsString()", res.getTableNameAsString());
assertNotNull("verify getServerName()", res.getServerName());
assertNotNull("verify getServerNameAsString()", res.getServerNameAsString());

if (regionName.contains(Canary.DEFAULT_WRITE_TABLE_NAME.getNameAsString())) {
assertTrue("write to region " + regionName + " succeeded", res.isWriteSuccess());
assertTrue("write took some time", res.getWriteLatency() > -1);
} else {
assertTrue("read from region " + regionName + " succeeded", res.isReadSuccess());
assertTrue("read took some time", res.getReadLatency() > -1);
}
}
}

@Test
@Ignore("Intermittent argument matching failures, see HBASE-18813")
public void testReadTableTimeouts() throws Exception {
Expand Down

0 comments on commit ac07609

Please sign in to comment.