Skip to content

Commit

Permalink
HBASE-27028 Add a shell command for flushing master local region (#4539)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
2005hithlj authored Jun 21, 2022
1 parent bdaa448 commit 666aa06
Show file tree
Hide file tree
Showing 21 changed files with 238 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3212,4 +3212,9 @@ default List<OnlineLogRecord> getSlowLogResponses(final Set<ServerName> serverNa
*/
List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType, ServerType serverType,
int limit, Map<String, Object> filterParams) throws IOException;

/**
* Flush master local region
*/
void flushMasterStore() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1629,4 +1629,9 @@ default CompletableFuture<List<Boolean>> hasUserPermissions(List<Permission> per
*/
CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, String logType,
ServerType serverType, int limit, Map<String, Object> filterParams);

/**
* Flush master local region
*/
CompletableFuture<Void> flushMasterStore();
}
Original file line number Diff line number Diff line change
Expand Up @@ -865,4 +865,9 @@ public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNam
String logType, ServerType serverType, int limit, Map<String, Object> filterParams) {
return wrap(rawAdmin.getLogEntries(serverNames, logType, serverType, limit, filterParams));
}

@Override
public CompletableFuture<Void> flushMasterStore() {
return wrap(rawAdmin.flushMasterStore());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
Expand Down Expand Up @@ -1922,6 +1924,12 @@ public ModifyColumnStoreFileTrackerResponse modifyColumnStoreFileTracker(
throws ServiceException {
return stub.modifyColumnStoreFileTracker(controller, request);
}

@Override
public FlushMasterStoreResponse flushMasterStore(RpcController controller,
FlushMasterStoreRequest request) throws ServiceException {
return stub.flushMasterStore(controller, request);
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
Expand Down Expand Up @@ -4401,6 +4402,18 @@ public List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType,
return Collections.emptyList();
}

@Override
public void flushMasterStore() throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
protected Void rpcCall() throws Exception {
FlushMasterStoreRequest request = FlushMasterStoreRequest.newBuilder().build();
master.flushMasterStore(getRpcController(), request);
return null;
}
});
}

private List<LogEntry> getBalancerDecisions(final int limit) throws IOException {
return executeCallable(
new MasterCallable<List<LogEntry>>(getConnection(), getRpcControllerFactory()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
Expand Down Expand Up @@ -4023,4 +4025,14 @@ public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNam
return CompletableFuture.completedFuture(Collections.emptyList());
}
}

@Override
public CompletableFuture<Void> flushMasterStore() {
FlushMasterStoreRequest.Builder request = FlushMasterStoreRequest.newBuilder();
return this.<Void> newMasterCaller()
.action(((controller, stub) -> this.<FlushMasterStoreRequest, FlushMasterStoreResponse,
Void> call(controller, stub, request.build(),
(s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null)))
.call();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
Expand Down Expand Up @@ -744,4 +746,10 @@ public ModifyColumnStoreFileTrackerResponse modifyColumnStoreFileTracker(RpcCont
ModifyColumnStoreFileTrackerRequest request) throws ServiceException {
return stub.modifyColumnStoreFileTracker(controller, request);
}

@Override
public FlushMasterStoreResponse flushMasterStore(RpcController controller,
FlushMasterStoreRequest request) throws ServiceException {
return stub.flushMasterStore(controller, request);
}
}
6 changes: 6 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,9 @@ message ModifyColumnStoreFileTrackerResponse {
optional uint64 proc_id = 1;
}

message FlushMasterStoreRequest {}
message FlushMasterStoreResponse {}

service MasterService {
/** Used by the client to get the number of regions that have received the updated schema */
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
Expand Down Expand Up @@ -1153,6 +1156,9 @@ service MasterService {

rpc ModifyColumnStoreFileTracker(ModifyColumnStoreFileTrackerRequest)
returns(ModifyColumnStoreFileTrackerResponse);

rpc FlushMasterStore(FlushMasterStoreRequest)
returns(FlushMasterStoreResponse);
}

// HBCK Service definitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,22 @@ default void postTableFlush(final ObserverContext<MasterCoprocessorEnvironment>
final TableName tableName) throws IOException {
}

/**
* Called before the master local region memstore is flushed to disk.
* @param ctx the environment to interact with the framework and master
*/
default void preMasterStoreFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}

/**
* Called after the master local region memstore is flushed to disk.
* @param ctx the environment to interact with the framework and master
*/
default void postMasterStoreFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}

/**
* Called before the quota for the user is stored.
* @param ctx the environment to interact with the framework and master
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4163,6 +4163,26 @@ public MetaLocationSyncer getMetaLocationSyncer() {
return metaLocationSyncer;
}

@Override
public void flushMasterStore() throws IOException {
LOG.info("Force flush master local region.");
if (this.cpHost != null) {
try {
cpHost.preMasterStoreFlush();
} catch (IOException ioe) {
LOG.error("Error invoking master coprocessor preMasterStoreFlush()", ioe);
}
}
masterRegion.flush(true);
if (this.cpHost != null) {
try {
cpHost.postMasterStoreFlush();
} catch (IOException ioe) {
LOG.error("Error invoking master coprocessor postMasterStoreFlush()", ioe);
}
}
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public MasterRegion getMasterRegion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,24 @@ public void call(MasterObserver observer) throws IOException {
});
}

public void preMasterStoreFlush() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preMasterStoreFlush(this);
}
});
}

public void postMasterStoreFlush() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postMasterStoreFlush(this);
}
});
}

public void preSetUserQuota(final String user, final GlobalQuotaSettings quotas)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
Expand Down Expand Up @@ -1377,6 +1379,18 @@ public ModifyColumnStoreFileTrackerResponse modifyColumnStoreFileTracker(RpcCont
}
}

@Override
public FlushMasterStoreResponse flushMasterStore(RpcController controller,
FlushMasterStoreRequest request) throws ServiceException {
rpcPreCheck("flushMasterStore");
try {
master.flushMasterStore();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
return FlushMasterStoreResponse.newBuilder().build();
}

@Override
public ModifyNamespaceResponse modifyNamespace(RpcController controller,
ModifyNamespaceRequest request) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,4 +480,9 @@ boolean normalizeRegions(final NormalizeTableFilterParams ntfp, final boolean is
* We need to get this in MTP to tell the syncer the new meta replica count.
*/
MetaLocationSyncer getMetaLocationSyncer();

