Skip to content

Commit

Permalink
HBASE-26867 Introduce a FlushProcedure (#5256)
Browse files Browse the repository at this point in the history
Co-authored-by: huiruan <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
(cherry picked from commit 20c9e4b)
  • Loading branch information
frostruan authored and Apache9 committed Aug 14, 2023
1 parent ca0ef68 commit 13d2405
Show file tree
Hide file tree
Showing 31 changed files with 1,111 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,15 @@ Future<Void> modifyColumnFamilyStoreFileTrackerAsync(TableName tableName, byte[]
*/
void flush(TableName tableName, byte[] columnFamily) throws IOException;

/**
* Flush the specified column family stores on all regions of the passed table. This runs as a
* synchronous operation.
* @param tableName table to flush
* @param columnFamilies column families within a table
* @throws IOException if a remote or network exception occurs
*/
void flush(TableName tableName, List<byte[]> columnFamilies) throws IOException;

/**
* Flush an individual region. Synchronous operation.
* @param regionName region to flush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ public void flush(TableName tableName, byte[] columnFamily) throws IOException {
get(admin.flush(tableName, columnFamily));
}

@Override
public void flush(TableName tableName, List<byte[]> columnFamilies) throws IOException {
get(admin.flush(tableName, columnFamilies));
}

@Override
public void flushRegion(byte[] regionName) throws IOException {
get(admin.flushRegion(regionName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,14 @@ CompletableFuture<Void> modifyColumnFamilyStoreFileTracker(TableName tableName,
*/
CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily);

/**
* Flush the specified column family stores on all regions of the passed table. This runs as a
* synchronous operation.
* @param tableName table to flush
* @param columnFamilies column families within a table
*/
CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilies);

