Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-24757 : ReplicationSink should limit row count in batch mutation based on hbase.rpc.rows.warning.threshold #2139

Merged
merged 2 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,16 @@ public enum OperationStatusCode {
"hbase.master.executor.logreplayops.threads";
public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10;

/**
* Number of rows in a batch operation above which a warning will be logged.
*/
public static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";

/**
* Default value of {@link #BATCH_ROWS_THRESHOLD_NAME}
*/
public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;

private HConstants() {
// Can't be instantiated with this ctor.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,15 +276,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;

/**
* Number of rows in a batch operation above which a warning will be logged.
*/
static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold";
/**
* Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME}
*/
static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;

protected static final String RESERVOIR_ENABLED_KEY = "hbase.ipc.server.reservoir.enabled";

// Request counter. (Includes requests that are not serviced by regions.)
Expand Down Expand Up @@ -1229,7 +1220,8 @@ public RSRpcServices(HRegionServer rs) throws IOException {
RSRpcServices(HRegionServer rs, LogDelegate ld) throws IOException {
this.ld = ld;
regionServer = rs;
rowSizeWarnThreshold = rs.conf.getInt(BATCH_ROWS_THRESHOLD_NAME, BATCH_ROWS_THRESHOLD_DEFAULT);
rowSizeWarnThreshold = rs.conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME,
HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
RpcSchedulerFactory rpcSchedulerFactory;
try {
rpcSchedulerFactory = getRpcSchedulerFactoryClass().asSubclass(RpcSchedulerFactory.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
@Override
public void startReplicationService() throws IOException {
this.replicationManager.init();
this.replicationSink = new ReplicationSink(this.conf, this.server);
this.replicationSink = new ReplicationSink(this.conf);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -38,7 +39,6 @@
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -52,13 +52,14 @@
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;

/**
* <p>
Expand Down Expand Up @@ -91,16 +92,21 @@ public class ReplicationSink {
private SourceFSConfigurationProvider provider;
private WALEntrySinkFilter walEntrySinkFilter;

/**
* Row size threshold for multi requests above which a warning is logged
*/
private final int rowSizeWarnThreshold;

/**
* Create a sink for replication
*
* @param conf conf object
* @param stopper boolean to tell this thread to stop
* @param conf conf object
* @throws IOException thrown when HDFS goes bad or bad file name
*/
public ReplicationSink(Configuration conf, Stoppable stopper)
public ReplicationSink(Configuration conf)
throws IOException {
this.conf = HBaseConfiguration.create(conf);
rowSizeWarnThreshold = conf.getInt(
HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
decorateConf();
this.metrics = new MetricsSink();
this.walEntrySinkFilter = setupWALEntrySinkFilter();
Expand Down Expand Up @@ -210,11 +216,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
// Map of table name Vs list of pair of family and list of
// hfile paths from its namespace
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
bulkLoadsPerClusters.get(bld.getClusterIdsList());
if (bulkLoadHFileMap == null) {
bulkLoadHFileMap = new HashMap<>();
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
}
bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
}
} else {
Expand Down Expand Up @@ -247,7 +249,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
if (!rowMap.isEmpty()) {
LOG.debug("Started replicating mutations.");
for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
batch(entry.getKey(), entry.getValue().values());
batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
}
LOG.debug("Finished replicating mutations.");
}
Expand Down Expand Up @@ -372,17 +374,10 @@ private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
* @param value
* @return the list of values corresponding to key1 and key2
*/
private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
Map<K2,List<V>> innerMap = map.get(key1);
if (innerMap == null) {
innerMap = new HashMap<>();
map.put(key1, innerMap);
}
List<V> values = innerMap.get(key2);
if (values == null) {
values = new ArrayList<>();
innerMap.put(key2, values);
}
private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 key1, K2 key2,
V value) {
Map<K2, List<V>> innerMap = map.computeIfAbsent(key1, k -> new HashMap<>());
List<V> values = innerMap.computeIfAbsent(key2, k -> new ArrayList<>());
values.add(value);
return values;
}
Expand Down Expand Up @@ -410,9 +405,10 @@ public void stopReplicationSinkServices() {
* Do the changes and handle the pool
* @param tableName table to insert into
* @param allRows list of actions
* @throws IOException
* @param batchRowSizeThreshold rowSize threshold for batch mutation
*/
protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
private void batch(TableName tableName, Collection<List<Row>> allRows, int batchRowSizeThreshold)
throws IOException {
if (allRows.isEmpty()) {
return;
}
Expand All @@ -421,7 +417,15 @@ protected void batch(TableName tableName, Collection<List<Row>> allRows) throws
Connection connection = getConnection();
table = connection.getTable(tableName);
for (List<Row> rows : allRows) {
table.batch(rows, null);
List<List<Row>> batchRows;
if (rows.size() > batchRowSizeThreshold) {
batchRows = Lists.partition(rows, batchRowSizeThreshold);
} else {
batchRows = Collections.singletonList(rows);
}
for(List<Row> rowList:batchRows){
table.batch(rowList, null);
}
}
} catch (RetriesExhaustedWithDetailsException rewde) {
for (Throwable ex : rewde.getCauses()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -67,8 +68,8 @@ public static void setup() throws Exception {
final TableName tableName = TableName.valueOf("tableName");
TEST_UTIL = HBaseTestingUtility.createLocalHTU();
CONF = TEST_UTIL.getConfiguration();
THRESHOLD = CONF.getInt(RSRpcServices.BATCH_ROWS_THRESHOLD_NAME,
RSRpcServices.BATCH_ROWS_THRESHOLD_DEFAULT);
THRESHOLD = CONF.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME,
HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
TEST_UTIL.startMiniCluster();
TEST_UTIL.createTable(tableName, TEST_FAM);
RS = TEST_UTIL.getRSForFirstRegionInTable(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.security.SecureRandom;
import java.util.ArrayList;
Expand Down Expand Up @@ -55,7 +54,7 @@
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
Expand All @@ -78,7 +77,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;

@Category({ReplicationTests.class, MediumTests.class})
@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationSink {

@ClassRule
Expand Down Expand Up @@ -127,10 +126,8 @@ public void stop(String why) {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());

TEST_UTIL.startMiniCluster(3);
SINK =
new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
SINK = new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()));
table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
Expand Down Expand Up @@ -203,6 +200,40 @@ public void testMixedPutDelete() throws Exception {
assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
}

@Test
public void testLargeEditsPutDelete() throws Exception {
List<WALEntry> entries = new ArrayList<>();
List<Cell> cells = new ArrayList<>();
for (int i = 0; i < 5510; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
baseNamespaceDir, hfileArchiveDir);

ResultScanner resultScanner = table1.getScanner(new Scan());
int totalRows = 0;
while (resultScanner.next() != null) {
totalRows++;
}
assertEquals(5510, totalRows);

entries = new ArrayList<>();
cells = new ArrayList<>();
for (int i = 0; i < 11000; i++) {
entries.add(
createEntry(TABLE_NAME1, i, i % 2 != 0 ? KeyValue.Type.Put : KeyValue.Type.DeleteColumn,
cells));
}
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
baseNamespaceDir, hfileArchiveDir);
resultScanner = table1.getScanner(new Scan());
totalRows = 0;
while (resultScanner.next() != null) {
totalRows++;
}
assertEquals(5500, totalRows);
}

/**
* Insert to 2 different tables
* @throws Exception
Expand All @@ -221,7 +252,11 @@ public void testMixedPutTables() throws Exception {
Scan scan = new Scan();
ResultScanner scanRes = table2.getScanner(scan);
for(Result res : scanRes) {
assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
assertEquals(0, Bytes.toInt(res.getRow()) % 2);
}
scanRes = table1.getScanner(scan);
for(Result res : scanRes) {
assertEquals(1, Bytes.toInt(res.getRow()) % 2);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testWALEntryFilter() throws IOException {
IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
conf.setClass("hbase.client.connection.impl", DevNullConnection.class,
Connection.class);
ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
ReplicationSink sink = new ReplicationSink(conf);
// Create some dumb walentries.
List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries =
new ArrayList<>();
Expand Down