From 859b66c7c530b79b33a01129f7b3d93bc2491ae8 Mon Sep 17 00:00:00 2001 From: Justin Lee Date: Sun, 14 May 2023 23:31:01 +0000 Subject: [PATCH] Update Kafka topics to use REST API --- roles/kafka_broker/tasks/main.yml | 32 +--------- roles/kafka_broker/tasks/topics.yml | 98 +++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 30 deletions(-) create mode 100644 roles/kafka_broker/tasks/topics.yml diff --git a/roles/kafka_broker/tasks/main.yml b/roles/kafka_broker/tasks/main.yml index 812586a7ca..18e0b61831 100644 --- a/roles/kafka_broker/tasks/main.yml +++ b/roles/kafka_broker/tasks/main.yml @@ -516,34 +516,6 @@ when: (ssl_provided_keystore_and_truststore | bool) - name: Create Topics - shell: | - {{ binary_base_path }}/bin/kafka-topics --bootstrap-server {{ hostvars[inventory_hostname]|confluent.platform.resolve_hostname }}:{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['port']}} \ - --command-config {{kafka_broker.client_config_file}} \ - --create --if-not-exists \ - --topic {{ item.name }} \ - {{ '' if 'partitions' not in item else ' --partitions ' + item.partitions|string }} \ - {{ '' if 'replication.factor' not in item else ' --replication-factor ' + item['replication.factor']|string }} - environment: - KAFKA_OPTS: "-Xlog:all=error -XX:+IgnoreUnrecognizedVMOptions {% if kerberos_client_config_file_dest != '/etc/krb5.conf' %}-Djava.security.krb5.conf={{kerberos_client_config_file_dest}}{% endif %}" - ignore_errors: true - loop: "{{ kafka_broker_topics }}" - when: not ( rbac_enabled|bool or kafka_broker_client_secrets_protection_enabled|bool ) - run_once: true - tags: - - resource - -- name: Configure Topics - shell: | - {{ binary_base_path }}/bin/kafka-configs --bootstrap-server {{ hostvars[inventory_hostname]|confluent.platform.resolve_hostname }}:{{kafka_broker_listeners[kafka_broker_inter_broker_listener_name]['port']}} \ - --command-config {{kafka_broker.client_config_file}} \ - --alter \ - --topic {{ item.name }} \ - {% for k in item.config %} --add-config {{ k|string }}={{ item.config[k]|string }} {% endfor %} \ - environment: - KAFKA_OPTS: "-Xlog:all=error -XX:+IgnoreUnrecognizedVMOptions {% if kerberos_client_config_file_dest != '/etc/krb5.conf' %}-Djava.security.krb5.conf={{kerberos_client_config_file_dest}}{% endif %}" - ignore_errors: true - loop: "{{ kafka_broker_topics|selectattr('config','defined') }}" - when: not ( rbac_enabled|bool or kafka_broker_client_secrets_protection_enabled|bool ) - run_once: true + include_tasks: kafka_topics.yml tags: - - resource + - topics diff --git a/roles/kafka_broker/tasks/topics.yml b/roles/kafka_broker/tasks/topics.yml new file mode 100644 index 0000000000..6328fa1456 --- /dev/null +++ b/roles/kafka_broker/tasks/topics.yml @@ -0,0 +1,98 @@ +--- + - name: Get Cluster ID + uri: + url: "{{ kafka_broker_erp_clusters_url }}" + validate_certs: false + return_content: true + status_code: 200 + url_username: "{{kafka_broker_rest_health_check_user}}" + url_password: "{{kafka_broker_rest_health_check_password}}" + force_basic_auth: true + body_format: json + register: cluster_id_query + run_once: true + tags: + - topics + + - name: Parse Kafka Cluster ID from json query + set_fact: + kafka_cluster_id: "{{ cluster_id_query.json.data[0].cluster_id }}" + run_once: true + tags: + - topics + + - name: Get topic info + uri: + url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics/{{ item.topic_name }}/configs" + status_code: [200, 404] + headers: + content-type: application/json + validate_certs: false + return_content: true + url_username: "{{kafka_broker_rest_health_check_user}}" + url_password: "{{kafka_broker_rest_health_check_password}}" + force_basic_auth: true + body_format: json + loop: "{{ kafka_broker_topics }}" + register: topic_info + run_once: true + tags: + - topics + + - name: Identify existing topics + set_fact: + new_topics: "{{ topic_info.results | rejectattr('status', 'eq', 200) | map(attribute = 'item') }}" + existing_topics: "{{ topic_info.results | selectattr('status', 'eq', 200) }}" + run_once: true + tags: + - topics + + - name: Create new topics + uri: + url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics" + method: POST + status_code: [201] + body: "{{ item }}" + headers: + content-type: application/json + validate_certs: false + return_content: true + url_username: "{{kafka_broker_rest_health_check_user}}" + url_password: "{{kafka_broker_rest_health_check_password}}" + force_basic_auth: true + body_format: json + loop: "{{ new_topics }}" + run_once: true + tags: + - topics + + - name: Update existing configs for existing topics + loop: "{{ existing_topics }}" + uri: + url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics/{{ item.item.topic_name }}/configs:alter" + method: POST + status_code: [204] + body: | + { + "data": [ + {% for config in item.item['configs']|default([]) %} + {% if loop.index > 1%},{% endif %} + {{ config }} + {% endfor %} + {% if item.item['configs']|default([]) | length > 0 %},{% endif %} + {% for config_remove in (item.json.data | selectattr('is_default', 'eq', false) | map(attribute = 'name')) | difference(item.item.configs|default({}) | map(attribute = 'name')) %} + {% if loop.index > 1%},{% endif %} + {"name": "{{ config_remove }}", "operation": "DELETE"} + {% endfor %} + ] + } + headers: + content-type: application/json + validate_certs: false + return_content: true + url_username: "{{kafka_broker_rest_health_check_user}}" + url_password: "{{kafka_broker_rest_health_check_password}}" + force_basic_auth: true + body_format: json + tags: + - topics \ No newline at end of file