diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index d458c6cb66ec..ee6b393b3d15 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -340,7 +340,7 @@ public AsyncTableBuilder getTableBuilder(TableName tableName public AsyncTable build() { RawAsyncTableImpl rawTable = new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); - return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool); + return new AsyncTableImpl(rawTable, pool); } }; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index ba2c5600282d..f2a8e18b5214 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -43,12 +43,11 @@ @InterfaceAudience.Private class AsyncTableImpl implements AsyncTable { - private final AsyncTable rawTable; + private final RawAsyncTableImpl rawTable; private final ExecutorService pool; - AsyncTableImpl(AsyncConnectionImpl conn, AsyncTable rawTable, - ExecutorService pool) { + AsyncTableImpl(RawAsyncTableImpl rawTable, ExecutorService pool) { this.rawTable = rawTable; this.pool = pool; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java index 7d520099744d..616cf6b9e601 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -23,6 +23,7 @@ import java.io.InterruptedIOException; import java.util.ArrayDeque; import java.util.Queue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -30,8 +31,8 @@ import org.slf4j.LoggerFactory; /** - * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically - * in background and cache it in memory. Typically the {@link #maxCacheSize} will be + * The {@link ResultScanner} implementation for {@link RawAsyncTableImpl}. It will fetch data + * automatically in background and cache it in memory. Typically, the {@link #maxCacheSize} will be * {@code 2 * scan.getMaxResultSize()}. */ @InterfaceAudience.Private @@ -39,7 +40,7 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum private static final Logger LOG = LoggerFactory.getLogger(AsyncTableResultScanner.class); - private final AsyncTable rawTable; + private final TableName tableName; private final long maxCacheSize; @@ -57,12 +58,10 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum private ScanResumer resumer; - public AsyncTableResultScanner(AsyncTable table, Scan scan, - long maxCacheSize) { - this.rawTable = table; + public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) { + this.tableName = tableName; this.maxCacheSize = maxCacheSize; this.scan = scan; - table.scan(scan, this); } private void addToCache(Result result) { @@ -72,9 +71,10 @@ private void addToCache(Result result) { private void stopPrefetch(ScanController controller) { if (LOG.isDebugEnabled()) { - LOG.debug(String.format("0x%x", System.identityHashCode(this)) + - " stop prefetching when scanning " + rawTable.getName() + " as the cache size " + - cacheSize + " is greater than the maxCacheSize " + maxCacheSize); + LOG.debug("{} stop prefetching when scanning {} as the cache size {}" + + " is greater than the maxCacheSize {}", + String.format("0x%x", System.identityHashCode(this)), tableName, cacheSize, + maxCacheSize); } resumer = controller.suspend(); } @@ -138,7 +138,7 @@ public synchronized Result next() throws IOException { return null; } if (error != null) { - FutureUtils.rethrow(error); + throw FutureUtils.rethrow(error); } try { wait(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 20e84cc6f739..437f12f80a1b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -666,10 +666,14 @@ private long resultSize2CacheSize(long maxResultSize) { } @Override - public ResultScanner getScanner(Scan scan) { - return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan), - resultSize2CacheSize( - scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); + public AsyncTableResultScanner getScanner(Scan scan) { + final long maxCacheSize = resultSize2CacheSize( + scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize); + final Scan scanCopy = ReflectionUtils.newInstance(scan.getClass(), scan); + final AsyncTableResultScanner scanner = + new AsyncTableResultScanner(tableName, scanCopy, maxCacheSize); + scan(scan, scanner); + return scanner; } @Override