Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-10975 Add Kubernetes Leader Election and State Provider #6779

Merged
merged 11 commits into from
Mar 7, 2023

Conversation

exceptionfactory
Copy link
Contributor

Summary

NIFI-10975 Adds an initial implementation of Kubernetes cluster leader election and state management, supporting deployments without the need for ZooKeeper.

State Management Implementation

The state management implementation uses Kubernetes ConfigMaps to persist cluster information for stateful components. The KubernetesConfigMapStateProvider uses the standard data property, but encodes property names using Base64 URL encoding without padding to meet ConfigMap object property naming requirements.

The new State Provider can be configured in state-management.xml using the following element definition, which is included in a commented section of the configuration:

<cluster-provider>
    <id>kubernetes-provider</id>
    <class>org.apache.nifi.kubernetes.state.provider.KubernetesConfigMapStateProvider</class>
</cluster-provider>

Leader Election Implementation

The leader election implementation uses Kubernetes Leases for distributed tracking of the current cluster leader. Kubernetes Lease names must adhere to RFC 1123 subdomain naming requirements, requiring a mapping from NiFi application names to Lease names.

The new Leader Election implementation can be configured using a new property in nifi.properties as follows:

nifi.cluster.leader.election.implementation=KubernetesLeaderElectionManager

Framework Changes

The leader election implementation required promoting the LeaderElectionManager interface to nifi-framework-api to support NAR bundle extension loading. The LeaderElectionManager had two methods without runtime references, which were removed. Refactoring also involved creating a new nifi-framework-leader-election-shared module for abstracting tracking operations.

The nifi.properties configuration includes a new property with a default value of CuratorLeaderElectionManager, which provides current cluster coordination using ZooKeeper.

Kubernetes Client

The implementation includes a new nifi-kubernetes-client library which provides utility components for Kubernetes Client access and service namespace determination.

The nifi-kubernetes-client library depends on the Fabric8 Kubernetes Client which supports current versions of Kubernetes and provides separation of API and implementation classes.

Both the State Provider and Leader Election Manager implementations attempt to resolve the Kubernetes namespace based on the standard Service Account namespace secret. In absence of a readable namespace secret, the provider returns default as the namespace for storing Leases and ConfigMaps.

Additional Changes

Additional changes include removing several integration test classes specific to ZooKeeper. These integration tests are less useful with current system integration tests run on a scheduled basis.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 8
    • JDK 11
    • JDK 17

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

return new StandardLeaderElectionCommandProvider(kubernetesClientProvider, namespace);
}

