Skip to content

Commit

Permalink
KAFKA-3464: Add system tests for Connect with Kafka security enabled
Browse files Browse the repository at this point in the history
Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Ismael Juma, Gwen Shapira

Closes apache#1141 from ewencp/kafka-3464-connect-security-system-tests
  • Loading branch information
ewencp authored and gwenshap committed Apr 5, 2016
1 parent 31e263e commit c3c9289
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 29 deletions.
6 changes: 6 additions & 0 deletions tests/kafkatest/services/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ConnectServiceBase(Service):
def __init__(self, context, num_nodes, kafka, files):
super(ConnectServiceBase, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
self.files = files

def pids(self, node):
Expand Down Expand Up @@ -89,6 +90,7 @@ def restart(self):

def clean_node(self, node):
node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
self.security_config.clean_node(node)
node.account.ssh("rm -rf " + " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files), allow_fail=False)

def config_filenames(self):
Expand Down Expand Up @@ -153,6 +155,7 @@ def node(self):

def start_cmd(self, node, connector_configs):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
cmd += "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
cmd += " ".join(connector_configs)
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
Expand All @@ -161,6 +164,7 @@ def start_cmd(self, node, connector_configs):
def start_node(self, node):
node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)

self.security_config.setup_node(node)
node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
remote_connector_configs = []
Expand Down Expand Up @@ -190,13 +194,15 @@ def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offs

def start_cmd(self, node):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
return cmd

def start_node(self, node):
node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)

self.security_config.setup_node(node)
node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
if self.connector_config_templates:
Expand Down
1 change: 0 additions & 1 deletion tests/kafkatest/services/mirror_maker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
Expand Down
24 changes: 15 additions & 9 deletions tests/kafkatest/services/security/security_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import subprocess
from ducktape.template import TemplateRenderer
from kafkatest.services.security.minikdc import MiniKdc
import itertools

class Keytool(object):

Expand Down Expand Up @@ -172,17 +173,22 @@ def kafka_opts(self):
else:
return ""

def __str__(self):
def props(self, prefix=''):
"""
Return properties as string with line separators.
Return properties as string with line separators, optionally with a prefix.
This is used to append security config properties to
a properties file.
:param prefix: prefix to add to each property
:return: a string containing line-separated properties
"""
if self.security_protocol == SecurityConfig.PLAINTEXT:
return ""
config_lines = (prefix + key + "=" + value for key, value in self.properties.iteritems())
# Extra blank lines ensure this can be appended/prepended safely
return "\n".join(itertools.chain([""], config_lines, [""]))

prop_str = ""
if self.security_protocol != SecurityConfig.PLAINTEXT:
for key, value in self.properties.items():
prop_str += ("\n" + key + "=" + value)
prop_str += "\n"
return prop_str

def __str__(self):
"""
Return properties as a string with line separators.
"""
return self.props()
36 changes: 29 additions & 7 deletions tests/kafkatest/tests/connect/connect_distributed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from kafkatest.tests.kafka_test import KafkaTest
from ducktape.tests.test import Test

from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
from ducktape.utils.util import wait_until
from ducktape.mark import matrix
import subprocess, itertools, time
from collections import Counter

class ConnectDistributedTest(KafkaTest):
class ConnectDistributedTest(Test):
"""
Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on
another, validating the total output is identical to the input.
Expand All @@ -45,22 +49,39 @@ class ConnectDistributedTest(KafkaTest):
SCHEMA = { "type": "string", "optional": False }

def __init__(self, test_context):
super(ConnectDistributedTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
super(ConnectDistributedTest, self).__init__(test_context)
self.num_zk = 1
self.num_brokers = 1
self.topics = {
'test' : { 'partitions': 1, 'replication-factor': 1 }
})
}

self.zk = ZookeeperService(test_context, self.num_zk)

self.cc = ConnectDistributedService(test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
self.cc.log_level = "DEBUG"
self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
self.schemas = True

def test_file_source_and_sink(self):
def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT):
self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
topics=self.topics)

self.cc = ConnectDistributedService(self.test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
self.cc.log_level = "DEBUG"

self.zk.start()
self.kafka.start()


@matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
def test_file_source_and_sink(self, security_protocol):
"""
Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes.
"""

self.setup_services(security_protocol=security_protocol)
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))

self.cc.start()
Expand Down Expand Up @@ -94,6 +115,7 @@ def test_bounce(self, clean):
"""
num_tasks = 3

self.setup_services()
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
self.cc.start()

Expand Down
37 changes: 28 additions & 9 deletions tests/kafkatest/tests/connect/connect_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from kafkatest.tests.kafka_test import KafkaTest
from ducktape.tests.test import Test

from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.connect import ConnectStandaloneService
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from ducktape.mark import parametrize, matrix
import hashlib, subprocess, json

class ConnectStandaloneFileTest(KafkaTest):
class ConnectStandaloneFileTest(Test):
"""
Simple test of Kafka Connect that produces data from a file in one
standalone process and consumes it on another, validating the output is
Expand All @@ -42,24 +46,39 @@ class ConnectStandaloneFileTest(KafkaTest):
SCHEMA = { "type": "string", "optional": False }

def __init__(self, test_context):
super(ConnectStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
super(ConnectStandaloneFileTest, self).__init__(test_context)
self.num_zk = 1
self.num_brokers = 1
self.topics = {
'test' : { 'partitions': 1, 'replication-factor': 1 }
})
}

self.source = ConnectStandaloneService(test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
self.sink = ConnectStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
self.consumer_validator = ConsoleConsumer(test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=1000)
self.zk = ZookeeperService(test_context, self.num_zk)

@parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=True)
@parametrize(converter="org.apache.kafka.connect.json.JsonConverter", schemas=False)
@parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None)
def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True):
@matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'):
assert converter != None, "converter type must be set"
# Template parameters
self.key_converter = converter
self.value_converter = converter
self.schemas = schemas

self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
topics=self.topics)

self.source = ConnectStandaloneService(self.test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE])
self.sink = ConnectStandaloneService(self.test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE])
self.consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.TOPIC,
consumer_timeout_ms=1000, new_consumer=True)


self.zk.start()
self.kafka.start()

self.source.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-source.properties")])
self.sink.set_configs(lambda node: self.render("connect-standalone.properties", node=node), [self.render("connect-file-sink.properties")])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

bootstrap.servers={{ kafka.bootstrap_servers() }}
bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_protocol) }}
{{ kafka.security_config.client_config().props() }}
{{ kafka.security_config.client_config().props("producer.") }}
{{ kafka.security_config.client_config().props("consumer.") }}

group.id={{ group|default("connect-cluster") }}

Expand Down Expand Up @@ -43,4 +46,4 @@ rest.advertised.host.name = {{ node.account.hostname }}

# Reduce session timeouts so tests that kill workers don't need to wait as long to recover
session.timeout.ms=10000
consumer.session.timeout.ms=10000
consumer.session.timeout.ms=10000
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

bootstrap.servers={{ kafka.bootstrap_servers() }}
bootstrap.servers={{ kafka.bootstrap_servers(kafka.security_config.security_protocol) }}
{{ kafka.security_config.client_config().props("producer.") }}
{{ kafka.security_config.client_config().props("consumer.") }}

key.converter={{ key_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
value.converter={{ value_converter|default("org.apache.kafka.connect.json.JsonConverter") }}
Expand Down

0 comments on commit c3c9289

Please sign in to comment.