Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26791 Memstore flush fencing issue for SFT #4202

Merged
merged 2 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, Sto
super(conf, isPrimaryReplica, ctx);
}

@Override
public List<StoreFileInfo> load() throws IOException {
List<StoreFileInfo> files =
ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
return files != null ? files : Collections.emptyList();
}

@Override
public boolean requireWritingToTmpDirFirst() {
return true;
Expand All @@ -62,7 +55,13 @@ protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
}

@Override
public void set(List<StoreFileInfo> files) {
// NOOP
protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
List<StoreFileInfo> files =
ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
return files != null ? files : Collections.emptyList();
}

@Override
protected void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public FileBasedStoreFileTracker(Configuration conf, boolean isPrimaryReplica, S
}

@Override
public List<StoreFileInfo> load() throws IOException {
StoreFileList list = backedFile.load();
protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
StoreFileList list = backedFile.load(readOnly);
if (list == null) {
return Collections.emptyList();
}
Expand Down Expand Up @@ -148,7 +148,7 @@ protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
}

@Override
public void set(List<StoreFileInfo> files) throws IOException {
protected void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException {
synchronized (storefiles) {
storefiles.clear();
StoreFileList.Builder builder = StoreFileList.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ public MigrationStoreFileTracker(Configuration conf, boolean isPrimaryReplica, S
"src and dst is the same: %s", src.getClass());
}

@Override
public List<StoreFileInfo> load() throws IOException {
List<StoreFileInfo> files = src.load();
dst.set(files);
return files;
}

@Override
public boolean requireWritingToTmpDirFirst() {
// Returns true if either of the two StoreFileTracker returns true.
Expand All @@ -67,6 +60,15 @@ public boolean requireWritingToTmpDirFirst() {
return src.requireWritingToTmpDirFirst() || dst.requireWritingToTmpDirFirst();
}

@Override
protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
List<StoreFileInfo> files = src.doLoadStoreFiles(readOnly);
if (!readOnly) {
dst.doSetStoreFiles(files);
}
return files;
}

@Override
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
src.doAddNewStoreFiles(newFiles);
Expand All @@ -81,7 +83,7 @@ protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
}

@Override
public void set(List<StoreFileInfo> files) {
protected void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException {
throw new UnsupportedOperationException(
"Should not call this method on " + getClass().getSimpleName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,29 @@
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.ForkJoinPool;
import java.util.regex.Pattern;
import java.util.zip.CRC32;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Splitter;

import org.apache.hadoop.hbase.shaded.protobuf.generated.StoreFileTrackerProtos.StoreFileList;

/**
Expand All @@ -55,9 +67,13 @@ class StoreFileListFile {

static final String TRACK_FILE_DIR = ".filelist";

private static final String TRACK_FILE = "f1";
private static final String TRACK_FILE_PREFIX = "f1";

private static final String TRACK_FILE_ROTATE_PREFIX = "f2";

private static final String TRACK_FILE_ROTATE = "f2";
private static final char TRACK_FILE_SEPARATOR = '.';

private static final Pattern TRACK_FILE_PATTERN = Pattern.compile("^f(1|2)\\.\\d+$");

// 16 MB, which is big enough for a tracker file
private static final int MAX_FILE_SIZE = 16 * 1024 * 1024;
Expand All @@ -76,8 +92,6 @@ class StoreFileListFile {
StoreFileListFile(StoreContext ctx) {
this.ctx = ctx;
trackFileDir = new Path(ctx.getFamilyStoreDirectoryPath(), TRACK_FILE_DIR);
trackFiles[0] = new Path(trackFileDir, TRACK_FILE);
trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE);
}

private StoreFileList load(Path path) throws IOException {
Expand Down Expand Up @@ -114,23 +128,103 @@ private int select(StoreFileList[] lists) {
return lists[0].getTimestamp() >= lists[1].getTimestamp() ? 0 : 1;
}

StoreFileList load() throws IOException {
// file sequence id to path
private NavigableMap<Long, List<Path>> listFiles() throws IOException {
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
FileStatus[] statuses;
try {
statuses = fs.listStatus(trackFileDir);
} catch (FileNotFoundException e) {
LOG.debug("Track file directory {} does not exist", trackFileDir, e);
return Collections.emptyNavigableMap();
}
if (statuses == null || statuses.length == 0) {
return Collections.emptyNavigableMap();
}
TreeMap<Long, List<Path>> map = new TreeMap<>((l1, l2) -> l2.compareTo(l1));
for (FileStatus status : statuses) {
Path file = status.getPath();
if (!status.isFile()) {
LOG.warn("Found invalid track file {}, which is not a file", file);
continue;
}
if (!TRACK_FILE_PATTERN.matcher(file.getName()).matches()) {
LOG.warn("Found invalid track file {}, skip", file);
continue;
}
List<String> parts = Splitter.on(TRACK_FILE_SEPARATOR).splitToList(file.getName());
map.computeIfAbsent(Long.parseLong(parts.get(1)), k -> new ArrayList<>()).add(file);
}
return map;
}

private void initializeTrackFiles(long seqId) {
trackFiles[0] = new Path(trackFileDir, TRACK_FILE_PREFIX + TRACK_FILE_SEPARATOR + seqId);
joshelser marked this conversation as resolved.
Show resolved Hide resolved
trackFiles[1] = new Path(trackFileDir, TRACK_FILE_ROTATE_PREFIX + TRACK_FILE_SEPARATOR + seqId);
LOG.info("Initialized track files: {}, {}", trackFiles[0], trackFiles[1]);
}

private void cleanUpTrackFiles(long loadedSeqId,
NavigableMap<Long, List<Path>> seqId2TrackFiles) {
LOG.info("Cleanup track file with sequence id < {}", loadedSeqId);
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
NavigableMap<Long, List<Path>> toDelete =
loadedSeqId >= 0 ? seqId2TrackFiles.tailMap(loadedSeqId, false) : seqId2TrackFiles;
toDelete.values().stream().flatMap(l -> l.stream()).forEach(file -> {
ForkJoinPool.commonPool().execute(() -> {
LOG.info("Deleting track file {}", file);
try {
fs.delete(file, false);
} catch (IOException e) {
LOG.warn("failed to delete unused track file {}", file, e);
joshelser marked this conversation as resolved.
Show resolved Hide resolved
}
});
});
}

StoreFileList load(boolean readOnly) throws IOException {
NavigableMap<Long, List<Path>> seqId2TrackFiles = listFiles();
long seqId = -1L;
StoreFileList[] lists = new StoreFileList[2];
for (int i = 0; i < 2; i++) {
try {
lists[i] = load(trackFiles[i]);
} catch (FileNotFoundException | EOFException e) {
// this is normal case, so use info and do not log stacktrace
LOG.info("Failed to load track file {}: {}", trackFiles[i], e.toString());
for (Map.Entry<Long, List<Path>> entry : seqId2TrackFiles.entrySet()) {
List<Path> files = entry.getValue();
// should not have more than 2 files, if not, it means that the track files are broken, just
// throw exception out and fail the region open.
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
if (files.size() > 2) {
throw new DoNotRetryIOException("Should only have at most 2 track files for sequence id " +
entry.getKey() + ", but got " + files.size() + " files: " + files);
}
boolean loaded = false;
for (int i = 0; i < files.size(); i++) {
try {
lists[i] = load(files.get(i));
loaded = true;
} catch (EOFException e) {
// this is normal case, so use info and do not log stacktrace
LOG.info("Failed to load track file {}: {}", trackFiles[i], e.toString());
}
}
if (loaded) {
seqId = entry.getKey();
break;
}
}
int winnerIndex = select(lists);
if (lists[winnerIndex] != null) {
nextTrackFile = 1 - winnerIndex;
prevTimestamp = lists[winnerIndex].getTimestamp();
} else {
if (readOnly) {
return lists[select(lists)];
}

cleanUpTrackFiles(seqId, seqId2TrackFiles);

if (seqId < 0) {
initializeTrackFiles(System.currentTimeMillis());
nextTrackFile = 0;
return null;
}

initializeTrackFiles(Math.max(System.currentTimeMillis(), seqId + 1));
int winnerIndex = select(lists);
nextTrackFile = 1 - winnerIndex;
prevTimestamp = lists[winnerIndex].getTimestamp();
return lists[winnerIndex];
}

Expand All @@ -140,7 +234,8 @@ StoreFileList load() throws IOException {
void update(StoreFileList.Builder builder) throws IOException {
if (nextTrackFile < 0) {
// we need to call load first to load the prevTimestamp and also the next file
load();
// we are already in the update method, which is not read only, so pass false
load(false);
}
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
byte[] actualData = builder.setTimestamp(timestamp).build().toByteArray();
Expand All @@ -162,7 +257,7 @@ void update(StoreFileList.Builder builder) throws IOException {
fs.delete(trackFiles[nextTrackFile], false);
} catch (IOException e) {
// we will create new file with overwrite = true, so not a big deal here, only for speed up
// loading as we do not need to read this file when loading(we will hit FileNotFoundException)
// loading as we do not need to read this file when loading
LOG.debug("failed to delete old track file {}, not a big deal, just ignore", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import java.io.IOException;
import java.util.Collection;

import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
Expand Down Expand Up @@ -66,6 +66,11 @@ protected StoreFileTrackerBase(Configuration conf, boolean isPrimaryReplica, Sto
this.ctx = ctx;
}

@Override
public final List<StoreFileInfo> load() throws IOException {
return doLoadStoreFiles(!isPrimaryReplica);
}

@Override
public final void add(Collection<StoreFileInfo> newFiles) throws IOException {
if (isPrimaryReplica) {
Expand All @@ -81,6 +86,13 @@ public final void replace(Collection<StoreFileInfo> compactedFiles,
}
}

@Override
public final void set(List<StoreFileInfo> files) throws IOException {
if (isPrimaryReplica) {
doSetStoreFiles(files);
}
}

@Override
public TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder) {
builder.setValue(TRACKER_IMPL, getTrackerName());
Expand Down Expand Up @@ -173,8 +185,19 @@ public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) th
return builder.build();
}

/**
* For primary replica, we will call load once when opening a region, and the implementation could
* choose to do some cleanup work. So here we use {@code readOnly} to indicate that whether you
* are allowed to do the cleanup work. For secondary replicas, we will set {@code readOnly} to
* {@code true}.
*/
protected abstract List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException;

protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;

protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException;

protected abstract void doSetStoreFiles(Collection<StoreFileInfo> files) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOE
}

@Override
public List<StoreFileInfo> load() throws IOException {
protected List<StoreFileInfo> doLoadStoreFiles(boolean readOnly) throws IOException {
return new ArrayList<>(trackedFiles.get(storeId));
}

Expand Down
Loading