Skip to content

Commit

Permalink
review enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
rda3mon committed Jan 22, 2023
1 parent 5289e4c commit fc898a7
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 28 deletions.
2 changes: 1 addition & 1 deletion hbase-backup/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-build-configuration</artifactId>
<version>2.6.0-SNAPSHOT</version>
<version>${revision}</version>
<relativePath>../hbase-build-configuration</relativePath>
</parent>
<artifactId>hbase-backup</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
Expand Down Expand Up @@ -397,6 +398,7 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
Path bulkOutputPath = getBulkOutputDir();
conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true);
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
conf.set(JOB_NAME_CONF_KEY, jobname);
String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,17 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix)
static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.families.datablock.encoding";

// When MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY is enabled, should table names be written
// with namespace included. Enabling this means downstream jobs which use this output will
// need to account for namespace when finding the directory of the job output.
// For example: a table named my-table in namespace default would be in `/output/default/my-table`
// instead of current `/output/my-table`
// This will be the behavior when upgrading to hbase 3.0.
public static final String TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY =
"hbase.hfileoutputformat.tablename.namespace.inclusive";

private static final boolean TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_DEFAULT_VALUE = false;

// This constant is public since the client can modify this when setting
// up their conf object and thus refer to this symbol.
// It is present for backwards compatibility reasons. Use it only to
Expand Down Expand Up @@ -202,6 +213,8 @@ static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWrit
final Configuration conf = context.getConfiguration();
final boolean writeMultipleTables =
conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
final boolean writeToTableWithNamespace = conf.getBoolean(
TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_DEFAULT_VALUE);
final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
if (writeTableNames == null || writeTableNames.isEmpty()) {
throw new IllegalArgumentException("" + OUTPUT_TABLE_NAME_CONF_KEY + " cannot be empty");
Expand Down Expand Up @@ -255,16 +268,17 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
byte[] tableNameBytes = null;
if (writeMultipleTables) {
tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
tableNameBytes = TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
.getBytes(Charset.defaultCharset());
tableNameBytes = writeToTableWithNamespace
? TableName.valueOf(tableNameBytes).getNameWithNamespaceInclAsString()
.getBytes(Charset.defaultCharset())
: TableName.valueOf(tableNameBytes).toBytes();
if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
throw new IllegalArgumentException(
"TableName " + Bytes.toString(tableNameBytes) + " not expected");
}
} else {
tableNameBytes = Bytes.toBytes(writeTableNames);
}
String tableName = Bytes.toString(tableNameBytes);
Path tableRelPath = getTableRelativePath(tableNameBytes);
byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);

