From d607a91bee8e0f940fdf524c8b9ddbb2926cc8a1 Mon Sep 17 00:00:00 2001 From: Eric Shu Date: Fri, 25 Feb 2022 09:57:18 -0800 Subject: [PATCH] GEODE-10063: Correctly set primary queue connection. (#7382) * When adding QueueConnection to connectionList, also checks if the connection has been destroyed by another thread to prevent a bad connection is being added to the list. * Schedule RedundancySatisfierTask after remove connection so that bad connection can be detected. * During recoveryPrimary in RedundancySatisfierTask also check if primary connection is destroyed. If so, connection from backups will be promoted to primary. (cherry picked from commit 45cbe7f8df39704899b0305729749dc1cc9ffe89) --- .../client/internal/QueueManagerImpl.java | 22 +- .../client/internal/QueueManagerImplTest.java | 192 ++++++++++++++++++ 2 files changed, 206 insertions(+), 8 deletions(-) create mode 100644 geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerImplTest.java diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java index 28485c79684d..4da3aa7bb274 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/QueueManagerImpl.java @@ -41,6 +41,7 @@ import org.apache.geode.GemFireException; import org.apache.geode.SystemFailure; import org.apache.geode.annotations.Immutable; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.cache.InterestResultPolicy; import org.apache.geode.cache.NoSubscriptionServersAvailableException; import org.apache.geode.cache.client.ServerConnectivityException; @@ -355,7 +356,8 @@ void connectionCrashed(Connection con) { endpointCrashed(con.getEndpoint()); } - private void endpointCrashed(Endpoint endpoint) { + @VisibleForTesting + void endpointCrashed(Endpoint endpoint) { QueueConnectionImpl deadConnection; // We must be synchronized while checking to see if we have a queue connection for the endpoint, // because when we need to prevent a race between adding a queue connection to the map @@ -373,8 +375,8 @@ private void endpointCrashed(Endpoint endpoint) { ? (deadConnection.getUpdater().isPrimary() ? "Primary" : "Redundant") : "Queue", endpoint}); - scheduleRedundancySatisfierIfNeeded(0); deadConnection.internalDestroy(); + scheduleRedundancySatisfierIfNeeded(0); } else { if (logger.isDebugEnabled()) { logger.debug("Ignoring crashed endpoint {} it does not have a queue.", endpoint); @@ -724,7 +726,8 @@ private void recoverRedundancy(Set excludedServers, boolean reco } } - private QueueConnectionImpl promoteBackupToPrimary(List backups) { + @VisibleForTesting + QueueConnectionImpl promoteBackupToPrimary(List backups) { QueueConnectionImpl primary = null; for (int i = 0; primary == null && i < backups.size(); i++) { QueueConnectionImpl lastConnection = (QueueConnectionImpl) backups.get(i); @@ -844,12 +847,13 @@ private List findQueueServers(Set excludedServer * First we try to make a backup server the primary, but if run out of backup servers we will try * to find a new server. */ - private void recoverPrimary(Set excludedServers) { + @VisibleForTesting + void recoverPrimary(Set excludedServers) { if (pool.getPoolOrCacheCancelInProgress() != null) { return; } final boolean isDebugEnabled = logger.isDebugEnabled(); - if (queueConnections.getPrimary() != null) { + if (queueConnections.getPrimary() != null && !queueConnections.getPrimary().isDestroyed()) { if (isDebugEnabled) { logger.debug("Primary recovery not needed"); } @@ -980,7 +984,8 @@ private QueueConnectionImpl initializeQueueConnection(Connection connection, boo // connection but CCU may died as endpoint closed.... // so before putting connection need to see if something(crash) happen we should be able to // recover from it - private boolean addToConnectionList(QueueConnectionImpl connection, boolean isPrimary) { + @VisibleForTesting + boolean addToConnectionList(QueueConnectionImpl connection, boolean isPrimary) { boolean isBadConnection; synchronized (lock) { ClientUpdater cu = connection.getUpdater(); @@ -989,7 +994,7 @@ private boolean addToConnectionList(QueueConnectionImpl connection, boolean isPr } // now still CCU can died but then it will execute Checkendpoint with lock it will remove // connection connection and it will reschedule it. - if (connection.getEndpoint().isClosed() || shuttingDown + if (connection.getEndpoint().isClosed() || connection.isDestroyed() || shuttingDown || pool.getPoolOrCacheCancelInProgress() != null) { isBadConnection = true; } else { @@ -1022,7 +1027,8 @@ private boolean addToConnectionList(QueueConnectionImpl connection, boolean isPr return !isBadConnection; } - private void scheduleRedundancySatisfierIfNeeded(long delay) { + @VisibleForTesting + void scheduleRedundancySatisfierIfNeeded(long delay) { if (shuttingDown) { return; } diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerImplTest.java new file mode 100644 index 000000000000..264a83417132 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerImplTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.client.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +public class QueueManagerImplTest { + private final InternalPool pool = mock(InternalPool.class, RETURNS_DEEP_STUBS); + private final Endpoint endpoint = mock(Endpoint.class); + private final Endpoint backupEndpoint = mock(Endpoint.class); + private final QueueConnectionImpl primary = mock(QueueConnectionImpl.class); + private final QueueConnectionImpl backup = mock(QueueConnectionImpl.class); + private final ClientUpdater clientUpdater = mock(ClientUpdater.class); + private QueueManagerImpl queueManager; + + @Before + public void setup() { + queueManager = new QueueManagerImpl(pool, null, null, null, 1, 1, null, null); + when(primary.getEndpoint()).thenReturn(endpoint); + when(primary.getUpdater()).thenReturn(clientUpdater); + when(primary.isDestroyed()).thenReturn(false); + when(clientUpdater.isAlive()).thenReturn(true); + when(clientUpdater.isProcessing()).thenReturn(true); + when(endpoint.isClosed()).thenReturn(false); + when(backup.getEndpoint()).thenReturn(backupEndpoint); + when(backup.getUpdater()).thenReturn(clientUpdater); + when(backupEndpoint.isClosed()).thenReturn(false); + } + + @Test + public void addNoClientUpdaterConnectionToConnectionListReturnsFalse() { + when(primary.getUpdater()).thenReturn(null); + + assertThat(queueManager.addToConnectionList(primary, true)).isFalse(); + QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait(); + assertThat(connectionList.getPrimary()).isNull(); + } + + @Test + public void addNotAliveClientUpdaterConnectionToConnectionListReturnsFalse() { + when(clientUpdater.isAlive()).thenReturn(false); + + assertThat(queueManager.addToConnectionList(primary, true)).isFalse(); + QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait(); + assertThat(connectionList.getPrimary()).isNull(); + } + + @Test + public void addNotProcessingClientUpdaterConnectionToConnectionListReturnsFalse() { + when(clientUpdater.isProcessing()).thenReturn(false); + + assertThat(queueManager.addToConnectionList(primary, true)).isFalse(); + QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait(); + assertThat(connectionList.getPrimary()).isNull(); + } + + @Test + public void addClosedEndpointConnectionToConnectionListReturnsFalse() throws Exception { + when(endpoint.isClosed()).thenReturn(true); + + assertThat(queueManager.addToConnectionList(primary, true)).isFalse(); + QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait(); + assertThat(connectionList.getPrimary()).isNull(); + verify(primary).internalClose(true); + } + + @Test + public void addDestroyedConnectionToConnectionListReturnsFalse() throws Exception { + when(primary.isDestroyed()).thenReturn(true); + + assertThat(queueManager.addToConnectionList(primary, true)).isFalse(); + QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait(); + assertThat(connectionList.getPrimary()).isNull(); + verify(primary).internalClose(true); + } + + @Test + public void addConnectionToConnectionListWhenCancelInProgressReturnsFalse() throws Exception { + when(pool.getPoolOrCacheCancelInProgress()).thenReturn("cache closed"); + + assertThat(queueManager.addToConnectionList(primary, true)).isFalse(); + QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait(); + assertThat(connectionList.getPrimary()).isNull(); + verify(primary).internalClose(true); + } + + @Test + public void addConnectionToConnectionListCanSetPrimary() throws Exception { + assertThat(queueManager.addToConnectionList(primary, true)).isTrue(); + QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait(); + assertThat(connectionList.getPrimary()).isEqualTo(primary); + verify(primary, never()).internalClose(true); + } + + @Test + public void addConnectionToConnectionListCanAddBackups() throws Exception { + queueManager.addToConnectionList(primary, true); + + assertThat(queueManager.addToConnectionList(backup, false)).isTrue(); + QueueManager.QueueConnections connectionList = queueManager.getAllConnectionsNoWait(); + assertThat(connectionList.getPrimary()).isEqualTo(primary); + assertThat(connectionList.getBackups()).contains(backup); + verify(backup, never()).internalClose(true); + } + + @Test + public void endpointCrashedScheduleRedundancySatisfierAfterConnectionDestroyed() { + addConnections(); + QueueManagerImpl spy = spy(queueManager); + InOrder inOrder = inOrder(primary, spy); + doNothing().when(spy).scheduleRedundancySatisfierIfNeeded(0); + + spy.endpointCrashed(endpoint); + + inOrder.verify(primary).internalDestroy(); + inOrder.verify(spy).scheduleRedundancySatisfierIfNeeded(0); + QueueManager.QueueConnections connectionList = spy.getAllConnectionsNoWait(); + assertThat(connectionList.getPrimary()).isNull(); + } + + private void addConnections() { + queueManager.addToConnectionList(primary, true); + queueManager.addToConnectionList(backup, false); + } + + @Test + public void recoverPrimaryDoesNotPromoteBackupToPrimaryIfPrimaryExists() { + addConnections(); + QueueManagerImpl spy = spy(queueManager); + + spy.recoverPrimary(null); + + verify(spy, never()).promoteBackupToPrimary(anyList()); + } + + @Test + public void recoverPrimaryPromoteBackupToPrimaryIfNoPrimary() { + QueueManagerImpl spy = spy(queueManager); + spy.addToConnectionList(backup, false); + doReturn(backup).when(spy).promoteBackupToPrimary(anyList()); + + spy.recoverPrimary(null); + + verify(spy).promoteBackupToPrimary(anyList()); + verifyQueueConnectionsAfterRecoverPrimary(spy); + } + + private void verifyQueueConnectionsAfterRecoverPrimary(QueueManagerImpl spy) { + QueueManager.QueueConnections connectionList = spy.getAllConnectionsNoWait(); + assertThat(connectionList.getPrimary()).isEqualTo(backup); + assertThat(connectionList.getBackups()).isEmpty(); + } + + @Test + public void recoverPrimaryPromoteBackupToPrimaryIfPrimaryConnectionIsDestroyed() { + addConnections(); + QueueManagerImpl spy = spy(queueManager); + doReturn(backup).when(spy).promoteBackupToPrimary(anyList()); + when(primary.isDestroyed()).thenReturn(true); + + spy.recoverPrimary(null); + + verify(spy).promoteBackupToPrimary(anyList()); + verifyQueueConnectionsAfterRecoverPrimary(spy); + } +}