Skip to content

Commit

Permalink
fix: ensure that onStopLeading is called
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Sep 19, 2023
1 parent 3bd9373 commit 057fdbd
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#### Bugs
* Fix #5382: [java-generator] Allow to deserialize more valid RFC3339 date-time and make the format customizable
* Fix #5380: [java-generator] Avoid to emit Java Keywords as package names
* Fix #5463: ensures that onStopLeading is called with releaseOnCancel even when leadership is already lost
* Fix #5423: OkHttpClientImpl supports setting request method for empty payload requests

#### Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.HttpURLConnection;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -121,36 +122,44 @@ private synchronized void stopLeading() {
return; // not leading
}
if (leaderElectionConfig.isReleaseOnCancel()) {
release();
} else {
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
try {
if (release()) {
return;
}
} catch (KubernetesClientException e) {
final String lockDescription = leaderElectionConfig.getLock().describe();
if (e.getCode() != HttpURLConnection.HTTP_CONFLICT) {
LOGGER.error("Exception occurred while releasing lock '{}' on cancel", lockDescription, e);
} else {
LOGGER.debug("Leadership was likely already lost '{}'", lockDescription, e);
}
}
}
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
}

/**
* Release the leadership if currently held. If not cancelled, the elector will
* continue to try and re-acquire the lock.
*
* @return true if the lock was successfully released. false if there is no lock, or this is not the leader
*/
public synchronized void release() {
public synchronized boolean release() {
LeaderElectionRecord current = leaderElectionConfig.getLock().get(kubernetesClient);
if (current == null || !isLeader(current)) {
return; // lost leadership already
return false; // lost leadership already
}
try {
ZonedDateTime now = now();
final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(
"",
Duration.ofSeconds(1),
now,
now,
current.getLeaderTransitions());
ZonedDateTime now = now();
final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(
"",
Duration.ofSeconds(1),
now,
now,
current.getLeaderTransitions());

leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);
updateObserved(newLeaderElectionRecord);
} catch (KubernetesClientException e) {
final String lockDescription = leaderElectionConfig.getLock().describe();
LOGGER.error("Exception occurred while releasing lock '{}'", lockDescription, e);
}
leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);
updateObserved(newLeaderElectionRecord);
return true;
}

private CompletableFuture<Void> acquire() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.fabric8.kubernetes.client.extended.leaderelection;

import io.fabric8.kubernetes.api.model.StatusBuilder;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaderElectionRecord;
Expand All @@ -26,6 +27,7 @@
import org.mockito.Answers;
import org.mockito.Mockito;

import java.net.HttpURLConnection;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -144,6 +146,36 @@ void shouldReleaseWhenCanceled() throws Exception {
assertEquals(1, activeLer.get().getLeaderTransitions());
}

@Test
void shouldStopOnReleaseWhenCanceled() throws Exception {
// Given
AtomicReference<LeaderElectionRecord> activeLer = new AtomicReference<>();
final LeaderElectionConfig lec = mockLeaderElectionConfiguration(activeLer);
final CountDownLatch signal = new CountDownLatch(1);
final Lock mockedLock = lec.getLock();
when(lec.isReleaseOnCancel()).thenReturn(true);
AtomicInteger count = new AtomicInteger();
doAnswer(invocation -> {
if (count.addAndGet(1) == 2) {
// simulate that we've already lost election
throw new KubernetesClientException(new StatusBuilder().withCode(HttpURLConnection.HTTP_CONFLICT).build());
}
LeaderElectionRecord leaderRecord = invocation.getArgument(1, LeaderElectionRecord.class);
activeLer.set(leaderRecord);
signal.countDown();
return null;
}).when(mockedLock).update(any(), any());

// When
LeaderElector leaderElector = new LeaderElector(mock(NamespacedKubernetesClient.class), lec, CommonThreadPool.get());
CompletableFuture<?> started = leaderElector.start();
assertTrue(signal.await(10, TimeUnit.SECONDS));
started.cancel(true);

// Then
verify(lec.getLeaderCallbacks(), times(1)).onStopLeading();
}

@Test
void shouldRelease() throws Exception {
// Given
Expand Down

0 comments on commit 057fdbd

Please sign in to comment.