Skip to content

Commit

Permalink
KAFKA-3676: system tests for connector pause/resume
Browse files Browse the repository at this point in the history
Author: Jason Gustafson <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>

Closes #1345 from hachikuji/KAFKA-3676
  • Loading branch information
Jason Gustafson authored and ewencp committed May 9, 2016
1 parent eb1de10 commit f96da63
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record)

try {
TargetState state = TargetState.valueOf((String) targetState);
log.trace("Setting target state for connector {} to {}", connectorName, targetState);
log.debug("Setting target state for connector {} to {}", connectorName, targetState);
connectorTargetStates.put(connectorName, state);
} catch (IllegalArgumentException e) {
log.error("Invalid target state for connector ({}): {}", connectorName, targetState);
Expand Down
28 changes: 23 additions & 5 deletions tests/kafkatest/services/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,15 @@ def stop_node(self, node, clean_shutdown=True):

node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)

def restart(self):
def restart(self, clean_shutdown=True):
# We don't want to do any clean up here, just restart the process.
for node in self.nodes:
self.logger.info("Restarting Kafka Connect on " + str(node.account))
self.stop_node(node)
self.start_node(node)
self.restart_node(node, clean_shutdown)

def restart_node(self, node, clean_shutdown=True):
self.stop_node(node, clean_shutdown)
self.start_node(node)

def clean_node(self, node):
node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
Expand Down Expand Up @@ -128,6 +131,15 @@ def get_connector_tasks(self, name, node=None, retries=0, retry_backoff=.01):
def delete_connector(self, name, node=None, retries=0, retry_backoff=.01):
return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", retries=retries, retry_backoff=retry_backoff)

def get_connector_status(self, name, node=None):
return self._rest('/connectors/' + name + '/status', node=node)

def pause_connector(self, name, node=None):
return self._rest('/connectors/' + name + '/pause', method="PUT")

def resume_connector(self, name, node=None):
return self._rest('/connectors/' + name + '/resume', method="PUT")

def _rest(self, path, body=None, node=None, method="GET"):
if node is None:
node = random.choice(self.nodes)
Expand All @@ -139,7 +151,7 @@ def _rest(self, path, body=None, node=None, method="GET"):
self.logger.debug("%s %s response: %d", url, method, resp.status_code)
if resp.status_code > 400:
raise ConnectRestError(resp.status_code, resp.text, resp.url)
if resp.status_code == 204:
if resp.status_code == 204 or resp.status_code == 202:
return None
else:
return resp.json()
Expand Down Expand Up @@ -185,7 +197,7 @@ def start_node(self, node):
self.logger.info("Starting Kafka Connect standalone process on " + str(node.account))
with node.account.monitor_log(self.LOG_FILE) as monitor:
node.account.ssh(self.start_cmd(node, remote_connector_configs))
monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
monitor.wait_until('Kafka Connect started', timeout_sec=30, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))

if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")
Expand Down Expand Up @@ -298,6 +310,12 @@ def __init__(self, cc, name="verifiable-sink", tasks=1, topics=["verifiable"]):
self.tasks = tasks
self.topics = topics

def flushed_messages(self):
return filter(lambda m: 'flushed' in m and m['flushed'], self.messages())

def received_messages(self):
return filter(lambda m: 'flushed' not in m or not m['flushed'], self.messages())

