Skip to content

Commit

Permalink
HBASE-26867 Introduce a FlushProcedure
Browse files Browse the repository at this point in the history
  • Loading branch information
huiruan committed Jun 3, 2023
1 parent 7b571ca commit 6919097
Show file tree
Hide file tree
Showing 21 changed files with 895 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NamespaceDescriptor;
Expand Down Expand Up @@ -180,6 +181,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
Expand Down Expand Up @@ -955,7 +958,38 @@ public CompletableFuture<Void> flush(TableName tableName) {

@Override
public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
// This is for keeping compatibility with old implementation.
// If the server version is lower than the client version, it's possible that the
// flushTable method is not present in the server side, if so, we need to fall back
// to the old implementation.
FlushTableRequest request = RequestConverter.buildFlushTableRequest(tableName, columnFamily,
ng.getNonceGroup(), ng.newNonce());
CompletableFuture<Void> procFuture = this.<FlushTableRequest, FlushTableResponse> procedureCall(
tableName, request, (s, c, req, done) -> s.flushTable(c, req, done),
(resp) -> resp.getProcId(), new FlushTableProcedureBiConsumer(tableName));
// here we use another new CompletableFuture because the
// procFuture is not fully controlled by ourselves.
CompletableFuture<Void> future = new CompletableFuture<>();
addListener(procFuture, (ret, error) -> {
if (error != null) {
if (error instanceof DoNotRetryIOException) {
// usually this is caused by the method is not present on the server or
// the hbase hadoop version does not match the running hadoop version.
// if that happens, we need fall back to the old flush implementation.
LOG.info("Unrecoverable error in master side. Fallback to FlushTableProcedure V1", error);
legacyFlush(future, tableName, columnFamily);
} else {
future.completeExceptionally(error);
}
} else {
future.complete(ret);
}
});
return future;
}

private void legacyFlush(CompletableFuture<Void> future, TableName tableName,
byte[] columnFamily) {
addListener(tableExists(tableName), (exists, err) -> {
if (err != null) {
future.completeExceptionally(err);
Expand Down Expand Up @@ -985,7 +1019,6 @@ public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
});
}
});
return future;
}

@Override
Expand Down Expand Up @@ -2768,6 +2801,18 @@ String getOperationType() {
}
}

private static class FlushTableProcedureBiConsumer extends TableProcedureBiConsumer {

FlushTableProcedureBiConsumer(TableName tableName) {
super(tableName);
}

@Override
String getOperationType() {
return "FLUSH";
}
}

private static class CreateNamespaceProcedureBiConsumer extends NamespaceProcedureBiConsumer {

CreateNamespaceProcedureBiConsumer(String namespaceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
Expand Down Expand Up @@ -1714,4 +1715,14 @@ public static RemoveServersRequest buildRemoveServersRequest(Set<Address> server
}
return RemoveServersRequest.newBuilder().addAllServers(hostPorts).build();
}

public static FlushTableRequest buildFlushTableRequest(final TableName tableName,
final byte[] columnFamily, final long nonceGroup, final long nonce) {
FlushTableRequest.Builder builder = FlushTableRequest.newBuilder();
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
if (columnFamily != null) {
builder.setColumnFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
}
return builder.setNonceGroup(nonceGroup).setNonce(nonce).build();
}
}
14 changes: 14 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,17 @@ message ModifyTableResponse {
optional uint64 proc_id = 1;
}

message FlushTableRequest {
required TableName table_name = 1;
optional bytes column_family = 2;
optional uint64 nonce_group = 3 [default = 0];
optional uint64 nonce = 4 [default = 0];
}

message FlushTableResponse {
optional uint64 proc_id = 1;
}

/* Namespace-level protobufs */

message CreateNamespaceRequest {
Expand Down Expand Up @@ -1239,6 +1250,9 @@ service MasterService {

rpc FlushMasterStore(FlushMasterStoreRequest)
returns(FlushMasterStoreResponse);

rpc FlushTable(FlushTableRequest)
returns(FlushTableResponse);
}

// HBCK Service definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,26 @@ message RestoreParentToChildRegionsPair {
required string child2_region_name = 3;
}

