diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index ee2085987228..43fc4d1e3ebf 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.tools.VerifiableSinkConnector; @@ -232,10 +233,14 @@ public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) { @Override public ConfigInfos validateConfigs(String connType, Map connectorConfig) { - ConfigDef connectorConfigDef = ConnectorConfig.configDef(); - List connectorConfigValues = connectorConfigDef.validate(connectorConfig); - Connector connector = getConnector(connType); + ConfigDef connectorConfigDef; + if (connector instanceof SourceConnector) { + connectorConfigDef = SourceConnectorConfig.configDef(); + } else { + connectorConfigDef = SinkConnectorConfig.configDef(); + } + List connectorConfigValues = connectorConfigDef.validate(connectorConfig); Config config = connector.validate(connectorConfig); ConfigDef configDef = connector.config(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index e4395523263f..0cbfe214caab 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -22,12 +22,12 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; -import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; - import java.util.HashMap; import java.util.Map; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; + /** *

* Configuration options for Connectors. These only include Kafka Connect system-level configuration @@ -40,7 +40,7 @@ *

*/ public class ConnectorConfig extends AbstractConfig { - private static final String COMMON_GROUP = "Common"; + protected static final String COMMON_GROUP = "Common"; public static final String NAME_CONFIG = "name"; private static final String NAME_DOC = "Globally unique name to use for this connector."; @@ -60,19 +60,13 @@ public class ConnectorConfig extends AbstractConfig { private static final String TASK_MAX_DISPLAY = "Tasks max"; - public static final String TOPICS_CONFIG = "topics"; - private static final String TOPICS_DOC = ""; - public static final String TOPICS_DEFAULT = ""; - private static final String TOPICS_DISPLAY = "Topics"; - - private static ConfigDef config; + protected static ConfigDef config; static { config = new ConfigDef() .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY) .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY) - .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY) - .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, Width.LONG, TOPICS_DISPLAY); + .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY); } public static ConfigDef configDef() { @@ -86,4 +80,8 @@ public ConnectorConfig() { public ConnectorConfig(Map props) { super(config, props); } + + public ConnectorConfig(ConfigDef subClassConfig, Map props) { + super(subClassConfig, props); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java new file mode 100644 index 000000000000..cbfc6d1a6405 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; + +import java.util.HashMap; +import java.util.Map; + +/** + * Configuration needed for all sink connectors + */ + +public class SinkConnectorConfig extends ConnectorConfig { + + public static final String TOPICS_CONFIG = "topics"; + private static final String TOPICS_DOC = ""; + public static final String TOPICS_DEFAULT = ""; + private static final String TOPICS_DISPLAY = "Topics"; + + static ConfigDef config = ConnectorConfig.configDef() + .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY); + + public SinkConnectorConfig() { + this(new HashMap()); + } + + public SinkConnectorConfig(Map props) { + super(config, props); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java new file mode 100644 index 000000000000..ca9219f5449f --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import java.util.Map; + +public class SourceConnectorConfig extends ConnectorConfig { + + public SourceConnectorConfig(Map props) { + super(config, props); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 037eba742816..a2beff3b4409 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -28,6 +28,8 @@ import org.apache.kafka.connect.runtime.AbstractHerder; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.HerderConnectorContext; +import org.apache.kafka.connect.runtime.SinkConnectorConfig; +import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.Worker; @@ -827,10 +829,15 @@ private void reconfigureConnector(final String connName, final Callback cb } Map configs = configState.connectorConfig(connName); - ConnectorConfig connConfig = new ConnectorConfig(configs); + + ConnectorConfig connConfig; List sinkTopics = null; - if (worker.isSinkConnector(connName)) - sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); + if (worker.isSinkConnector(connName)) { + connConfig = new SinkConnectorConfig(configs); + sinkTopics = connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG); + } else { + connConfig = new SourceConnectorConfig(configs); + } final List> taskProps = worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 2316baefb365..8dbda74b8e7a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -23,6 +23,8 @@ import org.apache.kafka.connect.runtime.AbstractHerder; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.HerderConnectorContext; +import org.apache.kafka.connect.runtime.SinkConnectorConfig; +import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.Worker; @@ -251,11 +253,20 @@ private String startConnector(Map connectorProps) { private List> recomputeTaskConfigs(String connName) { Map config = configState.connectorConfig(connName); - ConnectorConfig connConfig = new ConnectorConfig(config); - return worker.connectorTaskConfigs(connName, - connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), - connConfig.getList(ConnectorConfig.TOPICS_CONFIG)); + ConnectorConfig connConfig; + if (worker.isSinkConnector(connName)) { + connConfig = new SinkConnectorConfig(config); + return worker.connectorTaskConfigs(connName, + connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), + connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG)); + } else { + connConfig = new SourceConnectorConfig(config); + return worker.connectorTaskConfigs(connName, + connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), + null); + } + } private void createConnectorTasks(String connName, TargetState initialState) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 2004c993a2c6..ec4f0253c1e3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -97,7 +97,7 @@ public void testStartAndStopConnector() throws Exception { EasyMock.expect(connector.version()).andReturn("1.0"); Map props = new HashMap<>(); - props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); + props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); @@ -155,7 +155,7 @@ public void testAddConnectorByAlias() throws Exception { EasyMock.expect(connector.version()).andReturn("1.0"); Map props = new HashMap<>(); - props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); + props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector"); @@ -208,7 +208,7 @@ public void testAddConnectorByShortAlias() throws Exception { EasyMock.expect(connector.version()).andReturn("1.0"); Map props = new HashMap<>(); - props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); + props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest"); @@ -274,7 +274,7 @@ public void testReconfigureConnectorTasks() throws Exception { EasyMock.expect(connector.version()).andReturn("1.0"); Map props = new HashMap<>(); - props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); + props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index e62b66366cb2..fbccc55963e7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.Worker; @@ -96,18 +97,18 @@ public class DistributedHerderTest { static { CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1); CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString()); - CONN1_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); + CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName()); } private static final Map CONN1_CONFIG_UPDATED = new HashMap<>(CONN1_CONFIG); static { - CONN1_CONFIG_UPDATED.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar,baz"); + CONN1_CONFIG_UPDATED.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar,baz"); } private static final Map CONN2_CONFIG = new HashMap<>(); static { CONN2_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN2); CONN2_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString()); - CONN2_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); + CONN2_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar"); CONN2_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName()); } private static final Map TASK_CONFIG = new HashMap<>(); @@ -935,9 +936,6 @@ public Object answer() throws Throwable { herder.tick(); assertTrue(connectorConfigCb.isDone()); assertEquals(CONN1_CONFIG_UPDATED, connectorConfigCb.get()); - // The config passed to Worker should - assertEquals(Arrays.asList("foo", "bar", "baz"), - capturedUpdatedConfig.getValue().getList(ConnectorConfig.TOPICS_CONFIG)); PowerMock.verifyAll(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 10e51946cd11..e70b968d420c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -92,7 +92,7 @@ public void testCreateSourceConnector() throws Exception { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); PowerMock.verifyAll(); } @@ -109,8 +109,8 @@ public void testCreateConnectorAlreadyExists() throws Exception { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); PowerMock.verifyAll(); } @@ -122,7 +122,7 @@ public void testCreateSinkConnector() throws Exception { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSinkConnector.class), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSinkConnector.class, true), false, createCallback); PowerMock.verifyAll(); } @@ -139,7 +139,7 @@ public void testDestroyConnector() throws Exception { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); FutureCallback> futureCb = new FutureCallback<>(); herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb); futureCb.get(1000L, TimeUnit.MILLISECONDS); @@ -164,13 +164,13 @@ public void testRestartConnector() throws Exception { worker.stopConnector(CONNECTOR_NAME); EasyMock.expectLastCall(); - worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))), + worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false))), EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.expectLastCall(); PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); FutureCallback cb = new FutureCallback<>(); herder.restartConnector(CONNECTOR_NAME, cb); @@ -191,7 +191,7 @@ public void testRestartConnectorFailureOnStop() throws Exception { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); FutureCallback cb = new FutureCallback<>(); herder.restartConnector(CONNECTOR_NAME, cb); @@ -213,13 +213,13 @@ public void testRestartConnectorFailureOnStart() throws Exception { EasyMock.expectLastCall(); RuntimeException e = new RuntimeException(); - worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))), + worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false))), EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.expectLastCall().andThrow(e); PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); FutureCallback cb = new FutureCallback<>(); herder.restartConnector(CONNECTOR_NAME, cb); @@ -247,7 +247,7 @@ public void testRestartTask() throws Exception { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); FutureCallback cb = new FutureCallback<>(); herder.restartTask(taskId, cb); @@ -269,7 +269,7 @@ public void testRestartTaskFailureOnStop() throws Exception { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); FutureCallback cb = new FutureCallback<>(); herder.restartTask(taskId, cb); @@ -297,7 +297,7 @@ public void testRestartTaskFailureOnStart() throws Exception { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); FutureCallback cb = new FutureCallback<>(); herder.restartTask(taskId, cb); @@ -325,7 +325,7 @@ public void testCreateAndStop() throws Exception { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); herder.stop(); PowerMock.verifyAll(); @@ -333,7 +333,7 @@ public void testCreateAndStop() throws Exception { @Test public void testAccessors() throws Exception { - Map connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class); + Map connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false); Callback> listConnectorsCb = PowerMock.createMock(Callback.class); Callback connectorInfoCb = PowerMock.createMock(Callback.class); @@ -388,7 +388,7 @@ public void testAccessors() throws Exception { @Test public void testPutConnectorConfig() throws Exception { - Map connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class); + Map connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false); Map newConnConfig = new HashMap<>(connConfig); newConnConfig.put("foo", "bar"); @@ -410,8 +410,10 @@ public void testPutConnectorConfig() throws Exception { EasyMock.expectLastCall(); EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true); // Generate same task config, which should result in no additional action to restart tasks - EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST)) + EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, null)) .andReturn(Collections.singletonList(taskConfig(BogusSourceTask.class, false))); + worker.isSinkConnector(CONNECTOR_NAME); + EasyMock.expectLastCall().andReturn(false); ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0))); putConnectorConfigCb.onCompletion(null, new Herder.Created<>(false, newConnInfo)); EasyMock.expectLastCall(); @@ -448,10 +450,11 @@ private void expectAdd(String name, Class connClass, Class taskClass, boolean sink) throws Exception { - Map connectorProps = connectorConfig(name, connClass); + + Map connectorProps = connectorConfig(name, connClass, sink); worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class), - EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.expectLastCall(); EasyMock.expect(worker.isRunning(name)).andReturn(true); @@ -462,11 +465,15 @@ private void expectAdd(String name, // And we should instantiate the tasks. For a sink task, we should see added properties for // the input topic partitions Map generatedTaskProps = taskConfig(taskClass, sink); - EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST)) - .andReturn(Collections.singletonList(generatedTaskProps)); + + EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sink ? TOPICS_LIST : null)) + .andReturn(Collections.singletonList(generatedTaskProps)); worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps), herder, TargetState.STARTED); EasyMock.expectLastCall(); + + worker.isSinkConnector(CONNECTOR_NAME); + PowerMock.expectLastCall().andReturn(sink); } private void expectStop() { @@ -483,11 +490,13 @@ private void expectDestroy() { expectStop(); } - private static HashMap connectorConfig(String name, Class connClass) { + private static HashMap connectorConfig(String name, Class connClass, boolean sink) { HashMap connectorProps = new HashMap<>(); connectorProps.put(ConnectorConfig.NAME_CONFIG, name); - connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR); connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName()); + if (sink) { + connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR); + } return connectorProps; }