Skip to content

Commit

Permalink
KAFKA-3634; Upgrade tests for SASL authentication
Browse files Browse the repository at this point in the history
Add a test for changing SASL mechanism using rolling upgrade and a test for rolling upgrade from 0.9.0.x to 0.10.0 with SASL/GSSAPI.

Author: Rajini Sivaram <[email protected]>

Reviewers: Ben Stopford <[email protected]>, Geoff Anderson <[email protected]>, Ismael Juma <[email protected]>

Closes #1290 from rajinisivaram/KAFKA-3634

(cherry picked from commit 87285f3)
Signed-off-by: Ismael Juma <[email protected]>
  • Loading branch information
rajinisivaram authored and ijuma committed May 9, 2016
1 parent 57ae46f commit f4ed61c
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
63 changes: 63 additions & 0 deletions tests/kafkatest/tests/core/security_rolling_upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.utils import is_int
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from ducktape.mark import parametrize
from ducktape.mark import matrix
from kafkatest.services.security.kafka_acls import ACLs
import time
Expand Down Expand Up @@ -74,6 +75,9 @@ def roll_in_secured_settings(self, client_protocol, broker_protocol):

# Roll cluster to disable PLAINTEXT port
self.kafka.close_port('PLAINTEXT')
self.set_authorizer_and_bounce(client_protocol, broker_protocol)

def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
self.kafka.authorizer_class_name = KafkaService.SIMPLE_AUTHORIZER
self.acls.set_acls(client_protocol, self.kafka, self.zk, self.topic, self.group)
self.acls.set_acls(broker_protocol, self.kafka, self.zk, self.topic, self.group)
Expand All @@ -85,6 +89,19 @@ def open_secured_port(self, client_protocol):
self.kafka.start_minikdc()
self.bounce()

def add_sasl_mechanism(self, new_client_sasl_mechanism):
self.kafka.client_sasl_mechanism = new_client_sasl_mechanism
self.kafka.start_minikdc()
self.bounce()

def roll_in_sasl_mechanism(self, security_protocol, new_sasl_mechanism):
# Roll cluster to update inter-broker SASL mechanism. This disables the old mechanism.
self.kafka.interbroker_sasl_mechanism = new_sasl_mechanism
self.bounce()

# Bounce again with ACLs for new mechanism
self.set_authorizer_and_bounce(security_protocol, security_protocol)

@matrix(client_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
def test_rolling_upgrade_phase_one(self, client_protocol):
"""
Expand Down Expand Up @@ -125,3 +142,49 @@ def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):

#Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume throughout
self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol)

@parametrize(new_client_sasl_mechanism='PLAIN')
def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism):
"""
Start with a SASL/GSSAPI cluster, add new SASL mechanism, via a rolling upgrade, ensuring we could produce
and consume throughout over SASL/GSSAPI. Finally check we can produce and consume using new mechanism.
"""
self.kafka.interbroker_security_protocol = "SASL_SSL"
self.kafka.security_protocol = "SASL_SSL"
self.kafka.client_sasl_mechanism = "GSSAPI"
self.kafka.interbroker_sasl_mechanism = "GSSAPI"
self.kafka.start()

# Create SASL/GSSAPI producer and consumer
self.create_producer_and_consumer()

# Rolling upgrade, adding new SASL mechanism, ensuring the GSSAPI producer/consumer continues to run
self.run_produce_consume_validate(self.add_sasl_mechanism, new_client_sasl_mechanism)

# Now we can produce and consume using the new SASL mechanism
self.kafka.client_sasl_mechanism = new_client_sasl_mechanism
self.create_producer_and_consumer()
self.run_produce_consume_validate(lambda: time.sleep(1))

@parametrize(new_sasl_mechanism='PLAIN')
def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism):
"""
Start with a SASL cluster with GSSAPI for inter-broker and a second mechanism for clients (i.e. result of phase one).
Start Producer and Consumer using the second mechanism
Incrementally upgrade to set inter-broker to the second mechanism and disable GSSAPI
Incrementally upgrade again to add ACLs
Ensure the producer and consumer run throughout
"""
#Start with a broker that has GSSAPI for inter-broker and a second mechanism for clients
self.kafka.security_protocol = "SASL_SSL"
self.kafka.interbroker_security_protocol = "SASL_SSL"
self.kafka.client_sasl_mechanism = new_sasl_mechanism
self.kafka.interbroker_sasl_mechanism = "GSSAPI"
self.kafka.start()

#Create Producer and Consumer using second mechanism
self.create_producer_and_consumer()

#Roll in the second SASL mechanism for inter-broker, disabling first mechanism. Ensure we can produce and consume throughout
self.run_produce_consume_validate(self.roll_in_sasl_mechanism, self.kafka.security_protocol, new_sasl_mechanism)

6 changes: 5 additions & 1 deletion tests/kafkatest/tests/core/upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def perform_upgrade(self, from_kafka_version, to_message_format_version=None):


@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=True, security_protocol="SASL_SSL")
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"], new_consumer=True)
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"], new_consumer=True)
Expand All @@ -70,7 +71,8 @@ def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
@parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"], new_consumer=True)
@parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"])
def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types, new_consumer=False):
def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types,
new_consumer=False, security_protocol="PLAINTEXT"):
"""Test upgrade of Kafka broker cluster from 0.8.2 or 0.9.0 to 0.10
from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X or 0.9
Expand All @@ -93,6 +95,8 @@ def test_upgrade(self, from_kafka_version, to_message_format_version, compressio
version=KafkaVersion(from_kafka_version),
topics={self.topic: {"partitions": 3, "replication-factor": 3,
'configs': {"min.insync.replicas": 2}}})
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
self.kafka.start()

self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
Expand Down

0 comments on commit f4ed61c

Please sign in to comment.