Skip to content

Commit

Permalink
HBASE-27216 Revisit the ReplicationSyncUp tool (#4966)
Browse files Browse the repository at this point in the history
Signed-off-by: Liangjun He <[email protected]>
  • Loading branch information
Apache9 committed Apr 21, 2023
1 parent d7775aa commit 3f8baa6
Show file tree
Hide file tree
Showing 22 changed files with 1,023 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,8 @@ public static String writeMapAsString(Map<String, Object> map) throws IOExceptio
public static String writeObjectAsString(Object object) throws IOException {
return GSON.toJson(object);
}

public static <T> T fromJson(String json, Class<T> clazz) {
return GSON.fromJson(json, clazz);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,7 @@ message ModifyColumnFamilyStoreFileTrackerStateData {
enum AssignReplicationQueuesState {
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1;
ASSIGN_REPLICATION_QUEUES_CLAIM = 2;
ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES = 3;
}

message AssignReplicationQueuesStateData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,25 @@ void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
* Add the given hfile refs to the given peer.
*/
void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException;

// the below method is for clean up stale data after running ReplicatoinSyncUp
/**
* Remove all the last sequence ids and hfile references data which are written before the given
* timestamp.
* <p/>
* The data of these two types are not used by replication directly.
* <p/>
* For last sequence ids, we will check it in serial replication, to make sure that we will
* replicate all edits in order, so if there are stale data, the worst case is that we will stop
* replicating as we think we still need to finish previous ranges first, although actually we
* have already replicated them out.
* <p/>
* For hfile references, it is just used by hfile cleaner to not remove these hfiles before we
* replicate them out, so if there are stale data, the worst case is that we can not remove these
* hfiles, although actually they have already been replicated out.
* <p/>
* So it is OK for us to just bring up the cluster first, and then use this method to delete the
* stale data, i.e, the data which are written before a specific timestamp.
*/
void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Used to create replication storage(peer, queue) classes.
*/
@InterfaceAudience.Private
public final class ReplicationStorageFactory {

private static final Logger LOG = LoggerFactory.getLogger(ReplicationStorageFactory.class);

public static final String REPLICATION_PEER_STORAGE_IMPL = "hbase.replication.peer.storage.impl";

// must use zookeeper here, otherwise when user upgrading from an old version without changing the
Expand All @@ -51,6 +55,8 @@ public final class ReplicationStorageFactory {
public static final TableName REPLICATION_QUEUE_TABLE_NAME_DEFAULT =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");

public static final String REPLICATION_QUEUE_IMPL = "hbase.replication.queue.storage.impl";

public static TableDescriptor createReplicationQueueTableDescriptor(TableName tableName)
throws IOException {
return TableDescriptorBuilder.newBuilder(tableName)
Expand Down Expand Up @@ -108,15 +114,26 @@ public static ReplicationPeerStorage getReplicationPeerStorage(FileSystem fs, ZK
*/
public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
Configuration conf) {
return getReplicationQueueStorage(conn, TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME,
REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())));
return getReplicationQueueStorage(conn, conf, TableName.valueOf(conf
.get(REPLICATION_QUEUE_TABLE_NAME, REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())));
}

/**
* Create a new {@link ReplicationQueueStorage}.
*/
public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
TableName tableName) {
return new TableReplicationQueueStorage(conn, tableName);
Configuration conf, TableName tableName) {
Class<? extends ReplicationQueueStorage> clazz = conf.getClass(REPLICATION_QUEUE_IMPL,
TableReplicationQueueStorage.class, ReplicationQueueStorage.class);
try {
Constructor<? extends ReplicationQueueStorage> c =
clazz.getConstructor(Connection.class, TableName.class);
return c.newInstance(conn, tableName);
} catch (Exception e) {
LOG.debug(
"failed to create ReplicationQueueStorage with Connection, try creating with Configuration",
e);
return ReflectionUtils.newInstance(clazz, conf, tableName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -594,4 +594,24 @@ public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
throw new ReplicationException("failed to batch update hfile references", e);
}
}

@Override
public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException {
try (Table table = conn.getTable(tableName);
ResultScanner scanner = table.getScanner(new Scan().addFamily(LAST_SEQUENCE_ID_FAMILY)
.addFamily(HFILE_REF_FAMILY).setFilter(new KeyOnlyFilter()))) {
for (;;) {
Result r = scanner.next();
if (r == null) {
break;
}
Delete delete = new Delete(r.getRow()).addFamily(LAST_SEQUENCE_ID_FAMILY, ts)
.addFamily(HFILE_REF_FAMILY, ts);
table.delete(delete);
}
} catch (IOException e) {
throw new ReplicationException(
"failed to remove last sequence ids and hfile references before timestamp " + ts, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -59,6 +62,7 @@
import javax.servlet.http.HttpServlet;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CatalogFamilyFormat;
Expand Down Expand Up @@ -227,6 +231,8 @@
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.ReplicationSyncUpToolInfo;
import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
Expand All @@ -247,6 +253,7 @@
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JsonMapper;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RetryCounter;
Expand All @@ -268,7 +275,9 @@
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.gson.JsonParseException;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
Expand Down Expand Up @@ -1286,6 +1295,38 @@ private void finishActiveMasterInitialization() throws IOException, InterruptedE
status.setStatus("Initializing MOB Cleaner");
initMobCleaner();

// delete the stale data for replication sync up tool if necessary
status.setStatus("Cleanup ReplicationSyncUp status if necessary");
Path replicationSyncUpInfoFile =
new Path(new Path(dataRootDir, ReplicationSyncUp.INFO_DIR), ReplicationSyncUp.INFO_FILE);
if (dataFs.exists(replicationSyncUpInfoFile)) {
// info file is available, load the timestamp and use it to clean up stale data in replication
// queue storage.
byte[] data;
try (FSDataInputStream in = dataFs.open(replicationSyncUpInfoFile)) {
data = ByteStreams.toByteArray(in);
}
ReplicationSyncUpToolInfo info = null;
try {
info = JsonMapper.fromJson(Bytes.toString(data), ReplicationSyncUpToolInfo.class);
} catch (JsonParseException e) {
// usually this should be a partial file, which means the ReplicationSyncUp tool did not
// finish properly, so not a problem. Here we do not clean up the status as we do not know
// the reason why the tool did not finish properly, so let users clean the status up
// manually
LOG.warn("failed to parse replication sync up info file, ignore and continue...", e);
}
if (info != null) {
LOG.info("Remove last sequence ids and hfile references which are written before {}({})",
info.getStartTimeMs(), DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.systemDefault())
.format(Instant.ofEpochMilli(info.getStartTimeMs())));
replicationPeerManager.getQueueStorage()
.removeLastSequenceIdsAndHFileRefsBefore(info.getStartTimeMs());
// delete the file after removing the stale data, so next time we do not need to do this
// again.
dataFs.delete(replicationSyncUpInfoFile, false);
}
}
status.setStatus("Calling postStartMaster coprocessors");
if (this.cpHost != null) {
// don't let cp initialization errors kill the master
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
Expand All @@ -37,6 +39,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -102,7 +105,7 @@ private void addMissingQueues(MasterProcedureEnv env) throws ReplicationExceptio
}
}

private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream()
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet());
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
Expand Down Expand Up @@ -130,18 +133,51 @@ private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
return Flow.HAS_MORE_STATE;
}

// check whether ReplicationSyncUp has already done the work for us, if so, we should skip
// claiming the replication queues and deleting them instead.
private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
MasterFileSystem mfs = env.getMasterFileSystem();
Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
return mfs.getFileSystem().exists(new Path(syncUpDir, crashedServer.getServerName()));
}

private void removeQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
for (ReplicationQueueId queueId : storage.listAllQueueIds(crashedServer)) {
storage.removeQueue(queueId);
}
MasterFileSystem mfs = env.getMasterFileSystem();
Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
// remove the region server record file
mfs.getFileSystem().delete(new Path(syncUpDir, crashedServer.getServerName()), false);
}

@Override
protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
try {
switch (state) {
case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES:
addMissingQueues(env);
retryCounter = null;
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM);
return Flow.HAS_MORE_STATE;
if (shouldSkip(env)) {
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
return Flow.HAS_MORE_STATE;
} else {
addMissingQueues(env);
retryCounter = null;
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM);
return Flow.HAS_MORE_STATE;
}
case ASSIGN_REPLICATION_QUEUES_CLAIM:
return claimQueues(env);
if (shouldSkip(env)) {
retryCounter = null;
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
return Flow.HAS_MORE_STATE;
} else {
return claimQueues(env);
}
case ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES:
removeQueues(env);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@

import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,6 +60,32 @@ public ClaimReplicationQueueRemoteProcedure(ReplicationQueueId queueId, ServerNa
this.targetServer = targetServer;
}

// check whether ReplicationSyncUp has already done the work for us, if so, we should skip
// claiming the replication queues and deleting them instead.
private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
MasterFileSystem mfs = env.getMasterFileSystem();
Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
return mfs.getFileSystem().exists(new Path(syncUpDir, getServerName().getServerName()));
}

@Override
protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
try {
if (shouldSkip(env)) {
LOG.info("Skip claiming {} because replication sync up has already done it for us",
getServerName());
return null;
}
} catch (IOException e) {
LOG.warn("failed to check whether we should skip claiming {} due to replication sync up",
getServerName(), e);
// just finish the procedure here, as the AssignReplicationQueuesProcedure will reschedule
return null;
}
return super.execute(env);
}

@Override
public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
assert targetServer.equals(remote);
Expand Down
Loading

0 comments on commit 3f8baa6

Please sign in to comment.