private void registerLeaderElectionCommand(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of check-then-modify here. I think we need to ensure that this is thread safe and synchronize the method or add locking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is only called from register, which is marked as synchronized, do you think any additional synchronization is necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, good call. So technically it's correct/safe. But I would generally mark this as synchronized as well because it has virtually zero cost, since the synchronization lock has already been obtained, but it makes it more clear that the actions can only be taken while synchronized. Or alternatively documenting via JavaDoc that it should only be called while synchronized. But I'd prefer just synchronizing the method itself.

final ObjectMeta metadata = configMap.getMetadata();
final String resourceVersion = metadata.getResourceVersion();
try {
return resourceVersion == null ? UNKNOWN_VERSION : Long.parseLong(resourceVersion);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this is valid. According to https://kubernetes.io/docs/reference/using-api/api-concepts/

You must not assume resource versions are numeric or collatable. API clients may only compare two resource versions for equality (this means that you must not compare resource versions for greater-than or less-than relationships).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would propose that we update our StateMap as follows:
Make long getVersion() deprecated in favor of a new String getStateVersion()
Remove long getVersion() in version 2.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about this approach, it probably depends on the Kubernetes implementation. One option that comes to mind is creating using a hash of the Resource Version. Another option could be using a custom metadata field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everywhere in the code base that I see long getVersion() being used, actually, is to check if the value is -1 (indicating that the state hasn't actually been stored anywhere). So it probably actually makes sense to use an Optional<String> as the return type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, adding a new Optional<String> getStateVersion() method to StateMap sounds like a good approach.

@markap14
Copy link
Contributor

So I crated a two-node nifi cluster using GKE to test this. On startup, things work well. Both nodes join the cluster. I can see that state is getting stored/recovered properly using ListGCSBucket. If I then disconnect the node that is Primary/Coordinator, I see that the other node is elected. But if I then reconnect the disconnected node, it gets into a bad state.
Running bin/nifi.sh diagnostics diag1.txt on both nodes shows that both nodes actually believe that they are both the Cluster Coordinator AND the Primary Node.
Looking at the logs of the disconnected node, I see:

2022-12-15 16:50:42,065 ERROR [KubernetesLeaderElectionManager] i.f.k.c.e.leaderelection.LeaderElector Exception occurred while releasing lock 'LeaseLock: nifi - cluster-coordinator (10.31.1.4:4423)'
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException: Unable to update LeaseLock
        at io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock.update(LeaseLock.java:102)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.cancel(Unknown Source)
        at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$null$0(LeaderElector.java:92)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.cancel(Unknown Source)
        at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.run(LeaderElector.java:70)
        at org.apache.nifi.kubernetes.leader.election.command.LeaderElectionCommand.run(LeaderElectionCommand.java:78)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at io.fabric8.kubernetes.client.KubernetesClientException.copyAsCause(KubernetesClientException.java:238)
        at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:517)
        at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:551)
        at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleUpdate(OperationSupport.java:347)
        at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleUpdate(BaseOperation.java:680)
        at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.lambda$replace$0(HasMetadataOperation.java:167)
        at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:172)
        at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:113)
        at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:41)
        at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.replace(BaseOperation.java:1043)
        at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.replace(BaseOperation.java:88)
        at io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock.update(LeaseLock.java:100)
        ... 19 common frames omitted
        at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:709)
        at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:689)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl$4.onResponse(OkHttpClientImpl.java:277)
        at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
        ... 3 common frames omitted
2022-12-15 16:50:42,066 ERROR [KubernetesLeaderElectionManager] i.f.k.c.e.leaderelection.LeaderElector Exception occurred while releasing lock 'LeaseLock: nifi - primary-node (10.31.1.4:4423)'
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException: Unable to update LeaseLock
        at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.stopLeading(LeaderElector.java:120)
        at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$null$1(LeaderElector.java:94)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.cancel(Unknown Source)
        at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$null$0(LeaderElector.java:92)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.cancel(Unknown Source)
        at io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.run(LeaderElector.java:70)
        at org.apache.nifi.kubernetes.leader.election.command.LeaderElectionCommand.run(LeaderElectionCommand.java:78)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://10.31.128.1/apis/coordination.k8s.io/v1/namespaces/nifi/leases/primary-node. Message: Operation cannot be fulfilled on leases.coordination.k8s.io "primary-node": the object has been modified; please apply your changes to the latest version and try again. Received status: Status(apiVersion=v1, code=409, details=StatusDetails(causes=[], group=coordination.k8s.io, kind=leases, name=primary-node, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Operation cannot be fulfilled on leases.coordination.k8s.io "primary-node": the object has been modified; please apply your changes to the latest version and try again, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Conflict, status=Failure, additionalProperties={}).
        at io.fabric8.kubernetes.client.KubernetesClientException.copyAsCause(KubernetesClientException.java:238)
        at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:517)
        at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:551)
        at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleUpdate(OperationSupport.java:347)
        at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleUpdate(BaseOperation.java:680)
        at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.lambda$replace$0(HasMetadataOperation.java:167)
        at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:172)
        at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:113)
        at io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperation.replace(HasMetadataOperation.java:41)
        at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.replace(BaseOperation.java:1043)
        at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.replace(BaseOperation.java:88)
        at io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock.update(LeaseLock.java:100)
        ... 19 common frames omitted
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT at: https://10.31.128.1/apis/coordination.k8s.io/v1/namespaces/nifi/leases/primary-node. Message: Operation cannot be fulfilled on leases.coordination.k8s.io "primary-node": the object has been modified; please apply your changes to the latest version and try again. Received status: Status(apiVersion=v1, code=409, details=StatusDetails(causes=[], group=coordination.k8s.io, kind=leases, name=primary-node, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Operation cannot be fulfilled on leases.coordination.k8s.io "primary-node": the object has been modified; please apply your changes to the latest version and try again, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Conflict, status=Failure, additionalProperties={}).
        at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:709)
        at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:689)
        at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.assertResponseCode(OperationSupport.java:640)
        at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:576)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$retryWithExponentialBackoff$2(OperationSupport.java:618)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
        at io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl$4.onResponse(OkHttpClientImpl.java:277)
        at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
        ... 3 common frames omitted

