diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 01fe0005f048..15f0a73a9df9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; @@ -286,7 +285,6 @@ private void calculateMobLengthMap(SetMultimap mobRefs) throw * * @param fd File details * @param scanner Where to read from. - * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint * @param throughputController The compaction throughput controller. @@ -295,7 +293,7 @@ private void calculateMobLengthMap(SetMultimap mobRefs) throw * @return Whether compaction ended; false if it was interrupted for any reason. */ @Override - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, boolean major, int numofFilesToCompact) throws IOException { long bytesWrittenProgressForLog = 0; @@ -665,7 +663,7 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId @Override - protected List commitWriter(StoreFileWriter writer, FileDetails fd, + protected List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = Lists.newArrayList(writer.getPath()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java index f250304952a3..82c3867c103c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java @@ -110,7 +110,11 @@ public List abortWriters() { return paths; } - protected abstract Collection writers(); + /** + * Returns all writers. This is used to prevent deleting currently writen storefiles + * during cleanup. + */ + public abstract Collection writers(); /** * Subclasses override this method to be called at the end of a successful sequence of append; all diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java new file mode 100644 index 000000000000..0c4807d8badc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This Chore, every time it runs, will clear the unsused HFiles in the data + * folder. + */ +@InterfaceAudience.Private +public class BrokenStoreFileCleaner extends ScheduledChore { + private static final Logger LOG = LoggerFactory.getLogger(BrokenStoreFileCleaner.class); + public static final String BROKEN_STOREFILE_CLEANER_ENABLED = + "hbase.region.broken.storefilecleaner.enabled"; + public static final boolean DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED = false; + public static final String BROKEN_STOREFILE_CLEANER_TTL = + "hbase.region.broken.storefilecleaner.ttl"; + public static final long DEFAULT_BROKEN_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; //12h + public static final String BROKEN_STOREFILE_CLEANER_DELAY = + "hbase.region.broken.storefilecleaner.delay"; + public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; //2h + public static final String BROKEN_STOREFILE_CLEANER_DELAY_JITTER = + "hbase.region.broken.storefilecleaner.delay.jitter"; + public static final double DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER = 0.25D; + public static final String BROKEN_STOREFILE_CLEANER_PERIOD = + "hbase.region.broken.storefilecleaner.period"; + public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; //6h + + private HRegionServer regionServer; + private final AtomicBoolean enabled = new AtomicBoolean(true); + private long fileTtl; + + public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper, + Configuration conf, HRegionServer regionServer) { + super("BrokenStoreFileCleaner", stopper, period, delay); + this.regionServer = regionServer; + setEnabled( + conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED)); + fileTtl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL); + } + + public boolean setEnabled(final boolean enabled) { + return this.enabled.getAndSet(enabled); + } + + public boolean getEnabled() { + return this.enabled.get(); + } + + @Override + public void chore() { + if (getEnabled()) { + long start = EnvironmentEdgeManager.currentTime(); + AtomicLong deletedFiles = new AtomicLong(0); + AtomicLong failedDeletes = new AtomicLong(0); + for (HRegion region : regionServer.getRegions()) { + for (HStore store : region.getStores()) { + //only do cleanup in stores not using tmp directories + if (store.getStoreEngine().requireWritingToTmpDirFirst()) { + continue; + } + Path storePath = + new Path(region.getRegionFileSystem().getRegionDir(), store.getColumnFamilyName()); + + try { + List fsStoreFiles = + Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath)); + fsStoreFiles.forEach( + file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes)); + } catch (IOException e) { + LOG.warn("Failed to list files in {}, cleanup is skipped there",storePath); + continue; + } + } + } + LOG.debug( + "BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed " + + "to delete {}", + regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start, + deletedFiles.get(), failedDeletes.get()); + } else { + LOG.trace("Broken storefile Cleaner chore disabled! Not cleaning."); + } + } + + private void cleanFileIfNeeded(FileStatus file, HStore store, + AtomicLong deletedFiles, AtomicLong failedDeletes) { + if(file.isDirectory()){ + LOG.trace("This is a Directory {}, skip cleanup", file.getPath()); + return; + } + + if(!validate(file.getPath())){ + LOG.trace("Invalid file {}, skip cleanup", file.getPath()); + return; + } + + if(!isOldEnough(file)){ + LOG.trace("Fresh file {}, skip cleanup", file.getPath()); + return; + } + + if(isActiveStorefile(file, store)){ + LOG.trace("Actively used storefile file {}, skip cleanup", file.getPath()); + return; + } + + // Compacted files can still have readers and are cleaned by a separate chore, so they have to + // be skipped here + if(isCompactedFile(file, store)){ + LOG.trace("Cleanup is done by a different chore for file {}, skip cleanup", file.getPath()); + return; + } + + if(isCompactionResultFile(file, store)){ + LOG.trace("The file is the result of an ongoing compaction {}, skip cleanup", file.getPath()); + return; + } + + deleteFile(file, store, deletedFiles, failedDeletes); + } + + private boolean isCompactionResultFile(FileStatus file, HStore store) { + return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath()); + } + + // Compacted files can still have readers and are cleaned by a separate chore, so they have to + // be skipped here + private boolean isCompactedFile(FileStatus file, HStore store) { + return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream() + .anyMatch(sf -> sf.getPath().equals(file.getPath())); + } + + private boolean isActiveStorefile(FileStatus file, HStore store) { + return store.getStoreEngine().getStoreFileManager().getStorefiles().stream() + .anyMatch(sf -> sf.getPath().equals(file.getPath())); + } + + boolean validate(Path file) { + if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) { + return true; + } + return StoreFileInfo.validateStoreFileName(file.getName()); + } + + boolean isOldEnough(FileStatus file){ + return file.getModificationTime() + fileTtl < EnvironmentEdgeManager.currentTime(); + } + + private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles, + AtomicLong failedDeletes) { + Path filePath = file.getPath(); + LOG.debug("Removing {} from store", filePath); + try { + boolean success = store.getFileSystem().delete(filePath, false); + if (!success) { + failedDeletes.incrementAndGet(); + LOG.warn("Attempted to delete:" + filePath + + ", but couldn't. Attempt to delete on next pass."); + } + else{ + deletedFiles.incrementAndGet(); + } + } catch (IOException e) { + e = e instanceof RemoteException ? + ((RemoteException)e).unwrapRemoteException() : e; + LOG.warn("Error while deleting: " + filePath, e); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java index 8201cb152c01..1e10eb2db231 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java @@ -71,7 +71,7 @@ public void append(Cell cell) throws IOException { } @Override - protected Collection writers() { + public Collection writers() { return lowerBoundary2Writer.values(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 811002561f0a..8920471a86ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -609,7 +609,7 @@ public Path commitDaughterRegion(final RegionInfo regionInfo, List allRegi writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent); HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false); - insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs); + insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs); } return regionDir; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index ef4158935342..92cc14000f54 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -432,6 +432,8 @@ public class HRegionServer extends HBaseServerBase */ final ServerNonceManager nonceManager; + private BrokenStoreFileCleaner brokenStoreFileCleaner; + @InterfaceAudience.Private CompactedHFilesDischarger compactedFileDischarger; @@ -1835,6 +1837,9 @@ private void startServices() throws IOException { if (this.slowLogTableOpsChore != null) { choreService.scheduleChore(slowLogTableOpsChore); } + if (this.brokenStoreFileCleaner != null) { + choreService.scheduleChore(brokenStoreFileCleaner); + } // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. @@ -1914,6 +1919,22 @@ private void initializeThreads() { this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, onlyMetaRefresh, this, this); } + + int brokenStoreFileCleanerPeriod = conf.getInt( + BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, + BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD); + int brokenStoreFileCleanerDelay = conf.getInt( + BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, + BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY); + double brokenStoreFileCleanerDelayJitter = conf.getDouble( + BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER, + BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER); + double jitterRate = (RandomUtils.nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter; + long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate); + this.brokenStoreFileCleaner = + new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue), + brokenStoreFileCleanerPeriod, this, conf, this); + registerConfigurationObservers(); } @@ -3488,6 +3509,11 @@ protected boolean clusterMode() { return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false); } + @InterfaceAudience.Private + public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){ + return brokenStoreFileCleaner; + } + @Override protected void stopChores() { shutdownChore(nonceManagerChore); @@ -3498,5 +3524,6 @@ protected void stopChores() { shutdownChore(storefileRefresher); shutdownChore(fsUtilizationChore); shutdownChore(slowLogTableOpsChore); + shutdownChore(brokenStoreFileCleaner); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index a5b5733935be..abba6a2be0b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1156,6 +1156,12 @@ protected List doCompaction(CompactionRequestImpl cr, } } replaceStoreFiles(filesToCompact, sfs, true); + + // This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the + // CleanerChore know that compaction is done and the file can be cleaned up if compaction + // have failed. + storeEngine.resetCompactionWriter(); + if (cr.isMajor()) { majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index 04867295c3ae..ddb52d10ffd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -42,9 +42,11 @@ import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -532,6 +534,25 @@ public void removeCompactedFiles(Collection compactedFiles) { } } + /** + * Whether the implementation of the used storefile tracker requires you to write to temp + * directory first, i.e, does not allow broken store files under the actual data directory. + */ + public boolean requireWritingToTmpDirFirst() { + return storeFileTracker.requireWritingToTmpDirFirst(); + } + + /** + * Resets the compaction writer when the new file is committed and used as active storefile. + * This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the + * CleanerChore know that compaction is done and the file can be cleaned up if compaction + * have failed. Currently called in + * @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List) + */ + public void resetCompactionWriter(){ + compactor.resetWriter(); + } + @RestrictedApi(explanation = "Should only be called in TestHStore", link = "", allowedOnPath = ".*/TestHStore.java") ReadWriteLock getLock() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java index fc0598d89ac0..a4e943ac8b04 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java @@ -58,7 +58,7 @@ public void setNoStripeMetadata() { } @Override - protected Collection writers() { + public Collection writers() { return existingWriters; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java index 533be176e7a7..19b7a98627e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java @@ -68,7 +68,7 @@ public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy) } @Override - protected void abortWriter(T writer) throws IOException { + protected void abortWriter() throws IOException { FileSystem fs = store.getFileSystem(); for (Path leftoverFile : writer.abortWriters()) { try { @@ -79,5 +79,7 @@ protected void abortWriter(T writer) throws IOException { e); } } + //this step signals that the target file is no longer writen and can be cleaned up + writer = null; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 47ef0f290251..0ee7d349e4c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -25,9 +25,12 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -37,6 +40,7 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileInfo; +import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter; import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; import org.apache.hadoop.hbase.regionserver.HStore; @@ -92,6 +96,8 @@ public abstract class Compactor { private final boolean dropCacheMajor; private final boolean dropCacheMinor; + protected T writer = null; + //TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(Configuration conf, HStore store) { this.conf = conf; @@ -324,7 +330,6 @@ protected final List compact(final CompactionRequestImpl request, // Find the smallest read point across all the Scanners. long smallestReadPoint = getSmallestReadPoint(); - T writer = null; boolean dropCache; if (request.isMajor() || request.isAllFiles()) { dropCache = this.dropCacheMajor; @@ -348,8 +353,13 @@ protected final List compact(final CompactionRequestImpl request, smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } + if (writer != null){ + LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream() + .map(n -> n.toString()) + .collect(Collectors.joining(", ", "{ ", " }"))); + } writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor()); - finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, + finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId, throughputController, request.isAllFiles(), request.getFiles().size()); if (!finished) { throw new InterruptedIOException("Aborting compaction of store " + store + " in region " @@ -369,24 +379,23 @@ protected final List compact(final CompactionRequestImpl request, Closeables.close(scanner, true); } if (!finished && writer != null) { - abortWriter(writer); + abortWriter(); } } assert finished : "We should have exited the method on all error paths"; assert writer != null : "Writer should be non-null if no error"; - return commitWriter(writer, fd, request); + return commitWriter(fd, request); } - protected abstract List commitWriter(T writer, FileDetails fd, + protected abstract List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException; - protected abstract void abortWriter(T writer) throws IOException; + protected abstract void abortWriter() throws IOException; /** * Performs the compaction. * @param fd FileDetails of cell sink writer * @param scanner Where to read from. - * @param writer Where to write to. * @param smallestReadPoint Smallest read point. * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= * smallestReadPoint @@ -394,7 +403,7 @@ protected abstract List commitWriter(T writer, FileDetails fd, * @param numofFilesToCompact the number of files to compact * @return Whether compaction ended; false if it was interrupted for some reason. */ - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, boolean major, int numofFilesToCompact) throws IOException { assert writer instanceof ShipperListener; @@ -537,4 +546,24 @@ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow); } + + public List getCompactionTargets(){ + if (writer == null){ + return Collections.emptyList(); + } + synchronized (writer){ + if (writer instanceof StoreFileWriter){ + return Arrays.asList(((StoreFileWriter)writer).getPath()); + } + return ((AbstractMultiFileWriter)writer).writers().stream().map(sfw -> sfw.getPath()).collect( + Collectors.toList()); + } + } + + /** + * Reset the Writer when the new storefiles were successfully added + */ + public void resetWriter(){ + writer = null; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index fd5433082903..43e037c5e702 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -79,8 +79,10 @@ public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetai } @Override - protected List commitWriter(DateTieredMultiFileWriter writer, FileDetails fd, + protected List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException { - return writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles()); + List pathList = + writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles()); + return pathList; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index afa2429cb6e8..ad2384a97ab8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -63,7 +63,7 @@ public List compact(final CompactionRequestImpl request, } @Override - protected List commitWriter(StoreFileWriter writer, FileDetails fd, + protected List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = Lists.newArrayList(writer.getPath()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); @@ -72,12 +72,19 @@ protected List commitWriter(StoreFileWriter writer, FileDetails fd, } @Override + protected void abortWriter() throws IOException { + abortWriter(writer); + } + protected void abortWriter(StoreFileWriter writer) throws IOException { Path leftoverFile = writer.getPath(); try { writer.close(); } catch (IOException e) { LOG.warn("Failed to close the writer after an unfinished compaction.", e); + } finally { + //this step signals that the target file is no longer writen and can be cleaned up + writer = null; } try { store.getFileSystem().delete(leftoverFile, false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 547555e3812e..060a11b41fe6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -125,7 +125,7 @@ public StripeMultiFileWriter createWriter(InternalScanner scanner, FileDetails f } @Override - protected List commitWriter(StripeMultiFileWriter writer, FileDetails fd, + protected List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException { List newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles()); assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata."; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java index 079b59ba0274..8d9b66e53d2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java @@ -95,7 +95,7 @@ public List load() throws IOException { } @Override - protected boolean requireWritingToTmpDirFirst() { + public boolean requireWritingToTmpDirFirst() { return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java index a6648f291e43..53a474d3bde7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java @@ -57,7 +57,7 @@ public List load() throws IOException { } @Override - protected boolean requireWritingToTmpDirFirst() { + public boolean requireWritingToTmpDirFirst() { // Returns true if either of the two StoreFileTracker returns true. // For example, if we want to migrate from a tracker implementation which can ignore the broken // files under data directory to a tracker implementation which can not, if we still allow diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java index f56a0dde4741..aabbe8d87494 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java @@ -88,4 +88,10 @@ void replace(Collection compactedFiles, Collection * @param builder The table descriptor builder for the given table. */ TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder); + + /** + * Whether the implementation of this tracker requires you to write to temp directory first, i.e, + * does not allow broken store files under the actual data directory. + */ + boolean requireWritingToTmpDirFirst(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java index b6de32b09a0d..db10f4db4c4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java @@ -173,12 +173,6 @@ public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) th return builder.build(); } - /** - * Whether the implementation of this tracker requires you to write to temp directory first, i.e, - * does not allow broken store files under the actual data directory. - */ - protected abstract boolean requireWritingToTmpDirFirst(); - protected abstract void doAddNewStoreFiles(Collection newFiles) throws IOException; protected abstract void doAddCompactionResults(Collection compactedFiles, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java index 7c75e4658305..0f8a95fc7648 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/RestoreSnapshotHelper.java @@ -549,7 +549,7 @@ private void restoreRegion(final RegionInfo regionInfo, " of snapshot=" + snapshotName+ " to region=" + regionInfo.getEncodedName() + " table=" + tableName); String fileName = restoreStoreFile(familyDir, regionInfo, storeFile, createBackRefs); - //mark the reference file to be added to tracker + // mark the reference file to be added to tracker filesToTrack.add(new StoreFileInfo(conf, fs, new Path(familyDir, fileName), true)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java index 50530dad69e7..d178d564f650 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; -import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -89,7 +88,7 @@ public FaultyMobStoreCompactor(Configuration conf, HStore store) { } @Override - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, + protected boolean performCompaction(FileDetails fd, InternalScanner scanner, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, boolean major, int numofFilesToCompact) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java new file mode 100644 index 000000000000..78755a4fe772 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import java.io.IOException; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, RegionServerTests.class }) +public class TestBrokenStoreFileCleaner { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBrokenStoreFileCleaner.class); + + private final HBaseTestingUtil testUtil = new HBaseTestingUtil(); + private final static byte[] fam = Bytes.toBytes("cf_1"); + private final static byte[] qual1 = Bytes.toBytes("qf_1"); + private final static byte[] val = Bytes.toBytes("val"); + private final static String junkFileName = "409fad9a751c4e8c86d7f32581bdc156"; + TableName tableName; + + + @Before + public void setUp() throws Exception { + testUtil.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL, + "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker"); + testUtil.getConfiguration() + .set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_ENABLED, "true"); + testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "0"); + testUtil.getConfiguration() + .set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, "15000000"); + testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, "0"); + testUtil.startMiniCluster(1); + } + + @After + public void tearDown() throws Exception { + testUtil.deleteTable(tableName); + testUtil.shutdownMiniCluster(); + } + + @Test + public void testDeletingJunkFile() throws Exception { + tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile"); + createTableWithData(tableName); + + HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0); + ServerName sn = testUtil.getMiniHBaseCluster() + .getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); + HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn); + BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner(); + + //create junk file + HStore store = region.getStore(fam); + Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName()); + Path junkFilePath = new Path(cfPath, junkFileName); + + FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath); + junkFileOS.writeUTF("hello"); + junkFileOS.close(); + + int storeFiles = store.getStorefilesCount(); + assertTrue(storeFiles > 0); + + //verify the file exist before the chore and missing afterwards + assertTrue(store.getFileSystem().exists(junkFilePath)); + cleaner.chore(); + assertFalse(store.getFileSystem().exists(junkFilePath)); + + //verify no storefile got deleted + int currentStoreFiles = store.getStorefilesCount(); + assertEquals(currentStoreFiles, storeFiles); + + } + + @Test + public void testSkippingCompactedFiles() throws Exception { + tableName = TableName.valueOf(getClass().getSimpleName() + "testSkippningCompactedFiles"); + createTableWithData(tableName); + + HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0); + + ServerName sn = testUtil.getMiniHBaseCluster() + .getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); + HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn); + BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner(); + + //run major compaction to generate compaced files + region.compact(true); + + //make sure there are compacted files + HStore store = region.getStore(fam); + int compactedFiles = store.getCompactedFilesCount(); + assertTrue(compactedFiles > 0); + + cleaner.chore(); + + //verify none of the compacted files were deleted + int existingCompactedFiles = store.getCompactedFilesCount(); + assertEquals(compactedFiles, existingCompactedFiles); + + //verify adding a junk file does not break anything + Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName()); + Path junkFilePath = new Path(cfPath, junkFileName); + + FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath); + junkFileOS.writeUTF("hello"); + junkFileOS.close(); + + assertTrue(store.getFileSystem().exists(junkFilePath)); + cleaner.setEnabled(true); + cleaner.chore(); + assertFalse(store.getFileSystem().exists(junkFilePath)); + + //verify compacted files are still intact + existingCompactedFiles = store.getCompactedFilesCount(); + assertEquals(compactedFiles, existingCompactedFiles); + } + + @Test + public void testJunkFileTTL() throws Exception { + tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile"); + createTableWithData(tableName); + + HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0); + ServerName sn = testUtil.getMiniHBaseCluster() + .getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName()); + HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn); + + //create junk file + HStore store = region.getStore(fam); + Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName()); + Path junkFilePath = new Path(cfPath, junkFileName); + + FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath); + junkFileOS.writeUTF("hello"); + junkFileOS.close(); + + int storeFiles = store.getStorefilesCount(); + assertTrue(storeFiles > 0); + + //verify the file exist before the chore + assertTrue(store.getFileSystem().exists(junkFilePath)); + + //set a 5 sec ttl + rs.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "5000"); + BrokenStoreFileCleaner cleaner = new BrokenStoreFileCleaner(15000000, + 0, rs, rs.getConfiguration(), rs); + cleaner.chore(); + //file is still present after chore run + assertTrue(store.getFileSystem().exists(junkFilePath)); + Thread.sleep(5000); + cleaner.chore(); + assertFalse(store.getFileSystem().exists(junkFilePath)); + + //verify no storefile got deleted + int currentStoreFiles = store.getStorefilesCount(); + assertEquals(currentStoreFiles, storeFiles); + } + + private Table createTableWithData(TableName tableName) throws IOException { + Table table = testUtil.createTable(tableName, fam); + try { + for (int i = 1; i < 10; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + table.put(p); + } + // flush them + testUtil.getAdmin().flush(tableName); + for (int i = 11; i < 20; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + table.put(p); + } + // flush them + testUtil.getAdmin().flush(tableName); + for (int i = 21; i < 30; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + table.put(p); + } + // flush them + testUtil.getAdmin().flush(tableName); + } catch (IOException e) { + table.close(); + throw e; + } + return table; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java index e0fca1fea7c5..6a0a8baa9ded 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java @@ -128,13 +128,13 @@ public MyCompactor(Configuration conf, HStore store) { } @Override - protected List commitWriter(StoreFileWriter writer, FileDetails fd, + protected List commitWriter(FileDetails fd, CompactionRequestImpl request) throws IOException { HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer; Cell cell = writerImpl.getLastCell(); // The cell should be backend with an KeyOnlyKeyValue. IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue); - return super.commitWriter(writer, fd, request); + return super.commitWriter(fd, request); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java index b30ca47772cb..98189729ac75 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java @@ -47,7 +47,6 @@ public TestStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreC } else { LOG.info("ctx.getRegionFileSystem() returned null. Leaving storeId null."); } - } @Override