Skip to content

Commit

Permalink
HBASE-24289 Heterogeneous Storage for Date Tiered Compaction (#1730)
Browse files Browse the repository at this point in the history
Signed-off-by: Guanghao Zhang <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
pengmq1 authored Jun 30, 2020
1 parent 982bd5f commit be57e40
Show file tree
Hide file tree
Showing 18 changed files with 539 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# Heterogeneous Storage for Date Tiered Compaction

## Objective

Support DateTiredCompaction([HBASE-15181](https://issues.apache.org/jira/browse/HBASE-15181))
for cold and hot data separation, support different storage policies for different time periods
of data to get better performance, for example, we can configure the data of last 1 month in SSD,
and 1 month ago data was in HDD.

+ Date Tiered Compaction (DTCP) is based on date tiering (date-aware), we hope to support
the separation of cold and hot data, heterogeneous storage. Set different storage
policies (in HDFS) for data in different time windows.
+ DTCP designs different windows, and we can classify the windows according to
the timestamps of the windows. For example: HOT window, WARM window, COLD window.
+ DTCP divides storefiles into different windows, and performs minor Compaction within
a time window. The storefile generated by Compaction will use the storage strategy of
this window. For example, if a window is a HOT window, the storefile generated by compaction
can be stored on the SSD. There are already WAL and the entire CF support storage policy
(HBASE-12848, HBASE-14061), our goal is to achieve cold and hot separation in one CF or
a region, using different storage policies.

## Definition of hot and cold data

Usually the data of the last 3 days can be defined as `HOT data`, hot age = 3 days.
If the written timestamp of the data(Cell) is > (timestamp now - hot age), we think the data is hot data.
Warm age can be defined in the same way. Only one type of data is allowed.
If data timestamp < (now - warm age), we consider it is COLD.
```
if timestamp >= (now - hot age) , HOT data
else if timestamp >= (now - warm age), WARM data
else COLD data
```

## Time window
When given a time now, it is the time when the compaction occurs. Each window and the size of
the window are automatically calculated by DTCP, and the window boundary is rounded according
to the base size.
Assuming that the base window size is 1 hour, and each tier has 3 windows, the current time is
between 12:00 and 13:00. We have defined three types of winow (`HOT, WARM, COLD`). The type of
winodw is determined by the timestamp at the beginning of the window and the timestamp now.
As shown in the figure 1 below, the type of each window can be determined by the age range
(hot / warm / cold) where (now - window.startTimestamp) falls. Cold age can not need to be set,
the default Long.MAX, meaning that the window with a very early time stamp belongs to the
cold window.
![figure 1](https://raw.githubusercontent.com/pengmq1/images/master/F1-HDTCP.png "figure 1")

## Example configuration

| Configuration Key | value | Note |
|:---|:---:|:---|
|hbase.hstore.compaction.date.tiered.storage.policy.enable|true|if or not use storage policy for window. Default is false|
|hbase.hstore.compaction.date.tiered.hot.window.age.millis|3600000|hot data age
|hbase.hstore.compaction.date.tiered.hot.window.storage.policy|ALL_SSD|hot data storage policy, Corresponding HDFS storage policy
|hbase.hstore.compaction.date.tiered.warm.window.age.millis|20600000||
|hbase.hstore.compaction.date.tiered.warm.window.storage.policy|ONE_SSD||
|hbase.hstore.compaction.date.tiered.cold.window.storage.policy|HOT||

The original date tiered compaction related configuration has the same meaning and maintains
compatibility.
If `hbase.hstore.compaction.date.tiered.storage.policy.enable = false`. DTCP still follows the
original logic and has not changed.

## Storage strategy
HDFS provides the following storage policies, you can refer to
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html

|Policy ID | Policy Name | Block Placement (3 replicas)|
|:---|:---|:---|
|15|Lasy_Persist|RAM_DISK: 1, DISK: 2|
|12|All_SSD|SSD: 3|
|10|One_SSD|SSD: 1, DISK: 2|
|7|Hot (default)|DISK: 3|
|5|Warm|DISK: 1, ARCHIVE: 2|
|2|Cold|ARCHIVE: 3|

Date Tiered Compaction (DTCP) supports the output of multiple storefiles. We hope that these
storefiles can be set with different storage policies (in HDFS).
Therefore, through DateTieredMultiFileWriter to generate different StoreFileWriters with
storage policy to achieve the purpose.

## Why use different child tmp dir
Before StoreFileWriter writes a storefile, we can create different dirs in the tmp directory
of the region and set the corresponding storage policy for these dirs. This way
StoreFileWriter can write files to different dirs.

Since **HDFS** does not support the create file with the storage policy parameter
(See https://issues.apache.org/jira/browse/HDFS-13209 and now not support on hadoop 2.x),
and HDFS cannot set a storage policy for a file / dir path that does not yet exist.
When the compaction ends, the storefile path must exist at this time, and I set the
storage policy to Storefile.

But, in HDFS, when the file is written first, and then the storage policy is set.
The actual storage location of the data does not match the storage policy. For example,
write three copies of a file (1 block) in the HDD, then set storage policy is ALL_SSD,
but the data block will not be moved to the SSD immediately.
“HDFS wont move the file content across different block volumes on rename”. Data movement
requires the HDFS mover tool, or use HDFS SPS
(for details, see https://issues.apache.org/jira/browse/HDFS-10285), so in order to
avoid moving data blocks at the HDFS level, we can set the file parent directory to
the storage policy we need before writing data. The new file automatically inherits the
storage policy of the parent directory, and is written according to the correct disk
type when writing. So as to avoid later data movement.

Over time, the original HOT data will become WARM / COLD and no longer belong to the
HOT window. When the compaction occurs again, the data will be automatically downgraded,
such as from SSD to HDD. The compaction mechanism will generate a new file (write into HDD)
and delete it Old file (SSD).
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public final class HConstants {
*/
public static final String RECOVERED_HFILES_DIR = "recovered.hfiles";

/**
* Date Tiered Compaction tmp dir prefix name if use storage policy
*/
public static final String STORAGE_POLICY_PREFIX = "storage_policy_";

/**
* The first four bytes of Hadoop RPC connections
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen

public interface WriterFactory {
public StoreFileWriter createWriter() throws IOException;
default StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
throws IOException {
return createWriter();
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,33 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {

private final boolean needEmptyFile;

private final Map<Long, String> lowerBoundariesPolicies;

/**
* @param lowerBoundariesPolicies each window to storage policy map.
* @param needEmptyFile whether need to create an empty store file if we haven't written out
* anything.
*/
public DateTieredMultiFileWriter(List<Long> lowerBoundaries, boolean needEmptyFile) {
public DateTieredMultiFileWriter(List<Long> lowerBoundaries,
Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile) {
for (Long lowerBoundary : lowerBoundaries) {
lowerBoundary2Writer.put(lowerBoundary, null);
}
this.needEmptyFile = needEmptyFile;
this.lowerBoundariesPolicies = lowerBoundariesPolicies;
}

@Override
public void append(Cell cell) throws IOException {
Map.Entry<Long, StoreFileWriter> entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp());
StoreFileWriter writer = entry.getValue();
if (writer == null) {
writer = writerFactory.createWriter();
String lowerBoundaryStoragePolicy = lowerBoundariesPolicies.get(entry.getKey());
if (lowerBoundaryStoragePolicy != null) {
writer = writerFactory.createWriterWithStoragePolicy(lowerBoundaryStoragePolicy);
} else {
writer = writerFactory.createWriter();
}
lowerBoundary2Writer.put(entry.getKey(), writer);
}
writer.append(cell);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ public void forceSelect(CompactionRequestImpl request) {
public List<Path> compact(ThroughputController throughputController, User user)
throws IOException {
if (request instanceof DateTieredCompactionRequest) {
return compactor.compact(request, ((DateTieredCompactionRequest) request).getBoundaries(),
DateTieredCompactionRequest compactionRequest = (DateTieredCompactionRequest) request;
return compactor.compact(request, compactionRequest.getBoundaries(),
compactionRequest.getBoundariesPolicies(),
throughputController, user);
} else {
throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
Expand Down Expand Up @@ -1148,7 +1149,7 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
boolean shouldDropBehind) throws IOException {
return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
includesTag, shouldDropBehind, -1);
includesTag, shouldDropBehind, -1, HConstants.EMPTY_STRING);
}

/**
Expand All @@ -1162,7 +1163,8 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm
// compaction
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException {
boolean shouldDropBehind, long totalCompactedFilesSize, String fileStoragePolicy)
throws IOException {
// creating new cache config for each new writer
final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
if (isCompaction) {
Expand Down Expand Up @@ -1219,7 +1221,8 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm
.withFavoredNodes(favoredNodes)
.withFileContext(hFileContext)
.withShouldDropCacheBehind(shouldDropBehind)
.withCompactedFilesSupplier(this::getCompactedFiles);
.withCompactedFilesSupplier(this::getCompactedFiles)
.withFileStoragePolicy(fileStoragePolicy);
return builder.build();
}

Expand Down Expand Up @@ -1540,6 +1543,7 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
List<Path> newFiles) throws IOException {
// Do the steps necessary to complete the compaction.
setStoragePolicyFromFileName(newFiles);
List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs);
Expand Down Expand Up @@ -1569,6 +1573,18 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
return sfs;
}

// Set correct storage policy from the file name of DTCP.
// Rename file will not change the storage policy.
private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException {
String prefix = HConstants.STORAGE_POLICY_PREFIX;
for (Path newFile : newFiles) {
if (newFile.getParent().getName().startsWith(prefix)) {
CommonFSUtils.setStoragePolicy(fs.getFileSystem(), newFile,
newFile.getParent().getName().substring(prefix.length()));
}
}
}

private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr,
List<Path> newFiles, User user) throws IOException {
List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
Expand Down Expand Up @@ -437,6 +438,7 @@ public static class Builder {
private HFileContext fileContext;
private boolean shouldDropCacheBehind;
private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
private String fileStoragePolicy;

public Builder(Configuration conf, CacheConfig cacheConf,
FileSystem fs) {
Expand Down Expand Up @@ -518,6 +520,11 @@ public Builder withCompactedFilesSupplier(
return this;
}

public Builder withFileStoragePolicy(String fileStoragePolicy) {
this.fileStoragePolicy = fileStoragePolicy;
return this;
}

/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
Expand Down Expand Up @@ -547,6 +554,20 @@ public StoreFileWriter build() throws IOException {
CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);

if (filePath == null) {
// The stored file and related blocks will used the directory based StoragePolicy.
// Because HDFS DistributedFileSystem does not support create files with storage policy
// before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files
// satisfy the specific storage policy when writing. So as to avoid later data movement.
// We don't want to change whole temp dir to 'fileStoragePolicy'.
if (!Strings.isNullOrEmpty(fileStoragePolicy)) {
dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy);
if (!fs.exists(dir)) {
HRegionFileSystem.mkdirs(fs, conf, dir);
LOG.info(
"Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy);
}
CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy);
}
filePath = getUniqueFile(fs, dir);
if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
bloomType = BloomType.NONE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner s
public StoreFileWriter createWriter() throws IOException {
return createTmpWriter(fd, shouldDropBehind);
}

@Override
public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
throws IOException {
return createTmpWriter(fd, shouldDropBehind, fileStoragePolicy);
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
// It is ok here if storeScanner is null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@ public class CompactionConfiguration {
private static final Class<? extends CompactionWindowFactory>
DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS = ExponentialCompactionWindowFactory.class;

public static final String DATE_TIERED_STORAGE_POLICY_ENABLE_KEY =
"hbase.hstore.compaction.date.tiered.storage.policy.enable";
public static final String DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY =
"hbase.hstore.compaction.date.tiered.hot.window.age.millis";
public static final String DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY =
"hbase.hstore.compaction.date.tiered.hot.window.storage.policy";
public static final String DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY =
"hbase.hstore.compaction.date.tiered.warm.window.age.millis";
public static final String DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY =
"hbase.hstore.compaction.date.tiered.warm.window.storage.policy";
/** Windows older than warm age belong to COLD_WINDOW **/
public static final String DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY =
"hbase.hstore.compaction.date.tiered.cold.window.storage.policy";

Configuration conf;
StoreConfigInformation storeConfigInfo;

Expand All @@ -113,6 +127,12 @@ public class CompactionConfiguration {
private final String compactionPolicyForDateTieredWindow;
private final boolean dateTieredSingleOutputForMinorCompaction;
private final String dateTieredCompactionWindowFactory;
private final boolean dateTieredStoragePolicyEnable;
private long hotWindowAgeMillis;
private long warmWindowAgeMillis;
private String hotWindowStoragePolicy;
private String warmWindowStoragePolicy;
private String coldWindowStoragePolicy;

CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
this.conf = conf;
Expand Down Expand Up @@ -147,6 +167,13 @@ public class CompactionConfiguration {
this.dateTieredCompactionWindowFactory = conf.get(
DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS_KEY,
DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS.getName());
// for Heterogeneous Storage
dateTieredStoragePolicyEnable = conf.getBoolean(DATE_TIERED_STORAGE_POLICY_ENABLE_KEY, false);
hotWindowAgeMillis = conf.getLong(DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY, 86400000L);
hotWindowStoragePolicy = conf.get(DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY, "ALL_SSD");
warmWindowAgeMillis = conf.getLong(DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY, 604800000L);
warmWindowStoragePolicy = conf.get(DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY, "ONE_SSD");
coldWindowStoragePolicy = conf.get(DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY, "HOT");
LOG.info(toString());
}

Expand Down Expand Up @@ -293,4 +320,28 @@ public boolean useDateTieredSingleOutputForMinorCompaction() {
public String getDateTieredCompactionWindowFactory() {
return dateTieredCompactionWindowFactory;
}

public boolean isDateTieredStoragePolicyEnable() {
return dateTieredStoragePolicyEnable;
}

public long getHotWindowAgeMillis() {
return hotWindowAgeMillis;
}

public long getWarmWindowAgeMillis() {
return warmWindowAgeMillis;
}

public String getHotWindowStoragePolicy() {
return hotWindowStoragePolicy.trim().toUpperCase();
}

public String getWarmWindowStoragePolicy() {
return warmWindowStoragePolicy.trim().toUpperCase();
}

public String getColdWindowStoragePolicy() {
return coldWindowStoragePolicy.trim().toUpperCase();
}
}
Loading

0 comments on commit be57e40

Please sign in to comment.