Skip to content

Commit

Permalink
HBASE-26224 Introduce a MigrationStoreFileTracker to support migratin…
Browse files Browse the repository at this point in the history
…g from different store file tracker implementations (#3656)

Signed-off-by: Wellington Chevreuil <[email protected]>
  • Loading branch information
Apache9 committed Oct 14, 2021
1 parent 708b7c1 commit 41ed7e2
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;

Expand All @@ -39,7 +40,9 @@ public DefaultStoreFileTracker(Configuration conf, boolean isPrimaryReplica, Sto

@Override
public List<StoreFileInfo> load() throws IOException {
return ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
List<StoreFileInfo> files =
ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
return files != null ? files : Collections.emptyList();
}

@Override
Expand All @@ -57,4 +60,9 @@ protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException {
// NOOP
}

@Override
void set(List<StoreFileInfo> files) {
// NOOP
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* storages.
*/
@InterfaceAudience.Private
public class FileBasedStoreFileTracker extends StoreFileTrackerBase {
class FileBasedStoreFileTracker extends StoreFileTrackerBase {

private final StoreFileListFile backedFile;

Expand Down Expand Up @@ -139,4 +139,17 @@ protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
}
}
}

@Override
void set(List<StoreFileInfo> files) throws IOException {
synchronized (storefiles) {
storefiles.clear();
StoreFileList.Builder builder = StoreFileList.newBuilder();
for (StoreFileInfo info : files) {
storefiles.put(info.getPath().getName(), info);
builder.addStoreFile(toStoreFileEntry(info));
}
backedFile.update(builder);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

/**
* A store file tracker used for migrating between store file tracker implementations.
*/
@InterfaceAudience.Private
class MigrationStoreFileTracker extends StoreFileTrackerBase {

public static final String SRC_IMPL = "hbase.store.file-tracker.migration.src.impl";

public static final String DST_IMPL = "hbase.store.file-tracker.migration.dst.impl";

private final StoreFileTrackerBase src;

private final StoreFileTrackerBase dst;

public MigrationStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
super(conf, isPrimaryReplica, ctx);
this.src = StoreFileTrackerFactory.create(conf, SRC_IMPL, isPrimaryReplica, ctx);
this.dst = StoreFileTrackerFactory.create(conf, DST_IMPL, isPrimaryReplica, ctx);
Preconditions.checkArgument(!src.getClass().equals(dst.getClass()),
"src and dst is the same: %s", src.getClass());
}

@Override
public List<StoreFileInfo> load() throws IOException {
List<StoreFileInfo> files = src.load();
dst.set(files);
return files;
}

@Override
protected boolean requireWritingToTmpDirFirst() {
// Returns true if either of the two StoreFileTracker returns true.
// For example, if we want to migrate from a tracker implementation which can ignore the broken
// files under data directory to a tracker implementation which can not, if we still allow
// writing in tmp directory directly, we may have some broken files under the data directory and
// then after we finally change the implementation which can not ignore the broken files, we
// will be in trouble.
return src.requireWritingToTmpDirFirst() || dst.requireWritingToTmpDirFirst();
}

@Override
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
src.doAddNewStoreFiles(newFiles);
dst.doAddNewStoreFiles(newFiles);
}

@Override
protected void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException {
src.doAddCompactionResults(compactedFiles, newFiles);
dst.doAddCompactionResults(compactedFiles, newFiles);
}

@Override
void set(List<StoreFileInfo> files) {
throw new UnsupportedOperationException(
"Should not call this method on " + getClass().getSimpleName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;

Expand Down Expand Up @@ -121,7 +120,10 @@ StoreFileList load() throws IOException {
* We will set the timestamp in this method so just pass the builder in
*/
void update(StoreFileList.Builder builder) throws IOException {
Preconditions.checkState(nextTrackFile >= 0, "should call load first before calling update");
if (nextTrackFile < 0) {
// we need to call load first to load the prevTimestamp and also the next file
load();
}
FileSystem fs = ctx.getRegionFileSystem().getFileSystem();
long timestamp = Math.max(prevTimestamp + 1, EnvironmentEdgeManager.currentTime());
try (FSDataOutputStream out = fs.create(trackFiles[nextTrackFile], true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
Expand Down Expand Up @@ -95,8 +96,7 @@ private HFileContext createFileContext(Compression.Algorithm compression,
}

@Override
public final StoreFileWriter createWriter(CreateStoreFileWriterParams params)
throws IOException {
public final StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws IOException {
if (!isPrimaryReplica) {
throw new IllegalStateException("Should not call create writer on secondary replicas");
}
Expand Down Expand Up @@ -170,4 +170,12 @@ public final StoreFileWriter createWriter(CreateStoreFileWriterParams params)

protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
Collection<StoreFileInfo> newFiles) throws IOException;

/**
* used to mirror the store file list after loading when migration.
* <p/>
* Do not add this method to the {@link StoreFileTracker} interface since we do not need this
* method in upper layer.
*/
abstract void set(List<StoreFileInfo> files) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@
package org.apache.hadoop.hbase.regionserver.storefiletracker;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreContext;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

/**
* Factory method for creating store file tracker.
*/
Expand All @@ -39,30 +41,38 @@ public final class StoreFileTrackerFactory {
private static final Logger LOG = LoggerFactory.getLogger(StoreFileTrackerFactory.class);

public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica,
StoreContext ctx) {
StoreContext ctx) {
Class<? extends StoreFileTracker> tracker =
conf.getClass(TRACK_IMPL, DefaultStoreFileTracker.class, StoreFileTracker.class);
LOG.info("instantiating StoreFileTracker impl {}", tracker.getName());
return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
}

public static StoreFileTracker create(Configuration conf, boolean isPrimaryReplica, String family,
HRegionFileSystem regionFs) {
HRegionFileSystem regionFs) {
ColumnFamilyDescriptorBuilder fDescBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family));
StoreContext ctx = StoreContext.getBuilder().
withColumnFamilyDescriptor(fDescBuilder.build()).
withRegionFileSystem(regionFs).
build();
return StoreFileTrackerFactory.create(conf, isPrimaryReplica, ctx);
StoreContext ctx = StoreContext.getBuilder().withColumnFamilyDescriptor(fDescBuilder.build())
.withRegionFileSystem(regionFs).build();
return StoreFileTrackerFactory.create(conf, TRACK_IMPL, isPrimaryReplica, ctx);
}

public static Configuration mergeConfigurations(Configuration global, TableDescriptor table,
ColumnFamilyDescriptor family) {
return StoreUtils.createStoreConfiguration(global, table, family);
}

public static Configuration mergeConfigurations(Configuration global,
TableDescriptor table, ColumnFamilyDescriptor family) {
return new CompoundConfiguration()
.add(global)
.addBytesMap(table.getValues())
.addStringMap(family.getConfiguration())
.addBytesMap(family.getValues());
static StoreFileTrackerBase create(Configuration conf, String configName,
boolean isPrimaryReplica, StoreContext ctx) {
String className =
Preconditions.checkNotNull(conf.get(configName), "config %s is not set", configName);
Class<? extends StoreFileTrackerBase> tracker;
try {
tracker = Class.forName(className).asSubclass(StoreFileTrackerBase.class);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
LOG.info("instantiating StoreFileTracker impl {} as {}", tracker.getName(), configName);
return ReflectionUtils.newInstance(tracker, conf, isPrimaryReplica, ctx);
}
}
Loading

0 comments on commit 41ed7e2

Please sign in to comment.