diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
index 1cd46bfcc..a5862176e 100644
--- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
+++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java
@@ -521,7 +521,7 @@ private void closeResources() {
/**
* Creates a data stream. Will not recreate the data stream if it already exists.
*
- * @param dataStream the data stream to create given in the form {type}-{dataset}-{topic}
+ * @param dataStream the data stream to create given in the form {type}-{dataset}-{namespace}
* @return true if the data stream was created, false if it already exists
*/
private boolean createDataStream(String dataStream) {
diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java
index 51c7d4c85..42b5629d7 100644
--- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java
+++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java
@@ -332,29 +332,46 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final String KERBEROS_KEYTAB_PATH_DEFAULT = null;
// Data stream configs
+ public static final String DATA_STREAM_NAMESPACE_CONFIG = "data.stream.namespace";
+ private static final String DATA_STREAM_NAMESPACE_DOC =
+ "Generic name describing user-configurable arbitrary grouping to be written to a data "
+ + "stream. Can be any arbitrary string that is no longer than 100 characters, "
+ + " is in all lowercase, and does not contain spaces or any of these special characters "
+ + "``/\\*\"<>|,#:-``. Otherwise, no value indicates the connector will write to regular "
+ + "indices instead. If set, this configuration will be used alongside "
+ + "``data.stream.type`` and ``data.stream.dataset`` to construct the data stream name in"
+ + "the form of {``data.stream.type``}-{``data.stream.dataset``}-{``"
+ + DATA_STREAM_NAMESPACE_CONFIG + "``}. "
+ + "Defaut value is ``${topic}``, that is to say the topic name.";
+ private static final String DATA_STREAM_NAMESPACE_DISPLAY = "Data Stream Namespace";
+ private static final String DATA_STREAM_NAMESPACE_DEFAULT = "${topic}";
+
public static final String DATA_STREAM_DATASET_CONFIG = "data.stream.dataset";
private static final String DATA_STREAM_DATASET_DOC =
"Generic name describing data ingested and its structure to be written to a data stream. Can "
+ "be any arbitrary string that is no longer than 100 characters, is in all lowercase, "
+ "and does not contain spaces or any of these special characters ``/\\*\"<>|,#:-``. "
+ "Otherwise, no value indicates the connector will write to regular indices instead. "
- + "If set, this configuration will be used alongside ``data.stream.type`` to "
- + "construct the data stream name in the form of {``data.stream.type``"
- + "}-{``" + DATA_STREAM_DATASET_CONFIG + "``}-{topic}.";
+ + "If set, this configuration will be used alongside ``data.stream.type`` and "
+ + "``" + DATA_STREAM_NAMESPACE_CONFIG + "`` to construct the data stream name in "
+ + "the form of {``data.stream.type``}-{``" + DATA_STREAM_DATASET_CONFIG + "``}-{``"
+ + DATA_STREAM_NAMESPACE_CONFIG + "``}.";
private static final String DATA_STREAM_DATASET_DISPLAY = "Data Stream Dataset";
private static final String DATA_STREAM_DATASET_DEFAULT = "";
-
+
public static final String DATA_STREAM_TYPE_CONFIG = "data.stream.type";
private static final String DATA_STREAM_TYPE_DOC = String.format(
"Generic type describing the data to be written to data stream. "
+ "The default is %s which indicates the connector will write "
+ "to regular indices instead. If set, this configuration will "
- + "be used alongside %s to construct the data stream name in the form of "
- + "{``%s``}-{``%s``}-{topic}.",
+ + "be used alongside %s and %s to construct the data stream name in the form of "
+ + "{``%s``}-{``%s``}-{``%s``}.",
DataStreamType.NONE.name(),
DATA_STREAM_DATASET_CONFIG,
+ DATA_STREAM_NAMESPACE_CONFIG,
DATA_STREAM_TYPE_CONFIG,
- DATA_STREAM_DATASET_CONFIG
+ DATA_STREAM_DATASET_CONFIG,
+ DATA_STREAM_NAMESPACE_CONFIG
);
private static final String DATA_STREAM_TYPE_DISPLAY = "Data Stream Type";
private static final DataStreamType DATA_STREAM_TYPE_DEFAULT = DataStreamType.NONE;
@@ -820,6 +837,17 @@ private static void addDataStreamConfigs(ConfigDef configDef) {
int order = 0;
configDef
.define(
+ DATA_STREAM_NAMESPACE_CONFIG,
+ Type.STRING,
+ DATA_STREAM_NAMESPACE_DEFAULT,
+ new DataStreamNamespaceValidator(),
+ Importance.LOW,
+ DATA_STREAM_NAMESPACE_DOC,
+ DATA_STREAM_GROUP,
+ ++order,
+ Width.MEDIUM,
+ DATA_STREAM_NAMESPACE_DISPLAY
+ ).define(
DATA_STREAM_DATASET_CONFIG,
Type.STRING,
DATA_STREAM_DATASET_DEFAULT,
@@ -946,6 +974,10 @@ public String dataStreamDataset() {
return getString(DATA_STREAM_DATASET_CONFIG);
}
+ public String dataStreamNamespace() {
+ return getString(DATA_STREAM_NAMESPACE_CONFIG);
+ }
+
public DataStreamType dataStreamType() {
return DataStreamType.valueOf(getString(DATA_STREAM_TYPE_CONFIG).toUpperCase());
}
@@ -1078,6 +1110,45 @@ public WriteMethod writeMethod() {
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
}
+ private static class DataStreamNamespaceValidator implements Validator {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void ensureValid(String name, Object value) {
+ if (value == null) {
+ return;
+ }
+
+ String namespace = (String) value;
+
+ if (namespace.length() > 100) {
+ throw new ConfigException(
+ name, namespace, "The specified namespace must be no longer than 100 characters."
+ );
+ }
+
+ if (!namespace.equals(namespace.toLowerCase())) {
+ throw new ConfigException(
+ name, namespace, "The specified namespace must be in all lowercase."
+ );
+ }
+
+ if (namespace.matches(".*[\\\\\\/\\*\\?\\\"<>| ,#\\-:]+.*")) {
+ throw new ConfigException(
+ name, namespace,
+ "The specified namespace must not contain any spaces or "
+ + "invalid characters \\/*?\"<>|,#-:"
+ );
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "A valid namespace name that is all lowercase, less than 100 characters, and "
+ + "does not contain any spaces or invalid characters \\/*?\"<>|,#-:";
+ }
+ }
+
private static class DataStreamDatasetValidator implements Validator {
@Override
diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java
index 8ff18ac71..9e81e6eaf 100644
--- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java
+++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java
@@ -182,8 +182,9 @@ private String convertTopicToIndexName(String topic) {
}
/**
- * Returns the converted index name from a given topic name in the form {type}-{dataset}-{topic}.
- * For the topic
, Elasticsearch accepts:
+ * Returns the converted datastream name from a given topic name in the form:
+ * {type}-{dataset}-{namespace}
+ * For the namespace
(that can contain topic), Elasticsearch accepts:
*