forked from apache/hbase
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
HBASE-25988 Store the store file list by a file (apache#3578)
Signed-off-by: Wellington Chevreuil <[email protected]>
- Loading branch information
Showing
9 changed files
with
451 additions
and
19 deletions.
There are no files selected for viewing
36 changes: 36 additions & 0 deletions
36
hbase-protocol-shaded/src/main/protobuf/StoreFileTracker.proto
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
syntax = "proto2"; | ||
// This file contains protocol buffers that are used for store file tracker. | ||
package hbase.pb; | ||
|
||
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; | ||
option java_outer_classname = "StoreFileTrackerProtos"; | ||
option java_generic_services = true; | ||
option java_generate_equals_and_hash = true; | ||
option optimize_for = SPEED; | ||
|
||
message StoreFileEntry { | ||
required string name = 1; | ||
required uint64 size = 2; | ||
} | ||
|
||
message StoreFileList { | ||
required uint64 timestamp = 1; | ||
repeated StoreFileEntry store_file = 2; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
142 changes: 142 additions & 0 deletions
142
...java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.hadoop.hbase.regionserver.storefiletracker; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.fs.FileSystem; | ||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.hbase.regionserver.StoreContext; | ||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo; | ||
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; | ||
import org.apache.yetus.audience.InterfaceAudience; | ||
|
||
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry; | ||
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList; | ||
|
||
/** | ||
* A file based store file tracker. | ||
* <p/> | ||
* For this tracking way, the store file list will be persistent into a file, so we can write the | ||
* new store files directly to the final data directory, as we will not load the broken files. This | ||
* will greatly reduce the time for flush and compaction on some object storages as a rename is | ||
* actual a copy on them. And it also avoid listing when loading store file list, which could also | ||
* speed up the loading of store files as listing is also not a fast operation on most object | ||
* storages. | ||
*/ | ||
@InterfaceAudience.Private | ||
public class FileBasedStoreFileTracker extends StoreFileTrackerBase { | ||
|
||
private final StoreFileListFile backedFile; | ||
|
||
private final Map<String, StoreFileInfo> storefiles = new HashMap<>(); | ||
|
||
public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) { | ||
super(conf, isPrimaryReplica, ctx); | ||
backedFile = new StoreFileListFile(ctx); | ||
} | ||
|
||
@Override | ||
public List<StoreFileInfo> load() throws IOException { | ||
StoreFileList list = backedFile.load(); | ||
if (list == null) { | ||
return Collections.emptyList(); | ||
} | ||
FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); | ||
List<StoreFileInfo> infos = new ArrayList<>(); | ||
for (StoreFileEntry entry : list.getStoreFileList()) { | ||
infos.add(ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, ctx.getRegionInfo(), | ||
ctx.getRegionFileSystem().getRegionInfoForFS(), ctx.getFamily().getNameAsString(), | ||
new Path(ctx.getFamilyStoreDirectoryPath(), entry.getName()))); | ||
} | ||
// In general, for primary replica, the load method should only be called once when | ||
// initialization, so we do not need synchronized here. And for secondary replicas, though the | ||
// load method could be called multiple times, we will never call other methods so no | ||
// synchronized is also fine. | ||
// But we have a refreshStoreFiles method in the Region interface, which can be called by CPs, | ||
// and we have a RefreshHFilesEndpoint example to expose the refreshStoreFiles method as RPC, so | ||
// for safety, let's still keep the synchronized here. | ||
synchronized (storefiles) { | ||
for (StoreFileInfo info : infos) { | ||
storefiles.put(info.getPath().getName(), info); | ||
} | ||
} | ||
return infos; | ||
} | ||
|
||
@Override | ||
protected boolean requireWritingToTmpDirFirst() { | ||
return false; | ||
} | ||
|
||
private StoreFileEntry toStoreFileEntry(StoreFileInfo info) { | ||
return StoreFileEntry.newBuilder().setName(info.getPath().getName()).setSize(info.getSize()) | ||
.build(); | ||
} | ||
|
||
@Override | ||
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException { | ||
synchronized (storefiles) { | ||
StoreFileList.Builder builder = StoreFileList.newBuilder(); | ||
for (StoreFileInfo info : storefiles.values()) { | ||
builder.addStoreFile(toStoreFileEntry(info)); | ||
} | ||
for (StoreFileInfo info : newFiles) { | ||
builder.addStoreFile(toStoreFileEntry(info)); | ||
} | ||
backedFile.update(builder); | ||
for (StoreFileInfo info : newFiles) { | ||
storefiles.put(info.getPath().getName(), info); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles, | ||
Collection<StoreFileInfo> newFiles) throws IOException { | ||
Set<String> compactedFileNames = | ||
compactedFiles.stream().map(info -> info.getPath().getName()).collect(Collectors.toSet()); | ||
synchronized (storefiles) { | ||
StoreFileList.Builder builder = StoreFileList.newBuilder(); | ||
storefiles.forEach((name, info) -> { | ||
if (compactedFileNames.contains(name)) { | ||
return; | ||
} | ||
builder.addStoreFile(toStoreFileEntry(info)); | ||
}); | ||
for (StoreFileInfo info : newFiles) { | ||
builder.addStoreFile(toStoreFileEntry(info)); | ||
} | ||
backedFile.update(builder); | ||
for (String name : compactedFileNames) { | ||
storefiles.remove(name); | ||
} | ||
for (StoreFileInfo info : newFiles) { | ||
storefiles.put(info.getPath().getName(), info); | ||
} | ||
} | ||
} | ||
} |
142 changes: 142 additions & 0 deletions
142
...rc/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileListFile.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.hadoop.hbase.regionserver.storefiletracker; | ||
|
||
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
import org.apache.hadoop.fs.FSDataInputStream; | ||
import org.apache.hadoop.fs.FSDataOutputStream; | ||
import org.apache.hadoop.fs.FileSystem; | ||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.hbase.regionserver.StoreContext; | ||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; | ||
import org.apache.yetus.audience.InterfaceAudience; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; | ||
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; | ||
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; | ||
|
||
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList; | ||
|
||
/** | ||
* To fully avoid listing, here we use two files for tracking. When loading, we will try to read | ||
* both the two files, if only one exists, we will trust this one, if both exist, we will compare | ||
* the timestamp to see which one is newer and trust that one. And we will record in memory that | ||
* which one is trusted by us, and when we need to update the store file list, we will write to the | ||
* other file. | ||
* <p/> | ||
* So in this way, we could avoid listing when we want to load the store file list file. | ||
*/ | ||
@InterfaceAudience.Private | ||
class StoreFileListFile { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class); | ||
|
||
private static final String TRACK_FILE_DIR = ".filelist"; | ||
|
||
private static final String TRACK_FILE = "f1"; | ||
|
||
private static final String TRACK_FILE_ROTATE = "f2"; | ||
|
||
private final StoreContext ctx; | ||
|
||
private final Path trackFileDir; | ||
|
||
private final Path[] trackFiles = new Path[2]; | ||
|
||
// this is used to make sure that we do not go backwards | ||
private long prevTimestamp = -1; | ||
|
||
private int nextTrackFile = -1; | ||
|
||
StoreFileListFile(StoreContext ctx) { | ||
this.ctx = ctx; | ||
trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR); | ||
trackFiles[0] = new Path(trackFileDir, TRACK_FILE); | ||
trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE); | ||
} | ||
|
||
private StoreFileList load(Path path) throws IOException { | ||
FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); | ||
byte[] bytes; | ||
try (FSDataInputStream in = fs.open(path)) { | ||
bytes = ByteStreams.toByteArray(in); | ||
} | ||
// Read all the bytes and then parse it, so we will only throw InvalidProtocolBufferException | ||
// here. This is very important for upper layer to determine whether this is the normal case, | ||
// where the file does not exist or is incomplete. If there is another type of exception, the | ||
// upper layer should throw it out instead of just ignoring it, otherwise it will lead to data | ||
// loss. | ||
return StoreFileList.parseFrom(bytes); | ||
} | ||
|
||
private int select(StoreFileList[] lists) { | ||
if (lists[0] == null) { | ||
return 1; | ||
} | ||
if (lists[1] == null) { | ||
return 0; | ||
} | ||
return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1; | ||
} | ||
|
||
StoreFileList load() throws IOException { | ||
StoreFileList[] lists = new StoreFileList[2]; | ||
for (int i = 0; i < 2; i++) { | ||
try { | ||
lists[i] = load(trackFiles[i]); | ||
} catch (FileNotFoundException | InvalidProtocolBufferException e) { | ||
// this is normal case, so use info and do not log stacktrace | ||
LOG.info("Failed to load track file {}: {}", trackFiles[i], e); | ||
} | ||
} | ||
int winnerIndex = select(lists); | ||
if (lists[winnerIndex] != null) { | ||
nextTrackFile = 1 - winnerIndex; | ||
prevTimestamp = lists[winnerIndex].getTimestamp(); | ||
} else { | ||
nextTrackFile = 0; | ||
} | ||
return lists[winnerIndex]; | ||
} | ||
|
||
/** | ||
* We will set the timestamp in this method so just pass the builder in | ||
*/ | ||
void update(StoreFileList.Builder builder) throws IOException { | ||
Preconditions.checkState(nextTrackFile >= 0, "should call load first before calling update"); | ||
FileSystem fs = ctx.getRegionFileSystem().getFileSystem(); | ||
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime()); | ||
try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) { | ||
builder.setTimestamp(timestamp).build().writeTo(out); | ||
} | ||
// record timestamp | ||
prevTimestamp = timestamp; | ||
// rotate the file | ||
nextTrackFile = 1 - nextTrackFile; | ||
try { | ||
fs.delete(trackFiles[nextTrackFile], false); | ||
} catch (IOException e) { | ||
// we will create new file with overwrite = true, so not a big deal here, only for speed up | ||
// loading as we do not need to read this file when loading(we will hit FileNotFoundException) | ||
LOG.debug("failed to delete old track file {}, not a big deal, just ignore", e); | ||
} | ||
} | ||
} |
Oops, something went wrong.