Skip to content

Commit

Permalink
fix fabric8io#4477 fabric8io#4965: LeaderElector release public / cru…
Browse files Browse the repository at this point in the history
…d mock compatibility
  • Loading branch information
shawkins committed Mar 18, 2023
1 parent 3710714 commit c2a8fac
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 84 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#### Bugs

#### Improvements
* Fix #4477 exposing LeaderElector.release to force an elector to give up the lease

#### Dependency Upgrade

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.fabric8.kubernetes.client.Client;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.mockwebserver.Context;
import io.fabric8.mockwebserver.ServerRequest;
import io.fabric8.mockwebserver.ServerResponse;
Expand Down Expand Up @@ -98,7 +99,8 @@ protected void initializeKubernetesClientAndMockServer(Class<?> testClass) {
EnableKubernetesMockClient a = testClass.getAnnotation(EnableKubernetesMockClient.class);
final Map<ServerRequest, Queue<ServerResponse>> responses = new HashMap<>();
mock = a.crud()
? new KubernetesMockServer(new Context(), new MockWebServer(), responses, new KubernetesMixedDispatcher(responses),
? new KubernetesMockServer(new Context(Serialization.jsonMapper()), new MockWebServer(), responses,
new KubernetesMixedDispatcher(responses),
a.https())
: new KubernetesMockServer(a.https());
mock.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,22 @@ private synchronized void stopLeading() {
if (current == null || !isLeader(current)) {
return; // not leading
}
try {
// update current from latest
current = leaderElectionConfig.getLock().get(kubernetesClient);
if (current == null || !isLeader(current)) {
return; // lost leadership already
}
if (leaderElectionConfig.isReleaseOnCancel()) {
release(current);
}
} finally {
// called regardless of isReleaseOnCancel
if (leaderElectionConfig.isReleaseOnCancel()) {
release();
} else {
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
}
}

private void release(LeaderElectionRecord current) {
/**
* Release the leadership if currently held. If not cancelled, the elector will
* continue to try and re-acquire the lock.
*/
public synchronized void release() {
LeaderElectionRecord current = leaderElectionConfig.getLock().get(kubernetesClient);
if (current == null || !isLeader(current)) {
return; // lost leadership already
}
try {
ZonedDateTime now = now();
final LeaderElectionRecord newLeaderElectionRecord = new LeaderElectionRecord(
Expand All @@ -144,9 +144,9 @@ private void release(LeaderElectionRecord current) {
now,
now,
current.getLeaderTransitions());
newLeaderElectionRecord.setVersion(current.getVersion());

leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);
updateObserved(newLeaderElectionRecord);
} catch (KubernetesClientException e) {
final String lockDescription = leaderElectionConfig.getLock().describe();
LOGGER.error("Exception occurred while releasing lock '{}'", lockDescription, e);
Expand Down Expand Up @@ -218,8 +218,7 @@ synchronized boolean tryAcquireOrRenew() {
isLeader ? oldLeaderElectionRecord.getAcquireTime() : now,
now,
oldLeaderElectionRecord.getLeaderTransitions() + (isLeader ? 0 : 1));
newLeaderElectionRecord.setVersion(oldLeaderElectionRecord.getVersion());
leaderElectionConfig.getLock().update(kubernetesClient, newLeaderElectionRecord);
lock.update(kubernetesClient, newLeaderElectionRecord);
updateObserved(newLeaderElectionRecord);
return true;
}
Expand All @@ -232,6 +231,8 @@ private void updateObserved(LeaderElectionRecord leaderElectionRecord) {
final String newLeader = leaderElectionRecord.getHolderIdentity();
if (!Objects.equals(newLeader, currentLeader)) {
LOGGER.debug("Leader changed from {} to {}", currentLeader, newLeader);
// this will notify even if the newLeader is null or empty, which is the same behavior as the go client
// but does not seem entirely correct
leaderElectionConfig.getLeaderCallbacks().onNewLeader(newLeader);
if (Objects.equals(currentLeader, leaderElectionConfig.getLock().identity())) {
leaderElectionConfig.getLeaderCallbacks().onStopLeading();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.utils.Serialization;
import org.slf4j.Logger;
Expand Down Expand Up @@ -58,10 +59,10 @@ protected LeaderElectionRecord toRecord(ConfigMap resource) {
}

@Override
protected ConfigMap toResource(LeaderElectionRecord leaderElectionRecord, ObjectMeta meta) {
return new ConfigMapBuilder().withMetadata(meta).editMetadata()
.addToAnnotations(LEADER_ELECTION_RECORD_ANNOTATION_KEY, Serialization.asJson(leaderElectionRecord))
.endMetadata()
protected ConfigMap toResource(LeaderElectionRecord leaderElectionRecord, ObjectMetaBuilder meta) {
return new ConfigMapBuilder()
.withMetadata(
meta.addToAnnotations(LEADER_ELECTION_RECORD_ANNOTATION_KEY, Serialization.asJson(leaderElectionRecord)).build())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;

import java.io.Serializable;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Objects;
Expand All @@ -35,7 +32,6 @@
* @see <a href=
* "https://github.com/kubernetes/client-go/blob/1aa326d7304eba6aedc8c89daad615cc7499d1f7/tools/leaderelection/resourcelock/interface.go">leaderelection/resourcelock/interface.go</a>
*/
@AllArgsConstructor
@Builder(toBuilder = true)
public class LeaderElectionRecord {

Expand All @@ -46,8 +42,6 @@ public class LeaderElectionRecord {
@JsonFormat(timezone = "UTC", pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'")
private final ZonedDateTime renewTime;
private final int leaderTransitions;
@JsonIgnore
private transient Serializable version;

@JsonCreator
public LeaderElectionRecord(
Expand Down Expand Up @@ -82,14 +76,6 @@ public int getLeaderTransitions() {
return leaderTransitions;
}

public Serializable getVersion() {
return version;
}

public void setVersion(Serializable version) {
this.version = version;
}

@Override
public boolean equals(Object o) {
if (this == o)
Expand All @@ -101,13 +87,12 @@ public boolean equals(Object o) {
Objects.equals(holderIdentity, that.holderIdentity) &&
Objects.equals(leaseDuration, that.leaseDuration) &&
Objects.equals(acquireTime, that.acquireTime) &&
Objects.equals(renewTime, that.renewTime) &&
Objects.equals(version, that.version);
Objects.equals(renewTime, that.renewTime);
}

@Override
public int hashCode() {
return Objects.hash(holderIdentity, leaseDuration, acquireTime, renewTime, leaderTransitions, version);
return Objects.hash(holderIdentity, leaseDuration, acquireTime, renewTime, leaderTransitions);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.fabric8.kubernetes.client.extended.leaderelection.resourcelock;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.api.model.coordination.v1.Lease;
import io.fabric8.kubernetes.api.model.coordination.v1.LeaseBuilder;

Expand All @@ -39,8 +40,8 @@ protected Class<Lease> getKind() {
}

@Override
protected Lease toResource(LeaderElectionRecord leaderElectionRecord, ObjectMeta meta) {
return new LeaseBuilder().withMetadata(meta)
protected Lease toResource(LeaderElectionRecord leaderElectionRecord, ObjectMetaBuilder meta) {
return new LeaseBuilder().withMetadata(meta.build())
.withNewSpec()
.withHolderIdentity(leaderElectionRecord.getHolderIdentity())
.withLeaseDurationSeconds((int) leaderElectionRecord.getLeaseDuration().get(ChronoUnit.SECONDS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
import io.fabric8.kubernetes.client.dsl.base.PatchContext;
import io.fabric8.kubernetes.client.dsl.base.PatchType;

import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;

public abstract class ResourceLock<T extends HasMetadata> implements Lock {

private final ObjectMeta meta;
private final String identity;
private T resource;

public ResourceLock(String namespace, String name, String identity) {
this(new ObjectMetaBuilder().withNamespace(namespace).withName(name).build(), identity);
Expand All @@ -46,23 +45,24 @@ public ResourceLock(ObjectMeta meta, String identity) {
protected abstract Class<T> getKind();

@Override
public LeaderElectionRecord get(KubernetesClient client) {
return getResource(client).map(this::toRecordInternal).orElse(null);
}

private Optional<T> getResource(KubernetesClient client) {
return Optional.ofNullable(client.resources(getKind()).inNamespace(meta.getNamespace()).withName(meta.getName()).get());
public synchronized LeaderElectionRecord get(KubernetesClient client) {
resource = client.resources(getKind()).inNamespace(meta.getNamespace()).withName(meta.getName()).get();
if (resource != null) {
return toRecord(resource);
}
return null;
}

@Override
public void create(KubernetesClient client, LeaderElectionRecord leaderElectionRecord) {
client.resource(toResource(leaderElectionRecord, getObjectMeta(leaderElectionRecord.getVersion()))).create();
public synchronized void create(KubernetesClient client, LeaderElectionRecord leaderElectionRecord) {
resource = client.resource(toResource(leaderElectionRecord, getObjectMeta(null))).create();
}

@Override
public void update(KubernetesClient client, LeaderElectionRecord leaderElectionRecord) {
client.resource(toResource(leaderElectionRecord, getObjectMeta(leaderElectionRecord.getVersion())))
.patch(PatchContext.of(PatchType.STRATEGIC_MERGE));
public synchronized void update(KubernetesClient client, LeaderElectionRecord leaderElectionRecord) {
Objects.requireNonNull(resource, "get or create must be called first");
client.resource(toResource(leaderElectionRecord, getObjectMeta(resource.getMetadata().getResourceVersion())))
.patch(PatchContext.of(PatchType.JSON_MERGE));
}

/**
Expand All @@ -72,18 +72,12 @@ public void update(KubernetesClient client, LeaderElectionRecord leaderElectionR
* @param meta not null
* @return
*/
protected abstract T toResource(LeaderElectionRecord leaderElectionRecord, ObjectMeta meta);

protected LeaderElectionRecord toRecordInternal(T resource) {
LeaderElectionRecord result = toRecord(resource);
result.setVersion(resource.getMetadata().getResourceVersion());
return result;
}
protected abstract T toResource(LeaderElectionRecord leaderElectionRecord, ObjectMetaBuilder meta);

protected abstract LeaderElectionRecord toRecord(T resource);

protected ObjectMeta getObjectMeta(Serializable version) {
return new ObjectMetaBuilder(meta).withResourceVersion((String) version).build();
protected ObjectMetaBuilder getObjectMeta(String version) {
return new ObjectMetaBuilder(meta).withResourceVersion((String) version);
}

/**
Expand All @@ -102,4 +96,8 @@ public String describe() {
return String.format("%sLock: %s - %s (%s)", getKind().getSimpleName(), meta.getNamespace(), meta.getName(), identity);
}

void setResource(T resource) {
this.resource = resource;
}

}
Loading

0 comments on commit c2a8fac

Please sign in to comment.