Skip to content

Commit

Permalink
HBASE-27988 NPE in AddPeerProcedure recovery (apache#5331)
Browse files Browse the repository at this point in the history
Co-authored-by: huiruan <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
frostruan and huiruan authored Jul 28, 2023
1 parent cf81fd3 commit 67b20fd
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -368,6 +369,9 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
new ReplicationLogCleanerBarrier();

// Only allow to add one sync replication peer concurrently
private final Semaphore syncReplicationPeerLock = new Semaphore(1);

// manager of replication
private ReplicationPeerManager replicationPeerManager;

Expand Down Expand Up @@ -4115,6 +4119,11 @@ public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
return replicationLogCleanerBarrier;
}

@Override
public Semaphore getSyncReplicationPeerLock() {
return syncReplicationPeerLock;
}

public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
getReplicationLoad(ServerName[] serverNames) {
List<ReplicationPeerDescription> peerList = this.getReplicationPeerManager().listPeers(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
Expand Down Expand Up @@ -368,6 +369,11 @@ ReplicationPeerConfig getReplicationPeerConfig(String peerId)
*/
ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier();

/**
* Returns the SyncReplicationPeerLock.
*/
Semaphore getSyncReplicationPeerLock();

/**
* Returns the {@link SyncReplicationReplayWALManager}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected void releaseLatch(MasterProcedureEnv env) {
env.getMasterServices().getReplicationLogCleanerBarrier().enable();
}
if (peerConfig.isSyncReplication()) {
env.getReplicationPeerManager().releaseSyncReplicationPeerLock();
env.getMasterServices().getSyncReplicationPeerLock().release();
}
super.releaseLatch(env);
}
Expand All @@ -108,7 +108,7 @@ protected void prePeerModification(MasterProcedureEnv env)
cpHost.preAddReplicationPeer(peerId, peerConfig);
}
if (peerConfig.isSyncReplication()) {
if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) {
if (!env.getMasterServices().getSyncReplicationPeerLock().tryAcquire()) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn(
"Can not acquire sync replication peer lock for peer {}, sleep {} secs", peerId,
Expand Down Expand Up @@ -147,7 +147,7 @@ protected void afterReplay(MasterProcedureEnv env) {
}
cleanerDisabled = true;
if (peerConfig.isSyncReplication()) {
if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) {
if (!env.getMasterServices().getSyncReplicationPeerLock().tryAcquire()) {
throw new IllegalStateException(
"Can not acquire sync replication peer lock for peer " + peerId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesS
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed to claim replication queues for {}, suspend {}secs {}; {};", crashedServer,
LOG.warn("Failed to claim replication queues for {}, suspend {} secs", crashedServer,
backoff / 1000, e);
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -111,9 +110,6 @@ public class ReplicationPeerManager implements ConfigurationObserver {
SyncReplicationState.DOWNGRADE_ACTIVE,
EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));

// Only allow to add one sync replication peer concurrently
private final Semaphore syncReplicationPeerLock = new Semaphore(1);

private final String clusterId;

private volatile Configuration conf;
Expand Down Expand Up @@ -713,14 +709,6 @@ private boolean isStringEquals(String s1, String s2) {
return s1.equals(s2);
}

public boolean tryAcquireSyncReplicationPeerLock() {
return syncReplicationPeerLock.tryAcquire();
}

public void releaseSyncReplicationPeerLock() {
syncReplicationPeerLock.release();
}

@Override
public void onConfigurationChange(Configuration conf) {
this.conf = conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ChoreService;
Expand Down Expand Up @@ -530,4 +531,9 @@ public boolean isReplicationPeerModificationEnabled() {
public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
return null;
}

@Override
public Semaphore getSyncReplicationPeerLock() {
return null;
}
}

0 comments on commit 67b20fd

Please sign in to comment.