diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index bd9ce6035ad4..30a923666e16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -128,13 +128,13 @@ S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind } /** The sole reason this class exists is that java has no ref/out/pointer parameters. */ - protected static class FileDetails { + public static class FileDetails { /** Maximum key count after compaction (for blooms) */ public long maxKeyCount = 0; /** Earliest put timestamp if major compaction */ public long earliestPutTs = HConstants.LATEST_TIMESTAMP; /** Latest put timestamp */ - public long latestPutTs = HConstants.LATEST_TIMESTAMP; + public long latestPutTs = 0; /** The last key in the files we're compacting. */ public long maxSeqId = 0; /** Latest memstore read point found in any of the involved files */ @@ -154,11 +154,12 @@ protected static class FileDetails { * @parma major If major compaction * @return The result. */ - private FileDetails getFileDetails(Collection filesToCompact, boolean allFiles, - boolean major) throws IOException { + static FileDetails getFileDetails(Collection filesToCompact, long keepSeqIdPeriod, + boolean allFiles, boolean major, Compression.Algorithm majorCompactionCompression, + Compression.Algorithm minorCompactionCompression) throws IOException { FileDetails fd = new FileDetails(); long oldestHFileTimestampToKeepMVCC = - EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod); + EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * keepSeqIdPeriod); for (HStoreFile file : filesToCompact) { if (allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) { @@ -216,8 +217,9 @@ private FileDetails getFileDetails(Collection filesToCompact, boolea } } tmp = fileInfo.get(TIMERANGE_KEY); - fd.latestPutTs = + long latestPutTs = tmp == null ? HConstants.LATEST_TIMESTAMP : TimeRangeTracker.parseFrom(tmp).getMax(); + fd.latestPutTs = Math.max(fd.latestPutTs, latestPutTs); LOG.debug( "Compacting {}, keycount={}, bloomtype={}, size={}, " + "encoding={}, compression={}, seqNum={}{}", @@ -328,7 +330,9 @@ private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, Sc protected final List compact(final CompactionRequestImpl request, InternalScannerFactory scannerFactory, CellSinkFactory sinkFactory, ThroughputController throughputController, User user) throws IOException { - FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor()); + FileDetails fd = + getFileDetails(request.getFiles(), keepSeqIdPeriod, request.isAllFiles(), request.isMajor(), + majorCompactionCompression, minorCompactionCompression); // Find the smallest read point across all the Scanners. long smallestReadPoint = getSmallestReadPoint(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFileDetails.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFileDetails.java new file mode 100644 index 000000000000..820987c51644 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFileDetails.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileReader; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestFileDetails { + + @Test public void testLatestPutTs() throws IOException { + List sfs = new ArrayList<>(3); + Map fileInfo = new HashMap<>(); + TimeRangeTracker tracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC, 0, 3000); + fileInfo.put(HStoreFile.TIMERANGE_KEY, TimeRangeTracker.toByteArray(tracker)); + sfs.add(createStoreFile(fileInfo)); + tracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC, 0, 2000); + fileInfo.put(HStoreFile.TIMERANGE_KEY, TimeRangeTracker.toByteArray(tracker)); + sfs.add(createStoreFile(fileInfo)); + tracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC, 0, 1000); + fileInfo.put(HStoreFile.TIMERANGE_KEY, TimeRangeTracker.toByteArray(tracker)); + sfs.add(createStoreFile(fileInfo)); + + FileDetails fd = Compactor.getFileDetails(sfs, HConstants.MIN_KEEP_SEQID_PERIOD, false, false, + Compression.Algorithm.NONE, Compression.Algorithm.NONE); + assertEquals(3000, fd.latestPutTs); + + // when TIMERANGE_KEY is null + fileInfo.clear(); + sfs.add(createStoreFile(fileInfo)); + fd = Compactor.getFileDetails(sfs, HConstants.MIN_KEEP_SEQID_PERIOD, false, false, + Compression.Algorithm.NONE, Compression.Algorithm.NONE); + assertEquals(HConstants.LATEST_TIMESTAMP, fd.latestPutTs); + } + + private static HStoreFile createStoreFile(Map fileInfo) + throws IOException { + HStoreFile sf = Mockito.mock(HStoreFile.class); + Mockito.doReturn(System.currentTimeMillis()).when(sf).getModificationTimestamp(); + Mockito.doReturn(0L).when(sf).getMaxSequenceId(); + StoreFileReader reader = Mockito.mock(StoreFileReader.class); + Mockito.doReturn(0L).when(reader).getEntries(); + Mockito.doReturn(new HashMap<>(fileInfo)).when(reader).loadFileInfo(); + Mockito.doReturn(0L).when(reader).length(); + Mockito.doReturn(false).when(reader).isBulkLoaded(); + Mockito.doReturn(BloomType.NONE).when(reader).getBloomFilterType(); + HFile.Reader hfr = Mockito.mock(HFile.Reader.class); + Mockito.doReturn(DataBlockEncoding.NONE).when(hfr).getDataBlockEncoding(); + Mockito.doReturn(hfr).when(reader).getHFileReader(); + Mockito.doReturn(reader).when(sf).getReader(); + return sf; + } + +}