Skip to content

Commit

Permalink
HBASE-25988 Store the store file list by a file (apache#3578)
Browse files Browse the repository at this point in the history
Signed-off-by: Wellington Chevreuil <[email protected]>
  • Loading branch information
Apache9 authored and apurtell committed Mar 18, 2022
1 parent ae15a1d commit e276da2
Show file tree
Hide file tree
Showing 9 changed files with 451 additions and 19 deletions.
36 changes: 36 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/StoreFileTracker.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
// This file contains protocol buffers that are used for store file tracker.
package hbase.pb;

option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "StoreFileTrackerProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message StoreFileEntry {
required string name = 1;
required uint64 size = 2;
}

message StoreFileList {
required uint64 timestamp = 1;
repeated StoreFileEntry store_file = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.Supplier;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.HeapSize;
Expand Down Expand Up @@ -109,6 +110,10 @@ public RegionCoprocessorHost getCoprocessorHost() {
return coprocessorHost;
}

public TableName getTableName() {
return getRegionInfo().getTable();
}

public RegionInfo getRegionInfo() {
return regionFileSystem.getRegionInfo();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ public StoreFlusher getStoreFlusher() {
return this.storeFlusher;
}

private StoreFileTracker createStoreFileTracker(HStore store) {
return StoreFileTrackerFactory.create(store.conf, store.getRegionInfo().getTable(),
store.isPrimaryReplicaStore(), store.getStoreContext());
private StoreFileTracker createStoreFileTracker(Configuration conf, HStore store) {
return StoreFileTrackerFactory.create(conf, store.isPrimaryReplicaStore(),
store.getStoreContext());
}

/**
Expand Down Expand Up @@ -206,7 +206,7 @@ protected final void createComponentsOnce(Configuration conf, HStore store,
this.ctx = store.getStoreContext();
this.coprocessorHost = store.getHRegion().getCoprocessorHost();
this.openStoreFileThreadPoolCreator = store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
this.storeFileTracker = createStoreFileTracker(store);
this.storeFileTracker = createStoreFileTracker(conf, store);
assert compactor != null && compactionPolicy != null && storeFileManager != null &&
storeFlusher != null && storeFileTracker != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -33,9 +32,9 @@
@InterfaceAudience.Private
class DefaultStoreFileTracker extends StoreFileTrackerBase {

public DefaultStoreFileTracker(Configuration conf, TableName tableName, boolean isPrimaryReplica,
public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica,
StoreContext ctx) {
super(conf, tableName, isPrimaryReplica, ctx);
super(conf, isPrimaryReplica, ctx);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;

/**
* A file based store file tracker.
* <p/>
* For this tracking way, the store file list will be persistent into a file, so we can write the
* new store files directly to the final data directory, as we will not load the broken files. This
* will greatly reduce the time for flush and compaction on some object storages as a rename is
* actual a copy on them. And it also avoid listing when loading store file list, which could also
* speed up the loading of store files as listing is also not a fast operation on most object
* storages.
*/
@InterfaceAudience.Private
public class FileBasedStoreFileTracker extends StoreFileTrackerBase {

private final StoreFileListFile backedFile;

private final Map<String, StoreFileInfo> storefiles = new HashMap<>();

public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
super(conf, isPrimaryReplica, ctx);
backedFile = new StoreFileListFile(ctx);
}

@Override
public List<StoreFileInfo> load() throws IOException {
StoreFileList list = backedFile.load();
if (list == null) {
return Collections.emptyList();
}
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
List<StoreFileInfo> infos = new ArrayList<>();
for (StoreFileEntry entry : list.getStoreFileList()) {
infos.add(ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, ctx.getRegionInfo(),
ctx.getRegionFileSystem().getRegionInfoForFS(), ctx.getFamily().getNameAsString(),
new Path(ctx.getFamilyStoreDirectoryPath(), entry.getName())));
}
// In general, for primary replica, the load method should only be called once when
// initialization, so we do not need synchronized here. And for secondary replicas, though the
// load method could be called multiple times, we will never call other methods so no
// synchronized is also fine.
// But we have a refreshStoreFiles method in the Region interface, which can be called by CPs,
// and we have a RefreshHFilesEndpoint example to expose the refreshStoreFiles method as RPC, so
// for safety, let's still keep the synchronized here.
synchronized (storefiles) {
for (StoreFileInfo info : infos) {
storefiles.put(info.getPath().getName(), info);
}
}
return infos;
}

@Override
protected boolean requireWritingToTmpDirFirst() {
return false;
}

private StoreFileEntry toStoreFileEntry(StoreFileInfo info) {
return StoreFileEntry.newBuilder().setName(info.getPath().getName()).setSize(info.getSize())
.build();
}

@Override
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
synchronized (storefiles) {
StoreFileList.Builder builder = StoreFileList.newBuilder();
for (StoreFileInfo info : storefiles.values()) {
builder.addStoreFile(toStoreFileEntry(info));
}
for (StoreFileInfo info : newFiles) {
builder.addStoreFile(toStoreFileEntry(info));
}
backedFile.update(builder);
for (StoreFileInfo info : newFiles) {
storefiles.put(info.getPath().getName(), info);
}
}
}

@Override
protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException {
Set<String> compactedFileNames =
compactedFiles.stream().map(info -> info.getPath().getName()).collect(Collectors.toSet());
synchronized (storefiles) {
StoreFileList.Builder builder = StoreFileList.newBuilder();
storefiles.forEach((name, info) -> {
if (compactedFileNames.contains(name)) {
return;
}
builder.addStoreFile(toStoreFileEntry(info));
});
for (StoreFileInfo info : newFiles) {
builder.addStoreFile(toStoreFileEntry(info));
}
backedFile.update(builder);
for (String name : compactedFileNames) {
storefiles.remove(name);
}
for (StoreFileInfo info : newFiles) {
storefiles.put(info.getPath().getName(), info);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;

import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;

/**
* To fully avoid listing, here we use two files for tracking. When loading, we will try to read
* both the two files, if only one exists, we will trust this one, if both exist, we will compare
* the timestamp to see which one is newer and trust that one. And we will record in memory that
* which one is trusted by us, and when we need to update the store file list, we will write to the
* other file.
* <p/>
* So in this way, we could avoid listing when we want to load the store file list file.
*/
@InterfaceAudience.Private
class StoreFileListFile {

private static final Logger LOG = LoggerFactory.getLogger(StoreFileListFile.class);

private static final String TRACK_FILE_DIR = ".filelist";

private static final String TRACK_FILE = "f1";

private static final String TRACK_FILE_ROTATE = "f2";

private final StoreContext ctx;

private final Path trackFileDir;

private final Path[] trackFiles = new Path[2];

// this is used to make sure that we do not go backwards
private long prevTimestamp = -1;

private int nextTrackFile = -1;

StoreFileListFile(StoreContext ctx) {
this.ctx = ctx;
trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR);
trackFiles[0] = new Path(trackFileDir, TRACK_FILE);
trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE);
}

private StoreFileList load(Path path) throws IOException {
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
byte[] bytes;
try (FSDataInputStream in = fs.open(path)) {
bytes = ByteStreams.toByteArray(in);
}
// Read all the bytes and then parse it, so we will only throw InvalidProtocolBufferException
// here. This is very important for upper layer to determine whether this is the normal case,
// where the file does not exist or is incomplete. If there is another type of exception, the
// upper layer should throw it out instead of just ignoring it, otherwise it will lead to data
// loss.
return StoreFileList.parseFrom(bytes);
}

private int select(StoreFileList[] lists) {
if (lists[0] == null) {
return 1;
}
if (lists[1] == null) {
return 0;
}
return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1;
}

StoreFileList load() throws IOException {
StoreFileList[] lists = new StoreFileList[2];
for (int i = 0; i < 2; i++) {
try {
lists[i] = load(trackFiles[i]);
} catch (FileNotFoundException | InvalidProtocolBufferException e) {
// this is normal case, so use info and do not log stacktrace
LOG.info("Failed to load track file {}: {}", trackFiles[i], e);
}
}
int winnerIndex = select(lists);
if (lists[winnerIndex] != null) {
nextTrackFile = 1 - winnerIndex;
prevTimestamp = lists[winnerIndex].getTimestamp();
} else {
nextTrackFile = 0;
}
return lists[winnerIndex];
}

/**
* We will set the timestamp in this method so just pass the builder in
*/
void update(StoreFileList.Builder builder) throws IOException {
Preconditions.checkState(nextTrackFile >= 0, "should call load first before calling update");
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) {
builder.setTimestamp(timestamp).build().writeTo(out);
}
// record timestamp
prevTimestamp = timestamp;
// rotate the file
nextTrackFile = 1 - nextTrackFile;
try {
fs.delete(trackFiles[nextTrackFile], false);
} catch (IOException e) {
// we will create new file with overwrite = true, so not a big deal here, only for speed up
// loading as we do not need to read this file when loading(we will hit FileNotFoundException)
LOG.debug("failed to delete old track file {}, not a big deal, just ignore", e);
}
}
}
Loading

0 comments on commit e276da2

Please sign in to comment.