diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index 694a61d5b1f7..727b1409c57e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -24,14 +24,14 @@ import java.io.IOException; import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.List; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -44,6 +44,8 @@ @InterfaceAudience.Private public class ReversedScannerCallable extends ScannerCallable { + private byte[] locationSearchKey; + /** * @param connection which connection * @param tableName table callable is on @@ -59,6 +61,18 @@ public ReversedScannerCallable(ClusterConnection connection, TableName tableName super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId); } + @Override + public void throwable(Throwable t, boolean retrying) { + // for reverse scans, we need to update cache using the search key found for the reverse scan + // range in prepare. Otherwise, we will see weird behavior at the table boundaries, + // when trying to clear cache for an empty row. + if (location != null && locationSearchKey != null) { + getConnection().updateCachedLocations(getTableName(), + location.getRegionInfo().getRegionName(), + locationSearchKey, t, location.getServerName()); + } + } + /** * @param reload force reload of server location */ @@ -67,33 +81,37 @@ public void prepare(boolean reload) throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException(); } + + if (reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME) + && getConnection().isTableDisabled(getTableName())) { + throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled."); + } + if (!instantiated || reload) { // we should use range locate if // 1. we do not want the start row // 2. the start row is empty which means we need to locate to the last region. if (scan.includeStartRow() && !isEmptyStartRow(getRow())) { // Just locate the region with the row - RegionLocations rl = getRegionLocations(reload, getRow()); + RegionLocations rl = getRegionLocationsForPrepare(getRow()); this.location = getLocationForReplica(rl); - if (location == null || location.getServerName() == null) { - throw new IOException("Failed to find location, tableName=" - + getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload=" - + reload); - } + this.locationSearchKey = getRow(); } else { - // Need to locate the regions with the range, and the target location is - // the last one which is the previous region of last region scanner + // The locateStart row is an approximation. So we need to search between + // that and the actual row in order to really find the last region byte[] locateStartRow = createCloseRowBefore(getRow()); - List locatedRegions = locateRegionsInRange( - locateStartRow, getRow(), reload); - if (locatedRegions.isEmpty()) { - throw new DoNotRetryIOException( - "Does hbase:meta exist hole? Couldn't get regions for the range from " - + Bytes.toStringBinary(locateStartRow) + " to " - + Bytes.toStringBinary(getRow())); - } - this.location = locatedRegions.get(locatedRegions.size() - 1); + Pair lastRegionAndKey = locateLastRegionInRange( + locateStartRow, getRow()); + this.location = lastRegionAndKey.getFirst(); + this.locationSearchKey = lastRegionAndKey.getSecond(); } + + if (location == null || location.getServerName() == null) { + throw new IOException("Failed to find location, tableName=" + + getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload=" + + reload); + } + setStub(getConnection().getClient(getLocation().getServerName())); checkIfRegionServerIsRemote(); instantiated = true; @@ -106,15 +124,14 @@ public void prepare(boolean reload) throws IOException { } /** - * Get the corresponding regions for an arbitrary range of keys. + * Get the last region before the endkey, which will be used to execute the reverse scan * @param startKey Starting row in range, inclusive * @param endKey Ending row in range, exclusive - * @param reload force reload of server location - * @return A list of HRegionLocation corresponding to the regions that contain - * the specified range + * @return The last location, and the rowKey used to find it. May be null, + * if a region could not be found. */ - private List locateRegionsInRange(byte[] startKey, - byte[] endKey, boolean reload) throws IOException { + private Pair locateLastRegionInRange(byte[] startKey, byte[] endKey) + throws IOException { final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW); if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { @@ -122,13 +139,17 @@ private List locateRegionsInRange(byte[] startKey, + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey)); } - List regionList = new ArrayList<>(); + + HRegionLocation lastRegion = null; + byte[] lastFoundKey = null; byte[] currentKey = startKey; + do { - RegionLocations rl = getRegionLocations(reload, currentKey); + RegionLocations rl = getRegionLocationsForPrepare(currentKey); HRegionLocation regionLocation = getLocationForReplica(rl); if (regionLocation.getRegionInfo().containsRow(currentKey)) { - regionList.add(regionLocation); + lastFoundKey = currentKey; + lastRegion = regionLocation; } else { throw new DoNotRetryIOException( "Does hbase:meta exist hole? Locating row " + Bytes.toStringBinary(currentKey) + @@ -137,7 +158,8 @@ private List locateRegionsInRange(byte[] startKey, currentKey = regionLocation.getRegionInfo().getEndKey(); } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0)); - return regionList; + + return new Pair<>(lastRegion, lastFoundKey); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 5d7cf705bd6e..b557b3eaf6f5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.exceptions.ScannerResetException; @@ -127,9 +128,18 @@ protected final HRegionLocation getLocationForReplica(RegionLocations locs) return loc; } - protected final RegionLocations getRegionLocations(boolean reload, byte[] row) + /** + * Fetch region locations for the row. Since this is for prepare, we always useCache. + * This is because we can be sure that RpcRetryingCaller will have cleared the cache + * in error handling if this is a retry. + * + * @param row the row to look up region location for + */ + protected final RegionLocations getRegionLocationsForPrepare(byte[] row) throws IOException { - return RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id, getConnection(), + // always use cache, because cache will have been cleared if necessary + // in the try/catch before retrying + return RpcRetryingCallerWithReadReplicas.getRegionLocations(true, id, getConnection(), getTableName(), row); } @@ -141,7 +151,13 @@ public void prepare(boolean reload) throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException(); } - RegionLocations rl = getRegionLocations(reload, getRow()); + + if (reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME) + && getConnection().isTableDisabled(getTableName())) { + throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled."); + } + + RegionLocations rl = getRegionLocationsForPrepare(getRow()); location = getLocationForReplica(rl); ServerName dest = location.getServerName(); setStub(super.getConnection().getClient(dest)); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestReversedScannerCallable.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestReversedScannerCallable.java index 6f62bb182ff6..67976b8233ad 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestReversedScannerCallable.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestReversedScannerCallable.java @@ -17,12 +17,23 @@ */ package org.apache.hadoop.hbase.client; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -33,7 +44,6 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -44,53 +54,78 @@ public class TestReversedScannerCallable { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReversedScannerCallable.class); + private static final TableName TABLE_NAME = TableName.valueOf("TestReversedScannerCallable"); + + private static final String HOSTNAME = "localhost"; + private static final ServerName SERVERNAME = ServerName.valueOf(HOSTNAME, 60030, 123); + private static final byte[] ROW = Bytes.toBytes("row1"); + private static final Scan DEFAULT_SCAN = new Scan().withStartRow(ROW, true).setReversed(true); + @Mock private ClusterConnection connection; @Mock - private Scan scan; - @Mock private RpcControllerFactory rpcFactory; @Mock private RegionLocations regionLocations; - - private final byte[] ROW = Bytes.toBytes("row1"); + @Mock + private HRegionLocation regionLocation; @Before public void setUp() throws Exception { - HRegionLocation regionLocation = Mockito.mock(HRegionLocation.class); - ServerName serverName = Mockito.mock(ServerName.class); - - Mockito.when(connection.getConfiguration()).thenReturn(new Configuration()); - Mockito.when(regionLocations.size()).thenReturn(1); - Mockito.when(regionLocations.getRegionLocation(0)).thenReturn(regionLocation); - Mockito.when(regionLocation.getHostname()).thenReturn("localhost"); - Mockito.when(regionLocation.getServerName()).thenReturn(serverName); - Mockito.when(scan.includeStartRow()).thenReturn(true); - Mockito.when(scan.getStartRow()).thenReturn(ROW); + when(connection.getConfiguration()).thenReturn(new Configuration()); + when(regionLocations.size()).thenReturn(1); + when(regionLocations.getRegionLocation(0)).thenReturn(regionLocation); + when(regionLocation.getHostname()).thenReturn(HOSTNAME); + when(regionLocation.getServerName()).thenReturn(SERVERNAME); } @Test - public void testPrepareDoesNotUseCache() throws Exception { - TableName tableName = TableName.valueOf("MyTable"); - Mockito.when(connection.relocateRegion(tableName, ROW, 0)).thenReturn(regionLocations); + public void testPrepareAlwaysUsesCache() throws Exception { + when(connection.locateRegion(TABLE_NAME, ROW, true, true, 0)) + .thenReturn(regionLocations); ReversedScannerCallable callable = - new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory, 0); + new ReversedScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0); + callable.prepare(false); callable.prepare(true); - Mockito.verify(connection).relocateRegion(tableName, ROW, 0); + verify(connection, times(2)).locateRegion(TABLE_NAME, ROW, true, true, 0); } @Test - public void testPrepareUsesCache() throws Exception { - TableName tableName = TableName.valueOf("MyTable"); - Mockito.when(connection.locateRegion(tableName, ROW, true, true, 0)) - .thenReturn(regionLocations); + public void testHandleDisabledTable() throws IOException { + when(connection.isTableDisabled(TABLE_NAME)).thenReturn(true); ReversedScannerCallable callable = - new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory, 0); + new ReversedScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0); + + assertThrows(TableNotEnabledException.class, () -> callable.prepare(true)); + } + + @Test + public void testUpdateSearchKeyCacheLocation() throws IOException { + byte[] regionName = RegionInfo.createRegionName(TABLE_NAME, + ConnectionUtils.createCloseRowBefore(ConnectionUtils.MAX_BYTE_ARRAY), "123", false); + HRegionInfo mockRegionInfo = mock(HRegionInfo.class); + when(mockRegionInfo.containsRow(ConnectionUtils.MAX_BYTE_ARRAY)).thenReturn(true); + when(mockRegionInfo.getEndKey()).thenReturn(HConstants.EMPTY_END_ROW); + when(mockRegionInfo.getRegionName()).thenReturn(regionName); + when(regionLocation.getRegionInfo()).thenReturn(mockRegionInfo); + + IOException testThrowable = new IOException("test throwable"); + + when(connection.locateRegion(TABLE_NAME, ConnectionUtils.MAX_BYTE_ARRAY, true, true, 0)) + .thenReturn(regionLocations); + + Scan scan = new Scan().setReversed(true); + ReversedScannerCallable callable = + new ReversedScannerCallable(connection, TABLE_NAME, scan, null, rpcFactory, 0); + callable.prepare(false); - Mockito.verify(connection).locateRegion(tableName, ROW, true, true, 0); + callable.throwable(testThrowable, true); + + verify(connection).updateCachedLocations(TABLE_NAME, regionName, + ConnectionUtils.MAX_BYTE_ARRAY, testThrowable, SERVERNAME); } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScannerCallable.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScannerCallable.java new file mode 100644 index 000000000000..6c48fd26fb1e --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestScannerCallable.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +@Category({ ClientTests.class, SmallTests.class }) +public class TestScannerCallable { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestScannerCallable.class); + + private static final TableName TABLE_NAME = TableName.valueOf("TestScannerCallable"); + + private static final String HOSTNAME = "localhost"; + private static final ServerName SERVERNAME = ServerName.valueOf(HOSTNAME, 60030, 123); + private static final byte[] ROW = Bytes.toBytes("row1"); + private static final Scan DEFAULT_SCAN = new Scan().withStartRow(ROW, true); + + @Mock + private ClusterConnection connection; + @Mock + private RpcControllerFactory rpcFactory; + @Mock + private RegionLocations regionLocations; + @Mock + private HRegionLocation regionLocation; + + @Before + public void setUp() throws Exception { + when(connection.getConfiguration()).thenReturn(new Configuration()); + when(regionLocations.size()).thenReturn(1); + when(regionLocations.getRegionLocation(0)).thenReturn(regionLocation); + when(regionLocation.getHostname()).thenReturn(HOSTNAME); + when(regionLocation.getServerName()).thenReturn(SERVERNAME); + } + + @Test + public void testPrepareAlwaysUsesCache() throws Exception { + when(connection.locateRegion(TABLE_NAME, ROW, true, true, 0)) + .thenReturn(regionLocations); + + ScannerCallable callable = + new ScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0); + callable.prepare(false); + callable.prepare(true); + + verify(connection, times(2)).locateRegion(TABLE_NAME, ROW, true, true, 0); + } + + @Test + public void testHandleDisabledTable() throws IOException { + when(connection.isTableDisabled(TABLE_NAME)).thenReturn(true); + + ScannerCallable callable = + new ScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0); + + assertThrows(TableNotEnabledException.class, () -> callable.prepare(true)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java index be8e4320a8bf..b78d5f3cfd2f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java @@ -281,7 +281,7 @@ private List waitOnSplit(final Table t) throws IOException { e.printStackTrace(); } regions = locator.getAllRegionLocations(); - if (regions.size() > originalCount) { + if (regions.size() > originalCount && allRegionsHaveHostnames(regions)) { break; } } @@ -289,6 +289,18 @@ private List waitOnSplit(final Table t) throws IOException { } } + // We need to check for null serverNames due to https://issues.apache.org/jira/browse/HBASE-26790, + // because the null serverNames cause the ScannerCallable to fail. + // we can remove this check once that is resolved + private boolean allRegionsHaveHostnames(List regions) { + for (HRegionLocation region : regions) { + if (region.getServerName() == null) { + return false; + } + } + return true; + } + protected Result getSingleScanResult(Table ht, Scan scan) throws IOException { ResultScanner scanner = ht.getScanner(scan); Result result = scanner.next(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java index 9dc896832493..332b1bfdec86 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java @@ -161,6 +161,70 @@ public void testPreserveMetaCacheOnException() throws Exception { } } + @Test + public void testClearsCacheOnScanException() throws Exception { + ((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector( + new RoundRobinExceptionInjector()); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.set("hbase.client.retries.number", "1"); + + try (ConnectionImplementation conn = + (ConnectionImplementation) ConnectionFactory.createConnection(conf); + Table table = conn.getTable(TABLE_NAME)) { + + byte[] row = Bytes.toBytes("row2"); + + Put put = new Put(row); + put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10)); + + Scan scan = new Scan(); + scan.withStartRow(row); + scan.setLimit(1); + scan.setCaching(1); + + populateCache(table, row); + assertNotNull(conn.getCachedLocation(TABLE_NAME, row)); + assertTrue(executeUntilCacheClearingException(table, scan)); + assertNull(conn.getCachedLocation(TABLE_NAME, row)); + + // repopulate cache so we can test with reverse scan too + populateCache(table, row); + assertNotNull(conn.getCachedLocation(TABLE_NAME, row)); + + // run with reverse scan + scan.setReversed(true); + assertTrue(executeUntilCacheClearingException(table, scan)); + assertNull(conn.getCachedLocation(TABLE_NAME, row)); + } + } + + private void populateCache(Table table, byte[] row) { + for (int i = 0; i < 50; i++) { + try { + table.get(new Get(row)); + return; + } catch (Exception e) { + // pass, we just want this to succeed so that region location will be cached + } + } + } + + private boolean executeUntilCacheClearingException(Table table, Scan scan) { + for (int i = 0; i < 50; i++) { + try { + try (ResultScanner scanner = table.getScanner(scan)) { + scanner.next(); + } + } catch (Exception ex) { + // Only keep track of the last exception that updated the meta cache + if (ClientExceptionsUtil.isMetaClearingException(ex)) { + return true; + } + } + } + return false; + } + @Test public void testCacheClearingOnCallQueueTooBig() throws Exception { ((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(