diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 332017386d43..12351569e059 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -4279,5 +4279,4 @@ private void initializeCoprocessorHost(Configuration conf) {
// initialize master side coprocessors before we start handling requests
this.cpHost = new MasterCoprocessorHost(this, conf);
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
index d37bb6202730..e08f53294336 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java
@@ -50,7 +50,7 @@ default void preClean() {
}
/**
- * Used to do some cleanup work
+ * Will be called after cleaner run.
*/
default void postClean() {
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
index 5ed0df0aa580..338abf38c6b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
@@ -383,7 +383,7 @@ public static MasterRegion create(MasterRegionParams params) throws IOException
params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize());
walRoller.start();
- WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), server, false);
+ WALFactory walFactory = new WALFactory(conf, server.getServerName(), server, false);
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
index 6d0acee76caa..25a4cd4b08e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
@@ -21,7 +21,6 @@
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -84,15 +83,21 @@ protected ReplicationPeerConfig getNewPeerConfig() {
@Override
protected void releaseLatch(MasterProcedureEnv env) {
+ env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
if (peerConfig.isSyncReplication()) {
env.getReplicationPeerManager().releaseSyncReplicationPeerLock();
}
- ProcedurePrepareLatch.releaseLatch(latch, this);
+ super.releaseLatch(env);
}
@Override
protected void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException, ProcedureSuspendedException {
+ if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn("LogCleaner is run at the same time when adding peer {}, sleep {} secs",
+ peerId, backoff / 1000));
+ }
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preAddReplicationPeer(peerId, peerConfig);
@@ -128,9 +133,13 @@ protected void postPeerModification(MasterProcedureEnv env)
@Override
protected void afterReplay(MasterProcedureEnv env) {
if (getCurrentState() == getInitialState()) {
- // will try to acquire the lock when executing the procedure, no need to acquire it here
+ // do not need to disable log cleaner or acquire lock if we are in the initial state, later
+ // when executing the procedure we will try to disable and acquire.
return;
}
+ if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
+ throw new IllegalStateException("can not disable log cleaner, this should not happen");
+ }
if (peerConfig.isSyncReplication()) {
if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) {
throw new IllegalStateException(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index e72523f83b23..4a78b7f971ed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
@@ -97,6 +98,9 @@ public class ReplicationPeerManager {
// Only allow to add one sync replication peer concurrently
private final Semaphore syncReplicationPeerLock = new Semaphore(1);
+ private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
+ new ReplicationLogCleanerBarrier();
+
private final String clusterId;
private final Configuration conf;
@@ -682,4 +686,8 @@ public boolean tryAcquireSyncReplicationPeerLock() {
public void releaseSyncReplicationPeerLock() {
syncReplicationPeerLock.release();
}
+
+ public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
+ return replicationLogCleanerBarrier;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index c18b7e73cdf6..a5981dd56a37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1700,7 +1700,7 @@ public boolean isOnline() {
* be hooked up to WAL.
*/
private void setupWALAndReplication() throws IOException {
- WALFactory factory = new WALFactory(conf, serverName.toString(), this, true);
+ WALFactory factory = new WALFactory(conf, serverName, this, true);
// TODO Replication make assumptions here based on the default filesystem impl
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationOffsetUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationOffsetUtil.java
new file mode 100644
index 000000000000..052c5542d47a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationOffsetUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class ReplicationOffsetUtil {
+
+ private ReplicationOffsetUtil() {
+ }
+
+ public static boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
+ // if no offset or the offset is just a place marker, replicate
+ if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
+ return true;
+ }
+ // otherwise, compare the timestamp
+ long walTs = AbstractFSWALProvider.getTimestamp(wal);
+ long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
+ if (walTs < startWalTs) {
+ return false;
+ } else if (walTs > startWalTs) {
+ return true;
+ }
+ // if the timestamp equals, usually it means we should include this wal but there is a special
+ // case, a negative offset means the wal has already been fully replicated, so here we should
+ // check the offset.
+ return offset.getOffset() >= 0;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 7135ca9a9b20..f1fd8f8d6b3a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -17,18 +17,29 @@
*/
package org.apache.hadoop.hbase.replication.master;
-import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
-import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,35 +51,129 @@
/**
* Implementation of a log cleaner that checks if a log is still scheduled for replication before
* deleting it when its TTL is over.
+ *
+ * The logic is a bit complicated after we switch to use table based replication queue storage, see
+ * the design doc in HBASE-27109 and the comments in HBASE-27214 for more details.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class);
- private ZKWatcher zkw = null;
- private boolean shareZK = false;
- private ReplicationQueueStorage queueStorage;
+ private Set notFullyDeadServers;
+ private Set peerIds;
+ // ServerName -> PeerId -> WalGroup -> Offset
+ // Here the server name is the source server name, so we can make sure that there is only one
+ // queue for a given peer, that why we can use a String peerId as key instead of
+ // ReplicationQueueId.
+ private Map>> replicationOffsets;
+ private ReplicationPeerManager rpm;
+ private Supplier> getNotFullyDeadServers;
+
+ private boolean canFilter;
private boolean stopped = false;
- private Set wals;
- private long readZKTimestamp = 0;
@Override
public void preClean() {
- readZKTimestamp = EnvironmentEdgeManager.currentTime();
- // TODO: revisit the implementation
- // try {
- // // The concurrently created new WALs may not be included in the return list,
- // // but they won't be deleted because they're not in the checking set.
- // wals = queueStorage.getAllWALs();
- // } catch (ReplicationException e) {
- // LOG.warn("Failed to read zookeeper, skipping checking deletable files");
- // wals = null;
- // }
+ if (this.getConf() == null) {
+ return;
+ }
+ canFilter = rpm.getReplicationLogCleanerBarrier().start();
+ if (canFilter) {
+ notFullyDeadServers = getNotFullyDeadServers.get();
+ peerIds = rpm.listPeers(null).stream().map(ReplicationPeerDescription::getPeerId)
+ .collect(Collectors.toSet());
+ // must get the not fully dead servers first and then get the replication queue data, in this
+ // way we can make sure that, we should have added the missing replication queues for the dead
+ // region servers recorded in the above set, otherwise the logic in the
+ // filterForDeadRegionServer method may lead us delete wal still in use.
+ List allQueueData;
+ try {
+ allQueueData = rpm.getQueueStorage().listAllQueues();
+ } catch (ReplicationException e) {
+ LOG.error("Can not list all replication queues, give up cleaning", e);
+ rpm.getReplicationLogCleanerBarrier().stop();
+ canFilter = false;
+ notFullyDeadServers = null;
+ peerIds = null;
+ return;
+ }
+ replicationOffsets = new HashMap<>();
+ for (ReplicationQueueData queueData : allQueueData) {
+ ReplicationQueueId queueId = queueData.getId();
+ ServerName serverName = queueId.getServerWALsBelongTo();
+ Map> peerId2Offsets =
+ replicationOffsets.computeIfAbsent(serverName, k -> new HashMap<>());
+ Map offsets =
+ peerId2Offsets.computeIfAbsent(queueId.getPeerId(), k -> new HashMap<>());
+ offsets.putAll(queueData.getOffsets());
+ }
+ } else {
+ LOG.info("Skip replication log cleaner because an AddPeerProcedure is running");
+ }
}
@Override
public void postClean() {
- // release memory
- wals = null;
+ if (canFilter) {
+ rpm.getReplicationLogCleanerBarrier().stop();
+ canFilter = false;
+ // release memory
+ notFullyDeadServers = null;
+ peerIds = null;
+ replicationOffsets = null;
+ }
+ }
+
+ private boolean shouldDelete(ReplicationGroupOffset offset, FileStatus file) {
+ return !ReplicationOffsetUtil.shouldReplicate(offset, file.getPath().getName());
+ }
+
+ private boolean filterForLiveRegionServer(ServerName serverName, FileStatus file) {
+ Map> peerId2Offsets =
+ replicationOffsets.get(serverName);
+ if (peerId2Offsets == null) {
+ // if there are replication queues missing, we can not delete the wal
+ return false;
+ }
+ for (String peerId : peerIds) {
+ Map offsets = peerId2Offsets.get(peerId);
+ // if no replication queue for a peer, we can not delete the wal
+ if (offsets == null) {
+ return false;
+ }
+ String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName());
+ ReplicationGroupOffset offset = offsets.get(walGroupId);
+ // if a replication queue still need to replicate this wal, we can not delete it
+ if (!shouldDelete(offset, file)) {
+ return false;
+ }
+ }
+ // if all replication queues have already finished replicating this wal, we can delete it.
+ return true;
+ }
+
+ private boolean filterForDeadRegionServer(ServerName serverName, FileStatus file) {
+ Map> peerId2Offsets =
+ replicationOffsets.get(serverName);
+ if (peerId2Offsets == null) {
+ // no replication queue for this dead rs, we can delete all wal files for it
+ return true;
+ }
+ for (String peerId : peerIds) {
+ Map offsets = peerId2Offsets.get(peerId);
+ if (offsets == null) {
+ // for dead server, we only care about existing replication queues, as we will delete a
+ // queue after we finish replicating it.
+ continue;
+ }
+ String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName());
+ ReplicationGroupOffset offset = offsets.get(walGroupId);
+ // if a replication queue still need to replicate this wal, we can not delete it
+ if (!shouldDelete(offset, file)) {
+ return false;
+ }
+ }
+ // if all replication queues have already finished replicating this wal, we can delete it.
+ return true;
}
@Override
@@ -78,10 +183,12 @@ public Iterable getDeletableFiles(Iterable files) {
if (this.getConf() == null) {
return files;
}
-
- if (wals == null) {
+ if (!canFilter) {
+ // We can not delete anything if there are AddPeerProcedure running at the same time
+ // See HBASE-27214 for more details.
return Collections.emptyList();
}
+
return Iterables.filter(files, new Predicate() {
@Override
public boolean apply(FileStatus file) {
@@ -90,65 +197,56 @@ public boolean apply(FileStatus file) {
if (file == null) {
return false;
}
- String wal = file.getPath().getName();
- boolean logInReplicationQueue = wals.contains(wal);
- if (logInReplicationQueue) {
- LOG.debug("Found up in ZooKeeper, NOT deleting={}", wal);
+ if (peerIds.isEmpty()) {
+ // no peer, can always delete
+ return true;
+ }
+ // not a valid wal file name, delete
+ if (!AbstractFSWALProvider.validateWALFilename(file.getPath().getName())) {
+ return true;
+ }
+ // meta wal is always deletable as we will never replicate it
+ if (AbstractFSWALProvider.isMetaFile(file.getPath())) {
+ return true;
+ }
+ ServerName serverName =
+ AbstractFSWALProvider.parseServerNameFromWALName(file.getPath().getName());
+ if (notFullyDeadServers.contains(serverName)) {
+ return filterForLiveRegionServer(serverName, file);
+ } else {
+ return filterForDeadRegionServer(serverName, file);
}
- return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp);
}
});
}
+ private Set getNotFullyDeadServers(MasterServices services) {
+ List onlineServers = services.getServerManager().getOnlineServersList();
+ return Stream.concat(onlineServers.stream(),
+ services.getMasterProcedureExecutor().getProcedures().stream()
+ .filter(p -> p instanceof ServerCrashProcedure).filter(p -> !p.isFinished())
+ .map(p -> ((ServerCrashProcedure) p).getServerName()))
+ .collect(Collectors.toSet());
+ }
+
@Override
public void init(Map params) {
super.init(params);
- try {
- if (MapUtils.isNotEmpty(params)) {
- Object master = params.get(HMaster.MASTER);
- if (master != null && master instanceof HMaster) {
- zkw = ((HMaster) master).getZooKeeper();
- shareZK = true;
- }
- }
- if (zkw == null) {
- zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null);
+ if (MapUtils.isNotEmpty(params)) {
+ Object master = params.get(HMaster.MASTER);
+ if (master != null && master instanceof MasterServices) {
+ MasterServices m = (MasterServices) master;
+ rpm = m.getReplicationPeerManager();
+ getNotFullyDeadServers = () -> getNotFullyDeadServers(m);
+ return;
}
- // TODO: revisit the implementation
- // this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
- } catch (IOException e) {
- LOG.error("Error while configuring " + this.getClass().getName(), e);
}
- }
-
- @InterfaceAudience.Private
- public void setConf(Configuration conf, ZKWatcher zk) {
- super.setConf(conf);
- try {
- this.zkw = zk;
- // TODO: revisit the implementation
- // this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf);
- } catch (Exception e) {
- LOG.error("Error while configuring " + this.getClass().getName(), e);
- }
- }
-
- @InterfaceAudience.Private
- public void setConf(Configuration conf, ZKWatcher zk,
- ReplicationQueueStorage replicationQueueStorage) {
- super.setConf(conf);
- this.zkw = zk;
- this.queueStorage = replicationQueueStorage;
+ throw new IllegalArgumentException("Missing " + HMaster.MASTER + " parameter");
}
@Override
public void stop(String why) {
- if (this.stopped) return;
this.stopped = true;
- if (!shareZK && this.zkw != null) {
- LOG.info("Stopping " + this.zkw);
- this.zkw.close();
- }
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleanerBarrier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleanerBarrier.java
new file mode 100644
index 000000000000..d87565187280
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleanerBarrier.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A barrier to guard the execution of {@link ReplicationLogCleaner}.
+ *
+ * The reason why we introduce this class is because there could be race between
+ * {@link org.apache.hadoop.hbase.master.replication.AddPeerProcedure} and
+ * {@link ReplicationLogCleaner}. See HBASE-27214 for more details.
+ */
+@InterfaceAudience.Private
+public class ReplicationLogCleanerBarrier {
+
+ private enum State {
+ // the cleaner is not running
+ NOT_RUNNING,
+ // the cleaner is running
+ RUNNING,
+ // the cleaner is disabled
+ DISABLED
+ }
+
+ private State state = State.NOT_RUNNING;
+
+ // we could have multiple AddPeerProcedure running at the same time, so here we need to do
+ // reference counting.
+ private int numberDisabled = 0;
+
+ public synchronized boolean start() {
+ if (state == State.NOT_RUNNING) {
+ state = State.RUNNING;
+ return true;
+ }
+ if (state == State.DISABLED) {
+ return false;
+ }
+ throw new IllegalStateException("Unexpected state " + state);
+ }
+
+ public synchronized void stop() {
+ if (state != State.RUNNING) {
+ throw new IllegalStateException("Unexpected state " + state);
+ }
+ state = State.NOT_RUNNING;
+ }
+
+ public synchronized boolean disable() {
+ if (state == State.RUNNING) {
+ return false;
+ }
+ if (state == State.NOT_RUNNING) {
+ state = State.DISABLED;
+ }
+ numberDisabled++;
+ return true;
+ }
+
+ public synchronized void enable() {
+ if (state != State.DISABLED) {
+ throw new IllegalStateException("Unexpected state " + state);
+ }
+ numberDisabled--;
+ if (numberDisabled == 0) {
+ state = State.NOT_RUNNING;
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 7fab12e83118..e3745a7c2e37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -55,6 +55,7 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
@@ -806,22 +807,7 @@ private boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
if (AbstractFSWALProvider.isMetaFile(wal)) {
return false;
}
- // if no offset or the offset is just a place marker, replicate
- if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
- return true;
- }
- // otherwise, compare the timestamp
- long walTs = AbstractFSWALProvider.getTimestamp(wal);
- long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
- if (walTs < startWalTs) {
- return false;
- } else if (walTs > startWalTs) {
- return true;
- }
- // if the timestamp equals, usually it means we should include this wal but there is a special
- // case, a negative offset means the wal has already been fully replicated, so here we should
- // check the offset.
- return offset.getOffset() >= 0;
+ return ReplicationOffsetUtil.shouldReplicate(offset, wal);
}
void claimQueue(ReplicationQueueId queueId) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index 50ffd6df1afd..b63ad473719c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -117,7 +117,10 @@ public boolean isAborted() {
System.out.println("Start Replication Server start");
Replication replication = new Replication();
replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
- new WALFactory(conf, "test", null, false));
+ new WALFactory(conf,
+ ServerName
+ .valueOf(getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()),
+ null, false));
ReplicationSourceManager manager = replication.getReplicationManager();
manager.init();
claimReplicationQueues(zkw, manager);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 9c2177ce8d81..1ad760818735 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -19,6 +19,9 @@
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -39,6 +42,7 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@@ -631,4 +635,29 @@ private static String getWALNameGroupFromWALName(String name, int group) {
public static String getWALPrefixFromWALName(String name) {
return getWALNameGroupFromWALName(name, 1);
}
+
+ private static final Pattern SERVER_NAME_PATTERN = Pattern.compile("^[^"
+ + ServerName.SERVERNAME_SEPARATOR + "]+" + ServerName.SERVERNAME_SEPARATOR
+ + Addressing.VALID_PORT_REGEX + ServerName.SERVERNAME_SEPARATOR + Addressing.VALID_PORT_REGEX);
+
+ /**
+ * Parse the server name from wal prefix. A wal's name is always started with a server name in non
+ * test code.
+ * @throws IllegalArgumentException if the name passed in is not started with a server name
+ * @return the server name
+ */
+ public static ServerName parseServerNameFromWALName(String name) {
+ String decoded;
+ try {
+ decoded = URLDecoder.decode(name, StandardCharsets.UTF_8.name());
+ } catch (UnsupportedEncodingException e) {
+ throw new AssertionError("should never happen", e);
+ }
+ Matcher matcher = SERVER_NAME_PATTERN.matcher(decoded);
+ if (matcher.find()) {
+ return ServerName.valueOf(matcher.group());
+ } else {
+ throw new IllegalArgumentException(name + " is not started with a server name");
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 9136099defdc..95565e319be1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
@@ -170,17 +171,35 @@ static WALProvider createProvider(Class extends WALProvider> clazz) throws IOE
}
/**
- * @param conf must not be null, will keep a reference to read params in later reader/writer
- * instances.
- * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
- * to make a directory
+ * Create a WALFactory.
*/
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*|.*/HBaseTestingUtility.java")
public WALFactory(Configuration conf, String factoryId) throws IOException {
// default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider
// for HMaster or HRegionServer which take system table only. See HBASE-19999
this(conf, factoryId, null, true);
}
+ /**
+ * Create a WALFactory.
+ *
+ * This is the constructor you should use when creating a WALFactory in normal code, to make sure
+ * that the {@code factoryId} is the server name. We need this assumption in some places for
+ * parsing the server name out from the wal file name.
+ * @param conf must not be null, will keep a reference to read params
+ * in later reader/writer instances.
+ * @param serverName use to generate the factoryId, which will be append at
+ * the first of the final file name
+ * @param abortable the server associated with this WAL file
+ * @param enableSyncReplicationWALProvider whether wrap the wal provider to a
+ * {@link SyncReplicationWALProvider} n
+ */
+ public WALFactory(Configuration conf, ServerName serverName, Abortable abortable,
+ boolean enableSyncReplicationWALProvider) throws IOException {
+ this(conf, serverName.toString(), abortable, enableSyncReplicationWALProvider);
+ }
+
/**
* @param conf must not be null, will keep a reference to read params
* in later reader/writer instances.
@@ -190,7 +209,7 @@ public WALFactory(Configuration conf, String factoryId) throws IOException {
* @param enableSyncReplicationWALProvider whether wrap the wal provider to a
* {@link SyncReplicationWALProvider}
*/
- public WALFactory(Configuration conf, String factoryId, Abortable abortable,
+ private WALFactory(Configuration conf, String factoryId, Abortable abortable,
boolean enableSyncReplicationWALProvider) throws IOException {
// until we've moved reader/writer construction down into providers, this initialization must
// happen prior to provider initialization, in case they need to instantiate a reader/writer.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 1a0537bcbafe..d7ba6c227c6d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -18,57 +18,60 @@
package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
+import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
+import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MockServer;
-import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// revisit later after we implement new replication log cleaner
-@Ignore
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
@Category({ MasterTests.class, MediumTests.class })
public class TestLogsCleaner {
@@ -88,22 +91,29 @@ public class TestLogsCleaner {
private static DirScanPool POOL;
+ private static String peerId = "1";
+
+ private MasterServices masterServices;
+
+ private ReplicationQueueStorage queueStorage;
+
+ @Rule
+ public final TableNameTestRule tableNameRule = new TableNameTestRule();
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- TEST_UTIL.startMiniZKCluster();
- TEST_UTIL.startMiniDFSCluster(1);
+ TEST_UTIL.startMiniCluster();
POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniZKCluster();
- TEST_UTIL.shutdownMiniDFSCluster();
+ TEST_UTIL.shutdownMiniCluster();
POOL.shutdownNow();
}
@Before
- public void beforeTest() throws IOException {
+ public void beforeTest() throws Exception {
conf = TEST_UTIL.getConfiguration();
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
@@ -112,14 +122,51 @@ public void beforeTest() throws IOException {
// root directory
fs.mkdirs(OLD_WALS_DIR);
+
+ TableName tableName = tableNameRule.getTableName();
+ TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
+ TEST_UTIL.getAdmin().createTable(td);
+ TEST_UTIL.waitTableAvailable(tableName);
+ queueStorage =
+ ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), tableName);
+
+ masterServices = mock(MasterServices.class);
+ when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
+ ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
+ when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
+ when(rpm.getQueueStorage()).thenReturn(queueStorage);
+ when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
+ when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
+ ServerManager sm = mock(ServerManager.class);
+ when(masterServices.getServerManager()).thenReturn(sm);
+ when(sm.getOnlineServersList()).thenReturn(Collections.emptyList());
+ @SuppressWarnings("unchecked")
+ ProcedureExecutor procExec = mock(ProcedureExecutor.class);
+ when(masterServices.getMasterProcedureExecutor()).thenReturn(procExec);
+ when(procExec.getProcedures()).thenReturn(Collections.emptyList());
}
/**
* This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same
- * oldWALs directory. Created files: - 2 invalid files - 5 old Procedure WALs - 30 old WALs from
- * which 3 are in replication - 5 recent Procedure WALs - 1 recent WAL - 1 very new WAL (timestamp
- * in future) - masterProcedureWALs subdirectory Files which should stay: - 3 replication WALs - 2
- * new WALs - 5 latest Procedure WALs - masterProcedureWALs subdirectory
+ * oldWALs directory.
+ *
+ * Created files:
+ *
+ *
2 invalid files
+ *
5 old Procedure WALs
+ *
30 old WALs from which 3 are in replication
+ *
5 recent Procedure WALs
+ *
1 recent WAL
+ *
1 very new WAL (timestamp in future)
+ *
masterProcedureWALs subdirectory
+ *
+ * Files which should stay:
+ *
+ *
3 replication WALs
+ *
2 new WALs
+ *
5 latest Procedure WALs
+ *
masterProcedureWALs subdirectory
+ *
*/
@Test
public void testLogCleaning() throws Exception {
@@ -131,9 +178,6 @@ public void testLogCleaning() throws Exception {
HMaster.decorateMasterConfiguration(conf);
Server server = new DummyServer();
- ReplicationQueueStorage queueStorage = ReplicationStorageFactory
- .getReplicationQueueStorage(ConnectionFactory.createConnection(conf), conf);
-
String fakeMachineName =
URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name());
@@ -159,14 +203,12 @@ public void testLogCleaning() throws Exception {
for (int i = 1; i <= 30; i++) {
Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i));
fs.createNewFile(fileName);
- // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
- // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
- if (i % (30 / 3) == 0) {
- // queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
- LOG.info("Replication log file: " + fileName);
- }
}
-
+ // Case 4: the newest 3 WALs will be kept because they are beyond the replication offset
+ masterServices.getReplicationPeerManager().listPeers(null)
+ .add(new ReplicationPeerDescription(peerId, true, null, null));
+ queueStorage.setOffset(new ReplicationQueueId(server.getServerName(), peerId), fakeMachineName,
+ new ReplicationGroupOffset(fakeMachineName + "." + (now - 3), 0), Collections.emptyMap());
// Case 5: 5 Procedure WALs that are new, will stay
for (int i = 6; i <= 10; i++) {
Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
@@ -189,7 +231,8 @@ public void testLogCleaning() throws Exception {
// 10 procedure WALs
assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
- LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null);
+ LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL,
+ ImmutableMap.of(HMaster.MASTER, masterServices));
cleaner.chore();
// In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
@@ -208,98 +251,14 @@ public void testLogCleaning() throws Exception {
}
}
- @Test
- public void testZooKeeperRecoveryDuringGetListOfReplicators() throws Exception {
- ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
-
- List dummyFiles = Arrays.asList(
- new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log1")),
- new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log2")));
-
- FaultyZooKeeperWatcher faultyZK =
- new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
- final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false);
-
- try {
- faultyZK.init(false);
- ReplicationQueueStorage queueStorage = spy(ReplicationStorageFactory
- .getReplicationQueueStorage(ConnectionFactory.createConnection(conf), conf));
- // doAnswer(new Answer