Skip to content

Commit

Permalink
HBASE-27214 Implement the new replication hfile/log cleaner (#4722)
Browse files Browse the repository at this point in the history
Signed-off-by: Xin Sun <[email protected]>
  • Loading branch information
Apache9 committed Apr 21, 2023
1 parent 7bff87b commit 68bcae9
Show file tree
Hide file tree
Showing 20 changed files with 1,008 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ default void preClean() {
}

/**
* Used to do some cleanup work
* Will be called after cleaner run.
*/
default void postClean() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public static MasterRegion create(MasterRegionParams params) throws IOException
params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize());
walRoller.start();

WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), server, false);
WALFactory walFactory = new WALFactory(conf, server.getServerName(), server, false);
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.replication.ReplicationException;
Expand Down Expand Up @@ -84,15 +83,21 @@ protected ReplicationPeerConfig getNewPeerConfig() {

@Override
protected void releaseLatch(MasterProcedureEnv env) {
env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
if (peerConfig.isSyncReplication()) {
env.getReplicationPeerManager().releaseSyncReplicationPeerLock();
}
ProcedurePrepareLatch.releaseLatch(latch, this);
super.releaseLatch(env);
}

@Override
protected void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException, ProcedureSuspendedException {
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn("LogCleaner is run at the same time when adding peer {}, sleep {} secs",
peerId, backoff / 1000));
}
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preAddReplicationPeer(peerId, peerConfig);
Expand Down Expand Up @@ -128,9 +133,13 @@ protected void postPeerModification(MasterProcedureEnv env)
@Override
protected void afterReplay(MasterProcedureEnv env) {
if (getCurrentState() == getInitialState()) {
// will try to acquire the lock when executing the procedure, no need to acquire it here
// do not need to disable log cleaner or acquire lock if we are in the initial state, later
// when executing the procedure we will try to disable and acquire.
return;
}
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
throw new IllegalStateException("can not disable log cleaner, this should not happen");
}
if (peerConfig.isSyncReplication()) {
if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
Expand Down Expand Up @@ -98,6 +99,9 @@ public class ReplicationPeerManager {
// Only allow to add one sync replication peer concurrently
private final Semaphore syncReplicationPeerLock = new Semaphore(1);

private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
new ReplicationLogCleanerBarrier();

private final String clusterId;

private final Configuration conf;
Expand Down Expand Up @@ -693,4 +697,8 @@ public boolean tryAcquireSyncReplicationPeerLock() {
public void releaseSyncReplicationPeerLock() {
syncReplicationPeerLock.release();
}

public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
return replicationLogCleanerBarrier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1734,7 +1734,7 @@ public boolean isOnline() {
* be hooked up to WAL.
*/
private void setupWALAndReplication() throws IOException {
WALFactory factory = new WALFactory(conf, serverName.toString(), this, true);
WALFactory factory = new WALFactory(conf, serverName, this, true);
// TODO Replication make assumptions here based on the default filesystem impl
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public final class ReplicationOffsetUtil {

private ReplicationOffsetUtil() {
}

public static boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
// if no offset or the offset is just a place marker, replicate
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
return true;
}
// otherwise, compare the timestamp
long walTs = AbstractFSWALProvider.getTimestamp(wal);
long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
if (walTs < startWalTs) {
return false;
} else if (walTs > startWalTs) {
return true;
}
// if the timestamp equals, usually it means we should include this wal but there is a special
// case, a negative offset means the wal has already been fully replicated, so here we should
// check the offset.
return offset.getOffset() >= 0;
}
}
Loading

0 comments on commit 68bcae9

Please sign in to comment.