enum FlushTableState {
FLUSH_TABLE_PREPARE = 1;
FLUSH_TABLE_FLUSH_REGIONS = 2;
}

message FlushTableProcedureStateData {
required TableName table_name = 1;
optional bytes column_family = 2;
}

message FlushRegionProcedureStateData {
required RegionInfo region = 1;
optional bytes column_family = 2;
}

message FlushRegionParameter {
required RegionInfo region = 1;
optional bytes column_family = 2;
}

enum SnapshotState {
SNAPSHOT_PREPARE = 1;
SNAPSHOT_PRE_OPERATION = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,13 @@ public enum EventType {
* RS verify snapshot.<br>
* RS_VERIFY_SNAPSHOT
*/
RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS);
RS_VERIFY_SNAPSHOT(88, ExecutorType.RS_SNAPSHOT_OPERATIONS),

/**
* RS flush regions.<br>
* RS_FLUSH_OPERATIONS
*/
RS_FLUSH_REGIONS(89, ExecutorType.RS_FLUSH_OPERATIONS);

private final int code;
private final ExecutorType executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public enum ExecutorType {
RS_SWITCH_RPC_THROTTLE(33),
RS_IN_MEMORY_COMPACTION(34),
RS_CLAIM_REPLICATION_QUEUE(35),
RS_SNAPSHOT_OPERATIONS(36);
RS_SNAPSHOT_OPERATIONS(36),

RS_FLUSH_OPERATIONS(37);

ExecutorType(int value) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
import org.apache.hadoop.hbase.master.procedure.FlushTableProcedure;
import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
Expand Down Expand Up @@ -4358,4 +4359,34 @@ private void initializeCoprocessorHost(Configuration conf) {
// initialize master side coprocessors before we start handling requests
this.cpHost = new MasterCoprocessorHost(this, conf);
}

@Override
public long flushTable(TableName tableName, byte[] columnFamily, long nonceGroup, long nonce)
throws IOException {
checkInitialized();

if (
!getConfiguration().getBoolean(MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED,
MasterFlushTableProcedureManager.FLUSH_PROCEDURE_ENABLED_DEFAULT)
) {
throw new DoNotRetryIOException("FlushProcedure is DISABLED");
}

return MasterProcedureUtil
.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
@Override
protected void run() throws IOException {
getMaster().getMasterCoprocessorHost().preTableFlush(tableName);
LOG.info(getClientIdAuditPrefix() + " flush " + tableName);
submitProcedure(
new FlushTableProcedure(procedureExecutor.getEnvironment(), tableName, columnFamily));
getMaster().getMasterCoprocessorHost().postTableFlush(tableName);
}

@Override
protected String getDescription() {
return "FlushTableProcedure";
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
Expand Down Expand Up @@ -3587,4 +3589,17 @@ public FlushMasterStoreResponse flushMasterStore(RpcController controller,
}
return FlushMasterStoreResponse.newBuilder().build();
}

@Override
public FlushTableResponse flushTable(RpcController controller, FlushTableRequest req)
throws ServiceException {
TableName tableName = ProtobufUtil.toTableName(req.getTableName());
byte[] columnFamily = req.hasColumnFamily() ? req.getColumnFamily().toByteArray() : null;
try {
long procId = server.flushTable(tableName, columnFamily, req.getNonceGroup(), req.getNonce());
return FlushTableResponse.newBuilder().setProcId(procId).build();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -464,4 +464,15 @@ boolean normalizeRegions(final NormalizeTableFilterParams ntfp, final boolean is
* Flush master local region
*/
void flushMasterStore() throws IOException;

/**
* Flush an existing table
* @param tableName The table name
* @param columnFamily The column family
* @param nonceGroup the nonce group
* @param nonce the nonce
* @return the flush procedure id
*/
long flushTable(final TableName tableName, final byte[] columnFamily, final long nonceGroup,
final long nonce) throws IOException;
}
Loading

0 comments on commit 6919097

Please sign in to comment.