From 50421b7b29008608de06165e4d7c79395cfbf57a Mon Sep 17 00:00:00 2001 From: cicharka Date: Thu, 24 Feb 2022 09:12:46 +0100 Subject: [PATCH] Remove kafka_var #2803 --- .../kafka/tasks/generate-certificates.yml | 88 +++---- .../playbooks/roles/kafka/tasks/metrics.yml | 6 +- .../roles/kafka/tasks/setup-kafka.yml | 44 ++-- .../kafka/templates/client-ssl.properties.j2 | 4 +- .../roles/kafka/templates/kafka.service.j2 | 10 +- .../templates/kafka_producer_consumer.py.j2 | 6 +- .../kafka/templates/kafka_server_jaas.conf.j2 | 6 +- .../roles/kafka/templates/logrotate.conf.j2 | 2 +- .../kafka/templates/server.properties.j2 | 86 +++---- docs/home/howto/RETENTION.md | 13 +- .../common/defaults/configuration/kafka.yml | 131 +++++----- .../common/validation/configuration/kafka.yml | 239 +++++++++--------- 12 files changed, 315 insertions(+), 320 deletions(-) diff --git a/ansible/playbooks/roles/kafka/tasks/generate-certificates.yml b/ansible/playbooks/roles/kafka/tasks/generate-certificates.yml index 18ef907cd2..04184fe72e 100644 --- a/ansible/playbooks/roles/kafka/tasks/generate-certificates.yml +++ b/ansible/playbooks/roles/kafka/tasks/generate-certificates.yml @@ -1,40 +1,40 @@ - name: Create stores directory file: - path: "{{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}" + path: "{{ specification.security.ssl.server.keystore_location | dirname }}" state: directory - owner: "{{ specification.kafka_var.user }}" - group: "{{ specification.kafka_var.group }}" + owner: "{{ specification.user }}" + group: "{{ specification.group }}" mode: "0755" - name: Check if keystore exists on broker stat: - path: "{{ specification.kafka_var.security.ssl.server.keystore_location }}" + path: "{{ specification.security.ssl.server.keystore_location }}" changed_when: false register: keystore_exists - name: Generate keystore for each server - shell: keytool -keystore {{ specification.kafka_var.security.ssl.server.keystore_location }} \ - -alias localhost -validity {{ specification.kafka_var.security.ssl.server.cert_validity }} -genkey -keyalg RSA \ - -noprompt -storepass {{ specification.kafka_var.security.ssl.server.passwords.keystore }} \ - -keypass {{ specification.kafka_var.security.ssl.server.passwords.key }} \ + shell: keytool -keystore {{ specification.security.ssl.server.keystore_location }} \ + -alias localhost -validity {{ specification.security.ssl.server.cert_validity }} -genkey -keyalg RSA \ + -noprompt -storepass {{ specification.security.ssl.server.passwords.keystore }} \ + -keypass {{ specification.security.ssl.server.passwords.key }} \ -dname "CN={{ inventory_hostname }}" -ext SAN="DNS:{{ inventory_hostname }}" when: - not keystore_exists.stat.exists - name: Check if signing certificate exists stat: - path: "{{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/ca-cert" + path: "{{ specification.security.ssl.server.keystore_location | dirname }}/ca-cert" register: signing_certificate_exists changed_when: false when: - groups['kafka'][0] == inventory_hostname - name: Generate signing certificate - shell: openssl req -new -x509 -keyout {{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/ca-key \ - -out {{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/ca-cert \ - -days {{ specification.kafka_var.security.ssl.server.cert_validity }} \ + shell: openssl req -new -x509 -keyout {{ specification.security.ssl.server.keystore_location | dirname }}/ca-key \ + -out {{ specification.security.ssl.server.keystore_location | dirname }}/ca-cert \ + -days {{ specification.security.ssl.server.cert_validity }} \ -subj "/CN={{ inventory_hostname }}" \ - --passout pass:{{ specification.kafka_var.security.ssl.server.passwords.key }} + --passout pass:{{ specification.security.ssl.server.passwords.key }} when: - groups['kafka'][0] == inventory_hostname - not signing_certificate_exists.stat.exists @@ -42,14 +42,14 @@ - name: Create kafka certificates directory on Epiphany host become: false file: - path: "{{ specification.kafka_var.security.ssl.server.local_cert_download_path }}" + path: "{{ specification.security.ssl.server.local_cert_download_path }}" state: directory delegate_to: localhost - name: Fetching files fetch: - src: "{{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/{{ item }}" - dest: "{{ specification.kafka_var.security.ssl.server.local_cert_download_path }}/{{ item }}" + src: "{{ specification.security.ssl.server.keystore_location | dirname }}/{{ item }}" + dest: "{{ specification.security.ssl.server.local_cert_download_path }}/{{ item }}" flat: yes loop: - "ca-cert" @@ -59,8 +59,8 @@ - name: Copy signing certificate and key to brokers copy: - src: "{{ specification.kafka_var.security.ssl.server.local_cert_download_path }}/{{ item }}" - dest: "{{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/" + src: "{{ specification.security.ssl.server.local_cert_download_path }}/{{ item }}" + dest: "{{ specification.security.ssl.server.keystore_location | dirname }}/" loop: - "ca-cert" - "ca-key" @@ -69,20 +69,20 @@ - name: Check if trustore exists stat: - path: "{{ specification.kafka_var.security.ssl.server.truststore_location }}" + path: "{{ specification.security.ssl.server.truststore_location }}" register: trustore_exists - name: Create trustore - shell: keytool -noprompt -keystore "{{ specification.kafka_var.security.ssl.server.truststore_location }}" -alias CARoot \ - -import -file "{{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/ca-cert" \ - -storepass {{ specification.kafka_var.security.ssl.server.passwords.keystore }} \ - -keypass {{ specification.kafka_var.security.ssl.server.passwords.key }} + shell: keytool -noprompt -keystore "{{ specification.security.ssl.server.truststore_location }}" -alias CARoot \ + -import -file "{{ specification.security.ssl.server.keystore_location | dirname }}/ca-cert" \ + -storepass {{ specification.security.ssl.server.passwords.keystore }} \ + -keypass {{ specification.security.ssl.server.passwords.key }} when: - not trustore_exists.stat.exists - name: Check if CA certificate is already imported - shell: keytool -list -v -keystore {{ specification.kafka_var.security.ssl.server.keystore_location }} \ - -storepass {{ specification.kafka_var.security.ssl.server.passwords.keystore }} \ + shell: keytool -list -v -keystore {{ specification.security.ssl.server.keystore_location }} \ + -storepass {{ specification.security.ssl.server.passwords.keystore }} \ | grep -i "Alias name" | grep -i "caroot" failed_when: "caroot_exists.rc == 2" changed_when: false @@ -90,8 +90,8 @@ - name: Check if certificate signed by CA is already imported shell: |- - keytool -list -v -keystore {{ specification.kafka_var.security.ssl.server.keystore_location }} \ - -storepass {{ specification.kafka_var.security.ssl.server.passwords.keystore }} \ + keytool -list -v -keystore {{ specification.security.ssl.server.keystore_location }} \ + -storepass {{ specification.security.ssl.server.passwords.keystore }} \ -alias localhost \ | grep -i 'Certificate chain length: 2' failed_when: "signed_cert_exists.rc == 2" @@ -99,41 +99,41 @@ register: signed_cert_exists - name: Export certificate to sign certificate with CA - shell: keytool -noprompt -keystore {{ specification.kafka_var.security.ssl.server.keystore_location }} \ + shell: keytool -noprompt -keystore {{ specification.security.ssl.server.keystore_location }} \ -alias localhost -certreq \ - -file "{{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/cert-file" \ - -storepass {{ specification.kafka_var.security.ssl.server.passwords.keystore }} \ - -keypass {{ specification.kafka_var.security.ssl.server.passwords.key }} + -file "{{ specification.security.ssl.server.keystore_location | dirname }}/cert-file" \ + -storepass {{ specification.security.ssl.server.passwords.keystore }} \ + -keypass {{ specification.security.ssl.server.passwords.key }} when: - signed_cert_exists.rc == 1 - name: Signing certificate with CA - shell: openssl x509 -req -CA "{{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/ca-cert" \ - -CAkey "{{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/ca-key" \ - -in "{{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/cert-file" \ - -out "{{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/cert-signed" \ - -days {{ specification.kafka_var.security.ssl.server.cert_validity }} -CAcreateserial \ - -passin pass:{{ specification.kafka_var.security.ssl.server.passwords.key }} + shell: openssl x509 -req -CA "{{ specification.security.ssl.server.keystore_location | dirname }}/ca-cert" \ + -CAkey "{{ specification.security.ssl.server.keystore_location | dirname }}/ca-key" \ + -in "{{ specification.security.ssl.server.keystore_location | dirname }}/cert-file" \ + -out "{{ specification.security.ssl.server.keystore_location | dirname }}/cert-signed" \ + -days {{ specification.security.ssl.server.cert_validity }} -CAcreateserial \ + -passin pass:{{ specification.security.ssl.server.passwords.key }} when: - signed_cert_exists.rc == 1 - name: Import certificate CA - shell: keytool -noprompt -keystore {{ specification.kafka_var.security.ssl.server.keystore_location }} -alias CARoot \ - -import -file "{{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/ca-cert" \ - -storepass {{ specification.kafka_var.security.ssl.server.passwords.keystore }} + shell: keytool -noprompt -keystore {{ specification.security.ssl.server.keystore_location }} -alias CARoot \ + -import -file "{{ specification.security.ssl.server.keystore_location | dirname }}/ca-cert" \ + -storepass {{ specification.security.ssl.server.passwords.keystore }} when: - caroot_exists.rc == 1 - name: Import certificate signed by CA - shell: keytool -noprompt -keystore {{ specification.kafka_var.security.ssl.server.keystore_location }} -alias localhost \ - -import -file "{{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/cert-signed" \ - -storepass {{ specification.kafka_var.security.ssl.server.passwords.keystore }} + shell: keytool -noprompt -keystore {{ specification.security.ssl.server.keystore_location }} -alias localhost \ + -import -file "{{ specification.security.ssl.server.keystore_location | dirname }}/cert-signed" \ + -storepass {{ specification.security.ssl.server.passwords.keystore }} when: - signed_cert_exists.rc == 1 - name: Remove extracted key and cert from others than root node file: - path: "{{ specification.kafka_var.security.ssl.server.keystore_location | dirname }}/{{ item }}" + path: "{{ specification.security.ssl.server.keystore_location | dirname }}/{{ item }}" state: absent loop: - "ca-cert" diff --git a/ansible/playbooks/roles/kafka/tasks/metrics.yml b/ansible/playbooks/roles/kafka/tasks/metrics.yml index 5860540c83..6b2c3a760b 100644 --- a/ansible/playbooks/roles/kafka/tasks/metrics.yml +++ b/ansible/playbooks/roles/kafka/tasks/metrics.yml @@ -2,7 +2,7 @@ - name: prometheus jmx | add kafka user to correct jmx exporter user user: - name: "{{ specification.kafka_var.user }}" + name: "{{ specification.user }}" groups: "{{ specification.jmx_exporter_group }}" append: yes @@ -11,8 +11,8 @@ copy: dest: "{{ specification.prometheus_jmx_config }}" src: jmx-kafka-config.yml - owner: "{{ specification.kafka_var.user }}" - group: "{{ specification.kafka_var.group }}" + owner: "{{ specification.user }}" + group: "{{ specification.group }}" mode: 0644 - name: delegated | create prometheus system group diff --git a/ansible/playbooks/roles/kafka/tasks/setup-kafka.yml b/ansible/playbooks/roles/kafka/tasks/setup-kafka.yml index 4db5b1fc6c..7fc630d28f 100644 --- a/ansible/playbooks/roles/kafka/tasks/setup-kafka.yml +++ b/ansible/playbooks/roles/kafka/tasks/setup-kafka.yml @@ -1,14 +1,14 @@ --- - name: Setup group group: - name: "{{ specification.kafka_var.group }}" + name: "{{ specification.group }}" system: yes - name: Setup user user: - name: "{{ specification.kafka_var.user }}" + name: "{{ specification.user }}" system: yes - group: "{{ specification.kafka_var.group }}" + group: "{{ specification.group }}" shell: "/usr/sbin/nologin" - name: Install Java package @@ -55,31 +55,31 @@ - name: Create data_dir file: - path: "{{ specification.kafka_var.data_dir }}" + path: "{{ specification.data_dir }}" state: directory - owner: "{{ specification.kafka_var.user }}" - group: "{{ specification.kafka_var.group }}" + owner: "{{ specification.user }}" + group: "{{ specification.group }}" mode: 0755 - name: Remove lost+found in the datadir file: - path: "{{ specification.kafka_var.data_dir }}/lost+found" + path: "{{ specification.data_dir }}/lost+found" state: absent - name: Create log_dir file: - path: "{{ specification.kafka_var.log_dir }}" + path: "{{ specification.log_dir }}" state: directory - owner: "{{ specification.kafka_var.user }}" - group: "{{ specification.kafka_var.group }}" + owner: "{{ specification.user }}" + group: "{{ specification.group }}" mode: 0755 - name: Create /etc/kafka directory file: path: /etc/kafka state: directory - owner: "{{ specification.kafka_var.user }}" - group: "{{ specification.kafka_var.group }}" + owner: "{{ specification.user }}" + group: "{{ specification.group }}" # - name: link conf_dir to /opt/kafka/config # file: dest=/etc/kafka owner=kafka group=kafka state=link src=/opt/kafka/config @@ -87,23 +87,23 @@ # Setup log4j.properties - name: Create log4j.properties file: - path: "{{ specification.kafka_var.conf_dir }}/log4j.properties" - owner: "{{ specification.kafka_var.user }}" - group: "{{ specification.kafka_var.group }}" + path: "{{ specification.conf_dir }}/log4j.properties" + owner: "{{ specification.user }}" + group: "{{ specification.group }}" mode: 0644 - name: Generate certificate include_tasks: generate-certificates.yml when: - - specification.kafka_var.security.ssl.enabled is defined - - specification.kafka_var.security.ssl.enabled + - specification.security.ssl.enabled is defined + - specification.security.ssl.enabled # Setup server.properties - name: Create server.properties template: - dest: "{{ specification.kafka_var.conf_dir }}/server.properties" - owner: "{{ specification.kafka_var.user }}" - group: "{{ specification.kafka_var.group }}" + dest: "{{ specification.conf_dir }}/server.properties" + owner: "{{ specification.user }}" + group: "{{ specification.group }}" # Was 0640 mode: 0644 src: server.properties.j2 @@ -114,7 +114,7 @@ - name: Delete meta.properties become: true file: - path: "{{ specification.kafka_var.data_dir }}/meta.properties" + path: "{{ specification.data_dir }}/meta.properties" state: absent when: create_server_properties.changed @@ -128,7 +128,7 @@ - name: configure system settings, file descriptors and number of threads for kafka pam_limits: - domain: "{{ specification.kafka_var.user }}" + domain: "{{ specification.user }}" limit_type: "{{ item.limit_type }}" limit_item: "{{ item.limit_item }}" value: "{{item.value}}" diff --git a/ansible/playbooks/roles/kafka/templates/client-ssl.properties.j2 b/ansible/playbooks/roles/kafka/templates/client-ssl.properties.j2 index 81b69ab3d6..c4807c9863 100644 --- a/ansible/playbooks/roles/kafka/templates/client-ssl.properties.j2 +++ b/ansible/playbooks/roles/kafka/templates/client-ssl.properties.j2 @@ -1,4 +1,4 @@ -bootstrap.servers={{ kafka_hosts }}:{{ specification.kafka_var.security.ssl.port }} +bootstrap.servers={{ kafka_hosts }}:{{ specification.security.ssl.port }} security.protocol=SSL ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks -ssl.truststore.password={{ specification.kafka_var.security.ssl.client.passwords.truststore }} +ssl.truststore.password={{ specification.security.ssl.client.passwords.truststore }} diff --git a/ansible/playbooks/roles/kafka/templates/kafka.service.j2 b/ansible/playbooks/roles/kafka/templates/kafka.service.j2 index 18c69243ee..71d6815140 100644 --- a/ansible/playbooks/roles/kafka/templates/kafka.service.j2 +++ b/ansible/playbooks/roles/kafka/templates/kafka.service.j2 @@ -2,8 +2,8 @@ Description=Kafka Daemon After=zookeeper.service -{% if specification.kafka_var.javax_net_debug is defined %} -{% set javax_debug = '-Djavax.net.debug=' ~ specification.kafka_var.javax_net_debug %} +{% if specification.javax_net_debug is defined %} +{% set javax_debug = '-Djavax.net.debug=' ~ specification.javax_net_debug %} {% else %} {% set javax_debug = '' %} {% endif %} @@ -14,14 +14,14 @@ User=kafka Group=kafka LimitNOFILE=32768 Restart=on-failure -Environment="KAFKA_HEAP_OPTS={{ specification.kafka_var.heap_opts }}" -Environment="LOG_DIR={{ specification.kafka_var.log_dir }}" +Environment="KAFKA_HEAP_OPTS={{ specification.heap_opts }}" +Environment="LOG_DIR={{ specification.log_dir }}" {% if exporter.stat.exists %} Environment="KAFKA_OPTS={{ javax_debug }} -javaagent:{{ prometheus_jmx_exporter_path }}={{ specification.prometheus_jmx_exporter_web_listen_port }}:{{ specification.prometheus_jmx_config }}" {% else %} Environment="KAFKA_OPTS={{ javax_debug }}" {% endif %} -Environment="KAFKA_JMX_OPTS={{ specification.kafka_var.jmx_opts }}" +Environment="KAFKA_JMX_OPTS={{ specification.jmx_opts }}" ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties [Install] diff --git a/ansible/playbooks/roles/kafka/templates/kafka_producer_consumer.py.j2 b/ansible/playbooks/roles/kafka/templates/kafka_producer_consumer.py.j2 index 0f798a4480..b3bf8e47a3 100644 --- a/ansible/playbooks/roles/kafka/templates/kafka_producer_consumer.py.j2 +++ b/ansible/playbooks/roles/kafka/templates/kafka_producer_consumer.py.j2 @@ -20,8 +20,8 @@ class Producer(threading.Thread): producer = KafkaProducer(bootstrap_servers='{{ kafka_hosts }}') while not self.stop_event.is_set(): - {% for msg in specification.kafka_var.tests.epiphany_topic_test_msgs %} - producer.send('{{ specification.kafka_var.tests.epiphany_topic_test }}', b"{{ msg }}") + {% for msg in specification.tests.epiphany_topic_test_msgs %} + producer.send('{{ specification.tests.epiphany_topic_test }}', b"{{ msg }}") {% endfor %} time.sleep(1) @@ -40,7 +40,7 @@ class Consumer(multiprocessing.Process): consumer = KafkaConsumer(bootstrap_servers='{{ kafka_hosts }}', auto_offset_reset='earliest', consumer_timeout_ms=1000) - consumer.subscribe(['{{ specification.kafka_var.tests.epiphany_topic_test }}']) + consumer.subscribe(['{{ specification.tests.epiphany_topic_test }}']) while not self.stop_event.is_set(): for message in consumer: diff --git a/ansible/playbooks/roles/kafka/templates/kafka_server_jaas.conf.j2 b/ansible/playbooks/roles/kafka/templates/kafka_server_jaas.conf.j2 index ca801c1b8b..176c77ffe7 100644 --- a/ansible/playbooks/roles/kafka/templates/kafka_server_jaas.conf.j2 +++ b/ansible/playbooks/roles/kafka/templates/kafka_server_jaas.conf.j2 @@ -1,8 +1,8 @@ KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required - username="{{ specification.kafka_var.admin }}" - password="{{ specification.kafka_var.admin_pwd }}" - user_admin="{{ specification.kafka_var.admin_pwd }}" + username="{{ specification.admin }}" + password="{{ specification.admin_pwd }}" + user_admin="{{ specification.admin_pwd }}" {%- for host in kafka_hosts %} user_{{host}}="kafkabroker1-secret"; {%- endfor %} diff --git a/ansible/playbooks/roles/kafka/templates/logrotate.conf.j2 b/ansible/playbooks/roles/kafka/templates/logrotate.conf.j2 index 3c0d37d799..ad2b60e6d1 100644 --- a/ansible/playbooks/roles/kafka/templates/logrotate.conf.j2 +++ b/ansible/playbooks/roles/kafka/templates/logrotate.conf.j2 @@ -1,4 +1,4 @@ -{{ specification.kafka_var.log_dir }}/*.log { +{{ specification.log_dir }}/*.log { rotate 5 daily compress diff --git a/ansible/playbooks/roles/kafka/templates/server.properties.j2 b/ansible/playbooks/roles/kafka/templates/server.properties.j2 index 6f2643beac..63ec2cd81b 100644 --- a/ansible/playbooks/roles/kafka/templates/server.properties.j2 +++ b/ansible/playbooks/roles/kafka/templates/server.properties.j2 @@ -11,23 +11,23 @@ auto.create.topics.enable=true delete.topic.enable=true #default replication factors for automatically created topics -default.replication.factor={{ specification.kafka_var.default_replication_factor }} +default.replication.factor={{ specification.default_replication_factor }} #The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement. -offsets.topic.replication.factor={{ specification.kafka_var.offsets_topic_replication_factor }} +offsets.topic.replication.factor={{ specification.offsets_topic_replication_factor }} #Offsets older than this retention period will be discarded -offsets.retention.minutes={{ specification.kafka_var.offset_retention_minutes }} +offsets.retention.minutes={{ specification.offset_retention_minutes }} #The maximum number of incremental fetch sessions that we will maintain -max.incremental.fetch.session.cache.slots={{ specification.kafka_var.max_incremental_fetch_session_cache_slots }} +max.incremental.fetch.session.cache.slots={{ specification.max_incremental_fetch_session_cache_slots }} #Enable controlled shutdown of the server -controlled.shutdown.enable={{ specification.kafka_var.controlled_shutdown_enable | lower }} +controlled.shutdown.enable={{ specification.controlled_shutdown_enable | lower }} #Number of fetcher threads used to replicate messages from a source broker. #Increasing this value can increase the degree of I/O parallelism in the follower broker. -num.replica.fetchers={{ specification.kafka_var.num_replica_fetchers }} +num.replica.fetchers={{ specification.num_replica_fetchers }} #The number of bytes of messages to attempt to fetch for each partition. #This is not an absolute maximum, if the first record batch in the first @@ -35,10 +35,10 @@ num.replica.fetchers={{ specification.kafka_var.num_replica_fetchers }} #the record batch will still be returned to ensure that progress can be made. #The maximum record batch size accepted by the broker is defined #via message.max.bytes (broker config) or max.message.bytes (topic config). -replica.fetch.max.bytes={{ specification.kafka_var.replica_fetch_max_bytes }} +replica.fetch.max.bytes={{ specification.replica_fetch_max_bytes }} #The socket receive buffer for network requests -replica.socket.receive.buffer.bytes={{ specification.kafka_var.replica_socket_receive_buffer_bytes }} +replica.socket.receive.buffer.bytes={{ specification.replica_socket_receive_buffer_bytes }} ############################# Socket Server Settings ############################# {%- endif %} @@ -50,10 +50,10 @@ replica.socket.receive.buffer.bytes={{ specification.kafka_var.replica_socket_re # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 -{% if specification.kafka_var.security.ssl.enabled -%} -listeners=SSL://{{ inventory_hostname }}:{{ specification.kafka_var.security.ssl.port }} +{% if specification.security.ssl.enabled -%} +listeners=SSL://{{ inventory_hostname }}:{{ specification.security.ssl.port }} {% else %} -listeners=PLAINTEXT://{{ ansible_default_ipv4.address }}:{{ specification.kafka_var.port }} +listeners=PLAINTEXT://{{ ansible_default_ipv4.address }}:{{ specification.port }} {%- endif %} # Hostname and port the broker will advertise to producers and consumers. If not set, @@ -65,47 +65,47 @@ listeners=PLAINTEXT://{{ ansible_default_ipv4.address }}:{{ specification.kafka_ #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads handling network requests -num.network.threads={{ specification.kafka_var.socket_settings.network_threads }} +num.network.threads={{ specification.socket_settings.network_threads }} # The number of threads doing disk I/O -num.io.threads={{ specification.kafka_var.socket_settings.io_threads }} +num.io.threads={{ specification.socket_settings.io_threads }} # The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer.bytes={{ specification.kafka_var.socket_settings.send_buffer_bytes }} +socket.send.buffer.bytes={{ specification.socket_settings.send_buffer_bytes }} # The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer.bytes={{ specification.kafka_var.socket_settings.receive_buffer_bytes }} +socket.receive.buffer.bytes={{ specification.socket_settings.receive_buffer_bytes }} # The maximum size of a request that the socket server will accept (protection against OOM) -socket.request.max.bytes={{ specification.kafka_var.socket_settings.request_max_bytes }} +socket.request.max.bytes={{ specification.socket_settings.request_max_bytes }} ############################# Security ######################################### # Security protocol used to communicate between brokers -{% if specification.kafka_var.security.ssl.enabled -%} +{% if specification.security.ssl.enabled -%} # If not enabled it should default to PLAINTEXT -security.inter.broker.protocol={{ specification.kafka_var.security.inter_broker_protocol }} +security.inter.broker.protocol={{ specification.security.inter_broker_protocol }} #### Encryption Settings #### -ssl.endpoint.identification.algorithm={{ specification.kafka_var.security.ssl.endpoint_identification_algorithm }} +ssl.endpoint.identification.algorithm={{ specification.security.ssl.endpoint_identification_algorithm }} -ssl.keystore.location={{ specification.kafka_var.security.ssl.server.keystore_location }} -ssl.keystore.password={{ specification.kafka_var.security.ssl.server.passwords.keystore }} -ssl.truststore.location={{ specification.kafka_var.security.ssl.server.truststore_location }} -ssl.truststore.password={{ specification.kafka_var.security.ssl.server.passwords.truststore }} -ssl.key.password={{ specification.kafka_var.security.ssl.server.passwords.key }} +ssl.keystore.location={{ specification.security.ssl.server.keystore_location }} +ssl.keystore.password={{ specification.security.ssl.server.passwords.keystore }} +ssl.truststore.location={{ specification.security.ssl.server.truststore_location }} +ssl.truststore.password={{ specification.security.ssl.server.passwords.truststore }} +ssl.key.password={{ specification.security.ssl.server.passwords.key }} -ssl.client.auth={{ specification.kafka_var.security.ssl.client_auth }} +ssl.client.auth={{ specification.security.ssl.client_auth }} {%- endif %} -{% if specification.kafka_var.security.authentication.enabled %} -{% if specification.kafka_var.security.authentication.authentication_method == "sasl" -%} +{% if specification.security.authentication.enabled %} +{% if specification.security.authentication.authentication_method == "sasl" -%} #### Authentication Settings #### # SASL mechanism used for inter-broker communication. -sasl.mechanism.inter.broker.protocol={{ specification.kafka_var.security.authentication.sasl_mechanism_inter_broker_protocol }} +sasl.mechanism.inter.broker.protocol={{ specification.security.authentication.sasl_mechanism_inter_broker_protocol }} -sasl.enabled.mechanisms={{ specification.kafka_var.security.sasl_authentication.enabled_mechanisms }} +sasl.enabled.mechanisms={{ specification.security.sasl_authentication.enabled_mechanisms }} # The list of SASL mechanisms enabled in the Kafka server. The list may contain any mechanism # for which a security provider is available. Only GSSAPI is enabled by default. @@ -113,23 +113,23 @@ sasl.enabled.mechanisms={{ specification.kafka_var.security.sasl_authentication. ############################# ACLs ############################################# # The authorizer class that should be used for authorization -authorizer.class.name={{ specification.kafka_var.security.authorization.authorizer_class_name }} +authorizer.class.name={{ specification.security.authorization.authorizer_class_name }} # If a Resource R has no associated ACLs, no one other than super users is allowed to access R. If you want to change that behavior, set this property to true -allow.everyone.if.no.acl.found={{ specification.kafka_var.security.authorization.allow_everyone_if_no_acl_found }} +allow.everyone.if.no.acl.found={{ specification.security.authorization.allow_everyone_if_no_acl_found }} -{% if specification.kafka_var.security.authentication.enabled and specification.kafka_var.security.authorization.enabled -%} +{% if specification.security.authentication.enabled and specification.security.authorization.enabled -%} -{% if specification.kafka_var.security.authentication.authentication_method == "certificates" -%} +{% if specification.security.authentication.authentication_method == "certificates" -%} {% set super_users = groups['kafka'] %} -{% if specification.kafka_var.security.authorization.super_users is defined -%} -{% set super_users = super_users + specification.kafka_var.security.authorization.super_users %} +{% if specification.security.authorization.super_users is defined -%} +{% set super_users = super_users + specification.security.authorization.super_users %} {%- endif %} super.users=User:CN={{ super_users | list | join(';User:CN=') }}; {%- endif %} -{% if specification.kafka_var.security.authentication.authentication_method == "sasl" and specification.kafka_var.security.authorization.super_users is defined -%} -super.users={{ specification.kafka_var.security.authorization.super_users }} +{% if specification.security.authentication.authentication_method == "sasl" and specification.security.authorization.super_users is defined -%} +super.users={{ specification.security.authorization.super_users }} {%- endif %} {%- endif %} @@ -138,20 +138,20 @@ super.users={{ specification.kafka_var.security.authorization.super_users }} ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files -log.dirs={{ specification.kafka_var.data_dir }} +log.dirs={{ specification.data_dir }} # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. -num.partitions={{ specification.kafka_var.partitions }} +num.partitions={{ specification.partitions }} # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. -num.recovery.threads.per.data.dir={{ specification.kafka_var.num_recovery_threads_per_data_dir }} +num.recovery.threads.per.data.dir={{ specification.num_recovery_threads_per_data_dir }} # When a producer sets acks to "all" (or "-1"), this configuration specifies the minimum number # of replicas that must acknowledge a write for the write to be considered successful. -min.insync.replicas={{ specification.kafka_var.min_insync_replicas }} +min.insync.replicas={{ specification.min_insync_replicas }} ############################# Log Flush Policy ############################# @@ -178,11 +178,11 @@ min.insync.replicas={{ specification.kafka_var.min_insync_replicas }} # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age -log.retention.hours={{ specification.kafka_var.log_retention_hours }} +log.retention.hours={{ specification.log_retention_hours }} # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. Functions independently of log.retention.hours. -log.retention.bytes={{ specification.kafka_var.log_retention_bytes }} +log.retention.bytes={{ specification.log_retention_bytes }} # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 diff --git a/docs/home/howto/RETENTION.md b/docs/home/howto/RETENTION.md index 3fa6ccdb9c..6ae5b8d87f 100644 --- a/docs/home/howto/RETENTION.md +++ b/docs/home/howto/RETENTION.md @@ -22,23 +22,22 @@ kind: configuration/kafka title: "Kafka" name: default specification: - kafka_var: - partitions: 8 - log_retention_hours: 168 - log_retention_bytes: -1 + partitions: 8 + log_retention_hours: 168 + log_retention_bytes: -1 ``` ### Configuration parameters -#### specification.kafka_var.partitions +#### specification.partitions Sets [num.partitions](https://kafka.apache.org/documentation/#brokerconfigs_num.partitions) parameter -#### specification.kafka_var.log_retention_hours +#### specification.log_retention_hours Sets [log.retention.hours](https://kafka.apache.org/documentation/#brokerconfigs_log.retention.bytes) parameter -#### specification.kafka_var.log_retention_bytes +#### specification.log_retention_bytes Sets [log.retention.bytes](https://kafka.apache.org/documentation/#brokerconfigs_log.retention.bytes) parameter diff --git a/schema/common/defaults/configuration/kafka.yml b/schema/common/defaults/configuration/kafka.yml index 1da6a76f96..30feadfdf9 100644 --- a/schema/common/defaults/configuration/kafka.yml +++ b/schema/common/defaults/configuration/kafka.yml @@ -2,72 +2,71 @@ kind: configuration/kafka title: "Kafka" name: default specification: - kafka_var: - enabled: True - admin: kafka - admin_pwd: epiphany - # javax_net_debug: all # uncomment to activate debugging, other debug options: https://colinpaice.blog/2020/04/05/using-java-djavax-net-debug-to-examine-data-flows-including-tls/ - security: - ssl: - enabled: False - port: 9093 - server: - local_cert_download_path: kafka-certs - keystore_location: /var/private/ssl/kafka.server.keystore.jks - truststore_location: /var/private/ssl/kafka.server.truststore.jks - cert_validity: 365 - passwords: - keystore: PasswordToChange - truststore: PasswordToChange - key: PasswordToChange - endpoint_identification_algorithm: HTTPS - client_auth: required - encrypt_at_rest: False - inter_broker_protocol: PLAINTEXT - authorization: - enabled: False - authorizer_class_name: kafka.security.auth.SimpleAclAuthorizer - allow_everyone_if_no_acl_found: False - super_users: - - tester01 - - tester02 - users: - - name: test_user - topic: test_topic - authentication: - enabled: False - authentication_method: certificates - sasl_mechanism_inter_broker_protocol: - sasl_enabled_mechanisms: PLAIN - sha: "b28e81705e30528f1abb6766e22dfe9dae50b1e1e93330c880928ff7a08e6b38ee71cbfc96ec14369b2dfd24293938702cab422173c8e01955a9d1746ae43f98" - port: 9092 - min_insync_replicas: 1 # Minimum number of replicas (ack write) - default_replication_factor: 1 # Minimum number of automatically created topics - offsets_topic_replication_factor: 1 # Minimum number of offsets topic (consider higher value for HA) - num_recovery_threads_per_data_dir: 1 # Minimum number of recovery threads per data dir - num_replica_fetchers: 1 # Minimum number of replica fetchers - replica_fetch_max_bytes: 1048576 - replica_socket_receive_buffer_bytes: 65536 - partitions: 8 # 100 x brokers x replicas for reasonable size cluster. Small clusters can be less - log_retention_hours: 168 # The minimum age of a log file to be eligible for deletion due to age - log_retention_bytes: -1 # -1 is no size limit only a time limit (log_retention_hours). This limit is enforced at the partition level, multiply it by the number of partitions to compute the topic retention in bytes. - offset_retention_minutes: 10080 # Offsets older than this retention period will be discarded - heap_opts: "-Xmx2G -Xms2G" - opts: "-Djavax.net.debug=all" - jmx_opts: - max_incremental_fetch_session_cache_slots: 1000 - controlled_shutdown_enable: true - group: kafka - user: kafka - conf_dir: /opt/kafka/config - data_dir: /var/lib/kafka - log_dir: /var/log/kafka - socket_settings: - network_threads: 3 # The number of threads handling network requests - io_threads: 8 # The number of threads doing disk I/O - send_buffer_bytes: 102400 # The send buffer (SO_SNDBUF) used by the socket server - receive_buffer_bytes: 102400 # The receive buffer (SO_RCVBUF) used by the socket server - request_max_bytes: 104857600 # The maximum size of a request that the socket server will accept (protection against OOM) + enabled: True + admin: kafka + admin_pwd: epiphany + # javax_net_debug: all # uncomment to activate debugging, other debug options: https://colinpaice.blog/2020/04/05/using-java-djavax-net-debug-to-examine-data-flows-including-tls/ + security: + ssl: + enabled: False + port: 9093 + server: + local_cert_download_path: kafka-certs + keystore_location: /var/private/ssl/kafka.server.keystore.jks + truststore_location: /var/private/ssl/kafka.server.truststore.jks + cert_validity: 365 + passwords: + keystore: PasswordToChange + truststore: PasswordToChange + key: PasswordToChange + endpoint_identification_algorithm: HTTPS + client_auth: required + encrypt_at_rest: False + inter_broker_protocol: PLAINTEXT + authorization: + enabled: False + authorizer_class_name: kafka.security.auth.SimpleAclAuthorizer + allow_everyone_if_no_acl_found: False + super_users: + - tester01 + - tester02 + users: + - name: test_user + topic: test_topic + authentication: + enabled: False + authentication_method: certificates + sasl_mechanism_inter_broker_protocol: + sasl_enabled_mechanisms: PLAIN + sha: "b28e81705e30528f1abb6766e22dfe9dae50b1e1e93330c880928ff7a08e6b38ee71cbfc96ec14369b2dfd24293938702cab422173c8e01955a9d1746ae43f98" + port: 9092 + min_insync_replicas: 1 # Minimum number of replicas (ack write) + default_replication_factor: 1 # Minimum number of automatically created topics + offsets_topic_replication_factor: 1 # Minimum number of offsets topic (consider higher value for HA) + num_recovery_threads_per_data_dir: 1 # Minimum number of recovery threads per data dir + num_replica_fetchers: 1 # Minimum number of replica fetchers + replica_fetch_max_bytes: 1048576 + replica_socket_receive_buffer_bytes: 65536 + partitions: 8 # 100 x brokers x replicas for reasonable size cluster. Small clusters can be less + log_retention_hours: 168 # The minimum age of a log file to be eligible for deletion due to age + log_retention_bytes: -1 # -1 is no size limit only a time limit (log_retention_hours). This limit is enforced at the partition level, multiply it by the number of partitions to compute the topic retention in bytes. + offset_retention_minutes: 10080 # Offsets older than this retention period will be discarded + heap_opts: "-Xmx2G -Xms2G" + opts: "-Djavax.net.debug=all" + jmx_opts: + max_incremental_fetch_session_cache_slots: 1000 + controlled_shutdown_enable: true + group: kafka + user: kafka + conf_dir: /opt/kafka/config + data_dir: /var/lib/kafka + log_dir: /var/log/kafka + socket_settings: + network_threads: 3 # The number of threads handling network requests + io_threads: 8 # The number of threads doing disk I/O + send_buffer_bytes: 102400 # The send buffer (SO_SNDBUF) used by the socket server + receive_buffer_bytes: 102400 # The receive buffer (SO_RCVBUF) used by the socket server + request_max_bytes: 104857600 # The maximum size of a request that the socket server will accept (protection against OOM) zookeeper_set_acl: false zookeeper_hosts: "{{ groups['zookeeper']|join(':2181,') }}:2181" jmx_exporter_user: jmx-exporter diff --git a/schema/common/validation/configuration/kafka.yml b/schema/common/validation/configuration/kafka.yml index 32b7cb5b98..ebbd14ba68 100644 --- a/schema/common/validation/configuration/kafka.yml +++ b/schema/common/validation/configuration/kafka.yml @@ -3,147 +3,144 @@ title: "Kafka specification schema" description: "Kafka specification schema" type: object properties: - kafka_var: + enabled: + type: boolean + admin: + type: string + admin_pwd: + type: string + javax_net_debug: + type: string + security: type: object properties: - enabled: - type: boolean - admin: - type: string - admin_pwd: - type: string - javax_net_debug: - type: string - security: + ssl: type: object properties: - ssl: + enabled: + type: boolean + port: + type: integer + server: type: object properties: - enabled: - type: boolean - port: + local_cert_download_path: + type: string + keystore_location: + type: string + truststore_location: + type: string + cert_validity: type: integer - server: + passwords: type: object properties: - local_cert_download_path: + keystore: type: string - keystore_location: + truststore: type: string - truststore_location: + key: type: string - cert_validity: - type: integer - passwords: - type: object - properties: - keystore: - type: string - truststore: - type: string - key: - type: string - endpoint_identification_algorithm: - type: string - client_auth: - type: string - encrypt_at_rest: + endpoint_identification_algorithm: + type: string + client_auth: + type: string + encrypt_at_rest: + type: boolean + inter_broker_protocol: + type: string + authorization: + type: object + properties: + enabled: type: boolean - inter_broker_protocol: + authorizer_class_name: type: string - authorization: - type: object - properties: - enabled: - type: boolean - authorizer_class_name: - type: string - allow_everyone_if_no_acl_found: - type: boolean - super_users: - type: array - items: + allow_everyone_if_no_acl_found: + type: boolean + super_users: + type: array + items: + type: string + users: + type: array + items: + type: object + properties: + name: type: string - users: - type: array - items: - type: object - properties: - name: - type: string - topic: - type: string - authentication: - type: object - properties: - enabled: - type: boolean - authentication_method: - type: string - sasl_mechanism_inter_broker_protocol: - type: 'null' - sasl_enabled_mechanisms: - type: string - sha: - type: string - port: - type: integer - min_insync_replicas: - type: integer - default_replication_factor: - type: integer - offsets_topic_replication_factor: - type: integer - num_recovery_threads_per_data_dir: - type: integer - num_replica_fetchers: - type: integer - replica_fetch_max_bytes: - type: integer - replica_socket_receive_buffer_bytes: - type: integer - partitions: + topic: + type: string + authentication: + type: object + properties: + enabled: + type: boolean + authentication_method: + type: string + sasl_mechanism_inter_broker_protocol: + type: 'null' + sasl_enabled_mechanisms: + type: string + sha: + type: string + port: + type: integer + min_insync_replicas: + type: integer + default_replication_factor: + type: integer + offsets_topic_replication_factor: + type: integer + num_recovery_threads_per_data_dir: + type: integer + num_replica_fetchers: + type: integer + replica_fetch_max_bytes: + type: integer + replica_socket_receive_buffer_bytes: + type: integer + partitions: + type: integer + log_retention_hours: + type: integer + log_retention_bytes: + type: integer + offset_retention_minutes: + type: integer + heap_opts: + type: string + opts: + type: string + jmx_opts: + type: 'null' + max_incremental_fetch_session_cache_slots: + type: integer + controlled_shutdown_enable: + type: boolean + group: + type: string + user: + type: string + conf_dir: + type: string + data_dir: + type: string + log_dir: + type: string + socket_settings: + type: object + properties: + network_threads: type: integer - log_retention_hours: + io_threads: type: integer - log_retention_bytes: + send_buffer_bytes: type: integer - offset_retention_minutes: + receive_buffer_bytes: type: integer - heap_opts: - type: string - opts: - type: string - jmx_opts: - type: 'null' - max_incremental_fetch_session_cache_slots: + request_max_bytes: type: integer - controlled_shutdown_enable: - type: boolean - group: - type: string - user: - type: string - conf_dir: - type: string - data_dir: - type: string - log_dir: - type: string - socket_settings: - type: object - properties: - network_threads: - type: integer - io_threads: - type: integer - send_buffer_bytes: - type: integer - receive_buffer_bytes: - type: integer - request_max_bytes: - type: integer zookeeper_set_acl: type: boolean zookeeper_hosts: