Skip to content

Commit

Permalink
KAFKA 3671: Move topics to SinkConnectorConfig
Browse files Browse the repository at this point in the history
Author: Liquan Pei <[email protected]>

Reviewers: Dan Norwood <[email protected]>, Ewen Cheslack-Postava <[email protected]>

Closes apache#1335 from Ishiihara/sink-connector-config
  • Loading branch information
Ishiihara authored and ewencp committed May 9, 2016
1 parent 62b9fa2 commit d1bb2b9
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.tools.VerifiableSinkConnector;
Expand Down Expand Up @@ -232,10 +233,14 @@ public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {

@Override
public ConfigInfos validateConfigs(String connType, Map<String, String> connectorConfig) {
ConfigDef connectorConfigDef = ConnectorConfig.configDef();
List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig);

Connector connector = getConnector(connType);
ConfigDef connectorConfigDef;
if (connector instanceof SourceConnector) {
connectorConfigDef = SourceConnectorConfig.configDef();
} else {
connectorConfigDef = SinkConnectorConfig.configDef();
}
List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig);

Config config = connector.validate(connectorConfig);
ConfigDef configDef = connector.config();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;


import java.util.HashMap;
import java.util.Map;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;

/**
* <p>
* Configuration options for Connectors. These only include Kafka Connect system-level configuration
Expand All @@ -40,7 +40,7 @@
* </p>
*/
public class ConnectorConfig extends AbstractConfig {
private static final String COMMON_GROUP = "Common";
protected static final String COMMON_GROUP = "Common";

public static final String NAME_CONFIG = "name";
private static final String NAME_DOC = "Globally unique name to use for this connector.";
Expand All @@ -60,19 +60,13 @@ public class ConnectorConfig extends AbstractConfig {

private static final String TASK_MAX_DISPLAY = "Tasks max";

public static final String TOPICS_CONFIG = "topics";
private static final String TOPICS_DOC = "";
public static final String TOPICS_DEFAULT = "";
private static final String TOPICS_DISPLAY = "Topics";

private static ConfigDef config;
protected static ConfigDef config;

static {
config = new ConfigDef()
.define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
.define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, Width.LONG, TOPICS_DISPLAY);
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY);
}

public static ConfigDef configDef() {
Expand All @@ -86,4 +80,8 @@ public ConnectorConfig() {
public ConnectorConfig(Map<String, String> props) {
super(config, props);
}

public ConnectorConfig(ConfigDef subClassConfig, Map<String, String> props) {
super(subClassConfig, props);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.connect.runtime;

import org.apache.kafka.common.config.ConfigDef;

import java.util.HashMap;
import java.util.Map;

/**
* Configuration needed for all sink connectors
*/

public class SinkConnectorConfig extends ConnectorConfig {

public static final String TOPICS_CONFIG = "topics";
private static final String TOPICS_DOC = "";
public static final String TOPICS_DEFAULT = "";
private static final String TOPICS_DISPLAY = "Topics";

static ConfigDef config = ConnectorConfig.configDef()
.define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY);

public SinkConnectorConfig() {
this(new HashMap<String, String>());
}

public SinkConnectorConfig(Map<String, String> props) {
super(config, props);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.connect.runtime;

import java.util.Map;

public class SourceConnectorConfig extends ConnectorConfig {

public SourceConnectorConfig(Map<String, String> props) {
super(config, props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
Expand Down Expand Up @@ -827,10 +829,15 @@ private void reconfigureConnector(final String connName, final Callback<Void> cb
}

Map<String, String> configs = configState.connectorConfig(connName);
ConnectorConfig connConfig = new ConnectorConfig(configs);

ConnectorConfig connConfig;
List<String> sinkTopics = null;
if (worker.isSinkConnector(connName))
sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
if (worker.isSinkConnector(connName)) {
connConfig = new SinkConnectorConfig(configs);
sinkTopics = connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG);
} else {
connConfig = new SourceConnectorConfig(configs);
}

final List<Map<String, String>> taskProps
= worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
Expand Down Expand Up @@ -251,11 +253,20 @@ private String startConnector(Map<String, String> connectorProps) {

private List<Map<String, String>> recomputeTaskConfigs(String connName) {
Map<String, String> config = configState.connectorConfig(connName);
ConnectorConfig connConfig = new ConnectorConfig(config);

return worker.connectorTaskConfigs(connName,
connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
connConfig.getList(ConnectorConfig.TOPICS_CONFIG));
ConnectorConfig connConfig;
if (worker.isSinkConnector(connName)) {
connConfig = new SinkConnectorConfig(config);
return worker.connectorTaskConfigs(connName,
connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG));
} else {
connConfig = new SourceConnectorConfig(config);
return worker.connectorTaskConfigs(connName,
connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
null);
}

}

private void createConnectorTasks(String connName, TargetState initialState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testStartAndStopConnector() throws Exception {
EasyMock.expect(connector.version()).andReturn("1.0");

Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
Expand Down Expand Up @@ -155,7 +155,7 @@ public void testAddConnectorByAlias() throws Exception {
EasyMock.expect(connector.version()).andReturn("1.0");

Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector");
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testAddConnectorByShortAlias() throws Exception {
EasyMock.expect(connector.version()).andReturn("1.0");

Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest");
Expand Down Expand Up @@ -274,7 +274,7 @@ public void testReconfigureConnectorTasks() throws Exception {
EasyMock.expect(connector.version()).andReturn("1.0");

Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
Expand Down Expand Up @@ -96,18 +97,18 @@ public class DistributedHerderTest {
static {
CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1);
CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
CONN1_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
CONN1_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
}
private static final Map<String, String> CONN1_CONFIG_UPDATED = new HashMap<>(CONN1_CONFIG);
static {
CONN1_CONFIG_UPDATED.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar,baz");
CONN1_CONFIG_UPDATED.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar,baz");
}
private static final Map<String, String> CONN2_CONFIG = new HashMap<>();
static {
CONN2_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN2);
CONN2_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString());
CONN2_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
CONN2_CONFIG.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
CONN2_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName());
}
private static final Map<String, String> TASK_CONFIG = new HashMap<>();
Expand Down Expand Up @@ -935,9 +936,6 @@ public Object answer() throws Throwable {
herder.tick();
assertTrue(connectorConfigCb.isDone());
assertEquals(CONN1_CONFIG_UPDATED, connectorConfigCb.get());
// The config passed to Worker should
assertEquals(Arrays.asList("foo", "bar", "baz"),
capturedUpdatedConfig.getValue().getList(ConnectorConfig.TOPICS_CONFIG));
PowerMock.verifyAll();
}

Expand Down
Loading

0 comments on commit d1bb2b9

Please sign in to comment.