Skip to content

Commit

Permalink
HBASE-8868. add metric to report client shortcircuit reads. (apache#1334
Browse files Browse the repository at this point in the history
)

Signed-off-by: stack <[email protected]>
  • Loading branch information
jojochuang authored and infraio committed Mar 25, 2020
1 parent 0bb1975 commit 687384b
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,17 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
String HEDGED_READ_WINS_DESC =
"The number of times we started a hedged read and a hedged read won";

String TOTAL_BYTES_READ = "totalBytesRead";
String TOTAL_BYTES_READ_DESC = "The total number of bytes read from HDFS";
String LOCAL_BYTES_READ = "localBytesRead";
String LOCAL_BYTES_READ_DESC =
"The number of bytes read from the local HDFS DataNode";
String SHORTCIRCUIT_BYTES_READ = "shortCircuitBytesRead";
String SHORTCIRCUIT_BYTES_READ_DESC = "The number of bytes read through HDFS short circuit read";
String ZEROCOPY_BYTES_READ = "zeroCopyBytesRead";
String ZEROCOPY_BYTES_READ_DESC =
"The number of bytes read through HDFS zero copy";

String BLOCKED_REQUESTS_COUNT = "blockedRequestCount";
String BLOCKED_REQUESTS_COUNT_DESC = "The number of blocked requests because of memstore size is "
+ "larger than blockingMemStoreSize";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,26 @@ public interface MetricsRegionServerWrapper {
*/
long getHedgedReadWins();

/**
* @return Number of total bytes read from HDFS.
*/
long getTotalBytesRead();

/**
* @return Number of bytes read from the local HDFS DataNode.
*/
long getLocalBytesRead();

/**
* @return Number of bytes read locally through HDFS short circuit.
*/
long getShortCircuitBytesRead();

/**
* @return Number of bytes read locally through HDFS zero copy.
*/
long getZeroCopyBytesRead();

/**
* @return Count of requests blocked because the memstore size is larger than blockingMemStoreSize
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,18 @@ private MetricsRecordBuilder addGaugesToMetricsRecordBuilder(MetricsRecordBuilde
.addGauge(Interns.info(PERCENT_FILES_LOCAL_SECONDARY_REGIONS,
PERCENT_FILES_LOCAL_SECONDARY_REGIONS_DESC),
rsWrap.getPercentFileLocalSecondaryRegions())
.addGauge(Interns.info(TOTAL_BYTES_READ,
TOTAL_BYTES_READ_DESC),
rsWrap.getTotalBytesRead())
.addGauge(Interns.info(LOCAL_BYTES_READ,
LOCAL_BYTES_READ_DESC),
rsWrap.getLocalBytesRead())
.addGauge(Interns.info(SHORTCIRCUIT_BYTES_READ,
SHORTCIRCUIT_BYTES_READ_DESC),
rsWrap.getShortCircuitBytesRead())
.addGauge(Interns.info(ZEROCOPY_BYTES_READ,
ZEROCOPY_BYTES_READ_DESC),
rsWrap.getZeroCopyBytesRead())
.addGauge(Interns.info(SPLIT_QUEUE_LENGTH, SPLIT_QUEUE_LENGTH_DESC),
rsWrap.getSplitQueueSize())
.addGauge(Interns.info(COMPACTION_QUEUE_LENGTH, COMPACTION_QUEUE_LENGTH_DESC),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -94,6 +96,15 @@ public class FSDataInputStreamWrapper implements Closeable {
// errors against Hadoop pre 2.6.4 and 2.7.1 versions.
private Method unbuffer = null;

private final static ReadStatistics readStatistics = new ReadStatistics();

private static class ReadStatistics {
long totalBytesRead;
long totalLocalBytesRead;
long totalShortCircuitBytesRead;
long totalZeroCopyBytesRead;
}

public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
this(fs, path, false, -1L);
}
Expand Down Expand Up @@ -232,14 +243,64 @@ public void checksumOk() {
}
}

/** Close stream(s) if necessary. */
private void updateInputStreamStatistics(FSDataInputStream stream) {
// If the underlying file system is HDFS, update read statistics upon close.
if (stream instanceof HdfsDataInputStream) {
/**
* Because HDFS ReadStatistics is calculated per input stream, it is not
* feasible to update the aggregated number in real time. Instead, the
* metrics are updated when an input stream is closed.
*/
HdfsDataInputStream hdfsDataInputStream = (HdfsDataInputStream)stream;
synchronized (readStatistics) {
readStatistics.totalBytesRead += hdfsDataInputStream.getReadStatistics().
getTotalBytesRead();
readStatistics.totalLocalBytesRead += hdfsDataInputStream.getReadStatistics().
getTotalBytesRead();
readStatistics.totalShortCircuitBytesRead += hdfsDataInputStream.getReadStatistics().
getTotalShortCircuitBytesRead();
readStatistics.totalZeroCopyBytesRead += hdfsDataInputStream.getReadStatistics().
getTotalZeroCopyBytesRead();
}
}
}

public static long getTotalBytesRead() {
synchronized (readStatistics) {
return readStatistics.totalBytesRead;
}
}

public static long getLocalBytesRead() {
synchronized (readStatistics) {
return readStatistics.totalLocalBytesRead;
}
}

public static long getShortCircuitBytesRead() {
synchronized (readStatistics) {
return readStatistics.totalShortCircuitBytesRead;
}
}

public static long getZeroCopyBytesRead() {
synchronized (readStatistics) {
return readStatistics.totalZeroCopyBytesRead;
}
}

/** CloseClose stream(s) if necessary. */
@Override
public void close() {
if (!doCloseStreams) {
return;
}
updateInputStreamStatistics(this.streamNoFsChecksum);
// we do not care about the close exception as it is for reading, no data loss issue.
IOUtils.closeQuietly(streamNoFsChecksum);


updateInputStreamStatistics(stream);
IOUtils.closeQuietly(stream);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
Expand Down Expand Up @@ -941,6 +942,26 @@ public long getHedgedReadWins() {
return this.dfsHedgedReadMetrics == null? 0: this.dfsHedgedReadMetrics.getHedgedReadWins();
}

@Override
public long getTotalBytesRead() {
return FSDataInputStreamWrapper.getTotalBytesRead();
}

@Override
public long getLocalBytesRead() {
return FSDataInputStreamWrapper.getLocalBytesRead();
}

@Override
public long getShortCircuitBytesRead() {
return FSDataInputStreamWrapper.getShortCircuitBytesRead();
}

@Override
public long getZeroCopyBytesRead() {
return FSDataInputStreamWrapper.getZeroCopyBytesRead();
}

@Override
public long getBlockedRequestsCount() {
return blockedRequestsCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,26 @@ public long getHedgedReadWins() {
return 10;
}

@Override
public long getTotalBytesRead() {
return 0;
}

@Override
public long getLocalBytesRead() {
return 0;
}

@Override
public long getShortCircuitBytesRead() {
return 0;
}

@Override
public long getZeroCopyBytesRead() {
return 0;
}

@Override
public long getBlockedRequestsCount() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -622,4 +623,22 @@ public void testAverageRegionSize() throws Exception {
metricsRegionServer.getRegionServerWrapper().forceRecompute();
assertTrue(metricsHelper.getGaugeDouble("averageRegionSize", serverSource) > 0.0);
}

@Test
public void testReadBytes() throws Exception {
// Do a first put to be sure that the connection is established, meta is there and so on.
doNPuts(1, false);
doNGets(10, false);
TEST_UTIL.getAdmin().flush(tableName);
metricsRegionServer.getRegionServerWrapper().forceRecompute();

assertTrue("Total read bytes should be larger than 0",
metricsRegionServer.getRegionServerWrapper().getTotalBytesRead() > 0);
assertTrue("Total local read bytes should be larger than 0",
metricsRegionServer.getRegionServerWrapper().getLocalBytesRead() > 0);
assertEquals("Total short circuit read bytes should be equal to 0", 0,
metricsRegionServer.getRegionServerWrapper().getShortCircuitBytesRead());
assertEquals("Total zero-byte read bytes should be equal to 0", 0,
metricsRegionServer.getRegionServerWrapper().getZeroCopyBytesRead());
}
}
7 changes: 7 additions & 0 deletions src/main/asciidoc/_chapters/schema_design.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,13 @@ is complaining in the logs, include `dfs.client.read.shortcircuit.streams.cache
`dfs.client.socketcache.capacity`. Documentation is sparse on these options. You'll have to
read source code.
RegionServer metric system exposes HDFS short circuit read metrics `shortCircuitBytesRead`. Other
HDFS read metrics, including
`totalBytesRead` (The total number of bytes read from HDFS),
`localBytesRead` (The number of bytes read from the local HDFS DataNode),
`zeroCopyBytesRead` (The number of bytes read through HDFS zero copy)
are available and can be used to troubleshoot short-circuit read issues.
For more on short-circuit reads, see Colin's old blog on rollout,
link:http://blog.cloudera.com/blog/2013/08/how-improved-short-circuit-local-reads-bring-better-performance-and-security-to-hadoop/[How Improved Short-Circuit Local Reads Bring Better Performance and Security to Hadoop].
The link:https://issues.apache.org/jira/browse/HDFS-347[HDFS-347] issue also makes for an
Expand Down

0 comments on commit 687384b

Please sign in to comment.