/**
* Flush master local region
*/
void flushMasterStore() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ public RegionScanner getRegionScanner(Scan scan) throws IOException {
return region.getScanner(scan);
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public FlushResult flush(boolean force) throws IOException {
return region.flush(force);
flusherAndCompactor.resetChangesAfterLastFlush();
FlushResult flushResult = region.flush(force);
flusherAndCompactor.recordLastFlushTime();
return flushResult;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private boolean needCompaction() {
}

private void flushLoop() {
lastFlushTime = EnvironmentEdgeManager.currentTime();
recordLastFlushTime();
while (!closed) {
flushLock.lock();
try {
Expand All @@ -202,10 +202,10 @@ private void flushLoop() {
flushLock.unlock();
}
assert flushRequest;
changesAfterLastFlush.set(0);
resetChangesAfterLastFlush();
try {
region.flush(true);
lastFlushTime = EnvironmentEdgeManager.currentTime();
recordLastFlushTime();
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to flush master local region, aborting...", e);
abortable.abort("Failed to flush master local region", e);
Expand Down Expand Up @@ -263,6 +263,14 @@ void requestFlush() {
}
}

void resetChangesAfterLastFlush() {
changesAfterLastFlush.set(0);
}

void recordLastFlushTime() {
lastFlushTime = EnvironmentEdgeManager.currentTime();
}

@Override
public void close() {
closed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ public static class CPMasterObserver implements MasterCoprocessor, MasterObserve
private boolean postRequestLockCalled;
private boolean preLockHeartbeatCalled;
private boolean postLockHeartbeatCalled;
private boolean preMasterStoreFlushCalled;
private boolean postMasterStoreFlushCalled;

public void resetStates() {
preCreateTableRegionInfosCalled = false;
Expand Down Expand Up @@ -280,6 +282,8 @@ public void resetStates() {
postRequestLockCalled = false;
preLockHeartbeatCalled = false;
postLockHeartbeatCalled = false;
preMasterStoreFlushCalled = false;
postMasterStoreFlushCalled = false;
}

@Override
Expand Down Expand Up @@ -1042,6 +1046,18 @@ public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName) throws IOException {
}

@Override
public void preMasterStoreFlush(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
preMasterStoreFlushCalled = true;
}

@Override
public void postMasterStoreFlush(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
postMasterStoreFlushCalled = true;
}

@Override
public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final String userName, final GlobalQuotaSettings quotas) throws IOException {
Expand Down Expand Up @@ -1676,4 +1692,22 @@ public void testQueueLockAndLockHeartbeatOperations() throws Exception {
ProcedureTestingUtility.waitNoProcedureRunning(master.getMasterProcedureExecutor());
ProcedureTestingUtility.assertProcNotFailed(master.getMasterProcedureExecutor(), procId);
}

@Test
public void testMasterStoreOperations() throws Exception {
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
MasterCoprocessorHost host = master.getMasterCoprocessorHost();
CPMasterObserver cp = host.findCoprocessor(CPMasterObserver.class);
cp.resetStates();
assertFalse("No master store flush call", cp.preMasterStoreFlushCalled);
assertFalse("No master store flush call", cp.postMasterStoreFlushCalled);

try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
Admin admin = connection.getAdmin()) {
admin.flushMasterStore();

assertTrue("Master store flush called", cp.preMasterStoreFlushCalled);
assertTrue("Master store flush called", cp.postMasterStoreFlushCalled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,10 @@ public MetaLocationSyncer getMetaLocationSyncer() {
return null;
}

@Override
public void flushMasterStore() throws IOException {
}

@Override
public long modifyTableStoreFileTracker(TableName tableName, String dstSFT, long nonceGroup,
long nonce) throws IOException {
Expand Down
8 changes: 7 additions & 1 deletion hbase-shell/src/main/ruby/hbase/admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1795,11 +1795,17 @@ def modify_table_sft(tableName, sft)
@admin.modifyTableStoreFileTracker(tableName, sft)
end

#----------------------------------------------------------------------------------------------
#----------------------------------------------------------------------------------------------
# Change table column family's sft
def modify_table_family_sft(tableName, family_bytes, sft)
@admin.modifyColumnFamilyStoreFileTracker(tableName, family_bytes, sft)
end

#----------------------------------------------------------------------------------------------
# Flush master local region
def flush_master_store()
@admin.flushMasterStore()
end
end
# rubocop:enable Metrics/ClassLength
end
Loading

0 comments on commit 666aa06

Please sign in to comment.