From c5eea3f384e65491eb6485bea68de96093d22f99 Mon Sep 17 00:00:00 2001 From: fbaligand Date: Mon, 1 Jan 2024 17:31:11 +0100 Subject: [PATCH] add 'data.stream.namespace' configuration property --- .../elasticsearch/ElasticsearchClient.java | 2 +- .../ElasticsearchSinkConnectorConfig.java | 85 +++++++++++++++++-- .../elasticsearch/ElasticsearchSinkTask.java | 16 ++-- .../ElasticsearchSinkConnectorConfigTest.java | 24 ++++++ .../ElasticsearchSinkTaskTest.java | 19 +++++ 5 files changed, 131 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java index 1cd46bfcc..a5862176e 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchClient.java @@ -521,7 +521,7 @@ private void closeResources() { /** * Creates a data stream. Will not recreate the data stream if it already exists. * - * @param dataStream the data stream to create given in the form {type}-{dataset}-{topic} + * @param dataStream the data stream to create given in the form {type}-{dataset}-{namespace} * @return true if the data stream was created, false if it already exists */ private boolean createDataStream(String dataStream) { diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java index 51c7d4c85..42b5629d7 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java @@ -332,29 +332,46 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig { private static final String KERBEROS_KEYTAB_PATH_DEFAULT = null; // Data stream configs + public static final String DATA_STREAM_NAMESPACE_CONFIG = "data.stream.namespace"; + private static final String DATA_STREAM_NAMESPACE_DOC = + "Generic name describing user-configurable arbitrary grouping to be written to a data " + + "stream. Can be any arbitrary string that is no longer than 100 characters, " + + " is in all lowercase, and does not contain spaces or any of these special characters " + + "``/\\*\"<>|,#:-``. Otherwise, no value indicates the connector will write to regular " + + "indices instead. If set, this configuration will be used alongside " + + "``data.stream.type`` and ``data.stream.dataset`` to construct the data stream name in" + + "the form of {``data.stream.type``}-{``data.stream.dataset``}-{``" + + DATA_STREAM_NAMESPACE_CONFIG + "``}. " + + "Defaut value is ``${topic}``, that is to say the topic name."; + private static final String DATA_STREAM_NAMESPACE_DISPLAY = "Data Stream Namespace"; + private static final String DATA_STREAM_NAMESPACE_DEFAULT = "${topic}"; + public static final String DATA_STREAM_DATASET_CONFIG = "data.stream.dataset"; private static final String DATA_STREAM_DATASET_DOC = "Generic name describing data ingested and its structure to be written to a data stream. Can " + "be any arbitrary string that is no longer than 100 characters, is in all lowercase, " + "and does not contain spaces or any of these special characters ``/\\*\"<>|,#:-``. " + "Otherwise, no value indicates the connector will write to regular indices instead. " - + "If set, this configuration will be used alongside ``data.stream.type`` to " - + "construct the data stream name in the form of {``data.stream.type``" - + "}-{``" + DATA_STREAM_DATASET_CONFIG + "``}-{topic}."; + + "If set, this configuration will be used alongside ``data.stream.type`` and " + + "``" + DATA_STREAM_NAMESPACE_CONFIG + "`` to construct the data stream name in " + + "the form of {``data.stream.type``}-{``" + DATA_STREAM_DATASET_CONFIG + "``}-{``" + + DATA_STREAM_NAMESPACE_CONFIG + "``}."; private static final String DATA_STREAM_DATASET_DISPLAY = "Data Stream Dataset"; private static final String DATA_STREAM_DATASET_DEFAULT = ""; - + public static final String DATA_STREAM_TYPE_CONFIG = "data.stream.type"; private static final String DATA_STREAM_TYPE_DOC = String.format( "Generic type describing the data to be written to data stream. " + "The default is %s which indicates the connector will write " + "to regular indices instead. If set, this configuration will " - + "be used alongside %s to construct the data stream name in the form of " - + "{``%s``}-{``%s``}-{topic}.", + + "be used alongside %s and %s to construct the data stream name in the form of " + + "{``%s``}-{``%s``}-{``%s``}.", DataStreamType.NONE.name(), DATA_STREAM_DATASET_CONFIG, + DATA_STREAM_NAMESPACE_CONFIG, DATA_STREAM_TYPE_CONFIG, - DATA_STREAM_DATASET_CONFIG + DATA_STREAM_DATASET_CONFIG, + DATA_STREAM_NAMESPACE_CONFIG ); private static final String DATA_STREAM_TYPE_DISPLAY = "Data Stream Type"; private static final DataStreamType DATA_STREAM_TYPE_DEFAULT = DataStreamType.NONE; @@ -820,6 +837,17 @@ private static void addDataStreamConfigs(ConfigDef configDef) { int order = 0; configDef .define( + DATA_STREAM_NAMESPACE_CONFIG, + Type.STRING, + DATA_STREAM_NAMESPACE_DEFAULT, + new DataStreamNamespaceValidator(), + Importance.LOW, + DATA_STREAM_NAMESPACE_DOC, + DATA_STREAM_GROUP, + ++order, + Width.MEDIUM, + DATA_STREAM_NAMESPACE_DISPLAY + ).define( DATA_STREAM_DATASET_CONFIG, Type.STRING, DATA_STREAM_DATASET_DEFAULT, @@ -946,6 +974,10 @@ public String dataStreamDataset() { return getString(DATA_STREAM_DATASET_CONFIG); } + public String dataStreamNamespace() { + return getString(DATA_STREAM_NAMESPACE_CONFIG); + } + public DataStreamType dataStreamType() { return DataStreamType.valueOf(getString(DATA_STREAM_TYPE_CONFIG).toUpperCase()); } @@ -1078,6 +1110,45 @@ public WriteMethod writeMethod() { return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase()); } + private static class DataStreamNamespaceValidator implements Validator { + + @Override + @SuppressWarnings("unchecked") + public void ensureValid(String name, Object value) { + if (value == null) { + return; + } + + String namespace = (String) value; + + if (namespace.length() > 100) { + throw new ConfigException( + name, namespace, "The specified namespace must be no longer than 100 characters." + ); + } + + if (!namespace.equals(namespace.toLowerCase())) { + throw new ConfigException( + name, namespace, "The specified namespace must be in all lowercase." + ); + } + + if (namespace.matches(".*[\\\\\\/\\*\\?\\\"<>| ,#\\-:]+.*")) { + throw new ConfigException( + name, namespace, + "The specified namespace must not contain any spaces or " + + "invalid characters \\/*?\"<>|,#-:" + ); + } + } + + @Override + public String toString() { + return "A valid namespace name that is all lowercase, less than 100 characters, and " + + "does not contain any spaces or invalid characters \\/*?\"<>|,#-:"; + } + } + private static class DataStreamDatasetValidator implements Validator { @Override diff --git a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java index 8ff18ac71..9e81e6eaf 100644 --- a/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java +++ b/src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java @@ -182,8 +182,9 @@ private String convertTopicToIndexName(String topic) { } /** - * Returns the converted index name from a given topic name in the form {type}-{dataset}-{topic}. - * For the topic, Elasticsearch accepts: + * Returns the converted datastream name from a given topic name in the form: + * {type}-{dataset}-{namespace} + * For the namespace (that can contain topic), Elasticsearch accepts: *