Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'data.stream.namespace' configuration property #802

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ private void closeResources() {
/**
* Creates a data stream. Will not recreate the data stream if it already exists.
*
* @param dataStream the data stream to create given in the form {type}-{dataset}-{topic}
* @param dataStream the data stream to create given in the form {type}-{dataset}-{namespace}
* @return true if the data stream was created, false if it already exists
*/
private boolean createDataStream(String dataStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,29 +332,46 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final String KERBEROS_KEYTAB_PATH_DEFAULT = null;

// Data stream configs
public static final String DATA_STREAM_NAMESPACE_CONFIG = "data.stream.namespace";
private static final String DATA_STREAM_NAMESPACE_DOC =
"Generic name describing user-configurable arbitrary grouping to be written to a data "
+ "stream. Can be any arbitrary string that is no longer than 100 characters, "
+ " is in all lowercase, and does not contain spaces or any of these special characters "
+ "``/\\*\"<>|,#:-``. Otherwise, no value indicates the connector will write to regular "
+ "indices instead. If set, this configuration will be used alongside "
+ "``data.stream.type`` and ``data.stream.dataset`` to construct the data stream name in"
+ "the form of {``data.stream.type``}-{``data.stream.dataset``}-{``"
+ DATA_STREAM_NAMESPACE_CONFIG + "``}. "
+ "Defaut value is ``${topic}``, that is to say the topic name.";
private static final String DATA_STREAM_NAMESPACE_DISPLAY = "Data Stream Namespace";
private static final String DATA_STREAM_NAMESPACE_DEFAULT = "${topic}";

public static final String DATA_STREAM_DATASET_CONFIG = "data.stream.dataset";
private static final String DATA_STREAM_DATASET_DOC =
"Generic name describing data ingested and its structure to be written to a data stream. Can "
+ "be any arbitrary string that is no longer than 100 characters, is in all lowercase, "
+ "and does not contain spaces or any of these special characters ``/\\*\"<>|,#:-``. "
+ "Otherwise, no value indicates the connector will write to regular indices instead. "
+ "If set, this configuration will be used alongside ``data.stream.type`` to "
+ "construct the data stream name in the form of {``data.stream.type``"
+ "}-{``" + DATA_STREAM_DATASET_CONFIG + "``}-{topic}.";
+ "If set, this configuration will be used alongside ``data.stream.type`` and "
+ "``" + DATA_STREAM_NAMESPACE_CONFIG + "`` to construct the data stream name in "
+ "the form of {``data.stream.type``}-{``" + DATA_STREAM_DATASET_CONFIG + "``}-{``"
+ DATA_STREAM_NAMESPACE_CONFIG + "``}.";
private static final String DATA_STREAM_DATASET_DISPLAY = "Data Stream Dataset";
private static final String DATA_STREAM_DATASET_DEFAULT = "";

public static final String DATA_STREAM_TYPE_CONFIG = "data.stream.type";
private static final String DATA_STREAM_TYPE_DOC = String.format(
"Generic type describing the data to be written to data stream. "
+ "The default is %s which indicates the connector will write "
+ "to regular indices instead. If set, this configuration will "
+ "be used alongside %s to construct the data stream name in the form of "
+ "{``%s``}-{``%s``}-{topic}.",
+ "be used alongside %s and %s to construct the data stream name in the form of "
+ "{``%s``}-{``%s``}-{``%s``}.",
DataStreamType.NONE.name(),
DATA_STREAM_DATASET_CONFIG,
DATA_STREAM_NAMESPACE_CONFIG,
DATA_STREAM_TYPE_CONFIG,
DATA_STREAM_DATASET_CONFIG
DATA_STREAM_DATASET_CONFIG,
DATA_STREAM_NAMESPACE_CONFIG
);
private static final String DATA_STREAM_TYPE_DISPLAY = "Data Stream Type";
private static final DataStreamType DATA_STREAM_TYPE_DEFAULT = DataStreamType.NONE;
Expand Down Expand Up @@ -820,6 +837,17 @@ private static void addDataStreamConfigs(ConfigDef configDef) {
int order = 0;
configDef
.define(
DATA_STREAM_NAMESPACE_CONFIG,
Type.STRING,
DATA_STREAM_NAMESPACE_DEFAULT,
new DataStreamNamespaceValidator(),
Importance.LOW,
DATA_STREAM_NAMESPACE_DOC,
DATA_STREAM_GROUP,
++order,
Width.MEDIUM,
DATA_STREAM_NAMESPACE_DISPLAY
).define(
DATA_STREAM_DATASET_CONFIG,
Type.STRING,
DATA_STREAM_DATASET_DEFAULT,
Expand Down Expand Up @@ -946,6 +974,10 @@ public String dataStreamDataset() {
return getString(DATA_STREAM_DATASET_CONFIG);
}

public String dataStreamNamespace() {
return getString(DATA_STREAM_NAMESPACE_CONFIG);
}

public DataStreamType dataStreamType() {
return DataStreamType.valueOf(getString(DATA_STREAM_TYPE_CONFIG).toUpperCase());
}
Expand Down Expand Up @@ -1078,6 +1110,45 @@ public WriteMethod writeMethod() {
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
}

private static class DataStreamNamespaceValidator implements Validator {

@Override
@SuppressWarnings("unchecked")
public void ensureValid(String name, Object value) {
if (value == null) {
return;
}

String namespace = (String) value;

if (namespace.length() > 100) {
throw new ConfigException(
name, namespace, "The specified namespace must be no longer than 100 characters."
);
}

if (!namespace.equals(namespace.toLowerCase())) {
throw new ConfigException(
name, namespace, "The specified namespace must be in all lowercase."
);
}

if (namespace.matches(".*[\\\\\\/\\*\\?\\\"<>| ,#\\-:]+.*")) {
throw new ConfigException(
name, namespace,
"The specified namespace must not contain any spaces or "
+ "invalid characters \\/*?\"<>|,#-:"
);
}
}

@Override
public String toString() {
return "A valid namespace name that is all lowercase, less than 100 characters, and "
+ "does not contain any spaces or invalid characters \\/*?\"<>|,#-:";
}
}

private static class DataStreamDatasetValidator implements Validator {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,31 +182,33 @@ private String convertTopicToIndexName(String topic) {
}

/**
* Returns the converted index name from a given topic name in the form {type}-{dataset}-{topic}.
* For the <code>topic</code>, Elasticsearch accepts:
* Returns the converted datastream name from a given topic name in the form:
* {type}-{dataset}-{namespace}
* For the <code>namespace</code> (that can contain topic), Elasticsearch accepts:
* <ul>
* <li>all lowercase</li>
* <li>no longer than 100 bytes</li>
* </ul>
* (<a href="https://github.com/elastic/ecs/blob/master/rfcs/text/0009-data_stream-fields.md#restrictions-on-values">ref</a>_.)
*/
private String convertTopicToDataStreamName(String topic) {
topic = topic.toLowerCase();
if (topic.length() > 100) {
topic = topic.substring(0, 100);
String namespace = config.dataStreamNamespace();
namespace = namespace.replace("${topic}", topic.toLowerCase());
if (namespace.length() > 100) {
namespace = namespace.substring(0, 100);
}
String dataStream = String.format(
"%s-%s-%s",
config.dataStreamType().name().toLowerCase(),
config.dataStreamDataset(),
topic
namespace
);
return dataStream;
}

/**
* Returns the converted index name from a given topic name. If writing to a data stream,
* returns the index name in the form {type}-{dataset}-{topic}. For both cases, Elasticsearch
* returns the index name in the form {type}-{dataset}-{namespace}. For both cases, Elasticsearch
* accepts:
* <ul>
* <li>all lowercase</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public void testSetHttpTimeoutsConfig() {
assertEquals(config.connectionTimeoutMs(), 15000);
}

@Test
public void shouldAllowValidChractersDataStreamNamespace() {
props.put(DATA_STREAM_NAMESPACE_CONFIG, "a_valid.namespace123");
new ElasticsearchSinkConnectorConfig(props);
}

@Test
public void shouldAllowValidChractersDataStreamDataset() {
props.put(DATA_STREAM_DATASET_CONFIG, "a_valid.dataset123");
Expand All @@ -69,12 +75,24 @@ public void shouldAllowValidDataStreamTypeCaseInsensitive() {
new ElasticsearchSinkConnectorConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldNotAllowInvalidCaseDataStreamNamespace() {
props.put(DATA_STREAM_NAMESPACE_CONFIG, "AN_INVALID.namespace123");
new ElasticsearchSinkConnectorConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldNotAllowInvalidCaseDataStreamDataset() {
props.put(DATA_STREAM_DATASET_CONFIG, "AN_INVALID.dataset123");
new ElasticsearchSinkConnectorConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldNotAllowInvalidCharactersDataStreamNamespace() {
props.put(DATA_STREAM_NAMESPACE_CONFIG, "not-valid?");
new ElasticsearchSinkConnectorConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldNotAllowInvalidCharactersDataStreamDataset() {
props.put(DATA_STREAM_DATASET_CONFIG, "not-valid?");
Expand All @@ -87,6 +105,12 @@ public void shouldNotAllowInvalidDataStreamType() {
new ElasticsearchSinkConnectorConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldNotAllowLongDataStreamNamespace() {
props.put(DATA_STREAM_NAMESPACE_CONFIG, String.format("%d%100d", 1, 1));
new ElasticsearchSinkConnectorConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldNotAllowLongDataStreamDataset() {
props.put(DATA_STREAM_DATASET_CONFIG, String.format("%d%100d", 1, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_NAMESPACE_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_DATASET_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TYPE_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG;
Expand Down Expand Up @@ -376,6 +377,24 @@ public void testConvertTopicToDataStreamAllowUnderscores() {
verify(client, times(1)).createIndexOrDataStream(eq(indexName));
}

@Test
public void testConvertTopicToDataStreamWithCustomNamespace() {
String type = "logs";
String dataset = "a_valid_dataset";
String namespaceTemplate = "a_valid_prefix_${topic}";
props.put(DATA_STREAM_TYPE_CONFIG, type);
props.put(DATA_STREAM_DATASET_CONFIG, dataset);
props.put(DATA_STREAM_NAMESPACE_CONFIG, namespaceTemplate);
setUpTask();

String topic = "a_valid_topic";
String namespace = namespaceTemplate.replace("${topic}", topic);
when(assignment.contains(eq(new TopicPartition(topic, 1)))).thenReturn(true);
task.put(Collections.singletonList(record(topic, true, false, 0)));
String indexName = dataStreamName(type, dataset, namespace);
verify(client, times(1)).createIndexOrDataStream(eq(indexName));
}

@Test
public void testConvertTopicToDataStreamTooLong() {
String type = "logs";
Expand Down