/**
* Flush an individual region.
* @param regionName region to flush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
return wrap(rawAdmin.flush(tableName, columnFamily));
}

@Override
public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilies) {
return wrap(rawAdmin.flush(tableName, columnFamilies));
}

@Override
public CompletableFuture<Void> flushRegion(byte[] regionName) {
return wrap(rawAdmin.flushRegion(regionName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NamespaceDescriptor;
Expand Down Expand Up @@ -96,6 +97,7 @@
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -180,6 +182,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
Expand Down Expand Up @@ -950,12 +954,50 @@ public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {

@Override
public CompletableFuture<Void> flush(TableName tableName) {
return flush(tableName, null);
return flush(tableName, Collections.emptyList());
}

@Override
public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
return flush(tableName, Collections.singletonList(columnFamily));
}

@Override
public CompletableFuture<Void> flush(TableName tableName, List<byte[]> columnFamilyList) {
// This is for keeping compatibility with old implementation.
// If the server version is lower than the client version, it's possible that the
// flushTable method is not present in the server side, if so, we need to fall back
// to the old implementation.
List<byte[]> columnFamilies = columnFamilyList.stream()
.filter(cf -> cf != null && cf.length > 0).distinct().collect(Collectors.toList());
FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamilies,
ng.getNonceGroup(), ng.newNonce());
CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
(resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
CompletableFuture<Void> future = new CompletableFuture<>();
addListener(procFuture, (ret, error) -> {
if (error != null) {
if (error instanceof TableNotFoundException || error instanceof TableNotEnabledException) {
future.completeExceptionally(error);
} else if (error instanceof DoNotRetryIOException) {
// usually this is caused by the method is not present on the server or
// the hbase hadoop version does not match the running hadoop version.
// if that happens, we need fall back to the old flush implementation.
LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
legacyFlush(future, tableName, columnFamilies);
} else {
future.completeExceptionally(error);
}
} else {
future.complete(ret);
}
});
return future;
}

private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
List<byte[]> columnFamilies) {
addListener(tableExists(tableName), (exists, err) -> {
if (err != null) {
future.completeExceptionally(err);
Expand All @@ -969,8 +1011,9 @@ public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
future.completeExceptionally(new TableNotEnabledException(tableName));
} else {
Map<String, String> props = new HashMap<>();
if (columnFamily != null) {
props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
if (columnFamilies != null && !columnFamilies.isEmpty()) {
props.put(HConstants.FAMILY_KEY_STR, Strings.JOINER
.join(columnFamilies.stream().map(Bytes::toString).collect(Collectors.toList())));
}
addListener(
execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), props),
Expand All @@ -985,7 +1028,6 @@ public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
});
}
});
return future;
}

@Override
Expand Down Expand Up @@ -2768,6 +2810,18 @@ String getOperationType() {
}
}

private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {

FlushTableProcedureBiConsumer(TableName tableName) {
super(tableName);
}

@Override
String getOperationType() {
return "FLUSH";
}
}

private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {

CreateNamespaceProcedureBiConsumer(String namespaceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
Expand Down Expand Up @@ -1714,4 +1715,16 @@ public static RemoveServersRequest buildRemoveServersRequest(Set<Address> server
}
return RemoveServersRequest.newBuilder().addAllServers(hostPorts).build();
}

public static FlushTableRequest buildFlushTableRequest(final TableName tableName,
final List<byte[]> columnFamilies, final long nonceGroup, final long nonce) {
FlushTableRequest.Builder builder = FlushTableRequest.newBuilder();
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
if (!columnFamilies.isEmpty()) {
for (byte[] columnFamily : columnFamilies) {
builder.addColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
}
}
return builder.setNonceGroup(nonceGroup).setNonce(nonce).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;

/**
* Utility for Strings.
*/
Expand All @@ -28,6 +31,9 @@ public final class Strings {
public static final String DEFAULT_SEPARATOR = "=";
public static final String DEFAULT_KEYVALUE_SEPARATOR = ", ";

public static final Joiner JOINER = Joiner.on(",");
public static final Splitter SPLITTER = Splitter.on(",");

private Strings() {
}

Expand Down
14 changes: 14 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,17 @@ message ModifyTableResponse {
optional uint64 proc_id = 1;
}

message FlushTableRequest {
required TableName table_name = 1;
repeated bytes column_family = 2;
optional uint64 nonce_group = 3 [default = 0];
optional uint64 nonce = 4 [default = 0];
}

message FlushTableResponse {
optional uint64 proc_id = 1;
}

/* Namespace-level protobufs */

message CreateNamespaceRequest {
Expand Down Expand Up @@ -1239,6 +1250,9 @@ service MasterService {

rpc FlushMasterStore(FlushMasterStoreRequest)
returns(FlushMasterStoreResponse);

rpc FlushTable(FlushTableRequest)
returns(FlushTableResponse);
}

// HBCK Service definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,26 @@ message RestoreParentToChildRegionsPair {
required string child2_region_name = 3;
}

enum FlushTableState {
FLUSH_TABLE_PREPARE = 1;
FLUSH_TABLE_FLUSH_REGIONS = 2;
}

message FlushTableProcedureStateData {
required TableName table_name = 1;
repeated bytes column_family = 2;
}

message FlushRegionProcedureStateData {
required RegionInfo region = 1;
repeated bytes column_family = 2;
}

message FlushRegionParameter {
required RegionInfo region = 1;
repeated bytes column_family = 2;
}

enum SnapshotState {
SNAPSHOT_PREPARE = 1;
SNAPSHOT_PRE_OPERATION = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,13 @@ public enum EventType {
* RS verify snapshot.<br>
* RS_VERIFY_SNAPSHOT
*/
RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS);
RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS),

/**
* RS flush regions.<br>
* RS_FLUSH_OPERATIONS
*/
RS_FLUSH_REGIONS(89, ExecutorType.RS_FLUSH_OPERATIONS);

private final int code;
private final ExecutorType executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public enum ExecutorType {
RS_SWITCH_RPC_THROTTLE(33),
RS_IN_MEMORY_COMPACTION(34),
RS_CLAIM_REPLICATION_QUEUE(35),
RS_SNAPSHOT_OPERATIONS(36);
RS_SNAPSHOT_OPERATIONS(36),

RS_FLUSH_OPERATIONS(37);

ExecutorType(int value) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.FlushTableProcedure;
import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
Expand Down Expand Up @@ -4381,4 +4382,34 @@ private void initializeCoprocessorHost(Configuration conf) {
// initialize master side coprocessors before we start handling requests
this.cpHost = new MasterCoprocessorHost(this, conf);
}

@Override
public long flushTable(TableName tableName, List<byte[]> columnFamilies, long nonceGroup,
long nonce) throws IOException {
checkInitialized();

if (
!getConfiguration().getBoolean(MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED,
MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT)
) {
throw new DoNotRetryIOException("FlushTableProcedureV2 is DISABLED");
}

return MasterProcedureUtil
.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
@Override
protected void run() throws IOException {
getMaster().getMasterCoprocessorHost().preTableFlush(tableName);
LOG.info(getClientIdAuditPrefix() + " flush " + tableName);
submitProcedure(
new FlushTableProcedure(procedureExecutor.getEnvironment(), tableName, columnFamilies));
getMaster().getMasterCoprocessorHost().postTableFlush(tableName);
}

@Override
protected String getDescription() {
return "FlushTableProcedure";
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
Expand Down Expand Up @@ -3590,4 +3592,21 @@ public FlushMasterStoreResponse flushMasterStore(RpcController controller,
}
return FlushMasterStoreResponse.newBuilder().build();
}

@Override
public FlushTableResponse flushTable(RpcController controller, FlushTableRequest req)
throws ServiceException {
TableName tableName = ProtobufUtil.toTableName(req.getTableName());
List<byte[]> columnFamilies = req.getColumnFamilyCount() > 0
? req.getColumnFamilyList().stream().filter(cf -> !cf.isEmpty()).map(ByteString::toByteArray)
.collect(Collectors.toList())
: null;
try {
long procId =
server.flushTable(tableName, columnFamilies, req.getNonceGroup(), req.getNonce());
return FlushTableResponse.newBuilder().setProcId(procId).build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -477,4 +477,15 @@ boolean normalizeRegions(final NormalizeTableFilterParams ntfp, final boolean is
* Flush master local region
*/
void flushMasterStore() throws IOException;

/**
* Flush an existing table
* @param tableName The table name
* @param columnFamilies The column families to flush
* @param nonceGroup the nonce group
* @param nonce the nonce
* @return the flush procedure id
*/
long flushTable(final TableName tableName, final List<byte[]> columnFamilies,
final long nonceGroup, final long nonce) throws IOException;
}
Loading

0 comments on commit 13d2405

Please sign in to comment.