Skip to content

Commit

Permalink
MVP Create Topics
Browse files Browse the repository at this point in the history
  • Loading branch information
justinrlee committed May 14, 2023
1 parent c7d01d1 commit 5d0fb1b
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 0 deletions.
5 changes: 5 additions & 0 deletions roles/kafka_broker/tasks/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -514,3 +514,8 @@
- "{{kafka_broker_cert_path}}"
- "{{kafka_broker_key_path}}"
when: (ssl_provided_keystore_and_truststore | bool)

- name: Create Topics
include_tasks: kafka_topics.yml
tags:
- topics
98 changes: 98 additions & 0 deletions roles/kafka_broker/tasks/topics.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
---
- name: Get Cluster ID
uri:
url: "{{ kafka_broker_erp_clusters_url }}"
validate_certs: false
return_content: true
status_code: 200
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
body_format: json
register: cluster_id_query
run_once: true
tags:
- topics

- name: Parse Kafka Cluster ID from json query
set_fact:
kafka_cluster_id: "{{ cluster_id_query.json.data[0].cluster_id }}"
run_once: true
tags:
- topics

- name: Get topic info
uri:
url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics/{{ item.topic_name }}/configs"
status_code: [200, 404]
headers:
content-type: application/json
validate_certs: false
return_content: true
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
body_format: json
loop: "{{ kafka_broker_topics }}"
register: topic_info
run_once: true
tags:
- topics

- name: Identify existing topics
set_fact:
new_topics: "{{ topic_info.results | rejectattr('status', 'eq', 200) | map(attribute = 'item') }}"
existing_topics: "{{ topic_info.results | selectattr('status', 'eq', 200) }}"
run_once: true
tags:
- topics

- name: Create new topics
uri:
url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics"
method: POST
status_code: [201]
body: "{{ item }}"
headers:
content-type: application/json
validate_certs: false
return_content: true
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
body_format: json
loop: "{{ new_topics }}"
run_once: true
tags:
- topics

- name: Update existing configs for existing topics
loop: "{{ existing_topics }}"
uri:
url: "{{ kafka_broker_erp_clusters_url }}/{{ kafka_cluster_id }}/topics/{{ item.item.topic_name }}/configs:alter"
method: POST
status_code: [204]
body: |
{
"data": [
{% for config in item.item['configs']|default([]) %}
{% if loop.index > 1%},{% endif %}
{{ config }}
{% endfor %}
{% if item.item['configs']|default([]) | length > 0 %},{% endif %}
{% for config_remove in (item.json.data | selectattr('is_default', 'eq', false) | map(attribute = 'name')) | difference(item.item.configs|default({}) | map(attribute = 'name')) %}
{% if loop.index > 1%},{% endif %}
{"name": "{{ config_remove }}", "operation": "DELETE"}
{% endfor %}
]
}
headers:
content-type: application/json
validate_certs: false
return_content: true
url_username: "{{kafka_broker_rest_health_check_user}}"
url_password: "{{kafka_broker_rest_health_check_password}}"
force_basic_auth: true
body_format: json
tags:
- topics

0 comments on commit 5d0fb1b

Please sign in to comment.