def start(self):
self.logger.info("Creating connector VerifiableSinkConnector %s", self.name)
self.cc.create_connector({
Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def stop_node(self, node, clean_shutdown=True):

for pid in pids:
node.account.signal(pid, sig, allow_fail=False)
wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=20, err_msg="Kafka node failed to stop")
wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Kafka node failed to stop")

def clean_node(self, node):
JmxMixin.clean_node(self, node)
Expand Down
146 changes: 141 additions & 5 deletions tests/kafkatest/tests/connect/connect_distributed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink
from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
from ducktape.utils.util import wait_until
from ducktape.mark import matrix
import subprocess, itertools, time
from collections import Counter
import operator

class ConnectDistributedTest(Test):
"""
Expand Down Expand Up @@ -73,6 +74,142 @@ def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT):
self.zk.start()
self.kafka.start()

def _start_connector(self, config_file):
connector_props = self.render(config_file)
connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
self.cc.create_connector(connector_config)

def _connector_status(self, connector, node=None):
try:
return self.cc.get_connector_status(connector, node)
except ConnectRestError:
return None

def _has_state(self, status, state):
return status is not None and status['connector']['state'] == state

def _all_tasks_have_state(self, status, task_count, state):
if status is None:
return False

tasks = status['tasks']
if len(tasks) != task_count:
return False

return reduce(operator.and_, [task['state'] == state for task in tasks], True)

def is_running(self, connector, node=None):
status = self._connector_status(connector.name, node)
return self._has_state(status, 'RUNNING') and self._all_tasks_have_state(status, connector.tasks, 'RUNNING')

def is_paused(self, connector, node=None):
status = self._connector_status(connector.name, node)
return self._has_state(status, 'PAUSED') and self._all_tasks_have_state(status, connector.tasks, 'PAUSED')

def test_pause_and_resume_source(self):
"""
Verify that source connectors stop producing records when paused and begin again after
being resumed.
"""

self.setup_services()
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
self.cc.start()

self.source = VerifiableSource(self.cc)
self.source.start()

wait_until(lambda: self.is_running(self.source), timeout_sec=30,
err_msg="Failed to see connector transition to the RUNNING state")

self.cc.pause_connector(self.source.name)

# wait until all nodes report the paused transition
for node in self.cc.nodes:
wait_until(lambda: self.is_paused(self.source, node), timeout_sec=30,
err_msg="Failed to see connector transition to the PAUSED state")

# verify that we do not produce new messages while paused
num_messages = len(self.source.messages())
time.sleep(10)
assert num_messages == len(self.source.messages()), "Paused source connector should not produce any messages"

self.cc.resume_connector(self.source.name)

for node in self.cc.nodes:
wait_until(lambda: self.is_running(self.source, node), timeout_sec=30,
err_msg="Failed to see connector transition to the RUNNING state")

# after resuming, we should see records produced again
wait_until(lambda: len(self.source.messages()) > num_messages, timeout_sec=30,
err_msg="Failed to produce messages after resuming source connector")

def test_pause_and_resume_sink(self):
"""
Verify that sink connectors stop consuming records when paused and begin again after
being resumed.
"""

self.setup_services()
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
self.cc.start()

# use the verifiable source to produce a steady stream of messages
self.source = VerifiableSource(self.cc)
self.source.start()

self.sink = VerifiableSink(self.cc)
self.sink.start()

wait_until(lambda: self.is_running(self.sink), timeout_sec=30,
err_msg="Failed to see connector transition to the RUNNING state")

self.cc.pause_connector(self.sink.name)

# wait until all nodes report the paused transition
for node in self.cc.nodes:
wait_until(lambda: self.is_paused(self.sink, node), timeout_sec=30,
err_msg="Failed to see connector transition to the PAUSED state")

# verify that we do not consume new messages while paused
num_messages = len(self.sink.received_messages())
time.sleep(10)
assert num_messages == len(self.sink.received_messages()), "Paused sink connector should not consume any messages"

self.cc.resume_connector(self.sink.name)

for node in self.cc.nodes:
wait_until(lambda: self.is_running(self.sink, node), timeout_sec=30,
err_msg="Failed to see connector transition to the RUNNING state")

# after resuming, we should see records consumed again
wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30,
err_msg="Failed to consume messages after resuming source connector")


def test_pause_state_persistent(self):
"""
Verify that paused state is preserved after a cluster restart.
"""

self.setup_services()
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
self.cc.start()

self.source = VerifiableSource(self.cc)
self.source.start()

wait_until(lambda: self.is_running(self.source), timeout_sec=30,
err_msg="Failed to see connector transition to the RUNNING state")

self.cc.pause_connector(self.source.name)

self.cc.restart()

# we should still be paused after restarting
for node in self.cc.nodes:
wait_until(lambda: self.is_paused(self.source, node), timeout_sec=30,
err_msg="Failed to see connector startup in PAUSED state")

@matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
def test_file_source_and_sink(self, security_protocol):
Expand All @@ -87,10 +224,9 @@ def test_file_source_and_sink(self, security_protocol):
self.cc.start()

self.logger.info("Creating connectors")
for connector_props in [self.render("connect-file-source.properties"), self.render("connect-file-sink.properties")]:
connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
self.cc.create_connector(connector_config)

self._start_connector("connect-file-source.properties")
self._start_connector("connect-file-sink.properties")

# Generating data on the source node should generate new records and create new output on the sink node. Timeouts
# here need to be more generous than they are for standalone mode because a) it takes longer to write configs,
# do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile
Expand Down

0 comments on commit f96da63

Please sign in to comment.