Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27988 NPE in AddPeerProcedure recovery #5331

Merged
merged 1 commit into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -368,6 +369,9 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
new ReplicationLogCleanerBarrier();

// Only allow to add one sync replication peer concurrently
private final Semaphore syncReplicationPeerLock = new Semaphore(1);

// manager of replication
private ReplicationPeerManager replicationPeerManager;

Expand Down Expand Up @@ -4115,6 +4119,11 @@ public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
return replicationLogCleanerBarrier;
}

@Override
public Semaphore getSyncReplicationPeerLock() {
return syncReplicationPeerLock;
}

public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
getReplicationLoad(ServerName[] serverNames) {
List<ReplicationPeerDescription> peerList = this.getReplicationPeerManager().listPeers(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
Expand Down Expand Up @@ -368,6 +369,11 @@ ReplicationPeerConfig getReplicationPeerConfig(String peerId)
*/
ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier();

/**
* Returns the SyncReplicationPeerLock.
*/
Semaphore getSyncReplicationPeerLock();

/**
* Returns the {@link SyncReplicationReplayWALManager}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected void releaseLatch(MasterProcedureEnv env) {
env.getMasterServices().getReplicationLogCleanerBarrier().enable();
}
if (peerConfig.isSyncReplication()) {
env.getReplicationPeerManager().releaseSyncReplicationPeerLock();
env.getMasterServices().getSyncReplicationPeerLock().release();
}
super.releaseLatch(env);
}
Expand All @@ -108,7 +108,7 @@ protected void prePeerModification(MasterProcedureEnv env)
cpHost.preAddReplicationPeer(peerId, peerConfig);
}
if (peerConfig.isSyncReplication()) {
if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) {
if (!env.getMasterServices().getSyncReplicationPeerLock().tryAcquire()) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn(
"Can not acquire sync replication peer lock for peer {}, sleep {} secs", peerId,
Expand Down Expand Up @@ -147,7 +147,7 @@ protected void afterReplay(MasterProcedureEnv env) {
}
cleanerDisabled = true;
if (peerConfig.isSyncReplication()) {
if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) {
if (!env.getMasterServices().getSyncReplicationPeerLock().tryAcquire()) {
throw new IllegalStateException(
"Can not acquire sync replication peer lock for peer " + peerId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesS
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed to claim replication queues for {}, suspend {}secs {}; {};", crashedServer,
LOG.warn("Failed to claim replication queues for {}, suspend {} secs", crashedServer,
backoff / 1000, e);
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -111,9 +110,6 @@ public class ReplicationPeerManager implements ConfigurationObserver {
SyncReplicationState.DOWNGRADE_ACTIVE,
EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));

// Only allow to add one sync replication peer concurrently
private final Semaphore syncReplicationPeerLock = new Semaphore(1);

private final String clusterId;

private volatile Configuration conf;
Expand Down Expand Up @@ -713,14 +709,6 @@ private boolean isStringEquals(String s1, String s2) {
return s1.equals(s2);
}

public boolean tryAcquireSyncReplicationPeerLock() {
return syncReplicationPeerLock.tryAcquire();
}

public void releaseSyncReplicationPeerLock() {
syncReplicationPeerLock.release();
}

@Override
public void onConfigurationChange(Configuration conf) {
this.conf = conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ChoreService;
Expand Down Expand Up @@ -530,4 +531,9 @@ public boolean isReplicationPeerModificationEnabled() {
public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
return null;
}

@Override
public Semaphore getSyncReplicationPeerLock() {
return null;
}
}