diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index 9f6b862c3985..fa24e9028e94 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.procedure.ProcedureManagerHost; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -356,8 +355,7 @@ public HashMap readRegionServerLastLogRollResult() throws IOExcept return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir()); } - public Pair>>>, List> - readBulkloadRows(List tableList) throws IOException { + public List readBulkloadRows(List tableList) throws IOException { return systemTable.readBulkloadRows(tableList); } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index fb8cf6cb80ea..0e56a37cbf69 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -69,7 +69,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -398,8 +397,8 @@ public void deleteBackupInfo(String backupId) throws IOException { /** * Registers a bulk load. - * @param tabName table name - * @param region the region receiving hfile + * @param tabName table name + * @param region the region receiving hfile * @param cfToHfilePath column family and associated hfiles */ public void registerBulkLoad(TableName tabName, byte[] region, @@ -433,31 +432,25 @@ public void deleteBulkLoadedRows(List rows) throws IOException { } } - /* + /** * Reads the rows from backup table recording bulk loaded hfiles * @param tableList list of table names - * @return The keys of the Map are table, region and column family. Value of the map reflects - * whether the hfile was recorded by preCommitStoreFile hook (true) - */ - public Pair>>>, List> - readBulkloadRows(List tableList) throws IOException { - Map>>> map = new HashMap<>(); - List rows = new ArrayList<>(); - for (TableName tTable : tableList) { - Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable); - Map>> tblMap = map.get(tTable); - try (Table table = connection.getTable(bulkLoadTableName); - ResultScanner scanner = table.getScanner(scan)) { - Result res = null; + */ + public List readBulkloadRows(List tableList) throws IOException { + List result = new ArrayList<>(); + for (TableName table : tableList) { + Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table); + try (Table bulkLoadTable = connection.getTable(bulkLoadTableName); + ResultScanner scanner = bulkLoadTable.getScanner(scan)) { + Result res; while ((res = scanner.next()) != null) { res.advance(); String fam = null; String path = null; - byte[] row; String region = null; + byte[] row = null; for (Cell cell : res.listCells()) { row = CellUtil.cloneRow(cell); - rows.add(row); String rowStr = Bytes.toString(row); region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr); if ( @@ -472,23 +465,12 @@ public void deleteBulkLoadedRows(List rows) throws IOException { path = Bytes.toString(CellUtil.cloneValue(cell)); } } - if (map.get(tTable) == null) { - map.put(tTable, new HashMap<>()); - tblMap = map.get(tTable); - } - if (tblMap.get(region) == null) { - tblMap.put(region, new HashMap<>()); - } - Map> famMap = tblMap.get(region); - if (famMap.get(fam) == null) { - famMap.put(fam, new ArrayList<>()); - } - famMap.get(fam).add(path); + result.add(new BulkLoad(table, region, fam, path, row)); LOG.debug("found orig " + path + " for " + fam + " of table " + region); } } } - return new Pair<>(map, rows); + return result; } /* @@ -1540,6 +1522,7 @@ public static void deleteSnapshot(Connection conn) throws IOException { } public static List createDeleteForOrigBulkLoad(List lst) { + // Unused??? List lstDels = new ArrayList<>(lst.size()); for (TableName table : lst) { Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM)); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java new file mode 100644 index 000000000000..90b2e52c5706 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class BulkLoad { + public final TableName tableName; + public final String region; + public final String family; + public final String hfilePath; + public final byte[] rowKey; + + public BulkLoad(TableName tableName, String region, String family, String hfilePath, + byte[] rowKey) { + this.tableName = tableName; + this.region = region; + this.family = family; + this.hfilePath = hfilePath; + this.rowKey = rowKey; + } +} diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 6b60a9e43496..9e58940ae381 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -41,16 +41,16 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.mapreduce.WALPlayer; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.util.Tool; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + /** * Incremental backup implementation. See the {@link #execute() execute} method. */ @@ -101,18 +101,15 @@ protected static int getIndex(TableName tbl, List sTableList) { return -1; } - /* + /** * Reads bulk load records from backup table, iterates through the records and forms the paths for - * bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination - * @param sTableList list of tables to be backed up + * bulk loaded hfiles. Copies the bulk loaded hfiles to the backup destination + * @param tablesToBackup list of tables to be backed up */ - protected void handleBulkLoad(List tablesToBackup) - throws IOException { + protected void handleBulkLoad(List tablesToBackup) throws IOException { List activeFiles = new ArrayList<>(); List archiveFiles = new ArrayList<>(); - Pair>>>, List> pair = - backupManager.readBulkloadRows(tablesToBackup); - Map>>> map = pair.getFirst(); + List bulkLoads = backupManager.readBulkloadRows(tablesToBackup); FileSystem tgtFs; try { tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); @@ -122,57 +119,48 @@ protected void handleBulkLoad(List tablesToBackup) Path rootdir = CommonFSUtils.getRootDir(conf); Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); - for (Map.Entry>>> tblEntry : map.entrySet()) { - TableName srcTable = tblEntry.getKey(); + for (BulkLoad bulkLoad : bulkLoads) { + TableName srcTable = bulkLoad.tableName; + String regionName = bulkLoad.region; + String fam = bulkLoad.family; + String filename = FilenameUtils.getName(bulkLoad.hfilePath); if (!tablesToBackup.contains(srcTable)) { LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable); continue; } Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); - Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()), - srcTable.getQualifierAsString()); - for (Map.Entry>> regionEntry : tblEntry - .getValue().entrySet()) { - String regionName = regionEntry.getKey(); - Path regionDir = new Path(tblDir, regionName); - // map from family to List of hfiles - for (Map.Entry> famEntry : regionEntry.getValue() - .entrySet()) { - String fam = famEntry.getKey(); - Path famDir = new Path(regionDir, fam); - Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); - String tblName = srcTable.getQualifierAsString(); - Path tgtFam = new Path(new Path(tgtTable, regionName), fam); - if (!tgtFs.mkdirs(tgtFam)) { - throw new IOException("couldn't create " + tgtFam); - } - for (String file : famEntry.getValue()) { - int idx = file.lastIndexOf("/"); - String filename = file; - if (idx > 0) { - filename = file.substring(idx + 1); - } - Path p = new Path(famDir, filename); - Path tgt = new Path(tgtFam, filename); - Path archive = new Path(archiveDir, filename); - if (fs.exists(p)) { - if (LOG.isTraceEnabled()) { - LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName); - LOG.trace("copying " + p + " to " + tgt); - } - activeFiles.add(p.toString()); - } else if (fs.exists(archive)) { - LOG.debug("copying archive " + archive + " to " + tgt); - archiveFiles.add(archive.toString()); - } - } + Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); + + String srcTableQualifier = srcTable.getQualifierAsString(); + String srcTableNs = srcTable.getNamespaceAsString(); + Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier + + Path.SEPARATOR + regionName + Path.SEPARATOR + fam); + if (!tgtFs.mkdirs(tgtFam)) { + throw new IOException("couldn't create " + tgtFam); + } + Path tgt = new Path(tgtFam, filename); + + Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); + Path archive = new Path(archiveDir, filename); + + if (fs.exists(p)) { + if (LOG.isTraceEnabled()) { + LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.hfilePath, p.getParent(), + srcTableQualifier); + LOG.trace("copying {} to {}", p, tgt); } + activeFiles.add(p.toString()); + } else if (fs.exists(archive)) { + LOG.debug("copying archive {} to {}", archive, tgt); + archiveFiles.add(archive.toString()); } } copyBulkLoadedFiles(activeFiles, archiveFiles); - backupManager.deleteBulkLoadedRows(pair.getSecond()); + + List rowsToDelete = Lists.transform(bulkLoads, bulkLoad -> bulkLoad.rowKey); + backupManager.deleteBulkLoadedRows(rowsToDelete); } private void copyBulkLoadedFiles(List activeFiles, List archiveFiles) diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java index fa4373c8874b..d24ec160d0cb 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java @@ -20,11 +20,11 @@ import static org.junit.Assert.assertTrue; import java.util.List; -import java.util.Map; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.impl.BulkLoad; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.tool.TestBulkLoadHFiles; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; @@ -127,10 +126,8 @@ public void TestIncBackupDeleteTable() throws Exception { backupIdFull = client.backupTables(request); try (final BackupSystemTable table = new BackupSystemTable(conn)) { - Pair>>>, - List> pair = table.readBulkloadRows(tables); - assertTrue("map still has " + pair.getSecond().size() + " entries", - pair.getSecond().isEmpty()); + List bulkLoads = table.readBulkloadRows(tables); + assertTrue("bulkloads still has " + bulkLoads.size() + " entries", bulkLoads.isEmpty()); } assertTrue(checkSucceeded(backupIdFull));