diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 97fcefe4a70c..9f97002f4b44 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -181,8 +181,11 @@ public class PerformanceEvaluation extends Configured implements Tool { addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", "Run random seek scan with both start and stop row (max 10000 rows)"); addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test"); + addCommandDescriptor(RandomDeleteTest.class, "randomDelete", "Run random delete test"); addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test"); addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test"); + addCommandDescriptor(SequentialDeleteTest.class, "sequentialDelete", + "Run sequential delete test"); addCommandDescriptor(MetaWriteTest.class, "metaWrite", "Populate meta table;used with 1 thread; to be cleaned up by cleanMeta"); addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)"); @@ -352,7 +355,8 @@ static boolean checkTable(Admin admin, TestOptions opts) throws IOException { boolean needsDelete = false, exists = admin.tableExists(tableName); boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read") || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan"); - if (!exists && isReadCmd) { + boolean isDeleteCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("delete"); + if (!exists && (isReadCmd || isDeleteCmd)) { throw new IllegalStateException( "Must specify an existing table for read commands. Run a write command first."); } @@ -367,7 +371,8 @@ static boolean checkTable(Admin admin, TestOptions opts) throws IOException { && opts.presplitRegions != admin.getRegions(tableName).size()) || (!isReadCmd && desc != null && !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy)) - || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas) + || (!(isReadCmd || isDeleteCmd) && desc != null + && desc.getRegionReplication() != opts.replicas) || (desc != null && desc.getColumnFamilyCount() != opts.families) ) { needsDelete = true; @@ -2071,6 +2076,18 @@ protected byte[] generateRow(final int i) { } + static class RandomDeleteTest extends SequentialDeleteTest { + RandomDeleteTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + protected byte[] generateRow(final int i) { + return getRandomRow(this.rand, opts.totalRows); + } + + } + static class ScanTest extends TableTest { private ResultScanner testScanner; @@ -2406,6 +2423,34 @@ boolean testRow(final int i, final long startTime) throws IOException { } } + static class SequentialDeleteTest extends BufferedMutatorTest { + + SequentialDeleteTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + protected byte[] generateRow(final int i) { + return format(i); + } + + @Override + boolean testRow(final int i, final long startTime) throws IOException { + byte[] row = generateRow(i); + Delete delete = new Delete(row); + for (int family = 0; family < opts.families; family++) { + byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family); + delete.addFamily(familyName); + } + delete.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); + if (opts.autoFlush) { + table.delete(delete); + } else { + mutator.mutate(delete); + } + return true; + } + } + /* * Insert fake regions into meta table with contiguous split keys. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java index 24db92b4de1c..0c32303746c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePrettyPrinter.java @@ -537,6 +537,7 @@ private void printMeta(HFile.Reader reader, Map fileInfo) throws Bytes.equals(e.getKey(), HStoreFile.MAJOR_COMPACTION_KEY) || Bytes.equals(e.getKey(), HFileInfo.TAGS_COMPRESSED) || Bytes.equals(e.getKey(), HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY) + || Bytes.equals(e.getKey(), HStoreFile.HISTORICAL_KEY) ) { out.println(Bytes.toBoolean(e.getValue())); } else if (Bytes.equals(e.getKey(), HFileInfo.LASTKEY)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java index ba223de966c0..c235bdc29dc9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java @@ -162,7 +162,7 @@ private boolean isCompactedFile(FileStatus file, HStore store) { } private boolean isActiveStorefile(FileStatus file, HStore store) { - return store.getStoreEngine().getStoreFileManager().getStorefiles().stream() + return store.getStoreEngine().getStoreFileManager().getStoreFiles().stream() .anyMatch(sf -> sf.getPath().equals(file.getPath())); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSink.java index c7587a147a6f..1d838d86abcf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSink.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.yetus.audience.InterfaceAudience; @@ -34,4 +35,14 @@ public interface CellSink { * @param cell the cell to be added */ void append(Cell cell) throws IOException; + + /** + * Append the given (possibly partial) list of cells of a row + * @param cellList the cell list to be added + */ + default void appendAll(List cellList) throws IOException { + for (Cell cell : cellList) { + append(cell); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java index 8fdbb6035ae2..26437ab11242 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java @@ -46,7 +46,7 @@ public class DateTieredStoreEngine extends StoreEngine filesCompacting) { - return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(), filesCompacting); + return compactionPolicy.needsCompaction(storeFileManager.getStoreFiles(), filesCompacting); } @Override @@ -68,14 +68,14 @@ private final class DateTieredCompactionContext extends CompactionContext { @Override public List preSelect(List filesCompacting) { - return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(), + return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStoreFiles(), filesCompacting); } @Override public boolean select(List filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, boolean forceMajor) throws IOException { - request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting, + request = compactionPolicy.selectCompaction(storeFileManager.getStoreFiles(), filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor); return request != null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index 0c9fb9adcc2c..7b095596a3da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -56,7 +56,7 @@ public class DefaultStoreEngine extends StoreEngine filesCompacting) { - return compactionPolicy.needsCompaction(this.storeFileManager.getStorefiles(), filesCompacting); + return compactionPolicy.needsCompaction(this.storeFileManager.getStoreFiles(), filesCompacting); } @Override @@ -111,7 +111,7 @@ private class DefaultCompactionContext extends CompactionContext { @Override public boolean select(List filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, boolean forceMajor) throws IOException { - request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting, + request = compactionPolicy.selectCompaction(storeFileManager.getStoreFiles(), filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor); return request != null; } @@ -124,7 +124,7 @@ public List compact(ThroughputController throughputController, User user) @Override public List preSelect(List filesCompacting) { - return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(), + return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStoreFiles(), filesCompacting); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index f2d7cd973688..920a490daa2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.StoreFileWriter.shouldEnableHistoricalCompactionFiles; + +import edu.umd.cs.findbugs.annotations.Nullable; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.Iterator; @@ -48,17 +52,35 @@ class DefaultStoreFileManager implements StoreFileManager { private final CompactionConfiguration comConf; private final int blockingFileCount; private final Comparator storeFileComparator; - /** - * List of store files inside this store. This is an immutable list that is atomically replaced - * when its contents change. - */ - private volatile ImmutableList storefiles = ImmutableList.of(); + + static class StoreFileList { + /** + * List of store files inside this store. This is an immutable list that is atomically replaced + * when its contents change. + */ + final ImmutableList all; + /** + * List of store files that include the latest cells inside this store. This is an immutable + * list that is atomically replaced when its contents change. + */ + @Nullable + final ImmutableList live; + + StoreFileList(ImmutableList storeFiles, ImmutableList liveStoreFiles) { + this.all = storeFiles; + this.live = liveStoreFiles; + } + } + + private volatile StoreFileList storeFiles; + /** * List of compacted files inside this store that needs to be excluded in reads because further * new reads will be using only the newly created files out of compaction. These compacted files * will be deleted/cleared once all the existing readers on these compacted files are done. */ private volatile ImmutableList compactedfiles = ImmutableList.of(); + private final boolean enableLiveFileTracking; public DefaultStoreFileManager(CellComparator cellComparator, Comparator storeFileComparator, Configuration conf, @@ -66,18 +88,35 @@ public DefaultStoreFileManager(CellComparator cellComparator, this.cellComparator = cellComparator; this.storeFileComparator = storeFileComparator; this.comConf = comConf; - this.blockingFileCount = + blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT); + enableLiveFileTracking = shouldEnableHistoricalCompactionFiles(conf); + storeFiles = + new StoreFileList(ImmutableList.of(), enableLiveFileTracking ? ImmutableList.of() : null); + } + + private List getLiveFiles(Collection storeFiles) throws IOException { + List liveFiles = new ArrayList<>(storeFiles.size()); + for (HStoreFile file : storeFiles) { + file.initReader(); + if (!file.isHistorical()) { + liveFiles.add(file); + } + } + return liveFiles; } @Override - public void loadFiles(List storeFiles) { - this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, storeFiles); + public void loadFiles(List storeFiles) throws IOException { + this.storeFiles = new StoreFileList(ImmutableList.sortedCopyOf(storeFileComparator, storeFiles), + enableLiveFileTracking + ? ImmutableList.sortedCopyOf(storeFileComparator, getLiveFiles(storeFiles)) + : null); } @Override - public final Collection getStorefiles() { - return storefiles; + public final Collection getStoreFiles() { + return storeFiles.all; } @Override @@ -86,15 +125,20 @@ public Collection getCompactedfiles() { } @Override - public void insertNewFiles(Collection sfs) { - this.storefiles = - ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(this.storefiles, sfs)); + public void insertNewFiles(Collection sfs) throws IOException { + storeFiles = new StoreFileList( + ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(storeFiles.all, sfs)), + enableLiveFileTracking + ? ImmutableList.sortedCopyOf(storeFileComparator, + Iterables.concat(storeFiles.live, getLiveFiles(sfs))) + : null); } @Override public ImmutableCollection clearFiles() { - ImmutableList result = storefiles; - storefiles = ImmutableList.of(); + ImmutableList result = storeFiles.all; + storeFiles = + new StoreFileList(ImmutableList.of(), enableLiveFileTracking ? ImmutableList.of() : null); return result; } @@ -107,7 +151,7 @@ public Collection clearCompactedFiles() { @Override public final int getStorefileCount() { - return storefiles.size(); + return storeFiles.all.size(); } @Override @@ -117,28 +161,38 @@ public final int getCompactedFilesCount() { @Override public void addCompactionResults(Collection newCompactedfiles, - Collection results) { - this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, Iterables - .concat(Iterables.filter(storefiles, sf -> !newCompactedfiles.contains(sf)), results)); + Collection results) throws IOException { + ImmutableList liveStoreFiles = null; + if (enableLiveFileTracking) { + liveStoreFiles = ImmutableList.sortedCopyOf(storeFileComparator, + Iterables.concat(Iterables.filter(storeFiles.live, sf -> !newCompactedfiles.contains(sf)), + getLiveFiles(results))); + } + storeFiles = + new StoreFileList( + ImmutableList + .sortedCopyOf(storeFileComparator, + Iterables.concat( + Iterables.filter(storeFiles.all, sf -> !newCompactedfiles.contains(sf)), results)), + liveStoreFiles); // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized // Let a background thread close the actual reader on these compacted files and also // ensure to evict the blocks from block cache so that they are no longer in // cache newCompactedfiles.forEach(HStoreFile::markCompactedAway); - this.compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator, - Iterables.concat(this.compactedfiles, newCompactedfiles)); + compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator, + Iterables.concat(compactedfiles, newCompactedfiles)); } @Override public void removeCompactedFiles(Collection removedCompactedfiles) { - this.compactedfiles = - this.compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf)) - .sorted(storeFileComparator).collect(ImmutableList.toImmutableList()); + compactedfiles = compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf)) + .sorted(storeFileComparator).collect(ImmutableList.toImmutableList()); } @Override public final Iterator getCandidateFilesForRowKeyBefore(KeyValue targetKey) { - return this.storefiles.reverse().iterator(); + return storeFiles.all.reverse().iterator(); } @Override @@ -153,25 +207,28 @@ public Iterator updateCandidateFilesForRowKeyBefore( @Override public final Optional getSplitPoint() throws IOException { - return StoreUtils.getSplitPoint(storefiles, cellComparator); + return StoreUtils.getSplitPoint(storeFiles.all, cellComparator); } @Override - public final Collection getFilesForScan(byte[] startRow, boolean includeStartRow, - byte[] stopRow, boolean includeStopRow) { + public Collection getFilesForScan(byte[] startRow, boolean includeStartRow, + byte[] stopRow, boolean includeStopRow, boolean onlyLatestVersion) { + if (onlyLatestVersion && enableLiveFileTracking) { + return storeFiles.live; + } // We cannot provide any useful input and already have the files sorted by seqNum. - return getStorefiles(); + return getStoreFiles(); } @Override public int getStoreCompactionPriority() { - int priority = blockingFileCount - storefiles.size(); + int priority = blockingFileCount - storeFiles.all.size(); return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority; } @Override public Collection getUnneededFiles(long maxTs, List filesCompacting) { - ImmutableList files = storefiles; + ImmutableList files = storeFiles.all; // 1) We can never get rid of the last file which has the maximum seqid. // 2) Files that are not the latest can't become one due to (1), so the rest are fair game. return files.stream().limit(Math.max(0, files.size() - 1)).filter(sf -> { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 9954c78142e9..3c879dbdb730 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -593,7 +593,7 @@ public long timeOfOldestEdit() { /** Returns All store files. */ @Override public Collection getStorefiles() { - return this.storeEngine.getStoreFileManager().getStorefiles(); + return this.storeEngine.getStoreFileManager().getStoreFiles(); } @Override @@ -956,10 +956,10 @@ private void notifyChangedReadersObservers(List sfs) throws IOExcept * @return all scanners for this store */ public List getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, - boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt) - throws IOException { + boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt, + boolean onlyLatestVersion) throws IOException { return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, - readPt); + readPt, onlyLatestVersion); } /** @@ -977,13 +977,14 @@ public List getScanners(boolean cacheBlocks, boolean isGet, boo */ public List getScanners(boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, - byte[] stopRow, boolean includeStopRow, long readPt) throws IOException { + byte[] stopRow, boolean includeStopRow, long readPt, boolean onlyLatestVersion) + throws IOException { Collection storeFilesToScan; List memStoreScanners; this.storeEngine.readLock(); try { storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, - includeStartRow, stopRow, includeStopRow); + includeStartRow, stopRow, includeStopRow, onlyLatestVersion); memStoreScanners = this.memstore.getScanners(readPt); // NOTE: here we must increase the refCount for storeFiles because we would open the // storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here, @@ -1042,10 +1043,10 @@ private static void clearAndClose(List scanners) { */ public List getScanners(List files, boolean cacheBlocks, boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, - byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) - throws IOException { + byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner, + boolean onlyLatestVersion) throws IOException { return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, - false, readPt, includeMemstoreScanner); + false, readPt, includeMemstoreScanner, onlyLatestVersion); } /** @@ -1067,7 +1068,7 @@ public List getScanners(List files, boolean cacheBl public List getScanners(List files, boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, - boolean includeMemstoreScanner) throws IOException { + boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException { List memStoreScanners = null; if (includeMemstoreScanner) { this.storeEngine.readLock(); @@ -1428,7 +1429,7 @@ public CompactionProgress getCompactionProgress() { @Override public boolean shouldPerformMajorCompaction() throws IOException { - for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) { + for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStoreFiles()) { // TODO: what are these reader checks all over the place? if (sf.getReader() == null) { LOG.debug("StoreFile {} has null Reader", sf); @@ -1436,7 +1437,7 @@ public boolean shouldPerformMajorCompaction() throws IOException { } } return storeEngine.getCompactionPolicy() - .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStorefiles()); + .shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStoreFiles()); } public Optional requestCompaction() throws IOException { @@ -1614,7 +1615,7 @@ private void finishCompactionRequest(CompactionRequestImpl cr) { protected void refreshStoreSizeAndTotalBytes() throws IOException { this.storeSize.set(0L); this.totalUncompressedBytes.set(0L); - for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { + for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStoreFiles()) { StoreFileReader r = hsf.getReader(); if (r == null) { LOG.debug("StoreFile {} has a null Reader", hsf); @@ -1762,7 +1763,7 @@ public List recreateScanners(List currentFileS return null; } return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, - includeStartRow, stopRow, includeStopRow, readPt, false); + includeStartRow, stopRow, includeStopRow, readPt, false, false); } finally { this.storeEngine.readUnlock(); } @@ -1784,7 +1785,7 @@ public int getCompactedFilesCount() { } private LongStream getStoreFileAgeStream() { - return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> { + return this.storeEngine.getStoreFileManager().getStoreFiles().stream().filter(sf -> { if (sf.getReader() == null) { LOG.debug("StoreFile {} has a null Reader", sf); return false; @@ -1812,13 +1813,13 @@ public OptionalDouble getAvgStoreFileAge() { @Override public long getNumReferenceFiles() { - return this.storeEngine.getStoreFileManager().getStorefiles().stream() + return this.storeEngine.getStoreFileManager().getStoreFiles().stream() .filter(HStoreFile::isReference).count(); } @Override public long getNumHFiles() { - return this.storeEngine.getStoreFileManager().getStorefiles().stream() + return this.storeEngine.getStoreFileManager().getStoreFiles().stream() .filter(HStoreFile::isHFile).count(); } @@ -1830,19 +1831,19 @@ public long getStoreSizeUncompressed() { @Override public long getStorefilesSize() { // Include all StoreFiles - return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), + return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(), sf -> true); } @Override public long getHFilesSize() { // Include only StoreFiles which are HFiles - return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), + return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStoreFiles(), HStoreFile::isHFile); } private long getStorefilesFieldSize(ToLongFunction f) { - return this.storeEngine.getStoreFileManager().getStorefiles().stream() + return this.storeEngine.getStoreFileManager().getStoreFiles().stream() .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum(); } @@ -2415,7 +2416,7 @@ public int getCurrentParallelPutCount() { } public int getStoreRefCount() { - return this.storeEngine.getStoreFileManager().getStorefiles().stream() + return this.storeEngine.getStoreFileManager().getStoreFiles().stream() .filter(sf -> sf.getReader() != null).filter(HStoreFile::isHFile) .mapToInt(HStoreFile::getRefCount).sum(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index 5df02bfb26a8..b2e222428bac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -125,6 +125,8 @@ public class HStoreFile implements StoreFile { */ public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID"); + public static final byte[] HISTORICAL_KEY = Bytes.toBytes("HISTORICAL"); + private final StoreFileInfo fileInfo; // StoreFile.Reader @@ -138,6 +140,16 @@ public class HStoreFile implements StoreFile { // Indicates if the file got compacted private volatile boolean compactedAway = false; + // Indicates if the file contains historical cell versions. This is used when + // hbase.enable.historical.compaction.files is set to true. In that case, compactions + // can generate two files, one with the live cell versions and the other with the remaining + // (historical) cell versions. If isHistorical is true then the hfile is historical. + // Historical files are skipped for regular (not raw) scans for latest row versions. + // When hbase.enable.historical.compaction.files is false, isHistorical will be false + // for all files. This means all files will be treated as live files. Historical files are + // generated only when hbase.enable.historical.compaction.files is true. + private volatile boolean isHistorical = false; + // Keys for metadata stored in backing HFile. // Set when we obtain a Reader. private long sequenceid = -1; @@ -337,6 +349,10 @@ public boolean isCompactedAway() { return compactedAway; } + public boolean isHistorical() { + return isHistorical; + } + public int getRefCount() { return fileInfo.getRefCount(); } @@ -455,6 +471,10 @@ private void open() throws IOException { b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY); this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b)); + b = metadataMap.get(HISTORICAL_KEY); + if (b != null) { + isHistorical = Bytes.toBoolean(b); + } BloomType hfileBloomType = initialReader.getBloomFilterType(); if (cfBloomType != BloomType.NONE) { initialReader.loadBloomfilter(BlockType.GENERAL_BLOOM_META, metrics); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java index 48618a6976ce..7bb800a1d39c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java @@ -118,6 +118,14 @@ public RegionInfo getRegionInfo() { return regionFileSystem.getRegionInfo(); } + public int getMaxVersions() { + return family.getMaxVersions(); + } + + public boolean getNewVersionBehavior() { + return family.isNewVersionBehavior(); + } + public boolean isPrimaryReplicaStore() { return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index 34f882516bae..fbf9a4ffb135 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -360,7 +360,7 @@ public void refreshStoreFiles(Collection newFiles) throws IOException { * replicas to keep up to date with the primary region files. */ private void refreshStoreFilesInternal(Collection newFiles) throws IOException { - Collection currentFiles = storeFileManager.getStorefiles(); + Collection currentFiles = storeFileManager.getStoreFiles(); Collection compactedFiles = storeFileManager.getCompactedfiles(); if (currentFiles == null) { currentFiles = Collections.emptySet(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index 387fa559dcd3..86a14047f138 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -50,7 +50,7 @@ public interface StoreFileManager { */ @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "", allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)") - void loadFiles(List storeFiles); + void loadFiles(List storeFiles) throws IOException; /** * Adds new files, either for from MemStore flush or bulk insert, into the structure. @@ -58,7 +58,7 @@ public interface StoreFileManager { */ @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "", allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)") - void insertNewFiles(Collection sfs); + void insertNewFiles(Collection sfs) throws IOException; /** * Adds only the new compaction results into the structure. @@ -67,7 +67,8 @@ public interface StoreFileManager { */ @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "", allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)") - void addCompactionResults(Collection compactedFiles, Collection results); + void addCompactionResults(Collection compactedFiles, Collection results) + throws IOException; /** * Remove the compacted files @@ -95,7 +96,7 @@ public interface StoreFileManager { * checks; should not assume anything about relations between store files in the list. * @return The list of StoreFiles. */ - Collection getStorefiles(); + Collection getStoreFiles(); /** * List of compacted files inside this store that needs to be excluded in reads because further @@ -119,12 +120,13 @@ public interface StoreFileManager { /** * Gets the store files to scan for a Scan or Get request. - * @param startRow Start row of the request. - * @param stopRow Stop row of the request. + * @param startRow Start row of the request. + * @param stopRow Stop row of the request. + * @param onlyLatestVersion Scan only latest live version cells. * @return The list of files that are to be read for this request. */ Collection getFilesForScan(byte[] startRow, boolean includeStartRow, byte[] stopRow, - boolean includeStopRow); + boolean includeStopRow, boolean onlyLatestVersion); /** * Gets initial, full list of candidate store files to check for row-key-before. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java index 17e0001fb0cc..67fa2244e957 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java @@ -17,22 +17,27 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.HISTORICAL_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS; import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; +import static org.apache.hadoop.hbase.regionserver.StoreEngine.STORE_ENGINE_CLASS_KEY; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.UUID; import java.util.function.Consumer; @@ -43,6 +48,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; @@ -53,6 +60,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.util.BloomContext; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.BloomFilterUtil; @@ -68,6 +76,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Strings; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -79,24 +88,42 @@ @InterfaceAudience.Private public class StoreFileWriter implements CellSink, ShipperListener { private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName()); + public static final String ENABLE_HISTORICAL_COMPACTION_FILES = + "hbase.enable.historical.compaction.files"; + public static final boolean DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES = false; private static final Pattern dash = Pattern.compile("-"); - private final BloomFilterWriter generalBloomFilterWriter; - private final BloomFilterWriter deleteFamilyBloomFilterWriter; + private SingleStoreFileWriter liveFileWriter; + private SingleStoreFileWriter historicalFileWriter; + private final FileSystem fs; + private final Path historicalFilePath; + private final Configuration conf; + private final CacheConfig cacheConf; private final BloomType bloomType; - private byte[] bloomParam = null; - private long earliestPutTs = HConstants.LATEST_TIMESTAMP; - private long deleteFamilyCnt = 0; - private BloomContext bloomContext = null; - private BloomContext deleteFamilyBloomContext = null; - private final TimeRangeTracker timeRangeTracker; + private final long maxKeys; + private final InetSocketAddress[] favoredNodes; + private final HFileContext fileContext; + private final boolean shouldDropCacheBehind; private final Supplier> compactedFilesSupplier; - - protected HFile.Writer writer; + private final CellComparator comparator; + private Cell lastCell; + // The first (latest) delete family marker of the current row + private Cell deleteFamily; + // The list of delete family version markers of the current row + private List deleteFamilyVersionList = new ArrayList<>(); + // The first (latest) delete column marker of the current column + private Cell deleteColumn; + // The list of delete column version markers of the current column + private List deleteColumnVersionList = new ArrayList<>(); + // The live put cell count for the current column + private int livePutCellCount; + private final int maxVersions; + private final boolean newVersionBehavior; /** * Creates an HFile.Writer that also write helpful meta data. * @param fs file system to write to - * @param path file name to create + * @param liveFilePath the name of the live file to create + * @param historicalFilePath the name of the historical file name to create * @param conf user configuration * @param bloomType bloom filter setting * @param maxKeys the expected maximum number of keys to be added. Was used for @@ -105,72 +132,61 @@ public class StoreFileWriter implements CellSink, ShipperListener { * @param fileContext The HFile context * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived + * @param comparator Cell comparator + * @param maxVersions max cell versions + * @param newVersionBehavior enable new version behavior * @throws IOException problem writing to FS */ - private StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf, - BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, HFileContext fileContext, - boolean shouldDropCacheBehind, Supplier> compactedFilesSupplier) - throws IOException { + private StoreFileWriter(FileSystem fs, Path liveFilePath, Path historicalFilePath, + final Configuration conf, CacheConfig cacheConf, BloomType bloomType, long maxKeys, + InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind, + Supplier> compactedFilesSupplier, CellComparator comparator, + int maxVersions, boolean newVersionBehavior) throws IOException { + this.fs = fs; + this.historicalFilePath = historicalFilePath; + this.conf = conf; + this.cacheConf = cacheConf; + this.bloomType = bloomType; + this.maxKeys = maxKeys; + this.favoredNodes = favoredNodes; + this.fileContext = fileContext; + this.shouldDropCacheBehind = shouldDropCacheBehind; this.compactedFilesSupplier = compactedFilesSupplier; - this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); - // TODO : Change all writers to be specifically created for compaction context - writer = - HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes) - .withFileContext(fileContext).withShouldDropCacheBehind(shouldDropCacheBehind).create(); - - generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, - bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); + this.comparator = comparator; + this.maxVersions = maxVersions; + this.newVersionBehavior = newVersionBehavior; + liveFileWriter = new SingleStoreFileWriter(fs, liveFilePath, conf, cacheConf, bloomType, + maxKeys, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); + } - if (generalBloomFilterWriter != null) { - this.bloomType = bloomType; - this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf); - if (LOG.isTraceEnabled()) { - LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: " - + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH - ? Bytes.toInt(bloomParam) - : Bytes.toStringBinary(bloomParam)) - + ", " + generalBloomFilterWriter.getClass().getSimpleName()); + public static boolean shouldEnableHistoricalCompactionFiles(Configuration conf) { + if ( + conf.getBoolean(ENABLE_HISTORICAL_COMPACTION_FILES, + DEFAULT_ENABLE_HISTORICAL_COMPACTION_FILES) + ) { + // Historical compaction files are supported only for default store engine with + // default compactor. + String storeEngine = conf.get(STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + if (!storeEngine.equals(DefaultStoreEngine.class.getName())) { + LOG.warn("Historical compaction file generation is ignored for " + storeEngine + + ". hbase.enable.historical.compaction.files can be set to true only for the " + + "default compaction (DefaultStoreEngine and DefaultCompactor)"); + return false; } - // init bloom context - switch (bloomType) { - case ROW: - bloomContext = - new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); - break; - case ROWCOL: - bloomContext = - new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); - break; - case ROWPREFIX_FIXED_LENGTH: - bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter, - fileContext.getCellComparator(), Bytes.toInt(bloomParam)); - break; - default: - throw new IOException( - "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)"); + String compactor = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DefaultCompactor.class.getName()); + if (!compactor.equals(DefaultCompactor.class.getName())) { + LOG.warn("Historical compaction file generation is ignored for " + compactor + + ". hbase.enable.historical.compaction.files can be set to true only for the " + + "default compaction (DefaultStoreEngine and DefaultCompactor)"); + return false; } - } else { - // Not using Bloom filters. - this.bloomType = BloomType.NONE; - } - - // initialize delete family Bloom filter when there is NO RowCol Bloom filter - if (this.bloomType != BloomType.ROWCOL) { - this.deleteFamilyBloomFilterWriter = BloomFilterFactory.createDeleteBloomAtWrite(conf, - cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); - deleteFamilyBloomContext = - new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator()); - } else { - deleteFamilyBloomFilterWriter = null; - } - if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) { - LOG.trace("Delete Family Bloom filter type for " + path + ": " - + deleteFamilyBloomFilterWriter.getClass().getSimpleName()); + return true; } + return false; } public long getPos() throws IOException { - return ((HFileWriterImpl) writer).getPos(); + return liveFileWriter.getPos(); } /** @@ -181,7 +197,10 @@ public long getPos() throws IOException { */ public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) throws IOException { - appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet()); + liveFileWriter.appendMetadata(maxSequenceId, majorCompaction); + if (historicalFileWriter != null) { + historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction); + } } /** @@ -193,37 +212,10 @@ public void appendMetadata(final long maxSequenceId, final boolean majorCompacti */ public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, final Collection storeFiles) throws IOException { - writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); - writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); - writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles)); - appendTrackedTimestampsToMetadata(); - } - - /** - * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The compacted - * store files's name is needed. But if the compacted store file is a result of compaction, it's - * compacted files which still not archived is needed, too. And don't need to add compacted files - * recursively. If file A, B, C compacted to new file D, and file D compacted to new file E, will - * write A, B, C, D to file E's compacted files. So if file E compacted to new file F, will add E - * to F's compacted files first, then add E's compacted files: A, B, C, D to it. And no need to - * add D's compacted file, as D's compacted files has been in E's compacted files, too. See - * HBASE-20724 for more details. - * @param storeFiles The compacted store files to generate this new file - * @return bytes of CompactionEventTracker - */ - private byte[] toCompactionEventTrackerBytes(Collection storeFiles) { - Set notArchivedCompactedStoreFiles = this.compactedFilesSupplier.get().stream() - .map(sf -> sf.getPath().getName()).collect(Collectors.toSet()); - Set compactedStoreFiles = new HashSet<>(); - for (HStoreFile storeFile : storeFiles) { - compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName()); - for (String csf : storeFile.getCompactedStoreFiles()) { - if (notArchivedCompactedStoreFiles.contains(csf)) { - compactedStoreFiles.add(csf); - } - } + liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles); + if (historicalFileWriter != null) { + historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, storeFiles); } - return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles); } /** @@ -235,10 +227,10 @@ private byte[] toCompactionEventTrackerBytes(Collection storeFiles) */ public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, final long mobCellsCount) throws IOException { - writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); - writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); - writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); - appendTrackedTimestampsToMetadata(); + liveFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount); + if (historicalFileWriter != null) { + historicalFileWriter.appendMetadata(maxSequenceId, majorCompaction, mobCellsCount); + } } /** @@ -247,7 +239,10 @@ public void appendMetadata(final long maxSequenceId, final boolean majorCompacti * @throws IOException problem writing to FS */ public void appendMobMetadata(SetMultimap mobRefSet) throws IOException { - writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet)); + liveFileWriter.appendMobMetadata(mobRefSet); + if (historicalFileWriter != null) { + historicalFileWriter.appendMobMetadata(mobRefSet); + } } /** @@ -256,156 +251,560 @@ public void appendMobMetadata(SetMultimap mobRefSet) throws I public void appendTrackedTimestampsToMetadata() throws IOException { // TODO: The StoreFileReader always converts the byte[] to TimeRange // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. - appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); - appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); + liveFileWriter.appendTrackedTimestampsToMetadata(); + if (historicalFileWriter != null) { + historicalFileWriter.appendTrackedTimestampsToMetadata(); + } } - /** - * Record the earlest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker - * to include the timestamp of this key - */ - public void trackTimestamps(final Cell cell) { - if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { - earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); + @Override + public void beforeShipped() throws IOException { + liveFileWriter.beforeShipped(); + if (historicalFileWriter != null) { + historicalFileWriter.beforeShipped(); } - timeRangeTracker.includeTimestamp(cell); } - private void appendGeneralBloomfilter(final Cell cell) throws IOException { - if (this.generalBloomFilterWriter != null) { - /* - * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png - * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 3 Types of - * Filtering: 1. Row = Row 2. RowCol = Row + Qualifier 3. RowPrefixFixedLength = Fixed Length - * Row Prefix - */ - bloomContext.writeBloom(cell); + public Path getPath() { + return liveFileWriter.getPath(); + } + + public List getPaths() { + if (historicalFileWriter == null) { + return Lists.newArrayList(liveFileWriter.getPath()); } + return Lists.newArrayList(liveFileWriter.getPath(), historicalFileWriter.getPath()); } - private void appendDeleteFamilyBloomFilter(final Cell cell) throws IOException { - if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) { - return; + public boolean hasGeneralBloom() { + return liveFileWriter.hasGeneralBloom(); + } + + /** + * For unit testing only. + * @return the Bloom filter used by this writer. + */ + BloomFilterWriter getGeneralBloomWriter() { + return liveFileWriter.generalBloomFilterWriter; + } + + public void close() throws IOException { + liveFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(false)); + liveFileWriter.close(); + if (historicalFileWriter != null) { + historicalFileWriter.appendFileInfo(HISTORICAL_KEY, Bytes.toBytes(true)); + historicalFileWriter.close(); } + } - // increase the number of delete family in the store file - deleteFamilyCnt++; - if (this.deleteFamilyBloomFilterWriter != null) { - deleteFamilyBloomContext.writeBloom(cell); + public void appendFileInfo(byte[] key, byte[] value) throws IOException { + liveFileWriter.appendFileInfo(key, value); + if (historicalFileWriter != null) { + historicalFileWriter.appendFileInfo(key, value); } } - @Override - public void append(final Cell cell) throws IOException { - appendGeneralBloomfilter(cell); - appendDeleteFamilyBloomFilter(cell); - writer.append(cell); - trackTimestamps(cell); + /** + * For use in testing. + */ + HFile.Writer getLiveFileWriter() { + return liveFileWriter.getHFileWriter(); } - @Override - public void beforeShipped() throws IOException { - // For now these writer will always be of type ShipperListener true. - // TODO : Change all writers to be specifically created for compaction context - writer.beforeShipped(); - if (generalBloomFilterWriter != null) { - generalBloomFilterWriter.beforeShipped(); + /** + * @param dir Directory to create file in. + * @return random filename inside passed dir + */ + public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException { + if (!fs.getFileStatus(dir).isDirectory()) { + throw new IOException("Expecting " + dir.toString() + " to be a directory"); } - if (deleteFamilyBloomFilterWriter != null) { - deleteFamilyBloomFilterWriter.beforeShipped(); + return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll("")); + } + + private SingleStoreFileWriter getHistoricalFileWriter() throws IOException { + if (historicalFileWriter == null) { + historicalFileWriter = + new SingleStoreFileWriter(fs, historicalFilePath, conf, cacheConf, bloomType, maxKeys, + favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); } + return historicalFileWriter; } - public Path getPath() { - return this.writer.getPath(); + private void initRowState() { + deleteFamily = null; + deleteFamilyVersionList.clear(); + lastCell = null; } - public boolean hasGeneralBloom() { - return this.generalBloomFilterWriter != null; + private void initColumnState() { + livePutCellCount = 0; + deleteColumn = null; + deleteColumnVersionList.clear(); + } - /** - * For unit testing only. - * @return the Bloom filter used by this writer. - */ - BloomFilterWriter getGeneralBloomWriter() { - return generalBloomFilterWriter; + private boolean isDeletedByDeleteFamily(Cell cell) { + return deleteFamily != null && (deleteFamily.getTimestamp() > cell.getTimestamp() + || (deleteFamily.getTimestamp() == cell.getTimestamp() + && (!newVersionBehavior || cell.getSequenceId() < deleteFamily.getSequenceId()))); } - private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException { - boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0); - if (haveBloom) { - bfw.compactBloom(); + private boolean isDeletedByDeleteFamilyVersion(Cell cell) { + for (Cell deleteFamilyVersion : deleteFamilyVersionList) { + if ( + deleteFamilyVersion.getTimestamp() == cell.getTimestamp() + && (!newVersionBehavior || cell.getSequenceId() < deleteFamilyVersion.getSequenceId()) + ) { + return true; + } } - return haveBloom; + return false; } - private boolean closeGeneralBloomFilter() throws IOException { - boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter); + private boolean isDeletedByDeleteColumn(Cell cell) { + return deleteColumn != null && (deleteColumn.getTimestamp() > cell.getTimestamp() + || (deleteColumn.getTimestamp() == cell.getTimestamp() + && (!newVersionBehavior || cell.getSequenceId() < deleteColumn.getSequenceId()))); + } - // add the general Bloom filter writer and append file info - if (hasGeneralBloom) { - writer.addGeneralBloomFilter(generalBloomFilterWriter); - writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); - if (bloomParam != null) { - writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam); + private boolean isDeletedByDeleteColumnVersion(Cell cell) { + for (Cell deleteColumnVersion : deleteColumnVersionList) { + if ( + deleteColumnVersion.getTimestamp() == cell.getTimestamp() + && (!newVersionBehavior || cell.getSequenceId() < deleteColumnVersion.getSequenceId()) + ) { + return true; } - bloomContext.addLastBloomKey(writer); } - return hasGeneralBloom; + return false; } - private boolean closeDeleteFamilyBloomFilter() throws IOException { - boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter); + private boolean isDeleted(Cell cell) { + return isDeletedByDeleteFamily(cell) || isDeletedByDeleteColumn(cell) + || isDeletedByDeleteFamilyVersion(cell) || isDeletedByDeleteColumnVersion(cell); + } - // add the delete family Bloom filter writer - if (hasDeleteFamilyBloom) { - writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter); + private void appendCell(Cell cell) throws IOException { + if ((lastCell == null || !CellUtil.matchingColumn(lastCell, cell))) { + initColumnState(); } + if (cell.getType() == Cell.Type.DeleteFamily) { + if (deleteFamily == null) { + deleteFamily = cell; + liveFileWriter.append(cell); + } else { + getHistoricalFileWriter().append(cell); + } + } else if (cell.getType() == Cell.Type.DeleteFamilyVersion) { + if (!isDeletedByDeleteFamily(cell)) { + deleteFamilyVersionList.add(cell); + if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) { + // This means both the delete-family and delete-family-version markers have the same + // timestamp but the sequence id of delete-family-version marker is higher than that of + // the delete-family marker. In this case, there is no need to add the + // delete-family-version marker to the live version file. This case happens only with + // the new version behavior. + liveFileWriter.append(cell); + } else { + liveFileWriter.append(cell); + } + } else { + getHistoricalFileWriter().append(cell); + } + } else if (cell.getType() == Cell.Type.DeleteColumn) { + if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) { + deleteColumn = cell; + liveFileWriter.append(cell); + } else { + getHistoricalFileWriter().append(cell); + } + } else if (cell.getType() == Cell.Type.Delete) { + if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) { + deleteColumnVersionList.add(cell); + if (deleteFamily != null && deleteFamily.getTimestamp() == cell.getTimestamp()) { + // This means both the delete-family and delete-column-version markers have the same + // timestamp but the sequence id of delete-column-version marker is higher than that of + // the delete-family marker. In this case, there is no need to add the + // delete-column-version marker to the live version file. This case happens only with + // the new version behavior. + getHistoricalFileWriter().append(cell); + } else { + liveFileWriter.append(cell); + } + } else { + getHistoricalFileWriter().append(cell); + } + } else if (cell.getType() == Cell.Type.Put) { + if (livePutCellCount < maxVersions) { + // This is a live put cell (i.e., the latest version) of a column. Is it deleted? + if (!isDeleted(cell)) { + liveFileWriter.append(cell); + livePutCellCount++; + } else { + // It is deleted + getHistoricalFileWriter().append(cell); + if (newVersionBehavior) { + // Deleted versions are considered toward total version count when newVersionBehavior + livePutCellCount++; + } + } + } else { + // It is an older put cell + getHistoricalFileWriter().append(cell); + } + } + lastCell = cell; + } - // append file info about the number of delete family kvs - // even if there is no delete family Bloom. - writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt)); + @Override + public void appendAll(List cellList) throws IOException { + if (historicalFilePath == null) { + // The dual writing is not enabled and all cells are written to one file. We use + // the live version file in this case + for (Cell cell : cellList) { + liveFileWriter.append(cell); + } + return; + } + if (cellList.isEmpty()) { + return; + } + if (lastCell != null && comparator.compareRows(lastCell, cellList.get(0)) != 0) { + // It is a new row and thus time to reset the state + initRowState(); + } + for (Cell cell : cellList) { + appendCell(cell); + } + } - return hasDeleteFamilyBloom; + @Override + public void append(Cell cell) throws IOException { + if (historicalFilePath == null) { + // The dual writing is not enabled and all cells are written to one file. We use + // the live version file in this case + liveFileWriter.append(cell); + return; + } + appendCell(cell); } - public void close() throws IOException { - boolean hasGeneralBloom = this.closeGeneralBloomFilter(); - boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter(); + private static class SingleStoreFileWriter { + private final BloomFilterWriter generalBloomFilterWriter; + private final BloomFilterWriter deleteFamilyBloomFilterWriter; + private final BloomType bloomType; + private byte[] bloomParam = null; + private long earliestPutTs = HConstants.LATEST_TIMESTAMP; + private long deleteFamilyCnt = 0; + private BloomContext bloomContext = null; + private BloomContext deleteFamilyBloomContext = null; + private final TimeRangeTracker timeRangeTracker; + private final Supplier> compactedFilesSupplier; + + private HFile.Writer writer; - writer.close(); + /** + * Creates an HFile.Writer that also write helpful meta data. + * @param fs file system to write to + * @param path file name to create + * @param conf user configuration + * @param bloomType bloom filter setting + * @param maxKeys the expected maximum number of keys to be added. Was used for + * Bloom filter size in {@link HFile} format version 1. + * @param favoredNodes an array of favored nodes or possibly null + * @param fileContext The HFile context + * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file. + * @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived + * @throws IOException problem writing to FS + */ + private SingleStoreFileWriter(FileSystem fs, Path path, final Configuration conf, + CacheConfig cacheConf, BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, + HFileContext fileContext, boolean shouldDropCacheBehind, + Supplier> compactedFilesSupplier) throws IOException { + this.compactedFilesSupplier = compactedFilesSupplier; + this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); + // TODO : Change all writers to be specifically created for compaction context + writer = + HFile.getWriterFactory(conf, cacheConf).withPath(fs, path).withFavoredNodes(favoredNodes) + .withFileContext(fileContext).withShouldDropCacheBehind(shouldDropCacheBehind).create(); + + generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, + bloomType, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); + + if (generalBloomFilterWriter != null) { + this.bloomType = bloomType; + this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf); + if (LOG.isTraceEnabled()) { + LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: " + + (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH + ? Bytes.toInt(bloomParam) + : Bytes.toStringBinary(bloomParam)) + + ", " + generalBloomFilterWriter.getClass().getSimpleName()); + } + // init bloom context + switch (bloomType) { + case ROW: + bloomContext = + new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); + break; + case ROWCOL: + bloomContext = + new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator()); + break; + case ROWPREFIX_FIXED_LENGTH: + bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter, + fileContext.getCellComparator(), Bytes.toInt(bloomParam)); + break; + default: + throw new IOException( + "Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)"); + } + } else { + // Not using Bloom filters. + this.bloomType = BloomType.NONE; + } - // Log final Bloom filter statistics. This needs to be done after close() - // because compound Bloom filters might be finalized as part of closing. - if (LOG.isTraceEnabled()) { - LOG.trace( - (hasGeneralBloom ? "" : "NO ") + "General Bloom and " + (hasDeleteFamilyBloom ? "" : "NO ") - + "DeleteFamily" + " was added to HFile " + getPath()); + // initialize delete family Bloom filter when there is NO RowCol Bloom filter + if (this.bloomType != BloomType.ROWCOL) { + this.deleteFamilyBloomFilterWriter = BloomFilterFactory.createDeleteBloomAtWrite(conf, + cacheConf, (int) Math.min(maxKeys, Integer.MAX_VALUE), writer); + deleteFamilyBloomContext = + new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator()); + } else { + deleteFamilyBloomFilterWriter = null; + } + if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) { + LOG.trace("Delete Family Bloom filter type for " + path + ": " + + deleteFamilyBloomFilterWriter.getClass().getSimpleName()); + } } - } + private long getPos() throws IOException { + return ((HFileWriterImpl) writer).getPos(); + } - public void appendFileInfo(byte[] key, byte[] value) throws IOException { - writer.appendFileInfo(key, value); - } + /** + * Writes meta data. Call before {@link #close()} since its written as meta data to this file. + * @param maxSequenceId Maximum sequence id. + * @param majorCompaction True if this file is product of a major compaction + * @throws IOException problem writing to FS + */ + private void appendMetadata(final long maxSequenceId, final boolean majorCompaction) + throws IOException { + appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet()); + } - /** - * For use in testing. - */ - HFile.Writer getHFileWriter() { - return writer; - } + /** + * Writes meta data. Call before {@link #close()} since its written as meta data to this file. + * @param maxSequenceId Maximum sequence id. + * @param majorCompaction True if this file is product of a major compaction + * @param storeFiles The compacted store files to generate this new file + * @throws IOException problem writing to FS + */ + private void appendMetadata(final long maxSequenceId, final boolean majorCompaction, + final Collection storeFiles) throws IOException { + writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); + writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); + writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles)); + appendTrackedTimestampsToMetadata(); + } - /** - * @param dir Directory to create file in. - * @return random filename inside passed dir - */ - public static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException { - if (!fs.getFileStatus(dir).isDirectory()) { - throw new IOException("Expecting " + dir.toString() + " to be a directory"); + /** + * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The + * compacted store files's name is needed. But if the compacted store file is a result of + * compaction, it's compacted files which still not archived is needed, too. And don't need to + * add compacted files recursively. If file A, B, C compacted to new file D, and file D + * compacted to new file E, will write A, B, C, D to file E's compacted files. So if file E + * compacted to new file F, will add E to F's compacted files first, then add E's compacted + * files: A, B, C, D to it. And no need to add D's compacted file, as D's compacted files has + * been in E's compacted files, too. See HBASE-20724 for more details. + * @param storeFiles The compacted store files to generate this new file + * @return bytes of CompactionEventTracker + */ + private byte[] toCompactionEventTrackerBytes(Collection storeFiles) { + Set notArchivedCompactedStoreFiles = this.compactedFilesSupplier.get().stream() + .map(sf -> sf.getPath().getName()).collect(Collectors.toSet()); + Set compactedStoreFiles = new HashSet<>(); + for (HStoreFile storeFile : storeFiles) { + compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName()); + for (String csf : storeFile.getCompactedStoreFiles()) { + if (notArchivedCompactedStoreFiles.contains(csf)) { + compactedStoreFiles.add(csf); + } + } + } + return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles); + } + + /** + * Writes meta data. Call before {@link #close()} since its written as meta data to this file. + * @param maxSequenceId Maximum sequence id. + * @param majorCompaction True if this file is product of a major compaction + * @param mobCellsCount The number of mob cells. + * @throws IOException problem writing to FS + */ + private void appendMetadata(final long maxSequenceId, final boolean majorCompaction, + final long mobCellsCount) throws IOException { + writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); + writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); + writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); + appendTrackedTimestampsToMetadata(); + } + + /** + * Appends MOB - specific metadata (even if it is empty) + * @param mobRefSet - original table -> set of MOB file names + * @throws IOException problem writing to FS + */ + private void appendMobMetadata(SetMultimap mobRefSet) throws IOException { + writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet)); + } + + /** + * Add TimestampRange and earliest put timestamp to Metadata + */ + private void appendTrackedTimestampsToMetadata() throws IOException { + // TODO: The StoreFileReader always converts the byte[] to TimeRange + // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. + appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker)); + appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); + } + + /** + * Record the earlest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker + * to include the timestamp of this key + */ + private void trackTimestamps(final Cell cell) { + if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) { + earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); + } + timeRangeTracker.includeTimestamp(cell); + } + + private void appendGeneralBloomfilter(final Cell cell) throws IOException { + if (this.generalBloomFilterWriter != null) { + /* + * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue. + * png Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp 3 Types of + * Filtering: 1. Row = Row 2. RowCol = Row + Qualifier 3. RowPrefixFixedLength = Fixed + * Length Row Prefix + */ + bloomContext.writeBloom(cell); + } + } + + private void appendDeleteFamilyBloomFilter(final Cell cell) throws IOException { + if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) { + return; + } + + // increase the number of delete family in the store file + deleteFamilyCnt++; + if (this.deleteFamilyBloomFilterWriter != null) { + deleteFamilyBloomContext.writeBloom(cell); + } + } + + private void append(final Cell cell) throws IOException { + appendGeneralBloomfilter(cell); + appendDeleteFamilyBloomFilter(cell); + writer.append(cell); + trackTimestamps(cell); + } + + private void beforeShipped() throws IOException { + // For now these writer will always be of type ShipperListener true. + // TODO : Change all writers to be specifically created for compaction context + writer.beforeShipped(); + if (generalBloomFilterWriter != null) { + generalBloomFilterWriter.beforeShipped(); + } + if (deleteFamilyBloomFilterWriter != null) { + deleteFamilyBloomFilterWriter.beforeShipped(); + } + } + + private Path getPath() { + return this.writer.getPath(); + } + + private boolean hasGeneralBloom() { + return this.generalBloomFilterWriter != null; + } + + /** + * For unit testing only. + * @return the Bloom filter used by this writer. + */ + BloomFilterWriter getGeneralBloomWriter() { + return generalBloomFilterWriter; + } + + private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException { + boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0); + if (haveBloom) { + bfw.compactBloom(); + } + return haveBloom; + } + + private boolean closeGeneralBloomFilter() throws IOException { + boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter); + + // add the general Bloom filter writer and append file info + if (hasGeneralBloom) { + writer.addGeneralBloomFilter(generalBloomFilterWriter); + writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); + if (bloomParam != null) { + writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam); + } + bloomContext.addLastBloomKey(writer); + } + return hasGeneralBloom; + } + + private boolean closeDeleteFamilyBloomFilter() throws IOException { + boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter); + + // add the delete family Bloom filter writer + if (hasDeleteFamilyBloom) { + writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter); + } + + // append file info about the number of delete family kvs + // even if there is no delete family Bloom. + writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt)); + + return hasDeleteFamilyBloom; + } + + private void close() throws IOException { + boolean hasGeneralBloom = this.closeGeneralBloomFilter(); + boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter(); + + writer.close(); + + // Log final Bloom filter statistics. This needs to be done after close() + // because compound Bloom filters might be finalized as part of closing. + if (LOG.isTraceEnabled()) { + LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " + + (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " + + getPath()); + } + + } + + private void appendFileInfo(byte[] key, byte[] value) throws IOException { + writer.appendFileInfo(key, value); + } + + /** + * For use in testing. + */ + private HFile.Writer getHFileWriter() { + return writer; } - return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll("")); } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "ICAST_INTEGER_MULTIPLY_CAST_TO_LONG", @@ -418,7 +817,9 @@ public static class Builder { private BloomType bloomType = BloomType.NONE; private long maxKeyCount = 0; private Path dir; - private Path filePath; + private Path liveFilePath; + private Path historicalFilePath; + private InetSocketAddress[] favoredNodes; private HFileContext fileContext; private boolean shouldDropCacheBehind; @@ -430,6 +831,10 @@ public static class Builder { // store files which are not recorded in the SFT, but for the newly created store file writer, // they are not tracked in SFT, so here we need to record them and treat them specially. private Consumer writerCreationTracker; + private int maxVersions; + private boolean newVersionBehavior; + private CellComparator comparator; + private boolean isCompaction; public Builder(Configuration conf, CacheConfig cacheConf, FileSystem fs) { this.conf = conf; @@ -465,7 +870,7 @@ public Builder withOutputDir(Path dir) { */ public Builder withFilePath(Path filePath) { Preconditions.checkNotNull(filePath); - this.filePath = filePath; + this.liveFilePath = filePath; return this; } @@ -519,17 +924,37 @@ public Builder withWriterCreationTracker(Consumer writerCreationTracker) { return this; } + public Builder withMaxVersions(int maxVersions) { + this.maxVersions = maxVersions; + return this; + } + + public Builder withNewVersionBehavior(boolean newVersionBehavior) { + this.newVersionBehavior = newVersionBehavior; + return this; + } + + public Builder withCellComparator(CellComparator comparator) { + this.comparator = comparator; + return this; + } + + public Builder withIsCompaction(boolean isCompaction) { + this.isCompaction = isCompaction; + return this; + } + /** * Create a store file writer. Client is responsible for closing file when done. If metadata, * add BEFORE closing using {@link StoreFileWriter#appendMetadata}. */ public StoreFileWriter build() throws IOException { - if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) { + if ((dir == null ? 0 : 1) + (liveFilePath == null ? 0 : 1) != 1) { throw new IllegalArgumentException("Either specify parent directory " + "or file path"); } if (dir == null) { - dir = filePath.getParent(); + dir = liveFilePath.getParent(); } if (!fs.exists(dir)) { @@ -545,7 +970,7 @@ public StoreFileWriter build() throws IOException { } CommonFSUtils.setStoragePolicy(this.fs, dir, policyName); - if (filePath == null) { + if (liveFilePath == null) { // The stored file and related blocks will used the directory based StoragePolicy. // Because HDFS DistributedFileSystem does not support create files with storage policy // before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files @@ -560,21 +985,30 @@ public StoreFileWriter build() throws IOException { } CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy); } - filePath = getUniqueFile(fs, dir); + liveFilePath = getUniqueFile(fs, dir); if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { bloomType = BloomType.NONE; } } + + if (isCompaction && shouldEnableHistoricalCompactionFiles(conf)) { + historicalFilePath = getUniqueFile(fs, dir); + } + // make sure we call this before actually create the writer // in fact, it is not a big deal to even add an inexistent file to the track, as we will never // try to delete it and finally we will clean the tracker up after compaction. But if the file // cleaner find the file but we haven't recorded it yet, it may accidentally delete the file // and cause problem. if (writerCreationTracker != null) { - writerCreationTracker.accept(filePath); + writerCreationTracker.accept(liveFilePath); + if (historicalFilePath != null) { + writerCreationTracker.accept(historicalFilePath); + } } - return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount, - favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier); + return new StoreFileWriter(fs, liveFilePath, historicalFilePath, conf, cacheConf, bloomType, + maxKeyCount, favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier, + comparator, maxVersions, newVersionBehavior); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index c12307841a2b..89d4aa34e78c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.PrivateCellUtil; @@ -223,6 +224,12 @@ private void addCurrentScanners(List scanners) { this.currentScanners.addAll(scanners); } + private static boolean isOnlyLatestVersionScan(Scan scan) { + // No need to check for Scan#getMaxVersions because live version files generated by store file + // writer retains max versions specified in ColumnFamilyDescriptor for the given CF + return !scan.isRaw() && scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP; + } + /** * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we are not in a * compaction. @@ -247,7 +254,8 @@ public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet sfs, List memStoreSc // Eagerly creating scanners so that we have the ref counting ticking on the newly created // store files. In case of stream scanners this eager creation does not induce performance // penalty because in scans (that uses stream scanners) the next() call is bound to happen. - List scanners = store.getScanners(sfs, cacheBlocks, get, usePread, - isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false); + List scanners = + store.getScanners(sfs, cacheBlocks, get, usePread, isCompaction, matcher, + scan.getStartRow(), scan.getStopRow(), this.readPt, false, isOnlyLatestVersionScan(scan)); flushedstoreFileScanners.addAll(scanners); if (!CollectionUtils.isEmpty(memStoreScanners)) { clearAndClose(memStoreScannersAfterFlush); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java index 85f61a029ad9..8ac8397b868c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java @@ -136,7 +136,7 @@ public void loadFiles(List storeFiles) { } @Override - public Collection getStorefiles() { + public Collection getStoreFiles() { return state.allFilesCached; } @@ -300,7 +300,7 @@ private double getMidStripeSplitRatio(long smallerSize, long largerSize, long la @Override public Collection getFilesForScan(byte[] startRow, boolean includeStartRow, - byte[] stopRow, boolean includeStopRow) { + byte[] stopRow, boolean includeStopRow, boolean onlyLatestVersion) { if (state.stripeFiles.isEmpty()) { return state.level0Files; // There's just L0. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 251c8227da00..538efecb4018 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; +import static org.apache.hadoop.hbase.regionserver.StoreFileWriter.shouldEnableHistoricalCompactionFiles; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; @@ -143,6 +145,12 @@ public class CompactionConfiguration { conf.getLong(HBASE_HSTORE_COMPACTION_MIN_SIZE_KEY, storeConfigInfo.getMemStoreFlushSize()); minFilesToCompact = Math.max(2, conf.getInt(HBASE_HSTORE_COMPACTION_MIN_KEY, conf.getInt(HBASE_HSTORE_COMPACTION_MIN_KEY_OLD, 3))); + if (shouldEnableHistoricalCompactionFiles(conf)) { + // If historical file writing is enabled, we bump up the min value by one as DualFileWriter + // compacts files into two files, live and historical, instead of one. This also eliminates + // infinite re-compaction when the min value is set to 2 + minFilesToCompact += 1; + } maxFilesToCompact = conf.getInt(HBASE_HSTORE_COMPACTION_MAX_KEY, 10); compactionRatio = conf.getFloat(HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.2F); offPeakCompactionRatio = conf.getFloat(HBASE_HSTORE_COMPACTION_RATIO_OFFPEAK_KEY, 5.0F); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index d9ad265da64e..e58c53c355f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -464,7 +464,6 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel lastCleanCell = null; lastCleanCellSeqId = 0; } - writer.append(c); int len = c.getSerializedSize(); ++progress.currentCompactedKVs; progress.totalCompactedSize += len; @@ -478,6 +477,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel return false; } } + writer.appendAll(cells); if (shipper != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { if (lastCleanCell != null) { // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index eb803c3e2a88..bcc84230952f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -31,8 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - /** * Compact passed set of files. Create an instance and then call * {@link #compact(CompactionRequestImpl, ThroughputController, User)} @@ -67,7 +65,7 @@ public List compact(final CompactionRequestImpl request, @Override protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { - List newFiles = Lists.newArrayList(writer.getPath()); + List newFiles = writer.getPaths(); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); writer.close(); return newFiles; @@ -75,17 +73,19 @@ protected List commitWriter(StoreFileWriter writer, FileDetails fd, @Override protected final void abortWriter(StoreFileWriter writer) throws IOException { - Path leftoverFile = writer.getPath(); + List leftoverFiles = writer.getPaths(); try { writer.close(); } catch (IOException e) { LOG.warn("Failed to close the writer after an unfinished compaction.", e); } try { - store.getFileSystem().delete(leftoverFile, false); + for (Path path : leftoverFiles) { + store.getFileSystem().delete(path, false); + } } catch (IOException e) { LOG.warn("Failed to delete the leftover file {} after an unfinished compaction.", - leftoverFile, e); + leftoverFiles, e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index f5be2b380382..9a00508cd00d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -66,7 +66,7 @@ public List preSelectFilesForCoprocessor(StripeInformationProvider s // We sincerely hope nobody is messing with us with their coprocessors. // If they do, they are very likely to shoot themselves in the foot. // We'll just exclude all the filesCompacting from the list. - ArrayList candidateFiles = new ArrayList<>(si.getStorefiles()); + ArrayList candidateFiles = new ArrayList<>(si.getStoreFiles()); candidateFiles.removeAll(filesCompacting); return candidateFiles; } @@ -114,7 +114,7 @@ public StripeCompactionRequest selectCompaction(StripeInformationProvider si, // This can happen due to region split. We can skip it later; for now preserve // compact-all-things behavior. - Collection allFiles = si.getStorefiles(); + Collection allFiles = si.getStoreFiles(); if (StoreUtils.hasReferences(allFiles)) { LOG.debug("There are references in the store; compacting all files"); long targetKvs = estimateTargetKvs(allFiles, config.getInitialCount()).getFirst(); @@ -165,7 +165,7 @@ public StripeCompactionRequest selectCompaction(StripeInformationProvider si, public boolean needsCompactions(StripeInformationProvider si, List filesCompacting) { // Approximation on whether we need compaction. - return filesCompacting.isEmpty() && (StoreUtils.hasReferences(si.getStorefiles()) + return filesCompacting.isEmpty() && (StoreUtils.hasReferences(si.getStoreFiles()) || (si.getLevel0Files().size() >= this.config.getLevel0MinFiles()) || needsSingleStripeCompaction(si) || hasExpiredStripes(si) || allL0FilesExpired(si)); } @@ -577,7 +577,7 @@ public void setMajorRangeFull() { /** The information about stripes that the policy needs to do its stuff */ public static interface StripeInformationProvider { - public Collection getStorefiles(); + public Collection getStoreFiles(); /** * Gets the start row for a given stripe. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java index bdf3b92db65d..794a707062e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java @@ -185,7 +185,9 @@ public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) th .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind()) .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier()) .withFileStoragePolicy(params.fileStoragePolicy()) - .withWriterCreationTracker(params.writerCreationTracker()); + .withWriterCreationTracker(params.writerCreationTracker()) + .withMaxVersions(ctx.getMaxVersions()).withNewVersionBehavior(ctx.getNewVersionBehavior()) + .withCellComparator(ctx.getComparator()).withIsCompaction(params.isCompaction()); return builder.build(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java index 4754c5ba530b..320fc99f15b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/CreateRandomStoreFile.java @@ -193,7 +193,7 @@ public boolean run(String[] args) throws IOException { int numMetaBlocks = ThreadLocalRandom.current().nextInt(10) + 1; LOG.info("Writing " + numMetaBlocks + " meta blocks"); for (int metaI = 0; metaI < numMetaBlocks; ++metaI) { - sfw.getHFileWriter().appendMetaBlock(generateString(), new BytesWritable(generateValue())); + sfw.getLiveFileWriter().appendMetaBlock(generateString(), new BytesWritable(generateValue())); } sfw.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java index cdabdf27491c..08bbed6e18ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java @@ -129,7 +129,7 @@ public MyCompactor(Configuration conf, HStore store) { @Override protected List commitWriter(StoreFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { - HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer; + HFileWriterImpl writerImpl = (HFileWriterImpl) writer.getLiveFileWriter(); Cell cell = writerImpl.getLastCell(); // The cell should be backend with an KeyOnlyKeyValue. IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index e888639eac4a..ccc755a03580 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -1764,7 +1764,7 @@ public void testAge() throws IOException { Arrays.asList(mockStoreFile(currentTime - 10), mockStoreFile(currentTime - 100), mockStoreFile(currentTime - 1000), mockStoreFile(currentTime - 10000)); StoreFileManager sfm = mock(StoreFileManager.class); - when(sfm.getStorefiles()).thenReturn(storefiles); + when(sfm.getStoreFiles()).thenReturn(storefiles); StoreEngine storeEngine = mock(StoreEngine.class); when(storeEngine.getStoreFileManager()).thenReturn(sfm); return storeEngine; @@ -1805,10 +1805,10 @@ private static class MyStore extends HStore { public List getScanners(List files, boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, - boolean includeMemstoreScanner) throws IOException { + boolean includeMemstoreScanner, boolean onlyLatestVersion) throws IOException { hook.getScanners(this); return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, - stopRow, false, readPt, includeMemstoreScanner); + stopRow, false, readPt, includeMemstoreScanner, onlyLatestVersion); } @Override @@ -1958,7 +1958,7 @@ public void testHFileContextSetWithCFAndTable() throws Exception { .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L) .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true) .includesTag(false).shouldDropBehind(true)); - HFileContext hFileContext = writer.getHFileWriter().getFileContext(); + HFileContext hFileContext = writer.getLiveFileWriter().getFileContext(); assertArrayEquals(family, hFileContext.getColumnFamily()); assertArrayEquals(table, hFileContext.getTableName()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileWriter.java new file mode 100644 index 000000000000..6146605cd23e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileWriter.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.NEW_VERSION_BEHAVIOR; +import static org.apache.hadoop.hbase.regionserver.StoreFileWriter.ENABLE_HISTORICAL_COMPACTION_FILES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.MemoryCompactionPolicy; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Store file writer does not do any compaction. Each cell written to either the live or historical + * file. Regular (i.e., not-raw) scans that reads the latest put cells scans only live files. To + * ensure the correctness of store file writer, we need to verify that live files includes all live + * cells. This test indirectly verify this as follows. The test creates two tables, each with one + * region and one store. The dual file writing (live vs historical) is configured on only one of the + * tables. The test generates exact set of mutations on both tables. These mutations include all + * types of cells and these cells are written to multiple files using multiple memstore flushes. + * After writing all cells, the test first verify that both tables return the same set of cells for + * regular and raw scans. Then the same verification is done after tables are minor and finally + * major compacted. The test also verifies that flushes do not generate historical files and the + * historical files are generated only when historical file generation is enabled (by the config + * hbase.enable.historical.compaction.files). + */ +@Category({ MediumTests.class, RegionServerTests.class }) +@RunWith(Parameterized.class) +public class TestStoreFileWriter { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestStoreFileWriter.class); + private final int ROW_NUM = 100; + private final Random RANDOM = new Random(11); + private final HBaseTestingUtil testUtil = new HBaseTestingUtil(); + private HRegion[] regions = new HRegion[2]; + private final byte[][] qualifiers = + { Bytes.toBytes("0"), Bytes.toBytes("1"), Bytes.toBytes("2") }; + // This keeps track of all cells. It is a list of rows, each row is a list of columns, each + // column is a list of CellInfo object + private ArrayList>> insertedCells; + private TableName[] tableName = new TableName[2]; + private final Configuration conf = testUtil.getConfiguration(); + private int flushCount = 0; + + @Parameterized.Parameter(0) + public KeepDeletedCells keepDeletedCells; + @Parameterized.Parameter(1) + public int maxVersions; + @Parameterized.Parameter(2) + public boolean newVersionBehavior; + + @Parameterized.Parameters(name = "keepDeletedCells={0}, maxVersions={1}, newVersionBehavior={2}") + public static synchronized Collection data() { + return Arrays.asList( + new Object[][] { { KeepDeletedCells.FALSE, 1, true }, { KeepDeletedCells.FALSE, 2, false }, + { KeepDeletedCells.FALSE, 3, true }, { KeepDeletedCells.TRUE, 1, false }, + // { KeepDeletedCells.TRUE, 2, true }, see HBASE-28442 + { KeepDeletedCells.TRUE, 3, false } }); + } + + // In memory representation of a cell. We only need to know timestamp and type field for our + // testing for cell. Please note the row for the cell is implicit in insertedCells. + private static class CellInfo { + long timestamp; + Cell.Type type; + + CellInfo(long timestamp, Cell.Type type) { + this.timestamp = timestamp; + this.type = type; + } + } + + private void createTable(int index, boolean enableDualFileWriter) throws IOException { + tableName[index] = TableName.valueOf(getClass().getSimpleName() + "_" + index); + ColumnFamilyDescriptor familyDescriptor = + ColumnFamilyDescriptorBuilder.newBuilder(HBaseTestingUtil.fam1).setMaxVersions(maxVersions) + .setKeepDeletedCells(keepDeletedCells) + .setValue(NEW_VERSION_BEHAVIOR, Boolean.toString(newVersionBehavior)).build(); + TableDescriptorBuilder builder = + TableDescriptorBuilder.newBuilder(tableName[index]).setColumnFamily(familyDescriptor) + .setValue(ENABLE_HISTORICAL_COMPACTION_FILES, Boolean.toString(enableDualFileWriter)); + testUtil.createTable(builder.build(), null); + regions[index] = testUtil.getMiniHBaseCluster().getRegions(tableName[index]).get(0); + } + + @Before + public void setUp() throws Exception { + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 6); + conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + String.valueOf(MemoryCompactionPolicy.NONE)); + testUtil.startMiniCluster(); + createTable(0, false); + createTable(1, true); + insertedCells = new ArrayList<>(ROW_NUM); + for (int r = 0; r < ROW_NUM; r++) { + insertedCells.add(new ArrayList<>(qualifiers.length)); + for (int q = 0; q < qualifiers.length; q++) { + insertedCells.get(r).add(new ArrayList<>(10)); + } + } + } + + @After + public void tearDown() throws Exception { + this.testUtil.shutdownMiniCluster(); + testUtil.cleanupTestDir(); + } + + @Test + public void testCompactedFiles() throws Exception { + for (int i = 0; i < 10; i++) { + insertRows(ROW_NUM * maxVersions); + deleteRows(ROW_NUM / 8); + deleteRowVersions(ROW_NUM / 8); + deleteColumns(ROW_NUM / 8); + deleteColumnVersions(ROW_NUM / 8); + flushRegion(); + } + + verifyCells(); + + HStore[] stores = new HStore[2]; + + stores[0] = regions[0].getStore(HBaseTestingUtil.fam1); + assertEquals(flushCount, stores[0].getStorefilesCount()); + + stores[1] = regions[1].getStore(HBaseTestingUtil.fam1); + assertEquals(flushCount, stores[1].getStorefilesCount()); + + regions[0].compact(false); + assertEquals(flushCount - stores[0].getCompactedFiles().size() + 1, + stores[0].getStorefilesCount()); + + regions[1].compact(false); + assertEquals(flushCount - stores[1].getCompactedFiles().size() + 2, + stores[1].getStorefilesCount()); + + verifyCells(); + + regions[0].compact(true); + assertEquals(1, stores[0].getStorefilesCount()); + + regions[1].compact(true); + assertEquals(keepDeletedCells == KeepDeletedCells.FALSE ? 1 : 2, + stores[1].getStorefilesCount()); + + verifyCells(); + } + + private void verifyCells() throws Exception { + scanAndCompare(false); + scanAndCompare(true); + } + + private void flushRegion() throws Exception { + regions[0].flush(true); + regions[1].flush(true); + flushCount++; + } + + private Long getRowTimestamp(int row) { + Long maxTimestamp = null; + for (int q = 0; q < qualifiers.length; q++) { + int size = insertedCells.get(row).get(q).size(); + if (size > 0) { + CellInfo mostRecentCellInfo = insertedCells.get(row).get(q).get(size - 1); + if (mostRecentCellInfo.type == Cell.Type.Put) { + if (maxTimestamp == null || maxTimestamp < mostRecentCellInfo.timestamp) { + maxTimestamp = mostRecentCellInfo.timestamp; + } + } + } + } + return maxTimestamp; + } + + private long getNewTimestamp(long timestamp) throws Exception { + long newTimestamp = System.currentTimeMillis(); + if (timestamp == newTimestamp) { + Thread.sleep(1); + newTimestamp = System.currentTimeMillis(); + assertTrue(timestamp < newTimestamp); + } + return newTimestamp; + } + + private void insertRows(int rowCount) throws Exception { + int row; + long timestamp = System.currentTimeMillis(); + for (int r = 0; r < rowCount; r++) { + row = RANDOM.nextInt(ROW_NUM); + Put put = new Put(Bytes.toBytes(String.valueOf(row)), timestamp); + for (int q = 0; q < qualifiers.length; q++) { + put.addColumn(HBaseTestingUtil.fam1, qualifiers[q], + Bytes.toBytes(String.valueOf(timestamp))); + insertedCells.get(row).get(q).add(new CellInfo(timestamp, Cell.Type.Put)); + } + regions[0].put(put); + regions[1].put(put); + timestamp = getNewTimestamp(timestamp); + } + } + + private void deleteRows(int rowCount) throws Exception { + int row; + for (int r = 0; r < rowCount; r++) { + long timestamp = System.currentTimeMillis(); + row = RANDOM.nextInt(ROW_NUM); + Delete delete = new Delete(Bytes.toBytes(String.valueOf(row))); + regions[0].delete(delete); + regions[1].delete(delete); + // For simplicity, the family delete markers are inserted for all columns (instead of + // allocating a separate column for them) in the memory representation of the data stored + // to HBase + for (int q = 0; q < qualifiers.length; q++) { + insertedCells.get(row).get(q).add(new CellInfo(timestamp, Cell.Type.DeleteFamily)); + } + } + } + + private void deleteSingleRowVersion(int row, long timestamp) throws IOException { + Delete delete = new Delete(Bytes.toBytes(String.valueOf(row))); + delete.addFamilyVersion(HBaseTestingUtil.fam1, timestamp); + regions[0].delete(delete); + regions[1].delete(delete); + // For simplicity, the family delete version markers are inserted for all columns (instead of + // allocating a separate column for them) in the memory representation of the data stored + // to HBase + for (int q = 0; q < qualifiers.length; q++) { + insertedCells.get(row).get(q).add(new CellInfo(timestamp, Cell.Type.DeleteFamilyVersion)); + } + } + + private void deleteRowVersions(int rowCount) throws Exception { + int row; + for (int r = 0; r < rowCount; r++) { + row = RANDOM.nextInt(ROW_NUM); + Long timestamp = getRowTimestamp(row); + if (timestamp != null) { + deleteSingleRowVersion(row, timestamp); + } + } + // Just insert one more delete marker possibly does not delete any row version + row = RANDOM.nextInt(ROW_NUM); + deleteSingleRowVersion(row, System.currentTimeMillis()); + } + + private void deleteColumns(int rowCount) throws Exception { + int row; + for (int r = 0; r < rowCount; r++) { + long timestamp = System.currentTimeMillis(); + row = RANDOM.nextInt(ROW_NUM); + int q = RANDOM.nextInt(qualifiers.length); + Delete delete = new Delete(Bytes.toBytes(String.valueOf(row)), timestamp); + delete.addColumns(HBaseTestingUtil.fam1, qualifiers[q], timestamp); + regions[0].delete(delete); + regions[1].delete(delete); + insertedCells.get(row).get(q).add(new CellInfo(timestamp, Cell.Type.DeleteColumn)); + } + } + + private void deleteColumnVersions(int rowCount) throws Exception { + int row; + for (int r = 0; r < rowCount; r++) { + row = RANDOM.nextInt(ROW_NUM); + Long timestamp = getRowTimestamp(row); + if (timestamp != null) { + Delete delete = new Delete(Bytes.toBytes(String.valueOf(row))); + int q = RANDOM.nextInt(qualifiers.length); + delete.addColumn(HBaseTestingUtil.fam1, qualifiers[q], timestamp); + regions[0].delete(delete); + regions[1].delete(delete); + insertedCells.get(row).get(q).add(new CellInfo(timestamp, Cell.Type.Delete)); + } + } + } + + private Scan createScan(boolean raw) { + Scan scan = new Scan(); + scan.readAllVersions(); + scan.setRaw(raw); + return scan; + } + + private void scanAndCompare(boolean raw) throws Exception { + try (RegionScanner firstRS = regions[0].getScanner(createScan(raw))) { + try (RegionScanner secondRS = regions[1].getScanner(createScan(raw))) { + boolean firstHasMore; + boolean secondHasMore; + do { + List firstRowList = new ArrayList<>(); + List secondRowList = new ArrayList<>(); + firstHasMore = firstRS.nextRaw(firstRowList); + secondHasMore = secondRS.nextRaw(secondRowList); + assertEquals(firstRowList.size(), secondRowList.size()); + int size = firstRowList.size(); + for (int i = 0; i < size; i++) { + Cell firstCell = firstRowList.get(i); + Cell secondCell = secondRowList.get(i); + assertTrue(CellUtil.matchingRowColumn(firstCell, secondCell)); + assertTrue(firstCell.getType() == secondCell.getType()); + assertTrue( + Bytes.equals(CellUtil.cloneValue(firstCell), CellUtil.cloneValue(firstCell))); + } + } while (firstHasMore && secondHasMore); + assertEquals(firstHasMore, secondHasMore); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java index b61be8de00cc..ec5401a08b99 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java @@ -95,15 +95,15 @@ public void testInsertFilesIntoL0() throws Exception { MockHStoreFile sf = createFile(); manager.insertNewFiles(al(sf)); assertEquals(1, manager.getStorefileCount()); - Collection filesForGet = manager.getFilesForScan(KEY_A, true, KEY_A, true); + Collection filesForGet = manager.getFilesForScan(KEY_A, true, KEY_A, true, false); assertEquals(1, filesForGet.size()); assertTrue(filesForGet.contains(sf)); // Add some stripes and make sure we get this file for every stripe. manager.addCompactionResults(al(), al(createFile(OPEN_KEY, KEY_B), createFile(KEY_B, OPEN_KEY))); - assertTrue(manager.getFilesForScan(KEY_A, true, KEY_A, true).contains(sf)); - assertTrue(manager.getFilesForScan(KEY_C, true, KEY_C, true).contains(sf)); + assertTrue(manager.getFilesForScan(KEY_A, true, KEY_A, true, false).contains(sf)); + assertTrue(manager.getFilesForScan(KEY_C, true, KEY_C, true, false).contains(sf)); } @Test @@ -117,7 +117,7 @@ public void testClearFiles() throws Exception { Collection allFiles = manager.clearFiles(); assertEquals(4, allFiles.size()); assertEquals(0, manager.getStorefileCount()); - assertEquals(0, manager.getStorefiles().size()); + assertEquals(0, manager.getStoreFiles().size()); } private static ArrayList dumpIterator(Iterator iter) { @@ -541,7 +541,7 @@ private void testPriorityScenario(int expectedPriority, int limit, int stripes, private void verifyInvalidCompactionScenario(StripeStoreFileManager manager, ArrayList filesToCompact, ArrayList filesToInsert) throws Exception { - Collection allFiles = manager.getStorefiles(); + Collection allFiles = manager.getStoreFiles(); assertThrows(IllegalStateException.class, () -> manager.addCompactionResults(filesToCompact, filesToInsert)); verifyAllFiles(manager, allFiles); // must have the same files. @@ -556,7 +556,7 @@ private void verifyGetOrScanScenario(StripeStoreFileManager manager, byte[] star Collection results) throws Exception { start = start != null ? start : HConstants.EMPTY_START_ROW; end = end != null ? end : HConstants.EMPTY_END_ROW; - Collection sfs = manager.getFilesForScan(start, true, end, false); + Collection sfs = manager.getFilesForScan(start, true, end, false, false); assertEquals(results.size(), sfs.size()); for (HStoreFile result : results) { assertTrue(sfs.contains(result)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index c4f98f4d94ad..295d0cc4c2fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -245,13 +245,13 @@ public void testWithReferences() throws Exception { when(ref.isReference()).thenReturn(true); StripeInformationProvider si = mock(StripeInformationProvider.class); Collection sfs = al(ref, createFile()); - when(si.getStorefiles()).thenReturn(sfs); + when(si.getStoreFiles()).thenReturn(sfs); assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); // UnmodifiableCollection does not implement equals so we need to change it here to a // collection that implements it. - assertEquals(si.getStorefiles(), new ArrayList<>(scr.getRequest().getFiles())); + assertEquals(si.getStoreFiles(), new ArrayList<>(scr.getRequest().getFiles())); scr.execute(sc, NoLimitThroughputController.INSTANCE, null); verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), any(), any()); @@ -264,11 +264,11 @@ public void testInitialCountFromL0() throws Exception { StripeCompactionPolicy policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 2, false); StripeCompactionPolicy.StripeInformationProvider si = createStripesL0Only(3, 8); - verifyCompaction(policy, si, si.getStorefiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true); + verifyCompaction(policy, si, si.getStoreFiles(), true, 2, 12L, OPEN_KEY, OPEN_KEY, true); si = createStripesL0Only(3, 10); // If result would be too large, split into smaller parts. - verifyCompaction(policy, si, si.getStorefiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true); + verifyCompaction(policy, si, si.getStoreFiles(), true, 3, 10L, OPEN_KEY, OPEN_KEY, true); policy = createPolicy(conf, defaultSplitSize, defaultSplitCount, 6, false); - verifyCompaction(policy, si, si.getStorefiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true); + verifyCompaction(policy, si, si.getStoreFiles(), true, 6, 5L, OPEN_KEY, OPEN_KEY, true); } @Test @@ -857,7 +857,7 @@ private static StripeInformationProvider createStripesWithFiles(List bou ConcatenatedLists sfs = new ConcatenatedLists<>(); sfs.addAllSublists(stripes); sfs.addSublist(l0Files); - when(si.getStorefiles()).thenReturn(sfs); + when(si.getStoreFiles()).thenReturn(sfs); when(si.getStripes()).thenReturn(stripes); when(si.getStripeBoundaries()).thenReturn(boundariesList); when(si.getStripeCount()).thenReturn(stripes.size());