diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index c45fdff7ca16..01fe0005f048 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map.Entry; import java.util.Optional; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -144,17 +143,16 @@ public InternalScanner createScanner(ScanInfo scanInfo, List s }; private final CellSinkFactory writerFactory = - new CellSinkFactory() { - @Override - public StoreFileWriter createWriter(InternalScanner scanner, - org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, - boolean shouldDropBehind, boolean major) throws IOException { - // make this writer with tags always because of possible new cells with tags. - return store.createWriterInTmp(fd.maxKeyCount, - major ? majorCompactionCompression : minorCompactionCompression, - true, true, true, shouldDropBehind); - } - }; + new CellSinkFactory() { + @Override + public StoreFileWriter createWriter(InternalScanner scanner, + org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, + boolean shouldDropBehind, boolean major) throws IOException { + // make this writer with tags always because of possible new cells with tags. + return store.getStoreEngine().createWriter( + createParams(fd, shouldDropBehind, major).includeMVCCReadpoint(true).includesTag(true)); + } + }; public DefaultMobStoreCompactor(Configuration conf, HStore store) { super(conf, store); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index 480b85c58dfe..4a1dc7b33a51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -25,7 +25,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -127,8 +126,7 @@ public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, synchronized (flushLock) { status.setStatus("Flushing " + store + ": creating writer"); // Write the map out to the disk - writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(), - false, true, true, false); + writer = createWriter(snapshot, true); IOException e = null; try { // It's a mob store, flush the cells in a mob way. This is the difference of flushing diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java new file mode 100644 index 000000000000..10cd9f009e4a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public final class CreateStoreFileWriterParams { + + private long maxKeyCount; + + private Compression.Algorithm compression; + + private boolean isCompaction; + + private boolean includeMVCCReadpoint; + + private boolean includesTag; + + private boolean shouldDropBehind; + + private long totalCompactedFilesSize = -1; + + private String fileStoragePolicy = HConstants.EMPTY_STRING; + + private CreateStoreFileWriterParams() { + } + + public long maxKeyCount() { + return maxKeyCount; + } + + public CreateStoreFileWriterParams maxKeyCount(long maxKeyCount) { + this.maxKeyCount = maxKeyCount; + return this; + } + + public Compression.Algorithm compression() { + return compression; + } + + /** + * Set the compression algorithm to use + */ + public CreateStoreFileWriterParams compression(Compression.Algorithm compression) { + this.compression = compression; + return this; + } + + public boolean isCompaction() { + return isCompaction; + } + + /** + * Whether we are creating a new file in a compaction + */ + public CreateStoreFileWriterParams isCompaction(boolean isCompaction) { + this.isCompaction = isCompaction; + return this; + } + + public boolean includeMVCCReadpoint() { + return includeMVCCReadpoint; + } + + /** + * Whether to include MVCC or not + */ + public CreateStoreFileWriterParams includeMVCCReadpoint(boolean includeMVCCReadpoint) { + this.includeMVCCReadpoint = includeMVCCReadpoint; + return this; + } + + public boolean includesTag() { + return includesTag; + } + + /** + * Whether to includesTag or not + */ + public CreateStoreFileWriterParams includesTag(boolean includesTag) { + this.includesTag = includesTag; + return this; + } + + public boolean shouldDropBehind() { + return shouldDropBehind; + } + + public CreateStoreFileWriterParams shouldDropBehind(boolean shouldDropBehind) { + this.shouldDropBehind = shouldDropBehind; + return this; + } + + public long totalCompactedFilesSize() { + return totalCompactedFilesSize; + } + + public CreateStoreFileWriterParams totalCompactedFilesSize(long totalCompactedFilesSize) { + this.totalCompactedFilesSize = totalCompactedFilesSize; + return this; + } + + public String fileStoragePolicy() { + return fileStoragePolicy; + } + + public CreateStoreFileWriterParams fileStoragePolicy(String fileStoragePolicy) { + this.fileStoragePolicy = fileStoragePolicy; + return this; + } + + public static CreateStoreFileWriterParams create() { + return new CreateStoreFileWriterParams(); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java index 1df953d93c96..7422d9112eab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java @@ -19,18 +19,17 @@ import java.io.IOException; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; +import org.apache.yetus.audience.InterfaceAudience; /** * HBASE-15400 This store engine allows us to store data in date tiered layout with exponential diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index 58f8bbbb6ac5..693b9c93b9fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellComparator; @@ -39,8 +38,8 @@ * their derivatives. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class DefaultStoreEngine extends StoreEngine< - DefaultStoreFlusher, RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> { +public class DefaultStoreEngine extends StoreEngine { public static final String DEFAULT_STORE_FLUSHER_CLASS_KEY = "hbase.hstore.defaultengine.storeflusher.class"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index a7d7fb1f3d56..306760d7ce6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -21,15 +21,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; - -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Default implementation of StoreFlusher. @@ -60,9 +59,7 @@ public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, synchronized (flushLock) { status.setStatus("Flushing " + store + ": creating writer"); // Write the map out to the disk - writer = store.createWriterInTmp(cellsCount, - store.getColumnFamilyDescriptor().getCompressionType(), false, true, - snapshot.isTagsPresent(), false); + writer = createWriter(snapshot, false); IOException e = null; try { performFlush(scanner, writer, throughputController); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 7ce7f0310c7d..b00a50c522fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -28,7 +28,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -158,7 +157,7 @@ protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, protected StoreEngine createStoreEngine(HStore store, Configuration conf, CellComparator cellComparator) throws IOException { MobStoreEngine engine = new MobStoreEngine(); - engine.createComponents(conf, store, cellComparator); + engine.createComponentsOnce(conf, store, cellComparator); return engine; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index dd26f1ca9cc1..506e816d1a6d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -142,7 +142,7 @@ public Path getRegionDir() { // Temp Helpers // =========================================================================== /** @return {@link Path} to the region's temp directory, used for file creations */ - Path getTempDir() { + public Path getTempDir() { return new Path(getRegionDir(), REGION_TEMP_DIR); } @@ -237,11 +237,7 @@ public String getStoragePolicyName(String familyName) { * @param familyName Column Family Name * @return a set of {@link StoreFileInfo} for the specified family. */ - public Collection getStoreFiles(final byte[] familyName) throws IOException { - return getStoreFiles(Bytes.toString(familyName)); - } - - public Collection getStoreFiles(final String familyName) throws IOException { + public List getStoreFiles(final String familyName) throws IOException { return getStoreFiles(familyName, true); } @@ -251,7 +247,7 @@ public Collection getStoreFiles(final String familyName) throws I * @param familyName Column Family Name * @return a set of {@link StoreFileInfo} for the specified family. */ - public Collection getStoreFiles(final String familyName, final boolean validate) + public List getStoreFiles(final String familyName, final boolean validate) throws IOException { Path familyDir = getStoreDir(familyName); FileStatus[] files = CommonFSUtils.listStatus(this.fs, familyDir); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c4e34beca406..34365b944612 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.io.InterruptedIOException; import java.net.InetSocketAddress; @@ -47,8 +48,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Predicate; import java.util.function.ToLongFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -70,17 +69,12 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; -import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.quotas.RegionSizeStore; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; @@ -110,7 +104,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; -import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils; @@ -165,16 +158,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, private boolean cacheOnWriteLogged; - /** - * RWLock for store operations. - * Locked in shared mode when the list of component stores is looked at: - * - all reads/writes to table data - * - checking for split - * Locked in exclusive mode when the list of component stores is modified: - * - closing - * - completing a compaction - */ - final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); /** * Lock specific to archiving compacted store files. This avoids races around * the combination of retrieving the list of compacted files and moving them to @@ -290,14 +273,8 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family, } this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator()); - List hStoreFiles = loadStoreFiles(warmup); - // Move the storeSize calculation out of loadStoreFiles() method, because the secondary read - // replica's refreshStoreFiles() will also use loadStoreFiles() to refresh its store files and - // update the storeSize in the refreshStoreSizeAndTotalBytes() finally (just like compaction) , so - // no need calculate the storeSize twice. - this.storeSize.addAndGet(getStorefilesSize(hStoreFiles, sf -> true)); - this.totalUncompressedBytes.addAndGet(getTotalUncompressedBytes(hStoreFiles)); - this.storeEngine.getStoreFileManager().loadFiles(hStoreFiles); + storeEngine.initialize(warmup); + refreshStoreSizeAndTotalBytes(); flushRetriesNumber = conf.getInt( "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); @@ -513,102 +490,18 @@ void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) { this.dataBlockEncoder = blockEncoder; } - /** - * Creates an unsorted list of StoreFile loaded in parallel - * from the given directory. - */ - private List loadStoreFiles(boolean warmup) throws IOException { - Collection files = getRegionFileSystem().getStoreFiles(getColumnFamilyName()); - return openStoreFiles(files, warmup); - } - - private List openStoreFiles(Collection files, boolean warmup) - throws IOException { - if (CollectionUtils.isEmpty(files)) { - return Collections.emptyList(); - } - // initialize the thread pool for opening store files in parallel.. - ThreadPoolExecutor storeFileOpenerThreadPool = - this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpener-" - + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName()); - CompletionService completionService = - new ExecutorCompletionService<>(storeFileOpenerThreadPool); - - int totalValidStoreFile = 0; - for (StoreFileInfo storeFileInfo : files) { - // open each store file in parallel - completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo)); - totalValidStoreFile++; - } - - Set compactedStoreFiles = new HashSet<>(); - ArrayList results = new ArrayList<>(files.size()); - IOException ioe = null; - try { - for (int i = 0; i < totalValidStoreFile; i++) { - try { - HStoreFile storeFile = completionService.take().get(); - if (storeFile != null) { - LOG.debug("loaded {}", storeFile); - results.add(storeFile); - compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles()); - } - } catch (InterruptedException e) { - if (ioe == null) { - ioe = new InterruptedIOException(e.getMessage()); - } - } catch (ExecutionException e) { - if (ioe == null) { - ioe = new IOException(e.getCause()); - } - } - } - } finally { - storeFileOpenerThreadPool.shutdownNow(); - } - if (ioe != null) { - // close StoreFile readers - boolean evictOnClose = - getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true; - for (HStoreFile file : results) { - try { - if (file != null) { - file.closeStoreFile(evictOnClose); - } - } catch (IOException e) { - LOG.warn("Could not close store file {}", file, e); - } - } - throw ioe; - } - - // Should not archive the compacted store files when region warmup. See HBASE-22163. - if (!warmup) { - // Remove the compacted files from result - List filesToRemove = new ArrayList<>(compactedStoreFiles.size()); - for (HStoreFile storeFile : results) { - if (compactedStoreFiles.contains(storeFile.getPath().getName())) { - LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this); - storeFile.getReader().close(storeFile.getCacheConf() != null ? - storeFile.getCacheConf().shouldEvictOnClose() : true); - filesToRemove.add(storeFile); - } - } - results.removeAll(filesToRemove); - if (!filesToRemove.isEmpty() && this.isPrimaryReplicaStore()) { - LOG.debug("Moving the files {} to archive", filesToRemove); - getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), - filesToRemove); - } - } - - return results; + private void postRefreshStoreFiles() throws IOException { + // Advance the memstore read point to be at least the new store files seqIds so that + // readers might pick it up. This assumes that the store is not getting any writes (otherwise + // in-flight transactions might be made visible) + getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo); + refreshStoreSizeAndTotalBytes(); } @Override public void refreshStoreFiles() throws IOException { - Collection newFiles = getRegionFileSystem().getStoreFiles(getColumnFamilyName()); - refreshStoreFilesInternal(newFiles); + storeEngine.refreshStoreFiles(); + postRefreshStoreFiles(); } /** @@ -616,89 +509,8 @@ public void refreshStoreFiles() throws IOException { * region replicas to keep up to date with the primary region files. */ public void refreshStoreFiles(Collection newFiles) throws IOException { - List storeFiles = new ArrayList<>(newFiles.size()); - for (String file : newFiles) { - storeFiles.add(getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file)); - } - refreshStoreFilesInternal(storeFiles); - } - - /** - * Checks the underlying store files, and opens the files that have not - * been opened, and removes the store file readers for store files no longer - * available. Mainly used by secondary region replicas to keep up to date with - * the primary region files. - */ - private void refreshStoreFilesInternal(Collection newFiles) throws IOException { - StoreFileManager sfm = storeEngine.getStoreFileManager(); - Collection currentFiles = sfm.getStorefiles(); - Collection compactedFiles = sfm.getCompactedfiles(); - if (currentFiles == null) { - currentFiles = Collections.emptySet(); - } - if (newFiles == null) { - newFiles = Collections.emptySet(); - } - if (compactedFiles == null) { - compactedFiles = Collections.emptySet(); - } - - HashMap currentFilesSet = new HashMap<>(currentFiles.size()); - for (HStoreFile sf : currentFiles) { - currentFilesSet.put(sf.getFileInfo(), sf); - } - HashMap compactedFilesSet = new HashMap<>(compactedFiles.size()); - for (HStoreFile sf : compactedFiles) { - compactedFilesSet.put(sf.getFileInfo(), sf); - } - - Set newFilesSet = new HashSet(newFiles); - // Exclude the files that have already been compacted - newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet()); - Set toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet()); - Set toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet); - - if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) { - return; - } - - LOG.info("Refreshing store files for " + this + " files to add: " - + toBeAddedFiles + " files to remove: " + toBeRemovedFiles); - - Set toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); - for (StoreFileInfo sfi : toBeRemovedFiles) { - toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); - } - - // try to open the files - List openedFiles = openStoreFiles(toBeAddedFiles, false); - - // propogate the file changes to the underlying store file manager - replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception - - // Advance the memstore read point to be at least the new store files seqIds so that - // readers might pick it up. This assumes that the store is not getting any writes (otherwise - // in-flight transactions might be made visible) - if (!toBeAddedFiles.isEmpty()) { - // we must have the max sequence id here as we do have several store files - region.getMVCC().advanceTo(this.getMaxSequenceId().getAsLong()); - } - - refreshStoreSizeAndTotalBytes(); - } - - protected HStoreFile createStoreFileAndReader(final Path p) throws IOException { - StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), - p, isPrimaryReplicaStore()); - return createStoreFileAndReader(info); - } - - private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { - info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); - HStoreFile storeFile = new HStoreFile(info, getColumnFamilyDescriptor().getBloomFilterType(), - getCacheConfig()); - storeFile.initReader(); - return storeFile; + storeEngine.refreshStoreFiles(newFiles); + postRefreshStoreFiles(); } /** @@ -721,7 +533,7 @@ public void stopReplayingFromWAL(){ * Adds a value to the memstore */ public void add(final Cell cell, MemStoreSizing memstoreSizing) { - lock.readLock().lock(); + storeEngine.readLock(); try { if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", @@ -729,7 +541,7 @@ public void add(final Cell cell, MemStoreSizing memstoreSizing) { } this.memstore.add(cell, memstoreSizing); } finally { - lock.readLock().unlock(); + storeEngine.readUnlock(); currentParallelPutCount.decrementAndGet(); } } @@ -738,7 +550,7 @@ public void add(final Cell cell, MemStoreSizing memstoreSizing) { * Adds the specified value to the memstore */ public void add(final Iterable cells, MemStoreSizing memstoreSizing) { - lock.readLock().lock(); + storeEngine.readLock(); try { if (this.currentParallelPutCount.getAndIncrement() > this.parallelPutCountPrintThreshold) { LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too busy!", @@ -746,7 +558,7 @@ public void add(final Iterable cells, MemStoreSizing memstoreSizing) { } memstore.add(cells, memstoreSizing); } finally { - lock.readLock().unlock(); + storeEngine.readUnlock(); currentParallelPutCount.decrementAndGet(); } } @@ -869,17 +681,16 @@ public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws LOG.info("Loaded HFile " + srcPath + " into " + this + " as " + dstPath + " - updating store file list."); - HStoreFile sf = createStoreFileAndReader(dstPath); + HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); bulkLoadHFile(sf); - LOG.info("Successfully loaded {} into {} (new location: {})", - srcPath, this, dstPath); + LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, this, dstPath); return dstPath; } public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { - HStoreFile sf = createStoreFileAndReader(fileInfo); + HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo); bulkLoadHFile(sf); } @@ -887,28 +698,74 @@ private void bulkLoadHFile(HStoreFile sf) throws IOException { StoreFileReader r = sf.getReader(); this.storeSize.addAndGet(r.length()); this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); - - // Append the new storefile into the list - this.lock.writeLock().lock(); - try { - this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf)); - } finally { - // We need the lock, as long as we are updating the storeFiles - // or changing the memstore. Let us release it before calling - // notifyChangeReadersObservers. See HBASE-4485 for a possible - // deadlock scenario that could have happened if continue to hold - // the lock. - this.lock.writeLock().unlock(); - } + storeEngine.addStoreFiles(Lists.newArrayList(sf)); LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this); if (LOG.isTraceEnabled()) { - String traceMessage = "BULK LOAD time,size,store size,store files [" - + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize - + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; + String traceMessage = "BULK LOAD time,size,store size,store files [" + + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize + "," + + storeEngine.getStoreFileManager().getStorefileCount() + "]"; LOG.trace(traceMessage); } } + private ImmutableCollection closeWithoutLock() throws IOException { + // Clear so metrics doesn't find them. + ImmutableCollection result = storeEngine.getStoreFileManager().clearFiles(); + Collection compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles(); + // clear the compacted files + if (CollectionUtils.isNotEmpty(compactedfiles)) { + removeCompactedfiles(compactedfiles, + getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true); + } + if (!result.isEmpty()) { + // initialize the thread pool for closing store files in parallel. + ThreadPoolExecutor storeFileCloserThreadPool = + this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" + + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName()); + + // close each store file in parallel + CompletionService completionService = + new ExecutorCompletionService<>(storeFileCloserThreadPool); + for (HStoreFile f : result) { + completionService.submit(new Callable() { + @Override + public Void call() throws IOException { + boolean evictOnClose = + getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : true; + f.closeStoreFile(evictOnClose); + return null; + } + }); + } + + IOException ioe = null; + try { + for (int i = 0; i < result.size(); i++) { + try { + Future future = completionService.take(); + future.get(); + } catch (InterruptedException e) { + if (ioe == null) { + ioe = new InterruptedIOException(); + ioe.initCause(e); + } + } catch (ExecutionException e) { + if (ioe == null) { + ioe = new IOException(e.getCause()); + } + } + } + } finally { + storeFileCloserThreadPool.shutdownNow(); + } + if (ioe != null) { + throw ioe; + } + } + LOG.trace("Closed {}", this); + return result; + } + /** * Close all the readers We don't need to worry about subsequent requests because the Region holds * a write lock that will prevent any more reads or writes. @@ -916,67 +773,18 @@ private void bulkLoadHFile(HStoreFile sf) throws IOException { * @throws IOException on failure */ public ImmutableCollection close() throws IOException { + // findbugs can not recognize storeEngine.writeLock is just a lock operation so it will report + // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try finally... + // Change later if findbugs becomes smarter in the future. this.archiveLock.lock(); - this.lock.writeLock().lock(); try { - // Clear so metrics doesn't find them. - ImmutableCollection result = storeEngine.getStoreFileManager().clearFiles(); - Collection compactedfiles = - storeEngine.getStoreFileManager().clearCompactedFiles(); - // clear the compacted files - if (CollectionUtils.isNotEmpty(compactedfiles)) { - removeCompactedfiles(compactedfiles, getCacheConfig() != null ? - getCacheConfig().shouldEvictOnClose() : true); - } - if (!result.isEmpty()) { - // initialize the thread pool for closing store files in parallel. - ThreadPoolExecutor storeFileCloserThreadPool = this.region - .getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" - + this.region.getRegionInfo().getEncodedName() + "-" + this.getColumnFamilyName()); - - // close each store file in parallel - CompletionService completionService = - new ExecutorCompletionService<>(storeFileCloserThreadPool); - for (HStoreFile f : result) { - completionService.submit(new Callable() { - @Override - public Void call() throws IOException { - boolean evictOnClose = - getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true; - f.closeStoreFile(evictOnClose); - return null; - } - }); - } - - IOException ioe = null; - try { - for (int i = 0; i < result.size(); i++) { - try { - Future future = completionService.take(); - future.get(); - } catch (InterruptedException e) { - if (ioe == null) { - ioe = new InterruptedIOException(); - ioe.initCause(e); - } - } catch (ExecutionException e) { - if (ioe == null) { - ioe = new IOException(e.getCause()); - } - } - } - } finally { - storeFileCloserThreadPool.shutdownNow(); - } - if (ioe != null) { - throw ioe; - } + this.storeEngine.writeLock(); + try { + return closeWithoutLock(); + } finally { + this.storeEngine.writeUnlock(); } - LOG.trace("Closed {}", this); - return result; } finally { - this.lock.writeLock().unlock(); this.archiveLock.unlock(); } } @@ -1006,7 +814,7 @@ protected List flushCache(final long logCacheFlushId, MemStoreSnapshot sna try { for (Path pathName : pathNames) { lastPathName = pathName; - validateStoreFile(pathName); + storeEngine.validateStoreFile(pathName); } return pathNames; } catch (Exception e) { @@ -1052,167 +860,18 @@ public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException { } Path dstPath = getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path); - HStoreFile sf = createStoreFileAndReader(dstPath); + HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath); StoreFileReader r = sf.getReader(); this.storeSize.addAndGet(r.length()); this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); - this.lock.writeLock().lock(); - try { - this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf)); - } finally { - this.lock.writeLock().unlock(); - } + storeEngine.addStoreFiles(Lists.newArrayList(sf)); LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf, r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1)); return sf; } - /** - * Commit the given {@code files}. - *

- * We will move the file into data directory, and open it. - * @param files the files want to commit - * @param validate whether to validate the store files - * @return the committed store files - */ - private List commitStoreFiles(List files, boolean validate) throws IOException { - List committedFiles = new ArrayList<>(files.size()); - HRegionFileSystem hfs = getRegionFileSystem(); - String familyName = getColumnFamilyName(); - for (Path file : files) { - try { - if (validate) { - validateStoreFile(file); - } - Path committedPath = hfs.commitStoreFile(familyName, file); - HStoreFile sf = createStoreFileAndReader(committedPath); - committedFiles.add(sf); - } catch (IOException e) { - LOG.error("Failed to commit store file {}", file, e); - // Try to delete the files we have committed before. - // It is OK to fail when deleting as leaving the file there does not cause any data - // corruption problem. It just introduces some duplicated data which may impact read - // performance a little when reading before compaction. - for (HStoreFile sf : committedFiles) { - Path pathToDelete = sf.getPath(); - try { - sf.deleteStoreFile(); - } catch (IOException deleteEx) { - LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete, - deleteEx); - } - } - throw new IOException("Failed to commit the flush", e); - } - } - return committedFiles; - } - - public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, - boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, - boolean shouldDropBehind) throws IOException { - return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint, - includesTag, shouldDropBehind, -1, HConstants.EMPTY_STRING); - } - - /** - * @param compression Compression algorithm to use - * @param isCompaction whether we are creating a new file in a compaction - * @param includeMVCCReadpoint - whether to include MVCC or not - * @param includesTag - includesTag or not - * @return Writer for a new StoreFile in the tmp dir. - */ - // TODO : allow the Writer factory to create Writers of ShipperListener type only in case of - // compaction - public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, - boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, - boolean shouldDropBehind, long totalCompactedFilesSize, String fileStoragePolicy) - throws IOException { - // creating new cache config for each new writer - final CacheConfig cacheConf = getCacheConfig(); - final CacheConfig writerCacheConf = new CacheConfig(cacheConf); - if (isCompaction) { - // Don't cache data on write on compactions, unless specifically configured to do so - // Cache only when total file size remains lower than configured threshold - final boolean cacheCompactedBlocksOnWrite = - getCacheConfig().shouldCacheCompactedBlocksOnWrite(); - // if data blocks are to be cached on write - // during compaction, we should forcefully - // cache index and bloom blocks as well - if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf - .getCacheCompactedBlocksOnWriteThreshold()) { - writerCacheConf.enableCacheOnWrite(); - if (!cacheOnWriteLogged) { - LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " + - "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this); - cacheOnWriteLogged = true; - } - } else { - writerCacheConf.setCacheDataOnWrite(false); - if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { - // checking condition once again for logging - LOG.debug( - "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " - + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}", - this, totalCompactedFilesSize, - cacheConf.getCacheCompactedBlocksOnWriteThreshold()); - } - } - } else { - final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); - if (shouldCacheDataOnWrite) { - writerCacheConf.enableCacheOnWrite(); - if (!cacheOnWriteLogged) { - LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " + - "Index blocks and Bloom filter blocks", this); - cacheOnWriteLogged = true; - } - } - } - Encryption.Context encryptionContext = storeContext.getEncryptionContext(); - HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag, - encryptionContext); - Path familyTempDir = new Path(getRegionFileSystem().getTempDir(), getColumnFamilyName()); - StoreFileWriter.Builder builder = - new StoreFileWriter.Builder(conf, writerCacheConf, getFileSystem()) - .withOutputDir(familyTempDir) - .withBloomType(storeContext.getBloomFilterType()) - .withMaxKeyCount(maxKeyCount) - .withFavoredNodes(storeContext.getFavoredNodes()) - .withFileContext(hFileContext) - .withShouldDropCacheBehind(shouldDropBehind) - .withCompactedFilesSupplier(storeContext.getCompactedFilesSupplier()) - .withFileStoragePolicy(fileStoragePolicy); - return builder.build(); - } - - private HFileContext createFileContext(Compression.Algorithm compression, - boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) { - if (compression == null) { - compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; - } - ColumnFamilyDescriptor family = getColumnFamilyDescriptor(); - HFileContext hFileContext = new HFileContextBuilder() - .withIncludesMvcc(includeMVCCReadpoint) - .withIncludesTags(includesTag) - .withCompression(compression) - .withCompressTags(family.isCompressTags()) - .withChecksumType(StoreUtils.getChecksumType(conf)) - .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) - .withBlockSize(family.getBlocksize()) - .withHBaseCheckSum(true) - .withDataBlockEncoding(family.getDataBlockEncoding()) - .withEncryptionContext(encryptionContext) - .withCreateTime(EnvironmentEdgeManager.currentTime()) - .withColumnFamily(getColumnFamilyDescriptor().getName()) - .withTableName(getTableName().getName()) - .withCellComparator(getComparator()) - .build(); - return hFileContext; - } - private long getTotalSize(Collection sfs) { return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); } @@ -1222,18 +881,8 @@ private long getTotalSize(Collection sfs) { * @param sfs Store files * @return Whether compaction is required. */ - private boolean updateStorefiles(List sfs, long snapshotId) throws IOException { - this.lock.writeLock().lock(); - try { - this.storeEngine.getStoreFileManager().insertNewFiles(sfs); - } finally { - // We need the lock, as long as we are updating the storeFiles - // or changing the memstore. Let us release it before calling - // notifyChangeReadersObservers. See HBASE-4485 for a possible - // deadlock scenario that could have happened if continue to hold - // the lock. - this.lock.writeLock().unlock(); - } + private boolean completeFlush(List sfs, long snapshotId) throws IOException { + storeEngine.addStoreFiles(sfs); // We do not need to call clearSnapshot method inside the write lock. // The clearSnapshot itself is thread safe, which can be called at the same time with other // memstore operations expect snapshot and clearSnapshot. And for these two methods, in HRegion @@ -1259,11 +908,11 @@ private boolean updateStorefiles(List sfs, long snapshotId) throws I private void notifyChangedReadersObservers(List sfs) throws IOException { for (ChangedReadersObserver o : this.changedReaderObservers) { List memStoreScanners; - this.lock.readLock().lock(); + this.storeEngine.readLock(); try { memStoreScanners = this.memstore.getScanners(o.getReadPoint()); } finally { - this.lock.readLock().unlock(); + this.storeEngine.readUnlock(); } o.updateReaders(sfs, memStoreScanners); } @@ -1305,13 +954,13 @@ public List getScanners(boolean cacheBlocks, boolean usePread, byte[] stopRow, boolean includeStopRow, long readPt) throws IOException { Collection storeFilesToScan; List memStoreScanners; - this.lock.readLock().lock(); + this.storeEngine.readLock(); try { storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow, includeStartRow, stopRow, includeStopRow); memStoreScanners = this.memstore.getScanners(readPt); } finally { - this.lock.readLock().unlock(); + this.storeEngine.readUnlock(); } try { @@ -1388,11 +1037,11 @@ public List getScanners(List files, boolean cacheBl boolean includeMemstoreScanner) throws IOException { List memStoreScanners = null; if (includeMemstoreScanner) { - this.lock.readLock().lock(); + this.storeEngine.readLock(); try { memStoreScanners = this.memstore.getScanners(readPt); } finally { - this.lock.readLock().unlock(); + this.storeEngine.readUnlock(); } } try { @@ -1508,14 +1157,13 @@ protected List doCompaction(CompactionRequestImpl cr, List newFiles) throws IOException { // Do the steps necessary to complete the compaction. setStoragePolicyFromFileName(newFiles); - List sfs = commitStoreFiles(newFiles, true); + List sfs = storeEngine.commitStoreFiles(newFiles, true); if (this.getCoprocessorHost() != null) { for (HStoreFile sf : sfs) { getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user); } } - writeCompactionWalRecord(filesToCompact, sfs); - replaceStoreFiles(filesToCompact, sfs); + replaceStoreFiles(filesToCompact, sfs, true); if (cr.isMajor()) { majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); @@ -1579,25 +1227,24 @@ private void writeCompactionWalRecord(Collection filesCompacted, this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC()); } - void replaceStoreFiles(Collection compactedFiles, Collection result) - throws IOException { - this.lock.writeLock().lock(); - try { - this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result); - synchronized (filesCompacting) { - filesCompacting.removeAll(compactedFiles); - } - - // These may be null when the RS is shutting down. The space quota Chores will fix the Region - // sizes later so it's not super-critical if we miss these. - RegionServerServices rsServices = region.getRegionServerServices(); - if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) { - updateSpaceQuotaAfterFileReplacement( - rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(), - compactedFiles, result); - } - } finally { - this.lock.writeLock().unlock(); + @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", + allowedOnPath = ".*/(HStore|TestHStore).java") + void replaceStoreFiles(Collection compactedFiles, Collection result, + boolean writeCompactionMarker) throws IOException { + storeEngine.replaceStoreFiles(compactedFiles, result); + if (writeCompactionMarker) { + writeCompactionWalRecord(compactedFiles, result); + } + synchronized (filesCompacting) { + filesCompacting.removeAll(compactedFiles); + } + // These may be null when the RS is shutting down. The space quota Chores will fix the Region + // sizes later so it's not super-critical if we miss these. + RegionServerServices rsServices = region.getRegionServerServices(); + if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != null) { + updateSpaceQuotaAfterFileReplacement( + rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), getRegionInfo(), + compactedFiles, result); } } @@ -1720,7 +1367,7 @@ public void replayCompactionMarker(CompactionDescriptor compaction, boolean pick for (String compactionOutput : compactionOutputs) { StoreFileInfo storeFileInfo = getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), compactionOutput); - HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); + HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); outputStoreFiles.add(storeFile); } } @@ -1728,7 +1375,7 @@ public void replayCompactionMarker(CompactionDescriptor compaction, boolean pick if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) { LOG.info("Replaying compaction marker, replacing input files: " + inputStoreFiles + " with output files : " + outputStoreFiles); - this.replaceStoreFiles(inputStoreFiles, outputStoreFiles); + this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false); this.refreshStoreSizeAndTotalBytes(); } } @@ -1737,14 +1384,14 @@ public void replayCompactionMarker(CompactionDescriptor compaction, boolean pick public boolean hasReferences() { // Grab the read lock here, because we need to ensure that: only when the atomic // replaceStoreFiles(..) finished, we can get all the complete store file list. - this.lock.readLock().lock(); + this.storeEngine.readLock(); try { // Merge the current store files with compacted files here due to HBASE-20940. Collection allStoreFiles = new ArrayList<>(getStorefiles()); allStoreFiles.addAll(getCompactedFiles()); return StoreUtils.hasReferences(allStoreFiles); } finally { - this.lock.readLock().unlock(); + this.storeEngine.readUnlock(); } } @@ -1784,7 +1431,7 @@ public Optional requestCompaction(int priority, final CompactionContext compaction = storeEngine.createCompaction(); CompactionRequestImpl request = null; - this.lock.readLock().lock(); + this.storeEngine.readLock(); try { synchronized (filesCompacting) { // First, see if coprocessor would want to override selection. @@ -1857,7 +1504,7 @@ public Optional requestCompaction(int priority, request.setTracker(tracker); } } finally { - this.lock.readLock().unlock(); + this.storeEngine.readUnlock(); } if (LOG.isDebugEnabled()) { @@ -1890,7 +1537,7 @@ private void removeUnneededFiles() throws IOException { this, getColumnFamilyDescriptor().getMinVersions()); return; } - this.lock.readLock().lock(); + this.storeEngine.readLock(); Collection delSfs = null; try { synchronized (filesCompacting) { @@ -1902,7 +1549,7 @@ private void removeUnneededFiles() throws IOException { } } } finally { - this.lock.readLock().unlock(); + this.storeEngine.readUnlock(); } if (CollectionUtils.isEmpty(delSfs)) { @@ -1910,8 +1557,7 @@ private void removeUnneededFiles() throws IOException { } Collection newFiles = Collections.emptyList(); // No new files. - writeCompactionWalRecord(delSfs, newFiles); - replaceStoreFiles(delSfs, newFiles); + replaceStoreFiles(delSfs, newFiles, true); refreshStoreSizeAndTotalBytes(); LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this + "; total size is " @@ -1933,25 +1579,6 @@ protected void finishCompactionRequest(CompactionRequestImpl cr) { } } - /** - * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive - * operation. - * @param path the path to the store file - */ - private void validateStoreFile(Path path) throws IOException { - HStoreFile storeFile = null; - try { - storeFile = createStoreFileAndReader(path); - } catch (IOException e) { - LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e); - throw e; - } finally { - if (storeFile != null) { - storeFile.closeStoreFile(false); - } - } - } - /** * Update counts. */ @@ -1997,7 +1624,7 @@ public boolean canSplit() { * Determines if Store should be split. */ public Optional getSplitPoint() { - this.lock.readLock().lock(); + this.storeEngine.readLock(); try { // Should already be enforced by the split policy! assert !this.getRegionInfo().isMetaRegion(); @@ -2010,7 +1637,7 @@ public Optional getSplitPoint() { } catch(IOException e) { LOG.warn("Failed getting store size for {}", this, e); } finally { - this.lock.readLock().unlock(); + this.storeEngine.readUnlock(); } return Optional.empty(); } @@ -2043,7 +1670,7 @@ public void triggerMajorCompaction() { */ public KeyValueScanner getScanner(Scan scan, final NavigableSet targetCols, long readPt) throws IOException { - lock.readLock().lock(); + storeEngine.readLock(); try { ScanInfo scanInfo; if (this.getCoprocessorHost() != null) { @@ -2053,7 +1680,7 @@ public KeyValueScanner getScanner(Scan scan, final NavigableSet targetCo } return createScanner(scan, scanInfo, targetCols, readPt); } finally { - lock.readLock().unlock(); + storeEngine.readUnlock(); } } @@ -2083,7 +1710,7 @@ public List recreateScanners(List currentFileS boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, boolean includeMemstoreScanner) throws IOException { - this.lock.readLock().lock(); + this.storeEngine.readLock(); try { Map name2File = new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); @@ -2108,7 +1735,7 @@ public List recreateScanners(List currentFileS return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow, includeStartRow, stopRow, includeStopRow, readPt, false); } finally { - this.lock.readLock().unlock(); + this.storeEngine.readUnlock(); } } @@ -2174,41 +1801,20 @@ public long getStoreSizeUncompressed() { @Override public long getStorefilesSize() { // Include all StoreFiles - return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), sf -> true); + return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), + sf -> true); } @Override public long getHFilesSize() { // Include only StoreFiles which are HFiles - return getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), + return StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), HStoreFile::isHFile); } - private long getTotalUncompressedBytes(List files) { - return files.stream() - .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::getTotalUncompressedBytes)) - .sum(); - } - - private long getStorefilesSize(Collection files, Predicate predicate) { - return files.stream().filter(predicate) - .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::length)).sum(); - } - - private long getStorefileFieldSize(HStoreFile file, ToLongFunction f) { - if (file == null) { - return 0L; - } - StoreFileReader reader = file.getReader(); - if (reader == null) { - return 0L; - } - return f.applyAsLong(reader); - } - private long getStorefilesFieldSize(ToLongFunction f) { return this.storeEngine.getStoreFileManager().getStorefiles().stream() - .mapToLong(file -> getStorefileFieldSize(file, f)).sum(); + .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum(); } @Override @@ -2279,11 +1885,11 @@ public long getSmallestReadPoint() { */ public void upsert(Iterable cells, long readpoint, MemStoreSizing memstoreSizing) throws IOException { - this.lock.readLock().lock(); + this.storeEngine.readLock(); try { this.memstore.upsert(cells, readpoint, memstoreSizing); } finally { - this.lock.readLock().unlock(); + this.storeEngine.readUnlock(); } } @@ -2336,7 +1942,7 @@ public boolean commit(MonitoredTask status) throws IOException { return false; } status.setStatus("Flushing " + this + ": reopening flushed file"); - List storeFiles = commitStoreFiles(tempFiles, false); + List storeFiles = storeEngine.commitStoreFiles(tempFiles, false); for (HStoreFile sf : storeFiles) { StoreFileReader r = sf.getReader(); if (LOG.isInfoEnabled()) { @@ -2359,7 +1965,7 @@ public boolean commit(MonitoredTask status) throws IOException { } } // Add new file to store files. Clear snapshot too while we have the Store write lock. - return updateStorefiles(storeFiles, snapshot.getId()); + return completeFlush(storeFiles, snapshot.getId()); } @Override @@ -2387,7 +1993,7 @@ public void replayFlush(List fileNames, boolean dropMemstoreSnapshot) // open the file as a store file (hfile link, etc) StoreFileInfo storeFileInfo = getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file); - HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); + HStoreFile storeFile = storeEngine.createStoreFileAndReader(storeFileInfo); storeFiles.add(storeFile); HStore.this.storeSize.addAndGet(storeFile.getReader().length()); HStore.this.totalUncompressedBytes @@ -2404,7 +2010,7 @@ public void replayFlush(List fileNames, boolean dropMemstoreSnapshot) snapshotId = snapshot.getId(); snapshot.close(); } - HStore.this.updateStorefiles(storeFiles, snapshotId); + HStore.this.completeFlush(storeFiles, snapshotId); } /** @@ -2417,7 +2023,7 @@ public void abort() throws IOException { //won't be closed. If we are using MSLAB, the chunk referenced by those scanners //can't be released, thus memory leak snapshot.close(); - HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId()); + HStore.this.completeFlush(Collections.emptyList(), snapshot.getId()); } } } @@ -2583,7 +2189,7 @@ public synchronized void closeAndArchiveCompactedFiles() throws IOException { // ensure other threads do not attempt to archive the same files on close() archiveLock.lock(); try { - lock.readLock().lock(); + storeEngine.readLock(); Collection copyCompactedfiles = null; try { Collection compactedfiles = @@ -2595,7 +2201,7 @@ public synchronized void closeAndArchiveCompactedFiles() throws IOException { LOG.trace("No compacted files to archive"); } } finally { - lock.readLock().unlock(); + storeEngine.readUnlock(); } if (CollectionUtils.isNotEmpty(copyCompactedfiles)) { removeCompactedfiles(copyCompactedfiles, true); @@ -2730,12 +2336,7 @@ public boolean isSloppyMemStore() { private void clearCompactedfiles(List filesToRemove) throws IOException { LOG.trace("Clearing the compacted file {} from this store", filesToRemove); - try { - lock.writeLock().lock(); - this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove); - } finally { - lock.writeLock().unlock(); - } + storeEngine.removeCompactedFiles(filesToRemove); } void reportArchivedFilesForQuota(List archivedFiles, List fileSizes) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java index 26233505db73..2a9f96859361 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -108,6 +109,14 @@ public RegionCoprocessorHost getCoprocessorHost() { return coprocessorHost; } + public RegionInfo getRegionInfo() { + return regionFileSystem.getRegionInfo(); + } + + public boolean isPrimaryReplicaStore() { + return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID; + } + public static Builder getBuilder() { return new Builder(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index 60b3c3d0d20f..f9d6c294f779 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -20,37 +20,129 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; - +import java.util.Set; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; /** - * StoreEngine is a factory that can create the objects necessary for HStore to operate. - * Since not all compaction policies, compactors and store file managers are compatible, - * they are tied together and replaced together via StoreEngine-s. + * StoreEngine is a factory that can create the objects necessary for HStore to operate. Since not + * all compaction policies, compactors and store file managers are compatible, they are tied + * together and replaced together via StoreEngine-s. + *

+ * We expose read write lock methods to upper layer for store operations:
+ *

    + *
  • Locked in shared mode when the list of component stores is looked at: + *
      + *
    • all reads/writes to table data
    • + *
    • checking for split
    • + *
    + *
  • + *
  • Locked in exclusive mode when the list of component stores is modified: + *
      + *
    • closing
    • + *
    • completing a compaction
    • + *
    + *
  • + *
+ *

+ * It is a bit confusing that we have a StoreFileManager(SFM) and then a StoreFileTracker(SFT). As + * its name says, SFT is used to track the store files list. The reason why we have a SFT beside SFM + * is that, when introducing stripe compaction, we introduced the StoreEngine and also the SFM, but + * actually, the SFM here is not a general 'Manager', it is only designed to manage the in memory + * 'stripes', so we can select different store files when scanning or compacting. The 'tracking' of + * store files is actually done in {@link org.apache.hadoop.hbase.regionserver.HRegionFileSystem} + * and {@link HStore} before we have SFT. And since SFM is designed to only holds in memory states, + * we will hold write lock when updating it, the lock is also used to protect the normal read/write + * requests. This means we'd better not add IO operations to SFM. And also, no matter what the in + * memory state is, stripe or not, it does not effect how we track the store files. So consider all + * these facts, here we introduce a separated SFT to track the store files. + *

+ * Here, since we always need to update SFM and SFT almost at the same time, we introduce methods in + * StoreEngine directly to update them both, so upper layer just need to update StoreEngine once, to + * reduce the possible misuse. */ @InterfaceAudience.Private -public abstract class StoreEngine { +public abstract class StoreEngine { + + private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class); + protected SF storeFlusher; protected CP compactionPolicy; protected C compactor; protected SFM storeFileManager; + private Configuration conf; + private StoreContext ctx; + private RegionCoprocessorHost coprocessorHost; + private Function openStoreFileThreadPoolCreator; + private StoreFileTracker storeFileTracker; + + private final ReadWriteLock storeLock = new ReentrantReadWriteLock(); /** - * The name of the configuration parameter that specifies the class of - * a store engine that is used to manage and compact HBase store files. + * The name of the configuration parameter that specifies the class of a store engine that is used + * to manage and compact HBase store files. */ public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class"; - private static final Class> - DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class; + private static final Class> DEFAULT_STORE_ENGINE_CLASS = + DefaultStoreEngine.class; + + /** + * Acquire read lock of this store. + */ + public void readLock() { + storeLock.readLock().lock(); + } + + /** + * Release read lock of this store. + */ + public void readUnlock() { + storeLock.readLock().unlock(); + } + + /** + * Acquire write lock of this store. + */ + public void writeLock() { + storeLock.writeLock().lock(); + } + + /** + * Release write lock of this store. + */ + public void writeUnlock() { + storeLock.writeLock().unlock(); + } /** * @return Compaction policy to use. @@ -80,6 +172,11 @@ public StoreFlusher getStoreFlusher() { return this.storeFlusher; } + private StoreFileTracker createStoreFileTracker(HStore store) { + return StoreFileTrackerFactory.create(store.conf, store.getRegionInfo().getTable(), + store.isPrimaryReplicaStore(), store.getStoreContext()); + } + /** * @param filesCompacting Files currently compacting * @return whether a compaction selection is possible @@ -87,8 +184,8 @@ public StoreFlusher getStoreFlusher() { public abstract boolean needsCompaction(List filesCompacting); /** - * Creates an instance of a compaction context specific to this engine. - * Doesn't actually select or start a compaction. See CompactionContext class comment. + * Creates an instance of a compaction context specific to this engine. Doesn't actually select or + * start a compaction. See CompactionContext class comment. * @return New CompactionContext object. */ public abstract CompactionContext createCompaction() throws IOException; @@ -96,32 +193,321 @@ public StoreFlusher getStoreFlusher() { /** * Create the StoreEngine's components. */ - protected abstract void createComponents( - Configuration conf, HStore store, CellComparator cellComparator) throws IOException; + protected abstract void createComponents(Configuration conf, HStore store, + CellComparator cellComparator) throws IOException; - private void createComponentsOnce( - Configuration conf, HStore store, CellComparator cellComparator) throws IOException { - assert compactor == null && compactionPolicy == null - && storeFileManager == null && storeFlusher == null; + protected final void createComponentsOnce(Configuration conf, HStore store, + CellComparator cellComparator) throws IOException { + assert compactor == null && compactionPolicy == null && storeFileManager == null && + storeFlusher == null && storeFileTracker == null; createComponents(conf, store, cellComparator); - assert compactor != null && compactionPolicy != null - && storeFileManager != null && storeFlusher != null; + this.conf = conf; + this.ctx = store.getStoreContext(); + this.coprocessorHost = store.getHRegion().getCoprocessorHost(); + this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool; + this.storeFileTracker = createStoreFileTracker(store); + assert compactor != null && compactionPolicy != null && storeFileManager != null && + storeFlusher != null && storeFileTracker != null; + } + + /** + * Create a writer for writing new store files. + * @return Writer for a new StoreFile + */ + public StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException { + return storeFileTracker.createWriter(params); + } + + public HStoreFile createStoreFileAndReader(Path p) throws IOException { + StoreFileInfo info = new StoreFileInfo(conf, ctx.getRegionFileSystem().getFileSystem(), p, + ctx.isPrimaryReplicaStore()); + return createStoreFileAndReader(info); + } + + public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { + info.setRegionCoprocessorHost(coprocessorHost); + HStoreFile storeFile = + new HStoreFile(info, ctx.getFamily().getBloomFilterType(), ctx.getCacheConf()); + storeFile.initReader(); + return storeFile; + } + + /** + * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive + * operation. + * @param path the path to the store file + */ + public void validateStoreFile(Path path) throws IOException { + HStoreFile storeFile = null; + try { + storeFile = createStoreFileAndReader(path); + } catch (IOException e) { + LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e); + throw e; + } finally { + if (storeFile != null) { + storeFile.closeStoreFile(false); + } + } + } + + private List openStoreFiles(Collection files, boolean warmup) + throws IOException { + if (CollectionUtils.isEmpty(files)) { + return Collections.emptyList(); + } + // initialize the thread pool for opening store files in parallel.. + ExecutorService storeFileOpenerThreadPool = + openStoreFileThreadPoolCreator.apply("StoreFileOpener-" + + ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString()); + CompletionService completionService = + new ExecutorCompletionService<>(storeFileOpenerThreadPool); + + int totalValidStoreFile = 0; + for (StoreFileInfo storeFileInfo : files) { + // open each store file in parallel + completionService.submit(() -> createStoreFileAndReader(storeFileInfo)); + totalValidStoreFile++; + } + + Set compactedStoreFiles = new HashSet<>(); + ArrayList results = new ArrayList<>(files.size()); + IOException ioe = null; + try { + for (int i = 0; i < totalValidStoreFile; i++) { + try { + HStoreFile storeFile = completionService.take().get(); + if (storeFile != null) { + LOG.debug("loaded {}", storeFile); + results.add(storeFile); + compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles()); + } + } catch (InterruptedException e) { + if (ioe == null) { + ioe = new InterruptedIOException(e.getMessage()); + } + } catch (ExecutionException e) { + if (ioe == null) { + ioe = new IOException(e.getCause()); + } + } + } + } finally { + storeFileOpenerThreadPool.shutdownNow(); + } + if (ioe != null) { + // close StoreFile readers + boolean evictOnClose = + ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() : true; + for (HStoreFile file : results) { + try { + if (file != null) { + file.closeStoreFile(evictOnClose); + } + } catch (IOException e) { + LOG.warn("Could not close store file {}", file, e); + } + } + throw ioe; + } + + // Should not archive the compacted store files when region warmup. See HBASE-22163. + if (!warmup) { + // Remove the compacted files from result + List filesToRemove = new ArrayList<>(compactedStoreFiles.size()); + for (HStoreFile storeFile : results) { + if (compactedStoreFiles.contains(storeFile.getPath().getName())) { + LOG.warn("Clearing the compacted storefile {} from {}", storeFile, this); + storeFile.getReader().close( + storeFile.getCacheConf() != null ? storeFile.getCacheConf().shouldEvictOnClose() : + true); + filesToRemove.add(storeFile); + } + } + results.removeAll(filesToRemove); + if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) { + LOG.debug("Moving the files {} to archive", filesToRemove); + ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(), + filesToRemove); + } + } + + return results; + } + + public void initialize(boolean warmup) throws IOException { + List fileInfos = storeFileTracker.load(); + List files = openStoreFiles(fileInfos, warmup); + storeFileManager.loadFiles(files); + } + + public void refreshStoreFiles() throws IOException { + List fileInfos = storeFileTracker.load(); + refreshStoreFilesInternal(fileInfos); + } + + public void refreshStoreFiles(Collection newFiles) throws IOException { + List storeFiles = new ArrayList<>(newFiles.size()); + for (String file : newFiles) { + storeFiles + .add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(), file)); + } + refreshStoreFilesInternal(storeFiles); + } + + /** + * Checks the underlying store files, and opens the files that have not been opened, and removes + * the store file readers for store files no longer available. Mainly used by secondary region + * replicas to keep up to date with the primary region files. + */ + private void refreshStoreFilesInternal(Collection newFiles) throws IOException { + Collection currentFiles = storeFileManager.getStorefiles(); + Collection compactedFiles = storeFileManager.getCompactedfiles(); + if (currentFiles == null) { + currentFiles = Collections.emptySet(); + } + if (newFiles == null) { + newFiles = Collections.emptySet(); + } + if (compactedFiles == null) { + compactedFiles = Collections.emptySet(); + } + + HashMap currentFilesSet = new HashMap<>(currentFiles.size()); + for (HStoreFile sf : currentFiles) { + currentFilesSet.put(sf.getFileInfo(), sf); + } + HashMap compactedFilesSet = new HashMap<>(compactedFiles.size()); + for (HStoreFile sf : compactedFiles) { + compactedFilesSet.put(sf.getFileInfo(), sf); + } + + Set newFilesSet = new HashSet(newFiles); + // Exclude the files that have already been compacted + newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet()); + Set toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet()); + Set toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet); + + if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) { + return; + } + + LOG.info("Refreshing store files for " + this + " files to add: " + toBeAddedFiles + + " files to remove: " + toBeRemovedFiles); + + Set toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); + for (StoreFileInfo sfi : toBeRemovedFiles) { + toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); + } + + // try to open the files + List openedFiles = openStoreFiles(toBeAddedFiles, false); + + // propogate the file changes to the underlying store file manager + replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); // won't throw an exception + } + + /** + * Commit the given {@code files}. + *

+ * We will move the file into data directory, and open it. + * @param files the files want to commit + * @param validate whether to validate the store files + * @return the committed store files + */ + public List commitStoreFiles(List files, boolean validate) throws IOException { + List committedFiles = new ArrayList<>(files.size()); + HRegionFileSystem hfs = ctx.getRegionFileSystem(); + String familyName = ctx.getFamily().getNameAsString(); + Path storeDir = hfs.getStoreDir(familyName); + for (Path file : files) { + try { + if (validate) { + validateStoreFile(file); + } + Path committedPath; + // As we want to support writing to data directory directly, here we need to check whether + // the store file is already in the right place + if (file.getParent() != null && file.getParent().equals(storeDir)) { + // already in the right place, skip renmaing + committedPath = file; + } else { + // Write-out finished successfully, move into the right spot + committedPath = hfs.commitStoreFile(familyName, file); + } + HStoreFile sf = createStoreFileAndReader(committedPath); + committedFiles.add(sf); + } catch (IOException e) { + LOG.error("Failed to commit store file {}", file, e); + // Try to delete the files we have committed before. + // It is OK to fail when deleting as leaving the file there does not cause any data + // corruption problem. It just introduces some duplicated data which may impact read + // performance a little when reading before compaction. + for (HStoreFile sf : committedFiles) { + Path pathToDelete = sf.getPath(); + try { + sf.deleteStoreFile(); + } catch (IOException deleteEx) { + LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store file {}", pathToDelete, + deleteEx); + } + } + throw new IOException("Failed to commit the flush", e); + } + } + return committedFiles; + } + + public void addStoreFiles(Collection storeFiles) throws IOException { + storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles)); + writeLock(); + try { + storeFileManager.insertNewFiles(storeFiles); + } finally { + // We need the lock, as long as we are updating the storeFiles + // or changing the memstore. Let us release it before calling + // notifyChangeReadersObservers. See HBASE-4485 for a possible + // deadlock scenario that could have happened if continue to hold + // the lock. + writeUnlock(); + } + } + + public void replaceStoreFiles(Collection compactedFiles, + Collection newFiles) throws IOException { + storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles), + StoreUtils.toStoreFileInfo(newFiles)); + writeLock(); + try { + storeFileManager.addCompactionResults(compactedFiles, newFiles); + } finally { + writeUnlock(); + } + } + + public void removeCompactedFiles(Collection compactedFiles) { + writeLock(); + try { + storeFileManager.removeCompactedFiles(compactedFiles); + } finally { + writeUnlock(); + } } /** * Create the StoreEngine configured for the given Store. - * @param store The store. An unfortunate dependency needed due to it - * being passed to coprocessors via the compactor. + * @param store The store. An unfortunate dependency needed due to it being passed to coprocessors + * via the compactor. * @param conf Store configuration. * @param cellComparator CellComparator for storeFileManager. * @return StoreEngine to use. */ - public static StoreEngine create( - HStore store, Configuration conf, CellComparator cellComparator) throws IOException { + public static StoreEngine create(HStore store, Configuration conf, + CellComparator cellComparator) throws IOException { String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName()); try { - StoreEngine se = ReflectionUtils.instantiateWithCustomCtor( - className, new Class[] { }, new Object[] { }); + StoreEngine se = + ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, new Object[] {}); se.createComponentsOnce(conf, store, cellComparator); return se; } catch (Exception e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index 27127f3a6c64..a40b209c6ebb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.util.Collection; import java.util.Comparator; @@ -49,12 +50,16 @@ public interface StoreFileManager { * Loads the initial store files into empty StoreFileManager. * @param storeFiles The files to load. */ + @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "", + allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)") void loadFiles(List storeFiles); /** * Adds new files, either for from MemStore flush or bulk insert, into the structure. * @param sfs New store files. */ + @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "", + allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)") void insertNewFiles(Collection sfs); /** @@ -62,12 +67,16 @@ public interface StoreFileManager { * @param compactedFiles The input files for the compaction. * @param results The resulting files for the compaction. */ + @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "", + allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)") void addCompactionResults(Collection compactedFiles, Collection results); /** * Remove the compacted files * @param compactedFiles the list of compacted files */ + @RestrictedApi(explanation = "Should only be called in StoreEngine", link = "", + allowedOnPath = ".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)") void removeCompactedFiles(Collection compactedFiles); /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index 1064b6c70547..58031288f751 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -70,10 +70,17 @@ protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, writer.close(); } + protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, boolean alwaysIncludesTag) + throws IOException { + return store.getStoreEngine() + .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount()) + .compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false) + .includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || snapshot.isTagsPresent()) + .shouldDropBehind(false)); + } /** * Creates the scanner for flushing snapshot. Also calls coprocessors. - * @param snapshotScanners * @return The scanner; null if coprocessor is canceling the flush. */ protected final InternalScanner createScanner(List snapshotScanners, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java index ac5955feca7e..e464b86254d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java @@ -20,10 +20,13 @@ import java.io.IOException; import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; - +import java.util.function.Predicate; +import java.util.function.ToLongFunction; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -39,10 +42,13 @@ * Utility functions for region server storage layer. */ @InterfaceAudience.Private -public class StoreUtils { +public final class StoreUtils { private static final Logger LOG = LoggerFactory.getLogger(StoreUtils.class); + private StoreUtils() { + } + /** * Creates a deterministic hash code for store file collection. */ @@ -161,4 +167,30 @@ public static int getBytesPerChecksum(Configuration conf) { HFile.DEFAULT_BYTES_PER_CHECKSUM); } + public static List toStoreFileInfo(Collection storefiles) { + return storefiles.stream().map(HStoreFile::getFileInfo).collect(Collectors.toList()); + } + + public static long getTotalUncompressedBytes(List files) { + return files.stream() + .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::getTotalUncompressedBytes)) + .sum(); + } + + public static long getStorefilesSize(Collection files, + Predicate predicate) { + return files.stream().filter(predicate) + .mapToLong(file -> getStorefileFieldSize(file, StoreFileReader::length)).sum(); + } + + public static long getStorefileFieldSize(HStoreFile file, ToLongFunction f) { + if (file == null) { + return 0L; + } + StoreFileReader reader = file.getReader(); + if (reader == null) { + return 0L; + } + return f.applyAsLong(reader); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java index 14863a69a9b1..bfb3f649ff27 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java @@ -20,20 +20,19 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 1560aef5f6b3..f8183b7645a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -70,7 +70,7 @@ public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum StripeMultiFileWriter mw = null; try { mw = req.createWriter(); // Writer according to the policy. - StripeMultiFileWriter.WriterFactory factory = createWriterFactory(cellsCount); + StripeMultiFileWriter.WriterFactory factory = createWriterFactory(snapshot); StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; mw.init(storeScanner, factory); @@ -98,13 +98,12 @@ public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum return result; } - private StripeMultiFileWriter.WriterFactory createWriterFactory(final long kvCount) { + private StripeMultiFileWriter.WriterFactory createWriterFactory(MemStoreSnapshot snapshot) { return new StripeMultiFileWriter.WriterFactory() { @Override public StoreFileWriter createWriter() throws IOException { - StoreFileWriter writer = store.createWriterInTmp(kvCount, - store.getColumnFamilyDescriptor().getCompressionType(), false, true, true, false); - return writer; + // XXX: it used to always pass true for includesTag, re-consider? + return StripeStoreFlusher.this.createWriter(snapshot, true); } }; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java index 42841bfee531..533be176e7a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java @@ -51,13 +51,14 @@ protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner s WriterFactory writerFactory = new WriterFactory() { @Override public StoreFileWriter createWriter() throws IOException { - return createTmpWriter(fd, shouldDropBehind, major); + return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, major); } @Override public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy) - throws IOException { - return createTmpWriter(fd, shouldDropBehind, fileStoragePolicy, major); + throws IOException { + return AbstractMultiOutputCompactor.this.createWriter(fd, shouldDropBehind, + fileStoragePolicy, major); } }; // Prepare multi-writer, and perform the compaction using scanner and writer. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index e524f7dfd5fd..47ef0f290251 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -28,7 +28,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileInfo; import org.apache.hadoop.hbase.regionserver.CellSink; +import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -61,6 +61,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.io.Closeables; /** @@ -261,29 +262,32 @@ public InternalScanner createScanner(ScanInfo scanInfo, List s } }; + protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind, + boolean major) { + return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount) + .compression(major ? majorCompactionCompression : minorCompactionCompression) + .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0) + .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind) + .totalCompactedFilesSize(fd.totalCompactedFilesSize); + } + /** - * Creates a writer for a new file in a temporary directory. + * Creates a writer for a new file. * @param fd The file details. - * @return Writer for a new StoreFile in the tmp dir. + * @return Writer for a new StoreFile * @throws IOException if creation failed */ - protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, boolean major) - throws IOException { + protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, + boolean major) throws IOException { // When all MVCC readpoints are 0, don't write them. // See HBASE-8166, HBASE-12600, and HBASE-13389. - return store.createWriterInTmp(fd.maxKeyCount, - major ? majorCompactionCompression : minorCompactionCompression, - true, fd.maxMVCCReadpoint > 0, - fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, - HConstants.EMPTY_STRING); + return store.getStoreEngine().createWriter(createParams(fd, shouldDropBehind, major)); } - protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind, - String fileStoragePolicy, boolean major) throws IOException { - return store.createWriterInTmp(fd.maxKeyCount, - major ? majorCompactionCompression : minorCompactionCompression, - true, fd.maxMVCCReadpoint > 0, - fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, fileStoragePolicy); + protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind, + String fileStoragePolicy, boolean major) throws IOException { + return store.getStoreEngine() + .createWriter(createParams(fd, shouldDropBehind, major).fileStoragePolicy(fileStoragePolicy)); } private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 49d3e8ee01e7..afa2429cb6e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -45,14 +45,14 @@ public DefaultCompactor(Configuration conf, HStore store) { } private final CellSinkFactory writerFactory = - new CellSinkFactory() { - @Override - public StoreFileWriter createWriter(InternalScanner scanner, - org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, - boolean shouldDropBehind, boolean major) throws IOException { - return createTmpWriter(fd, shouldDropBehind, major); - } - }; + new CellSinkFactory() { + @Override + public StoreFileWriter createWriter(InternalScanner scanner, + org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, + boolean shouldDropBehind, boolean major) throws IOException { + return DefaultCompactor.this.createWriter(fd, shouldDropBehind, major); + } + }; /** * Do a minor/major compaction on an explicit set of storefiles from a Store. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java new file mode 100644 index 000000000000..d4c9a868eb48 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.storefiletracker; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.StoreContext; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The default implementation for store file tracker, where we do not persist the store file list, + * and use listing when loading store files. + */ +@InterfaceAudience.Private +class DefaultStoreFileTracker extends StoreFileTrackerBase { + + public DefaultStoreFileTracker(Configuration conf, TableName tableName, boolean isPrimaryReplica, + StoreContext ctx) { + super(conf, tableName, isPrimaryReplica, ctx); + } + + @Override + public List load() throws IOException { + return ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString()); + } + + @Override + public boolean requireWritingToTmpDirFirst() { + return true; + } + + @Override + protected void doAddNewStoreFiles(Collection newFiles) throws IOException { + // NOOP + } + + @Override + protected void doAddCompactionResults(Collection compactedFiles, + Collection newFiles) throws IOException { + // NOOP + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java new file mode 100644 index 000000000000..aadedc8ef727 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.storefiletracker; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * An interface to define how we track the store files for a give store. + *

+ * In the old time, we will write store to a tmp directory first, and then rename it to the actual + * data file. And once a store file is under data directory, we will consider it as 'committed'. And + * we need to do listing when loading store files. + *

+ * When cloud age is coming, now we want to store the store files on object storage, where rename + * and list are not as cheap as on HDFS, especially rename. Although introducing a metadata + * management layer for object storage could solve the problem, but we still want HBase to run on + * pure object storage, so here we introduce this interface to abstract how we track the store + * files. For the old implementation, we just persist nothing here, and do listing to load store + * files. When running on object storage, we could persist the store file list in a system region, + * or in a file on the object storage, to make it possible to write directly into the data directory + * to avoid renaming, and also avoid listing when loading store files. + *

+ * The implementation requires to be thread safe as flush and compaction may occur as the same time, + * and we could also do multiple compactions at the same time. As the implementation may choose to + * persist the store file list to external storage, which could be slow, it is the duty for the + * callers to not call it inside a lock which may block normal read/write requests. + */ +@InterfaceAudience.Private +public interface StoreFileTracker { + + /** + * Load the store files list when opening a region. + */ + List load() throws IOException; + + /** + * Add new store files. + *

+ * Used for flush and bulk load. + */ + void add(Collection newFiles) throws IOException; + + /** + * Add new store files and remove compacted store files after compaction. + */ + void replace(Collection compactedFiles, Collection newFiles) + throws IOException; + + /** + * Create a writer for writing new store files. + * @return Writer for a new StoreFile + */ + StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java new file mode 100644 index 000000000000..2451f45bdc7b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.storefiletracker; + +import java.io.IOException; +import java.util.Collection; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.crypto.Encryption; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; +import org.apache.hadoop.hbase.regionserver.StoreContext; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for all store file tracker. + *

+ * Mainly used to place the common logic to skip persistent for secondary replicas. + */ +@InterfaceAudience.Private +abstract class StoreFileTrackerBase implements StoreFileTracker { + + private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerBase.class); + + protected final Configuration conf; + + protected final TableName tableName; + + protected final boolean isPrimaryReplica; + + protected final StoreContext ctx; + + private volatile boolean cacheOnWriteLogged; + + protected StoreFileTrackerBase(Configuration conf, TableName tableName, boolean isPrimaryReplica, + StoreContext ctx) { + this.conf = conf; + this.tableName = tableName; + this.isPrimaryReplica = isPrimaryReplica; + this.ctx = ctx; + } + + @Override + public final void add(Collection newFiles) throws IOException { + if (isPrimaryReplica) { + doAddNewStoreFiles(newFiles); + } + } + + @Override + public final void replace(Collection compactedFiles, + Collection newFiles) throws IOException { + if (isPrimaryReplica) { + doAddCompactionResults(compactedFiles, newFiles); + } + } + + private HFileContext createFileContext(Compression.Algorithm compression, + boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context encryptionContext) { + if (compression == null) { + compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; + } + ColumnFamilyDescriptor family = ctx.getFamily(); + HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint) + .withIncludesTags(includesTag).withCompression(compression) + .withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf)) + .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)) + .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true) + .withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext) + .withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName()) + .withTableName(tableName.getName()).withCellComparator(ctx.getComparator()).build(); + return hFileContext; + } + + @Override + public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) + throws IOException { + if (!isPrimaryReplica) { + throw new IllegalStateException("Should not call create writer on secondary replicas"); + } + // creating new cache config for each new writer + final CacheConfig cacheConf = ctx.getCacheConf(); + final CacheConfig writerCacheConf = new CacheConfig(cacheConf); + long totalCompactedFilesSize = params.totalCompactedFilesSize(); + if (params.isCompaction()) { + // Don't cache data on write on compactions, unless specifically configured to do so + // Cache only when total file size remains lower than configured threshold + final boolean cacheCompactedBlocksOnWrite = cacheConf.shouldCacheCompactedBlocksOnWrite(); + // if data blocks are to be cached on write + // during compaction, we should forcefully + // cache index and bloom blocks as well + if (cacheCompactedBlocksOnWrite && + totalCompactedFilesSize <= cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { + writerCacheConf.enableCacheOnWrite(); + if (!cacheOnWriteLogged) { + LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence enabled " + + "cacheOnWrite for Data blocks, Index blocks and Bloom filter blocks", this); + cacheOnWriteLogged = true; + } + } else { + writerCacheConf.setCacheDataOnWrite(false); + if (totalCompactedFilesSize > cacheConf.getCacheCompactedBlocksOnWriteThreshold()) { + // checking condition once again for logging + LOG.debug( + "For {}, setting cacheCompactedBlocksOnWrite as false as total size of compacted " + + "files - {}, is greater than cacheCompactedBlocksOnWriteThreshold - {}", + this, totalCompactedFilesSize, cacheConf.getCacheCompactedBlocksOnWriteThreshold()); + } + } + } else { + final boolean shouldCacheDataOnWrite = cacheConf.shouldCacheDataOnWrite(); + if (shouldCacheDataOnWrite) { + writerCacheConf.enableCacheOnWrite(); + if (!cacheOnWriteLogged) { + LOG.info("For {} , cacheDataOnWrite is true, hence enabled cacheOnWrite for " + + "Index blocks and Bloom filter blocks", this); + cacheOnWriteLogged = true; + } + } + } + Encryption.Context encryptionContext = ctx.getEncryptionContext(); + HFileContext hFileContext = createFileContext(params.compression(), + params.includeMVCCReadpoint(), params.includesTag(), encryptionContext); + Path outputDir; + if (requireWritingToTmpDirFirst()) { + outputDir = + new Path(ctx.getRegionFileSystem().getTempDir(), ctx.getFamily().getNameAsString()); + } else { + throw new UnsupportedOperationException("not supported yet"); + } + StoreFileWriter.Builder builder = + new StoreFileWriter.Builder(conf, writerCacheConf, ctx.getRegionFileSystem().getFileSystem()) + .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType()) + .withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes()) + .withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind()) + .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier()) + .withFileStoragePolicy(params.fileStoragePolicy()); + return builder.build(); + } + + /** + * Whether the implementation of this tracker requires you to write to temp directory first, i.e, + * does not allow broken store files under the actual data directory. + */ + protected abstract boolean requireWritingToTmpDirFirst(); + + protected abstract void doAddNewStoreFiles(Collection newFiles) throws IOException; + + protected abstract void doAddCompactionResults(Collection compactedFiles, + Collection newFiles) throws IOException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java new file mode 100644 index 000000000000..4f7231bc3b9a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.storefiletracker; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.StoreContext; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Factory method for creating store file tracker. + */ +@InterfaceAudience.Private +public final class StoreFileTrackerFactory { + + public static StoreFileTracker create(Configuration conf, TableName tableName, + boolean isPrimaryReplica, StoreContext ctx) { + return new DefaultStoreFileTracker(conf, tableName, isPrimaryReplica, ctx); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java index 22ec6cb89ec4..291b909f69bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java @@ -105,7 +105,6 @@ Set getStoresRequiringCompaction(Set requestedStores, long times boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts) throws IOException { - // do we have any store files? Collection storeFiles = fileSystem.getStoreFiles(family); if (storeFiles == null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index 3c2bc3f0cb12..9314d7e9827f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -211,11 +211,13 @@ protected BlockCompactionsInCompletionHStore(HRegion region, ColumnFamilyDescrip @Override protected void refreshStoreSizeAndTotalBytes() throws IOException { - try { - r.compactionsWaiting.countDown(); - r.compactionsBlocked.await(); - } catch (InterruptedException ex) { - throw new IOException(ex); + if (r != null) { + try { + r.compactionsWaiting.countDown(); + r.compactionsBlocked.await(); + } catch (InterruptedException ex) { + throw new IOException(ex); + } } super.refreshStoreSizeAndTotalBytes(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java index cd83dc8c2494..0a6b37bbba4e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java @@ -215,8 +215,10 @@ public void tearDown() throws IOException { @Test public void testCacheOnWriteInSchema() throws IOException { // Write some random data into the store - StoreFileWriter writer = store.createWriterInTmp(Integer.MAX_VALUE, - HFile.DEFAULT_COMPRESSION_ALGORITHM, false, true, false, false); + StoreFileWriter writer = store.getStoreEngine() + .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(Integer.MAX_VALUE) + .compression(HFile.DEFAULT_COMPRESSION_ALGORITHM).isCompaction(false) + .includeMVCCReadpoint(true).includesTag(false).shouldDropBehind(false)); writeStoreFile(writer); writer.close(); // Verify the block types of interest were cached on write diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java index e832c47aac81..3784876a59f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java @@ -65,9 +65,12 @@ public void testCustomParts() throws Exception { DummyCompactionPolicy.class.getName()); conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, DummyStoreFlusher.class.getName()); + HRegion mockRegion = Mockito.mock(HRegion.class); HStore mockStore = Mockito.mock(HStore.class); Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO); - StoreEngine se = StoreEngine.create(mockStore, conf, CellComparatorImpl.COMPARATOR); + Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion); + StoreEngine se = + StoreEngine.create(mockStore, conf, CellComparatorImpl.COMPARATOR); Assert.assertTrue(se instanceof DefaultStoreEngine); Assert.assertTrue(se.getCompactionPolicy() instanceof DummyCompactionPolicy); Assert.assertTrue(se.getStoreFlusher() instanceof DummyStoreFlusher); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index a5584ff32599..7fb57686c35d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -5744,7 +5744,7 @@ public void testCompactionFromPrimary() throws IOException { Collection storeFiles = primaryRegion.getStore(families[0]).getStorefiles(); primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles); Collection storeFileInfos = primaryRegion.getRegionFileSystem() - .getStoreFiles(families[0]); + .getStoreFiles(Bytes.toString(families[0])); Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty()); verifyData(secondaryRegion, 0, 1000, cq, families); @@ -7648,7 +7648,7 @@ protected List doCompaction(CompactionRequestImpl cr, getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): true; for (Path newFile : newFiles) { // Create storefile around what we wrote with a reader on it. - HStoreFile sf = createStoreFileAndReader(newFile); + HStoreFile sf = storeEngine.createStoreFileAndReader(newFile); sf.closeStoreFile(evictOnClose); sfs.add(sf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 46995d92f8e6..f6cfc7b8efbb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -305,7 +305,7 @@ public Object run() throws Exception { /** * Verify that compression and data block encoding are respected by the - * Store.createWriterInTmp() method, used on store flush. + * createWriter method, used on store flush. */ @Test public void testCreateWriter() throws Exception { @@ -317,9 +317,11 @@ public void testCreateWriter() throws Exception { .build(); init(name.getMethodName(), conf, hcd); - // Test createWriterInTmp() - StoreFileWriter writer = - store.createWriterInTmp(4, hcd.getCompressionType(), false, true, false, false); + // Test createWriter + StoreFileWriter writer = store.getStoreEngine() + .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4) + .compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true) + .includesTag(false).shouldDropBehind(false)); Path path = writer.getPath(); writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); @@ -1016,19 +1018,19 @@ public void testRefreshStoreFilesNotChanged() throws IOException { // add one more file addStoreFile(); - HStore spiedStore = spy(store); + StoreEngine spiedStoreEngine = spy(store.getStoreEngine()); // call first time after files changed - spiedStore.refreshStoreFiles(); + spiedStoreEngine.refreshStoreFiles(); assertEquals(2, this.store.getStorefilesCount()); - verify(spiedStore, times(1)).replaceStoreFiles(any(), any()); + verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any()); // call second time - spiedStore.refreshStoreFiles(); + spiedStoreEngine.refreshStoreFiles(); // ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not // refreshed, - verify(spiedStore, times(1)).replaceStoreFiles(any(), any()); + verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any()); } private long countMemStoreScanner(StoreScanner scanner) { @@ -1639,7 +1641,7 @@ public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws // Do compaction MyThread thread = new MyThread(storeScanner); thread.start(); - store.replaceStoreFiles(actualStorefiles, actualStorefiles1); + store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false); thread.join(); KeyValueHeap heap2 = thread.getHeap(); assertFalse(heap.equals(heap2)); @@ -1705,8 +1707,10 @@ public void testSpaceQuotaChangeAfterReplacement() throws IOException { @Test public void testHFileContextSetWithCFAndTable() throws Exception { init(this.name.getMethodName()); - StoreFileWriter writer = store.createWriterInTmp(10000L, - Compression.Algorithm.NONE, false, true, false, true); + StoreFileWriter writer = store.getStoreEngine() + .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L) + .compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true) + .includesTag(false).shouldDropBehind(true)); HFileContext hFileContext = writer.getHFileWriter().getFileContext(); assertArrayEquals(family, hFileContext.getColumnFamily()); assertArrayEquals(table, hFileContext.getTableName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java index 737ba46b7073..3087c8267b0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java @@ -245,7 +245,7 @@ public void testCleanMergeReference() throws Exception { TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo); int count = 0; for(ColumnFamilyDescriptor colFamily : columnFamilies) { - count += hrfs.getStoreFiles(colFamily.getName()).size(); + count += hrfs.getStoreFiles(colFamily.getNameAsString()).size(); } ADMIN.compactRegion(mergedRegionInfo.getRegionName()); // clean up the merged region store files @@ -254,7 +254,7 @@ public void testCleanMergeReference() throws Exception { int newcount = 0; while (EnvironmentEdgeManager.currentTime() < timeout) { for(ColumnFamilyDescriptor colFamily : columnFamilies) { - newcount += hrfs.getStoreFiles(colFamily.getName()).size(); + newcount += hrfs.getStoreFiles(colFamily.getNameAsString()).size(); } if(newcount > count) { break; @@ -273,7 +273,7 @@ public void testCleanMergeReference() throws Exception { while (EnvironmentEdgeManager.currentTime() < timeout) { int newcount1 = 0; for(ColumnFamilyDescriptor colFamily : columnFamilies) { - newcount1 += hrfs.getStoreFiles(colFamily.getName()).size(); + newcount1 += hrfs.getStoreFiles(colFamily.getNameAsString()).size(); } if(newcount1 <= 1) { break; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java index 9141327d26d4..2fab050446ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -98,7 +97,7 @@ static class FailingHRegionFileSystem extends HRegionFileSystem { } @Override - public Collection getStoreFiles(String familyName) throws IOException { + public List getStoreFiles(String familyName) throws IOException { if (fail) { throw new IOException("simulating FS failure"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java index 4cb023f9b9d6..121f8301751f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java @@ -29,7 +29,6 @@ import java.util.Random; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -128,13 +127,12 @@ public void testScannerCloseAndUpdateReadersWithMemstoreScanner() throws Excepti p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val")); region.put(p); HStore store = region.getStore(fam); - ReentrantReadWriteLock lock = store.lock; // use the lock to manually get a new memstore scanner. this is what // HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here //since it is just a testcase). - lock.readLock().lock(); + store.getStoreEngine().readLock(); final List memScanners = store.memstore.getScanners(Long.MAX_VALUE); - lock.readLock().unlock(); + store.getStoreEngine().readUnlock(); Thread closeThread = new Thread() { public void run() { // close should be completed diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java index f5330f6faa42..eb0b1c1ca694 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java @@ -118,8 +118,10 @@ private static HStoreFile createFile() throws Exception { } private static TestStoreEngine createEngine(Configuration conf) throws Exception { + HRegion region = mock(HRegion.class); HStore store = mock(HStore.class); when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO); + when(store.getHRegion()).thenReturn(region); CellComparatorImpl kvComparator = mock(CellComparatorImpl.class); return (TestStoreEngine) StoreEngine.create(store, conf, kvComparator); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java index d7b7ba760156..0ea82c57be1a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java @@ -22,9 +22,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -45,11 +42,13 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.StoreEngine; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner; @@ -110,10 +109,9 @@ private DateTieredCompactor createCompactor(StoreFileWritersCapture writers, when(store.areWritesEnabled()).thenReturn(true); when(store.getFileSystem()).thenReturn(mock(FileSystem.class)); when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build()); - when(store.createWriterInTmp(anyLong(), any(), anyBoolean(), - anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); - when(store.createWriterInTmp(anyLong(), any(), anyBoolean(), - anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers); + StoreEngine storeEngine = mock(StoreEngine.class); + when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers); + when(store.getStoreEngine()).thenReturn(storeEngine); when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR); OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles); when(store.getMaxSequenceId()).thenReturn(maxSequenceId); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 5eb94ac1d448..2841695cd530 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -27,7 +27,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -58,6 +57,7 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.regionserver.StoreEngine; import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; @@ -828,12 +829,9 @@ private StripeCompactor createCompactor() throws Exception { when(info.getRegionNameAsString()).thenReturn("testRegion"); when(store.getColumnFamilyDescriptor()).thenReturn(familyDescriptor); when(store.getRegionInfo()).thenReturn(info); - when( - store.createWriterInTmp(anyLong(), any(), anyBoolean(), - anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); - when( - store.createWriterInTmp(anyLong(), any(), anyBoolean(), - anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers); + StoreEngine storeEngine = mock(StoreEngine.class); + when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers); + when(store.getStoreEngine()).thenReturn(storeEngine); Configuration conf = HBaseConfiguration.create(); conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java index e49174e6afe3..ae59c74bad95 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java @@ -21,9 +21,6 @@ import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -43,10 +40,12 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.StoreEngine; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner; import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture; @@ -209,10 +208,9 @@ private StripeCompactor createCompactor(StoreFileWritersCapture writers, KeyValu when(store.areWritesEnabled()).thenReturn(true); when(store.getFileSystem()).thenReturn(mock(FileSystem.class)); when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build()); - when(store.createWriterInTmp(anyLong(), any(), anyBoolean(), - anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); - when(store.createWriterInTmp(anyLong(), any(), anyBoolean(), - anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyString())).thenAnswer(writers); + StoreEngine storeEngine = mock(StoreEngine.class); + when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers); + when(store.getStoreEngine()).thenReturn(storeEngine); when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR); return new StripeCompactor(conf, store) {