Skip to content

Commit

Permalink
HBASE-26791 Memstore flush fencing issue for SFT
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Mar 10, 2022
1 parent 9293d6a commit 17aaa26
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, Sto
super(conf, isPrimaryReplica, ctx);
}

@Override
public List<StoreFileInfo> load() throws IOException {
List<StoreFileInfo> files =
ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
return files != null ? files : Collections.emptyList();
}

@Override
public boolean requireWritingToTmpDirFirst() {
return true;
Expand All @@ -62,7 +55,15 @@ protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
}

@Override
public void set(List<StoreFileInfo> files) {
// NOOP
protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
List<StoreFileInfo> files =
ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
return files != null ? files : Collections.emptyList();
}

@Override
protected void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException {
// TODO Implement StoreFileTrackerBase.doSetStoreFiles

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, S
backedFile = null;
}
}


@Override
public List<StoreFileInfo> load() throws IOException {
StoreFileList list = backedFile.load();
protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
StoreFileList list = backedFile.load(readOnly);
if (list == null) {
return Collections.emptyList();
}
Expand Down Expand Up @@ -148,7 +149,7 @@ protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
}

@Override
public void set(List<StoreFileInfo> files) throws IOException {
protected void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException {
synchronized (storefiles) {
storefiles.clear();
StoreFileList.Builder builder = StoreFileList.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ public MigrationStoreFileTracker(Configuration conf, boolean isPrimaryReplica, S
"src and dst is the same: %s", src.getClass());
}

@Override
public List<StoreFileInfo> load() throws IOException {
List<StoreFileInfo> files = src.load();
dst.set(files);
return files;
}

@Override
public boolean requireWritingToTmpDirFirst() {
// Returns true if either of the two StoreFileTracker returns true.
Expand All @@ -67,6 +60,15 @@ public boolean requireWritingToTmpDirFirst() {
return src.requireWritingToTmpDirFirst() || dst.requireWritingToTmpDirFirst();
}

@Override
protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
List<StoreFileInfo> files = src.doLoadStoreFiles(readOnly);
if (!readOnly) {
dst.doSetStoreFiles(files);
}
return files;
}

@Override
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
src.doAddNewStoreFiles(newFiles);
Expand All @@ -81,7 +83,7 @@ protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
}

@Override
public void set(List<StoreFileInfo> files) {
protected void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException {
throw new UnsupportedOperationException(
"Should not call this method on " + getClass().getSimpleName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,18 @@
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.ForkJoinPool;
import java.util.regex.Pattern;
import java.util.zip.CRC32;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.StoreContext;
Expand All @@ -31,6 +40,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Splitter;

import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;

/**
Expand All @@ -55,9 +66,13 @@ class StoreFileListFile {

static final String TRACK_FILE_DIR = ".filelist";

private static final String TRACK_FILE = "f1";
private static final String TRACK_FILE_PREFIX = "f1";

private static final String TRACK_FILE_ROTATE_PREFIX = "f2";

private static final char TRACK_FILE_SEPARATOR = '.';

private static final String TRACK_FILE_ROTATE = "f2";
private static final Pattern TRACK_FILE_PATTERN = Pattern.compile("^f(1|2)\\.\\d+$");

// 16 MB, which is big enough for a tracker file
private static final int MAX_FILE_SIZE = 16 * 1024 * 1024;
Expand All @@ -76,8 +91,6 @@ class StoreFileListFile {
StoreFileListFile(StoreContext ctx) {
this.ctx = ctx;
trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR);
trackFiles[0] = new Path(trackFileDir, TRACK_FILE);
trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE);
}

private StoreFileList load(Path path) throws IOException {
Expand Down Expand Up @@ -114,23 +127,96 @@ private int select(StoreFileList[] lists) {
return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1;
}

StoreFileList load() throws IOException {
// file sequence id to path
private NavigableMap<Long, List<Path>> listFiles() throws IOException {
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
FileStatus[] statuses;
try {
statuses = fs.listStatus(trackFileDir);
} catch (FileNotFoundException e) {
LOG.debug("Track file directory {} does not exist", trackFileDir, e);
return Collections.emptyNavigableMap();
}
if (statuses == null || statuses.length == 0) {
return Collections.emptyNavigableMap();
}
TreeMap<Long, List<Path>> map = new TreeMap<>((l1, l2) -> l2.compareTo(l1));
for (FileStatus status : statuses) {
Path file = status.getPath();
if (!status.isFile()) {
LOG.warn("Found invalid track file {}, which is not a file", file);
continue;
}
if (!TRACK_FILE_PATTERN.matcher(file.getName()).matches()) {
LOG.warn("Found invalid track file {}, skip", file);
continue;
}
List<String> parts = Splitter.on(TRACK_FILE_SEPARATOR).splitToList(file.getName());
map.computeIfAbsent(Long.parseLong(parts.get(1)), k -> new ArrayList<>()).add(file);
}
return map;
}

private void initializeTrackFiles(long seqId) {
trackFiles[0] = new Path(trackFileDir, TRACK_FILE_PREFIX + TRACK_FILE_SEPARATOR + seqId);
trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE_PREFIX + TRACK_FILE_SEPARATOR + seqId);
}

private void cleanUpTrackFiles(long loadedSeqId,
NavigableMap<Long, List<Path>> seqId2TrackFiles) {
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
NavigableMap<Long, List<Path>> toDelete =
loadedSeqId >= 0 ? seqId2TrackFiles.tailMap(loadedSeqId, false) : seqId2TrackFiles;
toDelete.values().stream().flatMap(l -> l.stream()).forEach(file -> {
ForkJoinPool.commonPool().execute(() -> {
try {
fs.delete(file, false);
} catch (IOException e) {
LOG.warn("failed to delete unused track file {}", file, e);
}
});
});
}

StoreFileList load(boolean readOnly) throws IOException {
NavigableMap<Long, List<Path>> seqId2TrackFiles = listFiles();
long seqId = -1L;
StoreFileList[] lists = new StoreFileList[2];
for (int i = 0; i < 2; i++) {
try {
lists[i] = load(trackFiles[i]);
} catch (FileNotFoundException | EOFException e) {
// this is normal case, so use info and do not log stacktrace
LOG.info("Failed to load track file {}: {}", trackFiles[i], e.toString());
for (Map.Entry<Long, List<Path>> entry : seqId2TrackFiles.entrySet()) {
List<Path> files = entry.getValue();
// should not have more than 2 files, if not, it means that the track files are broken, just
// throw exception out and fail the region open.
boolean loaded = false;
for (int i = 0; i < files.size(); i++) {
try {
lists[i] = load(files.get(i));
loaded = true;
} catch (EOFException e) {
// this is normal case, so use info and do not log stacktrace
LOG.info("Failed to load track file {}: {}", trackFiles[i], e.toString());
}
}
if (loaded) {
seqId = entry.getKey();
break;
}
}
int winnerIndex = select(lists);
if (lists[winnerIndex] != null) {
nextTrackFile = 1 - winnerIndex;
prevTimestamp = lists[winnerIndex].getTimestamp();
} else {
if (readOnly) {
return lists[select(lists)];
}

cleanUpTrackFiles(seqId, seqId2TrackFiles);

if (seqId < 0) {
initializeTrackFiles(System.currentTimeMillis());
nextTrackFile = 0;
return null;
}

initializeTrackFiles(Math.max(System.currentTimeMillis(), seqId + 1));
int winnerIndex = select(lists);
nextTrackFile = 1 - winnerIndex;
prevTimestamp = lists[winnerIndex].getTimestamp();
return lists[winnerIndex];
}

Expand All @@ -140,7 +226,8 @@ StoreFileList load() throws IOException {
void update(StoreFileList.Builder builder) throws IOException {
if (nextTrackFile < 0) {
// we need to call load first to load the prevTimestamp and also the next file
load();
// we are already in the update method, which is not read only, so pass false
load(false);
}
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
byte[] actualData = builder.setTimestamp(timestamp).build().toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import java.io.IOException;
import java.util.Collection;

import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
Expand Down Expand Up @@ -66,6 +66,11 @@ protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, Sto
this.ctx = ctx;
}

@Override
public final List<StoreFileInfo> load() throws IOException {
return doLoadStoreFiles(!isPrimaryReplica);
}

@Override
public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
if (isPrimaryReplica) {
Expand All @@ -81,6 +86,13 @@ public final void replace(Collection<StoreFileInfo> compactedFiles,
}
}

@Override
public final void set(List<StoreFileInfo> files) throws IOException {
if (isPrimaryReplica) {
doSetStoreFiles(files);
}
}

@Override
public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) {
builder.setValue(TRACKER_IMPL, getTrackerName());
Expand Down Expand Up @@ -173,8 +185,19 @@ public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) th
return builder.build();
}

/**
* For primary replica, we will call load once when opening a region, and the implementation could
* choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you
* are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to
* {@code true}.
*/
protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException;

protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;

protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException;

protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOE
}

@Override
public List<StoreFileInfo> load() throws IOException {
protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
return new ArrayList<>(trackedFiles.get(storeId));
}

Expand Down
Loading

0 comments on commit 17aaa26

Please sign in to comment.