diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index a894f3102c2cf..9a93a4e48a22f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -469,7 +469,7 @@ public void onCompletion(Throwable error, ConsumerRecord record) try { TargetState state = TargetState.valueOf((String) targetState); - log.trace("Setting target state for connector {} to {}", connectorName, targetState); + log.debug("Setting target state for connector {} to {}", connectorName, targetState); connectorTargetStates.put(connectorName, state); } catch (IllegalArgumentException e) { log.error("Invalid target state for connector ({}): {}", connectorName, targetState); diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index cf67c301b09e6..aad9ff3c009fa 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -88,12 +88,15 @@ def stop_node(self, node, clean_shutdown=True): node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False) - def restart(self): + def restart(self, clean_shutdown=True): # We don't want to do any clean up here, just restart the process. for node in self.nodes: self.logger.info("Restarting Kafka Connect on " + str(node.account)) - self.stop_node(node) - self.start_node(node) + self.restart_node(node, clean_shutdown) + + def restart_node(self, node, clean_shutdown=True): + self.stop_node(node, clean_shutdown) + self.start_node(node) def clean_node(self, node): node.account.kill_process("connect", clean_shutdown=False, allow_fail=True) @@ -128,6 +131,15 @@ def get_connector_tasks(self, name, node=None, retries=0, retry_backoff=.01): def delete_connector(self, name, node=None, retries=0, retry_backoff=.01): return self._rest_with_retry('/connectors/' + name, node=node, method="DELETE", retries=retries, retry_backoff=retry_backoff) + def get_connector_status(self, name, node=None): + return self._rest('/connectors/' + name + '/status', node=node) + + def pause_connector(self, name, node=None): + return self._rest('/connectors/' + name + '/pause', method="PUT") + + def resume_connector(self, name, node=None): + return self._rest('/connectors/' + name + '/resume', method="PUT") + def _rest(self, path, body=None, node=None, method="GET"): if node is None: node = random.choice(self.nodes) @@ -139,7 +151,7 @@ def _rest(self, path, body=None, node=None, method="GET"): self.logger.debug("%s %s response: %d", url, method, resp.status_code) if resp.status_code > 400: raise ConnectRestError(resp.status_code, resp.text, resp.url) - if resp.status_code == 204: + if resp.status_code == 204 or resp.status_code == 202: return None else: return resp.json() @@ -185,7 +197,7 @@ def start_node(self, node): self.logger.info("Starting Kafka Connect standalone process on " + str(node.account)) with node.account.monitor_log(self.LOG_FILE) as monitor: node.account.ssh(self.start_cmd(node, remote_connector_configs)) - monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account)) + monitor.wait_until('Kafka Connect started', timeout_sec=30, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account)) if len(self.pids(node)) == 0: raise RuntimeError("No process ids recorded") @@ -298,6 +310,12 @@ def __init__(self, cc, name="verifiable-sink", tasks=1, topics=["verifiable"]): self.tasks = tasks self.topics = topics + def flushed_messages(self): + return filter(lambda m: 'flushed' in m and m['flushed'], self.messages()) + + def received_messages(self): + return filter(lambda m: 'flushed' not in m or not m['flushed'], self.messages()) + def start(self): self.logger.info("Creating connector VerifiableSinkConnector %s", self.name) self.cc.create_connector({ diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 6ff7d0c7b553a..334069d99544e 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -227,7 +227,7 @@ def stop_node(self, node, clean_shutdown=True): for pid in pids: node.account.signal(pid, sig, allow_fail=False) - wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=20, err_msg="Kafka node failed to stop") + wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Kafka node failed to stop") def clean_node(self, node): JmxMixin.clean_node(self, node) diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 698a827b1712a..d3ae2e1690837 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -17,13 +17,14 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService -from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink +from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink, ConnectRestError from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.security.security_config import SecurityConfig from ducktape.utils.util import wait_until from ducktape.mark import matrix import subprocess, itertools, time from collections import Counter +import operator class ConnectDistributedTest(Test): """ @@ -73,6 +74,142 @@ def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT): self.zk.start() self.kafka.start() + def _start_connector(self, config_file): + connector_props = self.render(config_file) + connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')]) + self.cc.create_connector(connector_config) + + def _connector_status(self, connector, node=None): + try: + return self.cc.get_connector_status(connector, node) + except ConnectRestError: + return None + + def _has_state(self, status, state): + return status is not None and status['connector']['state'] == state + + def _all_tasks_have_state(self, status, task_count, state): + if status is None: + return False + + tasks = status['tasks'] + if len(tasks) != task_count: + return False + + return reduce(operator.and_, [task['state'] == state for task in tasks], True) + + def is_running(self, connector, node=None): + status = self._connector_status(connector.name, node) + return self._has_state(status, 'RUNNING') and self._all_tasks_have_state(status, connector.tasks, 'RUNNING') + + def is_paused(self, connector, node=None): + status = self._connector_status(connector.name, node) + return self._has_state(status, 'PAUSED') and self._all_tasks_have_state(status, connector.tasks, 'PAUSED') + + def test_pause_and_resume_source(self): + """ + Verify that source connectors stop producing records when paused and begin again after + being resumed. + """ + + self.setup_services() + self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) + self.cc.start() + + self.source = VerifiableSource(self.cc) + self.source.start() + + wait_until(lambda: self.is_running(self.source), timeout_sec=30, + err_msg="Failed to see connector transition to the RUNNING state") + + self.cc.pause_connector(self.source.name) + + # wait until all nodes report the paused transition + for node in self.cc.nodes: + wait_until(lambda: self.is_paused(self.source, node), timeout_sec=30, + err_msg="Failed to see connector transition to the PAUSED state") + + # verify that we do not produce new messages while paused + num_messages = len(self.source.messages()) + time.sleep(10) + assert num_messages == len(self.source.messages()), "Paused source connector should not produce any messages" + + self.cc.resume_connector(self.source.name) + + for node in self.cc.nodes: + wait_until(lambda: self.is_running(self.source, node), timeout_sec=30, + err_msg="Failed to see connector transition to the RUNNING state") + + # after resuming, we should see records produced again + wait_until(lambda: len(self.source.messages()) > num_messages, timeout_sec=30, + err_msg="Failed to produce messages after resuming source connector") + + def test_pause_and_resume_sink(self): + """ + Verify that sink connectors stop consuming records when paused and begin again after + being resumed. + """ + + self.setup_services() + self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) + self.cc.start() + + # use the verifiable source to produce a steady stream of messages + self.source = VerifiableSource(self.cc) + self.source.start() + + self.sink = VerifiableSink(self.cc) + self.sink.start() + + wait_until(lambda: self.is_running(self.sink), timeout_sec=30, + err_msg="Failed to see connector transition to the RUNNING state") + + self.cc.pause_connector(self.sink.name) + + # wait until all nodes report the paused transition + for node in self.cc.nodes: + wait_until(lambda: self.is_paused(self.sink, node), timeout_sec=30, + err_msg="Failed to see connector transition to the PAUSED state") + + # verify that we do not consume new messages while paused + num_messages = len(self.sink.received_messages()) + time.sleep(10) + assert num_messages == len(self.sink.received_messages()), "Paused sink connector should not consume any messages" + + self.cc.resume_connector(self.sink.name) + + for node in self.cc.nodes: + wait_until(lambda: self.is_running(self.sink, node), timeout_sec=30, + err_msg="Failed to see connector transition to the RUNNING state") + + # after resuming, we should see records consumed again + wait_until(lambda: len(self.sink.received_messages()) > num_messages, timeout_sec=30, + err_msg="Failed to consume messages after resuming source connector") + + + def test_pause_state_persistent(self): + """ + Verify that paused state is preserved after a cluster restart. + """ + + self.setup_services() + self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) + self.cc.start() + + self.source = VerifiableSource(self.cc) + self.source.start() + + wait_until(lambda: self.is_running(self.source), timeout_sec=30, + err_msg="Failed to see connector transition to the RUNNING state") + + self.cc.pause_connector(self.source.name) + + self.cc.restart() + + # we should still be paused after restarting + for node in self.cc.nodes: + wait_until(lambda: self.is_paused(self.source, node), timeout_sec=30, + err_msg="Failed to see connector startup in PAUSED state") @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL]) def test_file_source_and_sink(self, security_protocol): @@ -87,10 +224,9 @@ def test_file_source_and_sink(self, security_protocol): self.cc.start() self.logger.info("Creating connectors") - for connector_props in [self.render("connect-file-source.properties"), self.render("connect-file-sink.properties")]: - connector_config = dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')]) - self.cc.create_connector(connector_config) - + self._start_connector("connect-file-source.properties") + self._start_connector("connect-file-sink.properties") + # Generating data on the source node should generate new records and create new output on the sink node. Timeouts # here need to be more generous than they are for standalone mode because a) it takes longer to write configs, # do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile