Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure that onStopLeading is called #5464

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#### Bugs
* Fix #5382: [java-generator] Allow to deserialize more valid RFC3339 date-time and make the format customizable
* Fix #5380: [java-generator] Avoid to emit Java Keywords as package names
* Fix #5463: ensures that onStopLeading is called with releaseOnCancel even when leadership is already lost
* Fix #5423: OkHttpClientImpl supports setting request method for empty payload requests

#### Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.HttpURLConnection;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -121,36 +122,44 @@ private synchronized void stopLeading() {
return; // not leading
}
if (leaderElectionConfig.isReleaseOnCancel()) {
release();
} else {
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
try {
if (release()) {
return;
}
} catch (KubernetesClientException e) {
final String lockDescription = leaderElectionConfig.getLock().describe();
if (e.getCode() != HttpURLConnection.HTTP_CONFLICT) {
LOGGER.error("Exception occurred while releasing lock '{}' on cancel", lockDescription, e);
} else {
LOGGER.debug("Leadership was likely already lost '{}'", lockDescription, e);
}
}
}
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
}

/**
* Release the leadership if currently held. If not cancelled, the elector will
* continue to try and re-acquire the lock.
*
* @return true if the lock was successfully released. false if there is no lock, or this is not the leader
*/
public synchronized void release() {
public synchronized boolean release() {
LeaderElectionRecord current = leaderElectionConfig.getLock().get(kubernetesClient);
if (current == null || !isLeader(current)) {
return; // lost leadership already
return false; // lost leadership already
}
try {
ZonedDateTime now = now();
final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(
"",
Duration.ofSeconds(1),
now,
now,
current.getLeaderTransitions());
ZonedDateTime now = now();
final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(
"",
Duration.ofSeconds(1),
now,
now,
current.getLeaderTransitions());

leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);
updateObserved(newLeaderElectionRecord);
} catch (KubernetesClientException e) {
final String lockDescription = leaderElectionConfig.getLock().describe();
LOGGER.error("Exception occurred while releasing lock '{}'", lockDescription, e);
}
leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);
updateObserved(newLeaderElectionRecord);
return true;
}

private CompletableFuture<Void> acquire() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.fabric8.kubernetes.client.extended.leaderelection;

import io.fabric8.kubernetes.api.model.StatusBuilder;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaderElectionRecord;
Expand All @@ -26,6 +27,7 @@
import org.mockito.Answers;
import org.mockito.Mockito;

import java.net.HttpURLConnection;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -144,6 +146,36 @@ void shouldReleaseWhenCanceled() throws Exception {
assertEquals(1, activeLer.get().getLeaderTransitions());
}

@Test
void shouldStopOnReleaseWhenCanceled() throws Exception {
// Given
AtomicReference<LeaderElectionRecord> activeLer = new AtomicReference<>();
final LeaderElectionConfig lec = mockLeaderElectionConfiguration(activeLer);
final CountDownLatch signal = new CountDownLatch(1);
final Lock mockedLock = lec.getLock();
when(lec.isReleaseOnCancel()).thenReturn(true);
AtomicInteger count = new AtomicInteger();
doAnswer(invocation -> {
if (count.addAndGet(1) == 2) {
// simulate that we've already lost election
throw new KubernetesClientException(new StatusBuilder().withCode(HttpURLConnection.HTTP_CONFLICT).build());
}
LeaderElectionRecord leaderRecord = invocation.getArgument(1, LeaderElectionRecord.class);
activeLer.set(leaderRecord);
signal.countDown();
return null;
}).when(mockedLock).update(any(), any());

// When
LeaderElector leaderElector = new LeaderElector(mock(NamespacedKubernetesClient.class), lec, CommonThreadPool.get());
CompletableFuture<?> started = leaderElector.start();
assertTrue(signal.await(10, TimeUnit.SECONDS));
started.cancel(true);

// Then
verify(lec.getLeaderCallbacks(), times(1)).onStopLeading();
}

@Test
void shouldRelease() throws Exception {
// Given
Expand Down
Loading