Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26224 Introduce a MigrationStoreFileTracker to support migratin… #3656

Merged
merged 1 commit into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;

Expand All @@ -39,7 +40,9 @@ public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, Sto

@Override
public List<StoreFileInfo> load() throws IOException {
return ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
List<StoreFileInfo> files =
ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
return files != null ? files : Collections.emptyList();
}

@Override
Expand All @@ -57,4 +60,9 @@ protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException {
// NOOP
}

@Override
void set(List<StoreFileInfo> files) {
// NOOP
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* storages.
*/
@InterfaceAudience.Private
public class FileBasedStoreFileTracker extends StoreFileTrackerBase {
class FileBasedStoreFileTracker extends StoreFileTrackerBase {

private final StoreFileListFile backedFile;

Expand Down Expand Up @@ -139,4 +139,17 @@ protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
}
}
}

@Override
void set(List<StoreFileInfo> files) throws IOException {
synchronized (storefiles) {
storefiles.clear();
StoreFileList.Builder builder = StoreFileList.newBuilder();
for (StoreFileInfo info : files) {
storefiles.put(info.getPath().getName(), info);
builder.addStoreFile(toStoreFileEntry(info));
}
backedFile.update(builder);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

/**
* A store file tracker used for migrating between store file tracker implementations.
*/
@InterfaceAudience.Private
class MigrationStoreFileTracker extends StoreFileTrackerBase {

public static final String SRC_IMPL = "hbase.store.file-tracker.migration.src.impl";

public static final String DST_IMPL = "hbase.store.file-tracker.migration.dst.impl";

private final StoreFileTrackerBase src;

private final StoreFileTrackerBase dst;

public MigrationStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
super(conf, isPrimaryReplica, ctx);
this.src = StoreFileTrackerFactory.create(conf, SRC_IMPL, isPrimaryReplica, ctx);
this.dst = StoreFileTrackerFactory.create(conf, DST_IMPL, isPrimaryReplica, ctx);
Preconditions.checkArgument(!src.getClass().equals(dst.getClass()),
"src and dst is the same: %s", src.getClass());
}

@Override
public List<StoreFileInfo> load() throws IOException {
List<StoreFileInfo> files = src.load();
dst.set(files);
return files;
}

@Override
protected boolean requireWritingToTmpDirFirst() {
// Returns true if either of the two StoreFileTracker returns true.
// For example, if we want to migrate from a tracker implementation which can ignore the broken
// files under data directory to a tracker implementation which can not, if we still allow
// writing in tmp directory directly, we may have some broken files under the data directory and
// then after we finally change the implementation which can not ignore the broken files, we
// will be in trouble.
return src.requireWritingToTmpDirFirst() || dst.requireWritingToTmpDirFirst();
}

@Override
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
src.doAddNewStoreFiles(newFiles);
dst.doAddNewStoreFiles(newFiles);
}

@Override
protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException {
src.doAddCompactionResults(compactedFiles, newFiles);
dst.doAddCompactionResults(compactedFiles, newFiles);
}

@Override
void set(List<StoreFileInfo> files) {
throw new UnsupportedOperationException(
"Should not call this method on " + getClass().getSimpleName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;

Expand Down Expand Up @@ -121,7 +120,10 @@ StoreFileList load() throws IOException {
* We will set the timestamp in this method so just pass the builder in
*/
void update(StoreFileList.Builder builder) throws IOException {
Preconditions.checkState(nextTrackFile >= 0, "should call load first before calling update");
if (nextTrackFile < 0) {
// we need to call load first to load the prevTimestamp and also the next file
load();
}
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
Expand Down Expand Up @@ -95,8 +96,7 @@ private HFileContext createFileContext(Compression.Algorithm compression,
}

@Override
public final StoreFileWriter createWriter(CreateStoreFileWriterParams params)
throws IOException {
public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
if (!isPrimaryReplica) {
throw new IllegalStateException("Should not call create writer on secondary replicas");
}
Expand Down Expand Up @@ -170,4 +170,12 @@ public final StoreFileWriter createWriter(CreateStoreFileWriterParams params)

protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException;

/**
* used to mirror the store file list after loading when migration.
* <p/>
* Do not add this method to the {@link StoreFileTracker} interface since we do not need this
* method in upper layer.
*/
abstract void set(List<StoreFileInfo> files) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

/**
* Factory method for creating store file tracker.
*/
Expand All @@ -39,30 +41,39 @@ public final class StoreFileTrackerFactory {
private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerFactory.class);

public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica,
StoreContext ctx) {
StoreContext ctx) {
Class<? extends StoreFileTracker> tracker =
conf.getClass(TRACK_IMPL, DefaultStoreFileTracker.class, StoreFileTracker.class);
LOG.info("instantiating StoreFileTracker impl {}", tracker.getName());
return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
}

public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica, String family,
HRegionFileSystem regionFs) {
HRegionFileSystem regionFs) {
ColumnFamilyDescriptorBuilder fDescBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family));
StoreContext ctx = StoreContext.getBuilder().
withColumnFamilyDescriptor(fDescBuilder.build()).
withRegionFileSystem(regionFs).
build();
return StoreFileTrackerFactory.create(conf, isPrimaryReplica, ctx);
StoreContext ctx = StoreContext.getBuilder().withColumnFamilyDescriptor(fDescBuilder.build())
.withRegionFileSystem(regionFs).build();
return StoreFileTrackerFactory.create(conf, TRACK_IMPL, isPrimaryReplica, ctx);
}

public static Configuration mergeConfigurations(Configuration global, TableDescriptor table,
ColumnFamilyDescriptor family) {
return new CompoundConfiguration().add(global).addBytesMap(table.getValues())
.addStringMap(family.getConfiguration()).addBytesMap(family.getValues());
}

public static Configuration mergeConfigurations(Configuration global,
TableDescriptor table, ColumnFamilyDescriptor family) {
return new CompoundConfiguration()
.add(global)
.addBytesMap(table.getValues())
.addStringMap(family.getConfiguration())
.addBytesMap(family.getValues());
static StoreFileTrackerBase create(Configuration conf, String configName,
boolean isPrimaryReplica, StoreContext ctx) {
String className =
Preconditions.checkNotNull(conf.get(configName), "config %s is not set", configName);
Class<? extends StoreFileTrackerBase> tracker;
try {
tracker = Class.forName(className).asSubclass(StoreFileTrackerBase.class);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
LOG.info("instantiating StoreFileTracker impl {} as {}", tracker.getName(), configName);
return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
}
}
Loading