Skip to content

Commit

Permalink
GEODE-10063: Correctly set primary queue connection. (apache#7382)
Browse files Browse the repository at this point in the history
      *  When adding QueueConnection to connectionList, also checks if 
         the connection has been destroyed by another thread to prevent
         a bad connection is being added to the list.
      *  Schedule RedundancySatisfierTask after remove connection so 
         that bad connection can be detected.
      *  During recoveryPrimary in RedundancySatisfierTask also 
         check if primary connection is destroyed. If so, connection 
         from backups will be promoted to primary.
  • Loading branch information
pivotal-eshu authored Feb 25, 2022
1 parent 1fc35af commit 45cbe7f
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.geode.GemFireException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.NoSubscriptionServersAvailableException;
import org.apache.geode.cache.client.ServerConnectivityException;
Expand Down Expand Up @@ -355,7 +356,8 @@ void connectionCrashed(Connection con) {
endpointCrashed(con.getEndpoint());
}

private void endpointCrashed(Endpoint endpoint) {
@VisibleForTesting
void endpointCrashed(Endpoint endpoint) {
QueueConnectionImpl deadConnection;
// We must be synchronized while checking to see if we have a queue connection for the endpoint,
// because when we need to prevent a race between adding a queue connection to the map
Expand All @@ -373,8 +375,8 @@ private void endpointCrashed(Endpoint endpoint) {
? (deadConnection.getUpdater().isPrimary() ? "Primary" : "Redundant")
: "Queue",
endpoint});
scheduleRedundancySatisfierIfNeeded(0);
deadConnection.internalDestroy();
scheduleRedundancySatisfierIfNeeded(0);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Ignoring crashed endpoint {} it does not have a queue.", endpoint);
Expand Down Expand Up @@ -724,7 +726,8 @@ private void recoverRedundancy(Set<ServerLocation> excludedServers, boolean reco
}
}

private QueueConnectionImpl promoteBackupToPrimary(List<Connection> backups) {
@VisibleForTesting
QueueConnectionImpl promoteBackupToPrimary(List<Connection> backups) {
QueueConnectionImpl primary = null;
for (int i = 0; primary == null && i < backups.size(); i++) {
QueueConnectionImpl lastConnection = (QueueConnectionImpl) backups.get(i);
Expand Down Expand Up @@ -844,12 +847,13 @@ private List<ServerLocation> findQueueServers(Set<ServerLocation> excludedServer
* First we try to make a backup server the primary, but if run out of backup servers we will try
* to find a new server.
*/
private void recoverPrimary(Set<ServerLocation> excludedServers) {
@VisibleForTesting
void recoverPrimary(Set<ServerLocation> excludedServers) {
if (pool.getPoolOrCacheCancelInProgress() != null) {
return;
}
final boolean isDebugEnabled = logger.isDebugEnabled();
if (queueConnections.getPrimary() != null) {
if (queueConnections.getPrimary() != null && !queueConnections.getPrimary().isDestroyed()) {
if (isDebugEnabled) {
logger.debug("Primary recovery not needed");
}
Expand Down Expand Up @@ -980,7 +984,8 @@ private QueueConnectionImpl initializeQueueConnection(Connection connection, boo
// connection but CCU may died as endpoint closed....
// so before putting connection need to see if something(crash) happen we should be able to
// recover from it
private boolean addToConnectionList(QueueConnectionImpl connection, boolean isPrimary) {
@VisibleForTesting
boolean addToConnectionList(QueueConnectionImpl connection, boolean isPrimary) {
boolean isBadConnection;
synchronized (lock) {
ClientUpdater cu = connection.getUpdater();
Expand All @@ -989,7 +994,7 @@ private boolean addToConnectionList(QueueConnectionImpl connection, boolean isPr
}
// now still CCU can died but then it will execute Checkendpoint with lock it will remove
// connection connection and it will reschedule it.
if (connection.getEndpoint().isClosed() || shuttingDown
if (connection.getEndpoint().isClosed() || connection.isDestroyed() || shuttingDown
|| pool.getPoolOrCacheCancelInProgress() != null) {
isBadConnection = true;
} else {
Expand Down Expand Up @@ -1022,7 +1027,8 @@ private boolean addToConnectionList(QueueConnectionImpl connection, boolean isPr
return !isBadConnection;
}

private void scheduleRedundancySatisfierIfNeeded(long delay) {
@VisibleForTesting
void scheduleRedundancySatisfierIfNeeded(long delay) {
if (shuttingDown) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.client.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;

public class QueueManagerImplTest {
private final InternalPool pool = mock(InternalPool.class, RETURNS_DEEP_STUBS);
private final Endpoint endpoint = mock(Endpoint.class);
private final Endpoint backupEndpoint = mock(Endpoint.class);
private final QueueConnectionImpl primary = mock(QueueConnectionImpl.class);
private final QueueConnectionImpl backup = mock(QueueConnectionImpl.class);
private final ClientUpdater clientUpdater = mock(ClientUpdater.class);
private QueueManagerImpl queueManager;

@Before
public void setup() {
queueManager = new QueueManagerImpl(pool, null, null, null, 1, 1, null, null);
when(primary.getEndpoint()).thenReturn(endpoint);
when(primary.getUpdater()).thenReturn(clientUpdater);
when(primary.isDestroyed()).thenReturn(false);
when(clientUpdater.isAlive()).thenReturn(true);
when(clientUpdater.isProcessing()).thenReturn(true);
when(endpoint.isClosed()).thenReturn(false);
when(backup.getEndpoint()).thenReturn(backupEndpoint);
when(backup.getUpdater()).thenReturn(clientUpdater);
when(backupEndpoint.isClosed()).thenReturn(false);
}

@Test
public void addNoClientUpdaterConnectionToConnectionListReturnsFalse() {
when(primary.getUpdater()).thenReturn(null);

assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
assertThat(connectionList.getPrimary()).isNull();
}

@Test
public void addNotAliveClientUpdaterConnectionToConnectionListReturnsFalse() {
when(clientUpdater.isAlive()).thenReturn(false);

assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
assertThat(connectionList.getPrimary()).isNull();
}

@Test
public void addNotProcessingClientUpdaterConnectionToConnectionListReturnsFalse() {
when(clientUpdater.isProcessing()).thenReturn(false);

assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
assertThat(connectionList.getPrimary()).isNull();
}

@Test
public void addClosedEndpointConnectionToConnectionListReturnsFalse() throws Exception {
when(endpoint.isClosed()).thenReturn(true);

assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
assertThat(connectionList.getPrimary()).isNull();
verify(primary).internalClose(true);
}

@Test
public void addDestroyedConnectionToConnectionListReturnsFalse() throws Exception {
when(primary.isDestroyed()).thenReturn(true);

assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
assertThat(connectionList.getPrimary()).isNull();
verify(primary).internalClose(true);
}

@Test
public void addConnectionToConnectionListWhenCancelInProgressReturnsFalse() throws Exception {
when(pool.getPoolOrCacheCancelInProgress()).thenReturn("cache closed");

assertThat(queueManager.addToConnectionList(primary, true)).isFalse();
QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
assertThat(connectionList.getPrimary()).isNull();
verify(primary).internalClose(true);
}

@Test
public void addConnectionToConnectionListCanSetPrimary() throws Exception {
assertThat(queueManager.addToConnectionList(primary, true)).isTrue();
QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
assertThat(connectionList.getPrimary()).isEqualTo(primary);
verify(primary, never()).internalClose(true);
}

@Test
public void addConnectionToConnectionListCanAddBackups() throws Exception {
queueManager.addToConnectionList(primary, true);

assertThat(queueManager.addToConnectionList(backup, false)).isTrue();
QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait();
assertThat(connectionList.getPrimary()).isEqualTo(primary);
assertThat(connectionList.getBackups()).contains(backup);
verify(backup, never()).internalClose(true);
}

@Test
public void endpointCrashedScheduleRedundancySatisfierAfterConnectionDestroyed() {
addConnections();
QueueManagerImpl spy = spy(queueManager);
InOrder inOrder = inOrder(primary, spy);
doNothing().when(spy).scheduleRedundancySatisfierIfNeeded(0);

spy.endpointCrashed(endpoint);

inOrder.verify(primary).internalDestroy();
inOrder.verify(spy).scheduleRedundancySatisfierIfNeeded(0);
QueueManager.QueueConnections connectionList = spy.getAllConnectionsNoWait();
assertThat(connectionList.getPrimary()).isNull();
}

private void addConnections() {
queueManager.addToConnectionList(primary, true);
queueManager.addToConnectionList(backup, false);
}

@Test
public void recoverPrimaryDoesNotPromoteBackupToPrimaryIfPrimaryExists() {
addConnections();
QueueManagerImpl spy = spy(queueManager);

spy.recoverPrimary(null);

verify(spy, never()).promoteBackupToPrimary(anyList());
}

@Test
public void recoverPrimaryPromoteBackupToPrimaryIfNoPrimary() {
QueueManagerImpl spy = spy(queueManager);
spy.addToConnectionList(backup, false);
doReturn(backup).when(spy).promoteBackupToPrimary(anyList());

spy.recoverPrimary(null);

verify(spy).promoteBackupToPrimary(anyList());
verifyQueueConnectionsAfterRecoverPrimary(spy);
}

private void verifyQueueConnectionsAfterRecoverPrimary(QueueManagerImpl spy) {
QueueManager.QueueConnections connectionList = spy.getAllConnectionsNoWait();
assertThat(connectionList.getPrimary()).isEqualTo(backup);
assertThat(connectionList.getBackups()).isEmpty();
}

@Test
public void recoverPrimaryPromoteBackupToPrimaryIfPrimaryConnectionIsDestroyed() {
addConnections();
QueueManagerImpl spy = spy(queueManager);
doReturn(backup).when(spy).promoteBackupToPrimary(anyList());
when(primary.isDestroyed()).thenReturn(true);

spy.recoverPrimary(null);

verify(spy).promoteBackupToPrimary(anyList());
verifyQueueConnectionsAfterRecoverPrimary(spy);
}
}

0 comments on commit 45cbe7f

Please sign in to comment.