Expand Down Expand Up @@ -294,7 +308,7 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
if (wl == null || wl.writer == null) {
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
HRegionLocation loc = null;

String tableName = Bytes.toString(tableNameBytes);
if (tableName != null) {
try (
Connection connection =
Expand Down Expand Up @@ -614,6 +628,9 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
job.setOutputValueClass(MapReduceExtendedCell.class);
job.setOutputFormatClass(cls);

final boolean writeToTableWithNamespace = conf.getBoolean(
TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_DEFAULT_VALUE);

if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
}
Expand Down Expand Up @@ -653,10 +670,9 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,

for (TableInfo tableInfo : multiTableInfo) {
regionLocators.add(tableInfo.getRegionLocator());
String tn = writeMultipleTables
allTableNames.add(writeMultipleTables && writeToTableWithNamespace
? tableInfo.getRegionLocator().getName().getNameWithNamespaceInclAsString()
: tableInfo.getRegionLocator().getName().getNameAsString();
allTableNames.add(tn);
: tableInfo.getRegionLocator().getName().getNameAsString());
tableDescriptors.add(tableInfo.getTableDescriptor());
}
// Record tablenames for creating writer by favored nodes, and decoding compression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,6 @@ public class WALPlayer extends Configured implements Tool {

protected static final String tableSeparator = ";";

// This relies on Hadoop Configuration to handle warning about deprecated configs and
// to set the correct non-deprecated configs when an old one shows up.
static {
Configuration.addDeprecation("hlog.bulk.output", BULK_OUTPUT_CONF_KEY);
Configuration.addDeprecation("hlog.input.tables", TABLES_KEY);
Configuration.addDeprecation("hlog.input.tablesmap", TABLE_MAP_KEY);
}

private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";

public WALPlayer() {
Expand Down Expand Up @@ -116,11 +108,12 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {
if (WALEdit.isMetaEditFamily(cell)) {
continue;
}
KeyValue keyValue = KeyValueUtil.ensureKeyValue(cell);
byte[] outKey = multiTableSupport
? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator),
CellUtil.cloneRow(KeyValueUtil.ensureKeyValue(cell)))
: CellUtil.cloneRow(KeyValueUtil.ensureKeyValue(cell));
context.write(new ImmutableBytesWritable(outKey), KeyValueUtil.ensureKeyValue(cell));
CellUtil.cloneRow(keyValue))
: CellUtil.cloneRow(keyValue);
context.write(new ImmutableBytesWritable(outKey), keyValue);
}
}
} catch (InterruptedException e) {
Expand Down Expand Up @@ -371,7 +364,13 @@ public Job createSubmittableJob(String[] args) throws IOException {
RegionLocator regionLocator = conn.getRegionLocator(tableName);
tableInfoList.add(new TableInfo(table.getDescriptor(), regionLocator));
}
MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfoList);
if (tableInfoList.size() > 1) {
MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfoList);
} else if (tableInfoList.size() == 1) {
TableInfo tableInfo = tableInfoList.get(0);
HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.getTableDescriptor(),
tableInfo.getRegionLocator());
}
}
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,22 +566,33 @@ public void testMRIncrementalLoadWithPutSortReducer() throws Exception {

private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
boolean putSortReducer, String tableStr) throws Exception {
doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, false,
Arrays.asList(tableStr));
}

@Test
public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
doIncrementalLoadTest(false, false, true,
doIncrementalLoadTest(false, false, true, false,
Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
}

@Test
public void testMultiMRIncrementalLoadWithPutSortReducerWithNamespaceInPath() throws Exception {
LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducerWithNamespaceInPath\n");
doIncrementalLoadTest(false, false, true, true,
Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
}

private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
boolean putSortReducer, List<String> tableStr) throws Exception {
boolean putSortReducer, boolean shouldWriteToTableWithNamespace, List<String> tableStr)
throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
if (shouldWriteToTableWithNamespace) {
conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true);
}
int hostCount = 1;
int regionNum = 5;
if (shouldKeepLocality) {
Expand Down Expand Up @@ -616,7 +627,7 @@ private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKe
Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
// Generate the bulk load files
runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
if (writeMultipleTables) {
if (shouldWriteToTableWithNamespace) {
testDir = new Path(testDir, "default");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,22 +599,33 @@ public void testMRIncrementalLoadWithPutSortReducer() throws Exception {

private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
boolean putSortReducer, String tableStr) throws Exception {
doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer, false,
Arrays.asList(tableStr));
}

@Test
public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
doIncrementalLoadTest(false, false, true,
doIncrementalLoadTest(false, false, true, false,
Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
}

@Test
public void testMultiMRIncrementalLoadWithPutSortReducerWithNamespaceInPath() throws Exception {
LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducerWithNamespaceInPath\n");
doIncrementalLoadTest(false, false, true, true,
Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
}

private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
boolean putSortReducer, List<String> tableStr) throws Exception {
boolean putSortReducer, boolean shouldWriteToTableWithNamespace, List<String> tableStr)
throws Exception {
util = new HBaseTestingUtility();
Configuration conf = util.getConfiguration();
conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
if (shouldWriteToTableWithNamespace) {
conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true);
}
int hostCount = 1;
int regionNum = 5;
if (shouldKeepLocality) {
Expand Down Expand Up @@ -651,7 +662,7 @@ private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKe
Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
// Generate the bulk load files
runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
if (writeMultipleTables) {
if (shouldWriteToTableWithNamespace) {
testDir = new Path(testDir, "default");
}

Expand Down

0 comments on commit fc898a7

Please sign in to comment.