So looks like it is not properly relinquishing the ownership of the lease. I presume this is what causes both nodes to believe that they are the coordinator/primary.

@exceptionfactory
Copy link
Contributor Author

Thanks for the testing and feedback @markap14!

As it turns out, the Fabric8 Client version 6.3.0 includes a fix for relinquishing leadership described in Issue 4547. I will include an update to client version 6.3.0 as part of additional updates for State Map versioning.

@markap14
Copy link
Contributor

Great, thanks @exceptionfactory!
I also would recommend a couple minor changes to make this work more easily with Docker, as I had to make these changes to test:
In the nifi-docker/dockerhub/sh/start.sh file I added two lines:

# Set leader election and state management properties
prop_replace 'nifi.cluster.leader.election.implementation'      "${NIFI_LEADER_ELECTION_IMPLEMENTATION:-CuratorLeaderElectionManager}"
prop_replace 'nifi.state.management.provider.cluster'           "${NIFI_STATE_MANAGEMENT_CLUSTER_PROVIDER:-zk-provider}"

And also in the nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml file there's no need to add the <cluster-provider> element commented out. It's just configuration, it doesn't get used for anything unless it's selected in nifi.properties so we should just include the config uncommented in state-management.xml so that it can be easily referenced from nifi.properties

@exceptionfactory
Copy link
Contributor Author

Thanks for the recommendation on the Docker script @markap14. I pushed updates with the following changes:

  1. Added StateMap.getStateVersion() returning Optional<String> and refactored all references
  2. Upgraded Fabric8 Kubernetes Client from 6.2.0 to 6.3.0 to correct stop leading issue found during testing
  3. Added prop_replace lines to Docker start.sh for setting the Leader Election and Cluster State Provider from environment variables

@markap14
Copy link
Contributor

Thanks @exceptionfactory . Pulled the latest, rebuilt everything, rebuilt the docker image. Interestingly, it no longer worked. I got errors on startup telling me that the URI was invalid when calling leaderElectionCommandProvider.findLeader(roleName); (line 197 of KubernetesLeaderElectionManager) because the URI ended in "/Cluster Coordinator".
So I updated that code to get the roleId for the roleName and provide that.
That addressed the issue. It was now registering for the role properly.
Unfortunately, though, I still ran into the exact same issue. When I disconnected and reconnected a node, it gave the same stack trace, failing to release the lease because it was modified. This then caused the same issue, with both nodes thinking they are the leader.

@markap14
Copy link
Contributor

Also, just to ensure that I was using the latest, I did a kubectl exec -it nifi-0 bash and went into the work directory to check the lib. It's using kubernetes-client-6.3.1.jar as expected.

@exceptionfactory
Copy link
Contributor Author

Thanks for the feedback @markap14.

On further evaluation of the disconnect and reconnect behavior, I realized the unregister method was not removing the local leader identifier from the roleLeaders Map within KubernetesLeaderElectionManager. The corresponding command was not being removed from the roleCommands Map, which was preventing proper registration on cluster reconnection. I corrected this behavior and also corrected the Role ID resolution prior to calling findLeader().

In addition to those changes, I removed the withReleaseOnCancel() setting from the Leader Elector Builder. This was a more recent addition to the Kubernetes Client library implementation. The purpose of the setting is to update the Lease will a null holder identity, prompting nodes to attempt lease renewal. For the purpose of NiFi clustering, this behavior does not seem necessary, as NiFi nodes will proceed with attempting to update and obtain a lease lock. Removing the release on cancel setting avoids the error shown above while allowing standard lease lock update attempts to proceed.

These changes resulted in consistent behavior with various disconnect and reconnect attempts.

@markap14
Copy link
Contributor

markap14 commented Jan 5, 2023

