diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index ed1755ad5021..fedb4487968c 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -352,6 +352,11 @@ public ArrayList getAncestors(BackupInfo backupInfo) throws IOExcep public ArrayList getAncestors(BackupInfo backupInfo, TableName table) throws IOException { ArrayList ancestors = getAncestors(backupInfo); + return filterAncestorsForTable(ancestors, table); + } + + public static ArrayList filterAncestorsForTable(ArrayList ancestors, + TableName table) { ArrayList tableAncestors = new ArrayList<>(); for (BackupImage image : ancestors) { if (image.hasTable(table)) { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 211e9f96c89c..d99aef200176 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -18,11 +18,13 @@ package org.apache.hadoop.hbase.backup.impl; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; +import static org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -40,17 +42,26 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.WALPlayer; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hadoop.util.Tool; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; + /** * Incremental backup implementation. See the {@link #execute() execute} method. */ @@ -276,10 +287,48 @@ public void execute() throws IOException { // case INCREMENTAL_COPY: try { + // todo: need to add an abstraction to encapsulate and DRY this up + ArrayList ancestors = backupManager.getAncestors(backupInfo); + Map> regionsByTable = new HashMap<>(); + List splits = new ArrayList<>(); + for (TableName table : backupInfo.getTables()) { + ArrayList ancestorsForTable = + BackupManager.filterAncestorsForTable(ancestors, table); + + BackupImage backupImage = ancestorsForTable.get(ancestorsForTable.size() - 1); + if (backupImage.getType() != BackupType.FULL) { + throw new RuntimeException("No full backup found in ancestors for table " + table); + } + + String lastFullBackupId = backupImage.getBackupId(); + Path backupRootDir = new Path(backupInfo.getBackupRootDir()); + + FileSystem backupFs = backupRootDir.getFileSystem(conf); + Path tableInfoPath = + BackupUtils.getTableInfoPath(backupFs, backupRootDir, lastFullBackupId, table); + SnapshotProtos.SnapshotDescription snapshotDesc = + SnapshotDescriptionUtils.readSnapshotInfo(backupFs, tableInfoPath); + SnapshotManifest manifest = + SnapshotManifest.open(conf, backupFs, tableInfoPath, snapshotDesc); + List regionInfos = new ArrayList<>(manifest.getRegionManifests().size()); + for (SnapshotProtos.SnapshotRegionManifest regionManifest : manifest.getRegionManifests()) { + HBaseProtos.RegionInfo regionInfo = regionManifest.getRegionInfo(); + RegionInfo regionInfoObj = ProtobufUtil.toRegionInfo(regionInfo); + // scanning meta doesnt return mob regions, so skip them here too so we keep parity + if (Bytes.equals(regionInfoObj.getStartKey(), MobConstants.MOB_REGION_NAME_BYTES)) { + continue; + } + + regionInfos.add(regionInfoObj); + splits.add(new ImmutableBytesWritable(HFileOutputFormat2 + .combineTableNameSuffix(table.getName(), regionInfoObj.getStartKey()))); + } + regionsByTable.put(table, regionInfos); + } // copy out the table and region info files for each table - BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); + BackupUtils.copyTableRegionInfo(conn, backupInfo, regionsByTable, conf); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(); + convertWALsToHFiles(splits); incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, backupInfo.getBackupRootDir()); } catch (Exception e) { @@ -359,7 +408,7 @@ protected void deleteBulkLoadDirectory() throws IOException { } } - protected void convertWALsToHFiles() throws IOException { + protected void convertWALsToHFiles(List splits) throws IOException { // get incremental backup file list and prepare parameters for DistCp List incrBackupFileList = backupInfo.getIncrBackupFileList(); // Get list of tables in incremental backup set @@ -375,7 +424,7 @@ protected void convertWALsToHFiles() throws IOException { LOG.warn("Table " + table + " does not exists. Skipping in WAL converter"); } } - walToHFiles(incrBackupFileList, tableList); + walToHFiles(incrBackupFileList, tableList, splits); } @@ -385,8 +434,9 @@ protected boolean tableExists(TableName table, Connection conn) throws IOExcepti } } - protected void walToHFiles(List dirPaths, List tableList) throws IOException { - Tool player = new WALPlayer(); + protected void walToHFiles(List dirPaths, List tableList, + List splits) throws IOException { + WALPlayer player = new WALPlayer(); // Player reads all files in arbitrary directory structure and creates // a Map task for each file. We use ';' as separator @@ -401,6 +451,7 @@ protected void walToHFiles(List dirPaths, List tableList) throws conf.set(JOB_NAME_CONF_KEY, jobname); String[] playerArgs = { dirs, StringUtils.join(tableList, ",") }; + player.setSplits(splits); try { player.setConf(conf); int result = player.run(playerArgs); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index d4e849f610ae..c401a109379e 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupInfo; @@ -54,6 +53,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.master.region.MasterRegionFactory; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -122,7 +122,8 @@ private BackupUtils() { * @param conf configuration * @throws IOException exception */ - public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf) + public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo, + Map> lastFullBackupForTable, Configuration conf) throws IOException { Path rootDir = CommonFSUtils.getRootDir(conf); FileSystem fs = rootDir.getFileSystem(conf); @@ -147,20 +148,56 @@ public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo, C LOG.debug("Attempting to copy table info for:" + table + " target: " + target + " descriptor: " + orig); LOG.debug("Finished copying tableinfo."); - List regions = MetaTableAccessor.getTableRegions(conn, table); - // For each region, write the region info to disk - LOG.debug("Starting to write region info for table " + table); - for (RegionInfo regionInfo : regions) { - Path regionDir = FSUtils - .getRegionDirFromTableDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo); - regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName()); - writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo); - } + copyTableRegionInfosFromParent(table, targetFs, backupInfo, + lastFullBackupForTable.get(table), conf); LOG.debug("Finished writing region info for table " + table); } } } + private static void copyTableRegionInfosFromParent(TableName table, FileSystem targetFs, + BackupInfo backupInfo, List lastFullBackupForTable, Configuration conf) + throws IOException { + for (RegionInfo regionInfo : lastFullBackupForTable) { + Path regionDir = + FSUtils.getRegionDirFromTableDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo); + regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName()); + writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo); + } + } + + /** + * Returns value represent path for: + * ""/$USER/SBACKUP_ROOT/backup_id/namespace/table/.hbase-snapshot/ + * snapshot_1396650097621_namespace_table" this path contains .snapshotinfo, .tabledesc (0.96 and + * 0.98) this path contains .snapshotinfo, .data.manifest (trunk) + * @param tableName table name + * @return path to table info + * @throws IOException exception + */ + public static Path getTableInfoPath(FileSystem fs, Path backupRootPath, String backupId, + TableName tableName) throws IOException { + Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, backupId); + Path tableInfoPath = null; + + // can't build the path directly as the timestamp values are different + FileStatus[] snapshots = fs.listStatus(tableSnapShotPath, + new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs)); + for (FileStatus snapshot : snapshots) { + tableInfoPath = snapshot.getPath(); + // SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest"; + if (tableInfoPath.getName().endsWith("data.manifest")) { + break; + } + } + return tableInfoPath; + } + + static Path getTableSnapshotPath(Path backupRootPath, TableName tableName, String backupId) { + return new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, backupId), + HConstants.SNAPSHOT_DIR_NAME); + } + /** * Write the .regioninfo file on-disk. */ diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java index 8ca80d1301f6..de84f27ecbfb 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java @@ -145,13 +145,13 @@ void modifyTableSync(Connection conn, TableDescriptor desc) throws IOException { * the future * @param conn HBase connection * @param tableBackupPath backup path - * @param logDirs : incremental backup folders, which contains WAL - * @param tableNames : source tableNames(table names were backuped) - * @param newTableNames : target tableNames(table names to be restored to) + * @param hfileDirs incremental backup folders, which contains hfiles to bulkload + * @param tableNames source tableNames(table names were backuped) + * @param newTableNames target tableNames(table names to be restored to) * @param incrBackupId incremental backup Id * @throws IOException exception */ - public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] logDirs, + public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] hfileDirs, TableName[] tableNames, TableName[] newTableNames, String incrBackupId) throws IOException { try (Admin admin = conn.getAdmin()) { if (tableNames.length != newTableNames.length) { @@ -202,7 +202,7 @@ public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[ } RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf); - restoreService.run(logDirs, tableNames, restoreRootDir, newTableNames, false); + restoreService.run(hfileDirs, tableNames, restoreRootDir, newTableNames, false); } } @@ -225,39 +225,14 @@ Path getTableSnapshotPath(Path backupRootPath, TableName tableName, String backu HConstants.SNAPSHOT_DIR_NAME); } - /** - * Returns value represent path for: - * ""/$USER/SBACKUP_ROOT/backup_id/namespace/table/.hbase-snapshot/ - * snapshot_1396650097621_namespace_table" this path contains .snapshotinfo, .tabledesc (0.96 and - * 0.98) this path contains .snapshotinfo, .data.manifest (trunk) - * @param tableName table name - * @return path to table info - * @throws IOException exception - */ - Path getTableInfoPath(TableName tableName) throws IOException { - Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, backupId); - Path tableInfoPath = null; - - // can't build the path directly as the timestamp values are different - FileStatus[] snapshots = fs.listStatus(tableSnapShotPath, - new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs)); - for (FileStatus snapshot : snapshots) { - tableInfoPath = snapshot.getPath(); - // SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest"; - if (tableInfoPath.getName().endsWith("data.manifest")) { - break; - } - } - return tableInfoPath; - } - /** * Get table descriptor * @param tableName is the table backed up * @return {@link TableDescriptor} saved in backup image of the table */ TableDescriptor getTableDesc(TableName tableName) throws IOException { - Path tableInfoPath = this.getTableInfoPath(tableName); + Path tableInfoPath = BackupUtils.getTableInfoPath(fs, backupRootPath, backupId, tableName); + ; SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, tableInfoPath); SnapshotManifest manifest = SnapshotManifest.open(conf, fs, tableInfoPath, desc); TableDescriptor tableDescriptor = manifest.getTableDescriptor(); @@ -307,7 +282,8 @@ private void createAndRestoreTable(Connection conn, TableName tableName, TableNa tableDescriptor = manifest.getTableDescriptor(); } else { tableDescriptor = getTableDesc(tableName); - snapshotMap.put(tableName, getTableInfoPath(tableName)); + snapshotMap.put(tableName, + BackupUtils.getTableInfoPath(fs, backupRootPath, backupId, tableName)); } if (tableDescriptor == null) { LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost"); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 7b5095a897e2..d327fed3e4b6 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient; import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager; @@ -52,14 +53,20 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner; +import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.access.SecureTestUtil; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -71,6 +78,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; + /** * This class is only a base for other integration-level backup tests. Do not add tests here. * TestBackupSmallTests is where tests that don't require bring machines up/down should go All other @@ -128,10 +139,50 @@ public void execute() throws IOException { LOG.debug("For incremental backup, current table set is " + backupManager.getIncrementalBackupTableSet()); newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap(); + + // todo: need to add an abstraction to encapsulate and DRY this up` + ArrayList ancestors = backupManager.getAncestors(backupInfo); + Map> regionsByTable = new HashMap<>(); + List splits = new ArrayList<>(); + for (TableName table : backupInfo.getTables()) { + ArrayList ancestorsForTable = + BackupManager.filterAncestorsForTable(ancestors, table); + + BackupImage backupImage = ancestorsForTable.get(ancestorsForTable.size() - 1); + if (backupImage.getType() != BackupType.FULL) { + throw new RuntimeException("No full backup found in ancestors for table " + table); + } + + String lastFullBackupId = backupImage.getBackupId(); + Path backupRootDir = new Path(backupInfo.getBackupRootDir()); + + FileSystem backupFs = backupRootDir.getFileSystem(conf); + Path tableInfoPath = + BackupUtils.getTableInfoPath(backupFs, backupRootDir, lastFullBackupId, table); + SnapshotProtos.SnapshotDescription snapshotDesc = + SnapshotDescriptionUtils.readSnapshotInfo(backupFs, tableInfoPath); + SnapshotManifest manifest = + SnapshotManifest.open(conf, backupFs, tableInfoPath, snapshotDesc); + List regionInfos = new ArrayList<>(manifest.getRegionManifests().size()); + for (SnapshotProtos.SnapshotRegionManifest regionManifest : manifest + .getRegionManifests()) { + HBaseProtos.RegionInfo regionInfo = regionManifest.getRegionInfo(); + RegionInfo regionInfoObj = ProtobufUtil.toRegionInfo(regionInfo); + // scanning meta doesnt return mob regions, so skip them here too so we keep parity + if (Bytes.equals(regionInfoObj.getStartKey(), MobConstants.MOB_REGION_NAME_BYTES)) { + continue; + } + + regionInfos.add(regionInfoObj); + splits.add(new ImmutableBytesWritable(HFileOutputFormat2 + .combineTableNameSuffix(table.getName(), regionInfoObj.getStartKey()))); + } + regionsByTable.put(table, regionInfos); + } // copy out the table and region info files for each table - BackupUtils.copyTableRegionInfo(conn, backupInfo, conf); + BackupUtils.copyTableRegionInfo(conn, backupInfo, regionsByTable, conf); // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT - convertWALsToHFiles(); + convertWALsToHFiles(splits); incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() }, backupInfo.getBackupRootDir()); failStageIf(Stage.stage_2); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java index 90fbba2bf0ae..2cb4ac9756c6 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java @@ -130,6 +130,8 @@ public void TestIncBackupRestore() throws Exception { byte[] name = regions.get(0).getRegionInfo().getRegionName(); long startSplitTime = EnvironmentEdgeManager.currentTime(); try { + // todo: this fails, and itd be nice if we could really add a split so we can prove + // that our new splits passthrough works (expect split to disappear once we restore) admin.splitRegionAsync(name).get(); } catch (Exception e) { // although split fail, this may not affect following check in current API, diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExtendedCellSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExtendedCellSerialization.java new file mode 100644 index 000000000000..c784b2561881 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ExtendedCellSerialization.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Similar to CellSerialization, but includes the sequenceId from an ExtendedCell. This is necessary + * so that CellSortReducer can sort by sequenceId, if applicable. Note that these two serializations + * are not compatible -- data serialized by CellSerialization cannot be deserialized with + * ExtendedCellSerialization and vice versa. This is ok for {@link HFileOutputFormat2} because the + * serialization is not actually used for the actual written HFiles, just intermediate data (between + * mapper and reducer of a single job). + */ +@InterfaceAudience.Private +public class ExtendedCellSerialization implements Serialization { + @Override + public boolean accept(Class c) { + return ExtendedCell.class.isAssignableFrom(c); + } + + @Override + public ExtendedCellDeserializer getDeserializer(Class t) { + return new ExtendedCellDeserializer(); + } + + @Override + public ExtendedCellSerializer getSerializer(Class c) { + return new ExtendedCellSerializer(); + } + + public static class ExtendedCellDeserializer implements Deserializer { + private DataInputStream dis; + + @Override + public void close() throws IOException { + this.dis.close(); + } + + @Override + public KeyValue deserialize(ExtendedCell ignore) throws IOException { + KeyValue kv = KeyValueUtil.create(this.dis); + PrivateCellUtil.setSequenceId(kv, this.dis.readLong()); + return kv; + } + + @Override + public void open(InputStream is) throws IOException { + this.dis = new DataInputStream(is); + } + } + + public static class ExtendedCellSerializer implements Serializer { + private DataOutputStream dos; + + @Override + public void close() throws IOException { + this.dos.close(); + } + + @Override + public void open(OutputStream os) throws IOException { + this.dos = new DataOutputStream(os); + } + + @Override + public void serialize(ExtendedCell kv) throws IOException { + dos.writeInt(PrivateCellUtil.estimatedSerializedSizeOf(kv) - Bytes.SIZEOF_INT); + PrivateCellUtil.writeCell(kv, dos, true); + dos.writeLong(kv.getSequenceId()); + } + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 2bd5330a62f8..98f86d8e8c30 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -30,6 +30,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -124,7 +125,7 @@ public RegionLocator getRegionLocator() { protected static final byte[] tableSeparator = Bytes.toBytes(";"); - protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { + public static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) { return Bytes.add(tableName, tableSeparator, suffix); } @@ -159,6 +160,15 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = "hbase.mapreduce.use.multi.table.hfileoutputformat"; + /** + * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config + * package-private for internal usage for jobs like WALPlayer which need to use features of + * ExtendedCell. + */ + static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY = + "hbase.mapreduce.hfileoutputformat.extendedcell.enabled"; + static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false; + public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster."; public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum"; @@ -619,9 +629,7 @@ static void configureIncrementalLoad(Job job, List multiTableInfo, LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); } - conf.setStrings("io.serializations", conf.get("io.serializations"), - MutationSerialization.class.getName(), ResultSerialization.class.getName(), - CellSerialization.class.getName()); + mergeSerializations(conf); if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { LOG.info("bulkload locality sensitive enabled"); @@ -670,6 +678,33 @@ static void configureIncrementalLoad(Job job, List multiTableInfo, LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); } + private static void mergeSerializations(Configuration conf) { + List serializations = new ArrayList<>(); + + // add any existing values that have been set + String[] existing = conf.getStrings("io.serializations"); + if (existing != null) { + Collections.addAll(serializations, existing); + } + + serializations.add(MutationSerialization.class.getName()); + serializations.add(ResultSerialization.class.getName()); + + // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's + // SerializationFactory runs through serializations in the order they are registered. + // We want to register ExtendedCellSerialization before CellSerialization because both + // work for ExtendedCells but only ExtendedCellSerialization handles them properly. + if ( + conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY, + EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT) + ) { + serializations.add(ExtendedCellSerialization.class.getName()); + } + serializations.add(CellSerialization.class.getName()); + + conf.setStrings("io.serializations", serializations.toArray(new String[0])); + } + public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws IOException { Configuration conf = job.getConfiguration(); @@ -846,9 +881,16 @@ private static Map createFamilyConfValueMap(Configuration conf, * Configure job with a TotalOrderPartitioner, partitioning against * splitPoints. Cleans up the partitions file after job exists. */ - static void configurePartitioner(Job job, List splitPoints, + public static void configurePartitioner(Job job, List splitPoints, boolean writeMultipleTables) throws IOException { Configuration conf = job.getConfiguration(); + // todo: need to think if there's a better way + if (conf.get(job.getJobName() + ".wrotePartitions") != null) { + LOG.info("Already configured partitions, skipping... {}", splitPoints); + return; + } + LOG.info("Configuring partitions {}", splitPoints); + conf.set(job.getJobName() + ".wrotePartitions", "true"); // create the partitions file FileSystem fs = FileSystem.get(conf); String hbaseTmpFsDir = diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 56c6bebdf261..1e25476a2df5 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -79,6 +80,7 @@ public class WALPlayer extends Configured implements Tool { protected static final String tableSeparator = ";"; private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + private List splits; public WALPlayer() { } @@ -87,6 +89,10 @@ protected WALPlayer(final Configuration c) { super(c); } + public void setSplits(List splits) { + this.splits = splits; + } + /** * A mapper that just writes out KeyValues. This one can be used together with * {@link CellSortReducer} @@ -105,6 +111,13 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException { if (WALEdit.isMetaEditFamily(cell)) { continue; } + + // Set sequenceId from WALKey, since it is not included by WALCellCodec. The sequenceId + // on WALKey is the same value that was on the cells in the WALEdit. This enables + // CellSortReducer to use sequenceId to disambiguate duplicate cell timestamps. + // See HBASE-27649 + PrivateCellUtil.setSequenceId(cell, key.getSequenceId()); + byte[] outKey = multiTableSupport ? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell)) : CellUtil.cloneRow(cell); @@ -308,6 +321,15 @@ public Job createSubmittableJob(String[] args) throws IOException { if (hfileOutPath != null) { LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); + // WALPlayer needs ExtendedCellSerialization so that sequenceId can be propagated when + // sorting cells in CellSortReducer + job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY, + true); + + if (splits != null) { + HFileOutputFormat2.configurePartitioner(job, splits, true); + } + // the bulk HFile case List tableNames = getTableNameList(tables); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index 9b0d5ec52a34..b39d04802c98 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.mapreduce; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -29,6 +33,7 @@ import java.io.File; import java.io.PrintStream; import java.util.ArrayList; +import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,8 +55,10 @@ import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.LauncherSecurityManager; import org.apache.hadoop.hbase.util.MapReduceExtendedCell; import org.apache.hadoop.hbase.wal.WAL; @@ -131,6 +138,80 @@ public void testPlayingRecoveredEdit() throws Exception { assertTrue(TEST_UTIL.countRows(tn) > 0); } + /** + * Tests that when you write multiple cells with the same timestamp they are properly sorted by + * their sequenceId in WALPlayer/CellSortReducer so that the correct one wins when querying from + * the resulting bulkloaded HFiles. See HBASE-27649 + */ + @Test + public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName() + "1"); + final byte[] family = Bytes.toBytes("family"); + final byte[] column1 = Bytes.toBytes("c1"); + final byte[] column2 = Bytes.toBytes("c2"); + final byte[] row = Bytes.toBytes("row"); + Table table = TEST_UTIL.createTable(tableName, family); + + long now = EnvironmentEdgeManager.currentTime(); + // put a row into the first table + Put p = new Put(row); + p.addColumn(family, column1, now, column1); + p.addColumn(family, column2, now, column2); + + table.put(p); + + byte[] lastVal = null; + + for (int i = 0; i < 50; i++) { + lastVal = Bytes.toBytes(ThreadLocalRandom.current().nextLong()); + p = new Put(row); + p.addColumn(family, column1, now, lastVal); + + table.put(p); + + // wal rolling is necessary to trigger the bug. otherwise no sorting + // needs to occur in the reducer because it's all sorted and coming from a single file. + if (i % 10 == 0) { + WAL log = cluster.getRegionServer(0).getWAL(null); + log.rollWriter(); + } + } + + WAL log = cluster.getRegionServer(0).getWAL(null); + log.rollWriter(); + String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(), + HConstants.HREGION_LOGDIR_NAME).toString(); + + Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); + String outPath = "/tmp/" + name.getMethodName(); + configuration.set(WALPlayer.BULK_OUTPUT_CONF_KEY, outPath); + configuration.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true); + + WALPlayer player = new WALPlayer(configuration); + assertEquals(0, ToolRunner.run(configuration, player, + new String[] { walInputDir, tableName.getNameAsString() })); + + Get g = new Get(row); + Result result = table.get(g); + byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); + assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal))); + + table = TEST_UTIL.truncateTable(tableName); + g = new Get(row); + result = table.get(g); + assertThat(result.listCells(), nullValue()); + + BulkLoadHFiles.create(configuration).bulkLoad(tableName, + new Path(outPath, tableName.getNamespaceAsString() + "/" + tableName.getNameAsString())); + + g = new Get(row); + result = table.get(g); + value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1)); + + assertThat(result.listCells(), notNullValue()); + assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal))); + } + /** * Simple end-to-end test */