diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index 76336e1257412..51dade3471b5b 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -48,6 +48,7 @@ class ConnectServiceBase(Service): def __init__(self, context, num_nodes, kafka, files): super(ConnectServiceBase, self).__init__(context, num_nodes) self.kafka = kafka + self.security_config = kafka.security_config.client_config() self.files = files def pids(self, node): @@ -89,6 +90,7 @@ def restart(self): def clean_node(self, node): node.account.kill_process("connect", clean_shutdown=False, allow_fail=True) + self.security_config.clean_node(node) node.account.ssh("rm -rf " + " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files), allow_fail=False) def config_filenames(self): @@ -153,6 +155,7 @@ def node(self): def start_cmd(self, node, connector_configs): cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE + cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts cmd += "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE) cmd += " ".join(connector_configs) cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE) @@ -161,6 +164,7 @@ def start_cmd(self, node, connector_configs): def start_node(self, node): node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) + self.security_config.setup_node(node) node.account.create_file(self.CONFIG_FILE, self.config_template_func(node)) node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE)) remote_connector_configs = [] @@ -190,6 +194,7 @@ def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offs def start_cmd(self, node): cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE + cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE) cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE) return cmd @@ -197,6 +202,7 @@ def start_cmd(self, node): def start_node(self, node): node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) + self.security_config.setup_node(node) node.account.create_file(self.CONFIG_FILE, self.config_template_func(node)) node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE)) if self.connector_config_templates: diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py index 4386788d5f5ab..cb4b2c1ac9a02 100644 --- a/tests/kafkatest/services/mirror_maker.py +++ b/tests/kafkatest/services/mirror_maker.py @@ -1,4 +1,3 @@ - # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index b5efba81e4f80..1bbabd2359b02 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -17,6 +17,7 @@ import subprocess from ducktape.template import TemplateRenderer from kafkatest.services.security.minikdc import MiniKdc +import itertools class Keytool(object): @@ -172,17 +173,22 @@ def kafka_opts(self): else: return "" - def __str__(self): + def props(self, prefix=''): """ - Return properties as string with line separators. + Return properties as string with line separators, optionally with a prefix. This is used to append security config properties to a properties file. + :param prefix: prefix to add to each property + :return: a string containing line-separated properties """ + if self.security_protocol == SecurityConfig.PLAINTEXT: + return "" + config_lines = (prefix + key + "=" + value for key, value in self.properties.iteritems()) + # Extra blank lines ensure this can be appended/prepended safely + return "\n".join(itertools.chain([""], config_lines, [""])) - prop_str = "" - if self.security_protocol != SecurityConfig.PLAINTEXT: - for key, value in self.properties.items(): - prop_str += ("\n" + key + "=" + value) - prop_str += "\n" - return prop_str - + def __str__(self): + """ + Return properties as a string with line separators. + """ + return self.props() diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 9aa16abfbd7a8..698a827b1712a 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -13,15 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kafkatest.tests.kafka_test import KafkaTest +from ducktape.tests.test import Test + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.security.security_config import SecurityConfig from ducktape.utils.util import wait_until from ducktape.mark import matrix import subprocess, itertools, time from collections import Counter -class ConnectDistributedTest(KafkaTest): +class ConnectDistributedTest(Test): """ Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on another, validating the total output is identical to the input. @@ -45,22 +49,39 @@ class ConnectDistributedTest(KafkaTest): SCHEMA = { "type": "string", "optional": False } def __init__(self, test_context): - super(ConnectDistributedTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ + super(ConnectDistributedTest, self).__init__(test_context) + self.num_zk = 1 + self.num_brokers = 1 + self.topics = { 'test' : { 'partitions': 1, 'replication-factor': 1 } - }) + } + + self.zk = ZookeeperService(test_context, self.num_zk) - self.cc = ConnectDistributedService(test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE]) - self.cc.log_level = "DEBUG" self.key_converter = "org.apache.kafka.connect.json.JsonConverter" self.value_converter = "org.apache.kafka.connect.json.JsonConverter" self.schemas = True - def test_file_source_and_sink(self): + def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT): + self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, + security_protocol=security_protocol, interbroker_security_protocol=security_protocol, + topics=self.topics) + + self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE]) + self.cc.log_level = "DEBUG" + + self.zk.start() + self.kafka.start() + + + @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL]) + def test_file_source_and_sink(self, security_protocol): """ Tests that a basic file connector works across clean rolling bounces. This validates that the connector is correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes. """ + self.setup_services(security_protocol=security_protocol) self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) self.cc.start() @@ -94,6 +115,7 @@ def test_bounce(self, clean): """ num_tasks = 3 + self.setup_services() self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) self.cc.start() diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index 90f219a24271a..7b57402bf7cef 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -13,14 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kafkatest.tests.kafka_test import KafkaTest +from ducktape.tests.test import Test + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService from kafkatest.services.connect import ConnectStandaloneService from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.security.security_config import SecurityConfig from ducktape.utils.util import wait_until -from ducktape.mark import parametrize +from ducktape.mark import parametrize, matrix import hashlib, subprocess, json -class ConnectStandaloneFileTest(KafkaTest): +class ConnectStandaloneFileTest(Test): """ Simple test of Kafka Connect that produces data from a file in one standalone process and consumes it on another, validating the output is @@ -42,24 +46,39 @@ class ConnectStandaloneFileTest(KafkaTest): SCHEMA = { "type": "string", "optional": False } def __init__(self, test_context): - super(ConnectStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ + super(ConnectStandaloneFileTest, self).__init__(test_context) + self.num_zk = 1 + self.num_brokers = 1 + self.topics = { 'test' : { 'partitions': 1, 'replication-factor': 1 } - }) + } - self.source = ConnectStandaloneService(test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE]) - self.sink = ConnectStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE]) - self.consumer_validator = ConsoleConsumer(test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=1000) + self.zk = ZookeeperService(test_context, self.num_zk) @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True) @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=False) @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None) - def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True): + @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL]) + def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'): assert converter != None, "converter type must be set" # Template parameters self.key_converter = converter self.value_converter = converter self.schemas = schemas + self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, + security_protocol=security_protocol, interbroker_security_protocol=security_protocol, + topics=self.topics) + + self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE]) + self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE]) + self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC, + consumer_timeout_ms=1000, new_consumer=True) + + + self.zk.start() + self.kafka.start() + self.source.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-source.properties")]) self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-sink.properties")]) diff --git a/tests/kafkatest/tests/connect/templates/connect-distributed.properties b/tests/kafkatest/tests/connect/templates/connect-distributed.properties index 7a7440a4d9072..48f5f789fae1b 100644 --- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties +++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties @@ -13,7 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -bootstrap.servers={{ kafka.bootstrap_servers() }} +bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_protocol) }} +{{ kafka.security_config.client_config().props() }} +{{ kafka.security_config.client_config().props("producer.") }} +{{ kafka.security_config.client_config().props("consumer.") }} group.id={{ group|default("connect-cluster") }} @@ -43,4 +46,4 @@ rest.advertised.host.name = {{ node.account.hostname }} # Reduce session timeouts so tests that kill workers don't need to wait as long to recover session.timeout.ms=10000 -consumer.session.timeout.ms=10000 \ No newline at end of file +consumer.session.timeout.ms=10000 diff --git a/tests/kafkatest/tests/connect/templates/connect-standalone.properties b/tests/kafkatest/tests/connect/templates/connect-standalone.properties index bf1daf7bcc079..09c648720c7f8 100644 --- a/tests/kafkatest/tests/connect/templates/connect-standalone.properties +++ b/tests/kafkatest/tests/connect/templates/connect-standalone.properties @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -bootstrap.servers={{ kafka.bootstrap_servers() }} +bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_protocol) }} +{{ kafka.security_config.client_config().props("producer.") }} +{{ kafka.security_config.client_config().props("consumer.") }} key.converter={{ key_converter|default("org.apache.kafka.connect.json.JsonConverter") }} value.converter={{ value_converter|default("org.apache.kafka.connect.json.JsonConverter") }}