Thanks for the latest updated @exceptionfactory . Ran into another issue when testing, unfortunately.
I have a statefulset that had 3 replicas. nifi-1 was both the primary node and the coordinator.
I then scaled the statefulset to 0.
This didn't expire the lease though.:

mpayne@cs-654103601966-default:~$ k get leases
NAME                  HOLDER             AGE
cluster-coordinator   nifi-1.nifi:4423   63m
primary-node          nifi-1.nifi:4423   62m

Even after I waited over an hour the lease remains there. If I look at it:

mpayne@cs-654103601966-default:~$ k get lease cluster-coordinator -o yaml
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
  creationTimestamp: "2023-01-05T21:06:29Z"
  name: cluster-coordinator
  namespace: nifi
  resourceVersion: "252479"
  uid: 7e5d05d1-3b20-426d-8822-5cff92eb183f
spec:
  acquireTime: "2023-01-05T22:03:17.355642Z"
  holderIdentity: nifi-1.nifi:4423
  leaseDurationSeconds: 15
  leaseTransitions: 2
  renewTime: "2023-01-05T22:04:13.480562Z"

mpayne@cs-654103601966-default:~$ date
Thu 05 Jan 2023 10:11:34 PM UTC

We can see here that date is well past the renewTime. (10:11:34 PM = 22:11:34 PM vs 22:04:13 as the renew time).
So the least appears to remain, and the new node, nifi-0 cannot proceed:

2023-01-05 22:09:37,513 INFO [main] o.a.n.c.p.AbstractNodeProtocolSender Cluster Coordinator is located at nifi-1.nifi:4423. Will send Cluster Connection Request to this address
2023-01-05 22:09:37,535 WARN [main] o.a.nifi.controller.StandardFlowService Failed to connect to cluster due to: org.apache.nifi.cluster.protocol.ProtocolException: Failed to create socket to nifi-1.nifi:4423 due to: java.net.UnknownHostException: nifi-1.nifi
2023-01-05 22:09:42,550 INFO [main] o.a.n.c.c.n.LeaderElectionNodeProtocolSender Determined that Cluster Coordinator is located at nifi-1.nifi:4423; will use this address for sending heartbeat messages
2023-01-05 22:09:42,550 INFO [main] o.a.n.c.p.AbstractNodeProtocolSender Cluster Coordinator is located at nifi-1.nifi:4423. Will send Cluster Connection Request to this address
2023-01-05 22:09:42,550 WARN [main] o.a.nifi.controller.StandardFlowService Failed to connect to cluster due to: org.apache.nifi.cluster.protocol.ProtocolException: Failed to create socket to nifi-1.nifi:4423 due to: java.net.UnknownHostException: nifi-1.nifi

As soon as I delete the lease (k delete lease cluster-coordinator) all works as expected.
But we obviously can't have users manually deleting the lease all the time.
Not sure if this is the intended behavior, and we should be ignoring the lease if the renewTime has expired? Or is it because we don't actually participate in the leader election on startup since there appears to already be an elected leader?
Either way, we need to make sure that we can properly handle this condition, where the lease points to a node that is no longer part of the cluster

@exceptionfactory
Copy link
Contributor Author

Good find on the expired Lease handling @markap14, thanks for the additional testing.

The implementation of StandardLeaderElectionCommandProvider.findLeader() was always returning the last Lease Holder Identity, regardless of whether the Lease had expired. This broke the contract of LeaderElectionManager.getLeader() which callers expected to return a null when there is no leader for the role requested. I updated findLeader() to return Optional<String>, and check the expiration of the Lease so that findLeader returns Optional.empty() when the Lease has expired. The expiration is determined based on renewTime plus leaseDurationSeconds, following the definition of those fields in the Kubernetes LeaseSpec.

To make the implementation contract clearer, I also changed the return of LeaderElectionManager.getLeader() from String to Optional<String>. Although the method documentation noted that null indicates no leader is elected, changing the return to use Optional clarifies that detail without requiring too many adjustments. This seemed like a good time to make the change as part of promoting LeaderElectionManager to an extension component interface.

Following those changes, shutting down all nodes, then starting up a single node that was not previously the Cluster Coordinator or Primary Node worked as expected. The single node was able to become the leader for those roles.

It is worth noting that the Lease objects continue to live in the Kubernetes namespace even if all nodes have been shutdown for an extended period. The renewTime indicates the last update, and the corrected logic will now update existing values as needed.

@mh013370
Copy link
Contributor

mh013370 commented Jan 9, 2023

It looks like a ConfigMap is created for each component that requires state management. Do you have any concern about the size of state (in bytes) that may be written to any one ConfigMap?

The maximum size of a ConfigMap is 1MiB. Do you anticipate any compatibility issues where a use case would work with Zookeeper but would not work with ConfigMaps?

@exceptionfactory
Copy link
Contributor Author

It looks like a ConfigMap is created for each component that requires state management. Do you have any concern about the size of state (in bytes) that may be written to any one ConfigMap?

The maximum size of a ConfigMap is 1MiB. Do you anticipate any compatibility issues where a use case would work with Zookeeper but would not work with ConfigMaps?

Thanks for the feedback @michael81877. The standard ZooKeeper node storage is also limited to 1 MB, so the standard size limits of a Kubernetes ConfigMap align well with current behavior.

With that background and similar size limitations, I do not expect any compatibility issues based on size limitations. It is possible that other behavioral nuances could surface as a result of these alternative State Management and Leader Election implementations.

@exceptionfactory
Copy link
Contributor Author

@markap14 I rebased the branch to the current main so that all modules now reference the current 2.0.0-SNAPSHOT version.

- Added Kubernetes Leader Election Manager based on Kubernetes Leases
- Added Kubernetes State Provider based on Kubernetes ConfigMaps
- Added nifi-kubernetes-client for generalized access to Fabric8 Kubernetes Client
- Added nifi.cluster.leader.election.implementation Property defaulting to CuratorLeaderElectionManager
- Refactored LeaderElectionManager to nifi-framework-api for Extension Discovering Manager
- Refactored shared ZooKeeper configuration to nifi-framework-cluster-zookeeper
- Upgraded Kubernetes Client from 6.2.0 to 6.3.0
- Added getStateVersion to StateMap and deprecated getVersion
- Updated Docker start.sh with additional properties
- Changed LeaderElectionManager.getLeader() return to Optional String
- Updated versions to match current main branch and avoid reverting
Copy link
Contributor

@bbende bbende left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran this in k8s and tested all of the basic clustering scenarios like disconnecting/connecting nodes, killing nodes, storing cluster state, etc., everything looks good, nice work! Going to merge this.

@bbende bbende merged commit 512155b into apache:main Mar 7, 2023
r-vandenbos pushed a commit to r-vandenbos/nifi that referenced this pull request Apr 11, 2023
…6779)

* NIFI-10975 Added Kubernetes Leader Election and State Provider
- Added Kubernetes Leader Election Manager based on Kubernetes Leases
- Added Kubernetes State Provider based on Kubernetes ConfigMaps
- Added nifi-kubernetes-client for generalized access to Fabric8 Kubernetes Client
- Added nifi.cluster.leader.election.implementation Property defaulting to CuratorLeaderElectionManager
- Refactored LeaderElectionManager to nifi-framework-api for Extension Discovering Manager
- Refactored shared ZooKeeper configuration to nifi-framework-cluster-zookeeper

* NIFI-10975 Updated Kubernetes Client and StateMap
- Upgraded Kubernetes Client from 6.2.0 to 6.3.0
- Added getStateVersion to StateMap and deprecated getVersion
- Updated Docker start.sh with additional properties

* NIFI-10975 Corrected MockStateManager.assertStateSet()
* NIFI-10975 Upgraded Kubernetes Client from 6.3.0 to 6.3.1
* NIFI-10975 Corrected unregister leader and disabled release on cancel

* NIFI-10975 Corrected findLeader handling of Lease expiration
- Changed LeaderElectionManager.getLeader() return to Optional String

* NIFI-10975 Corrected StandardNiFiServiceFacade handling of Optional Leader
* NIFI-10975 Changed getLeader() to call findLeader() to avoid stale cached values
* NIFI-10975 Updated LeaderElectionCommand to run LeaderElector in loop
* NIFI-10975 Rebased on project version 2.0.0-SNAPSHOT

* NIFI-10975 Corrected Gson and AspectJ versions
- Updated versions to match current main branch and avoid reverting
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants