Skip to content

Commit

Permalink
add 'data.stream.namespace' configuration property
Browse files Browse the repository at this point in the history
  • Loading branch information
fbaligand authored and srishti-saraswat committed Oct 21, 2024
1 parent 9bdef0a commit c5eea3f
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ private void closeResources() {
/**
* Creates a data stream. Will not recreate the data stream if it already exists.
*
* @param dataStream the data stream to create given in the form {type}-{dataset}-{topic}
* @param dataStream the data stream to create given in the form {type}-{dataset}-{namespace}
* @return true if the data stream was created, false if it already exists
*/
private boolean createDataStream(String dataStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,29 +332,46 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final String KERBEROS_KEYTAB_PATH_DEFAULT = null;

// Data stream configs
public static final String DATA_STREAM_NAMESPACE_CONFIG = "data.stream.namespace";
private static final String DATA_STREAM_NAMESPACE_DOC =
"Generic name describing user-configurable arbitrary grouping to be written to a data "
+ "stream. Can be any arbitrary string that is no longer than 100 characters, "
+ " is in all lowercase, and does not contain spaces or any of these special characters "
+ "``/\\*\"<>|,#:-``. Otherwise, no value indicates the connector will write to regular "
+ "indices instead. If set, this configuration will be used alongside "
+ "``data.stream.type`` and ``data.stream.dataset`` to construct the data stream name in"
+ "the form of {``data.stream.type``}-{``data.stream.dataset``}-{``"
+ DATA_STREAM_NAMESPACE_CONFIG + "``}. "
+ "Defaut value is ``${topic}``, that is to say the topic name.";
private static final String DATA_STREAM_NAMESPACE_DISPLAY = "Data Stream Namespace";
private static final String DATA_STREAM_NAMESPACE_DEFAULT = "${topic}";

public static final String DATA_STREAM_DATASET_CONFIG = "data.stream.dataset";
private static final String DATA_STREAM_DATASET_DOC =
"Generic name describing data ingested and its structure to be written to a data stream. Can "
+ "be any arbitrary string that is no longer than 100 characters, is in all lowercase, "
+ "and does not contain spaces or any of these special characters ``/\\*\"<>|,#:-``. "
+ "Otherwise, no value indicates the connector will write to regular indices instead. "
+ "If set, this configuration will be used alongside ``data.stream.type`` to "
+ "construct the data stream name in the form of {``data.stream.type``"
+ "}-{``" + DATA_STREAM_DATASET_CONFIG + "``}-{topic}.";
+ "If set, this configuration will be used alongside ``data.stream.type`` and "
+ "``" + DATA_STREAM_NAMESPACE_CONFIG + "`` to construct the data stream name in "
+ "the form of {``data.stream.type``}-{``" + DATA_STREAM_DATASET_CONFIG + "``}-{``"
+ DATA_STREAM_NAMESPACE_CONFIG + "``}.";
private static final String DATA_STREAM_DATASET_DISPLAY = "Data Stream Dataset";
private static final String DATA_STREAM_DATASET_DEFAULT = "";

public static final String DATA_STREAM_TYPE_CONFIG = "data.stream.type";
private static final String DATA_STREAM_TYPE_DOC = String.format(
"Generic type describing the data to be written to data stream. "
+ "The default is %s which indicates the connector will write "
+ "to regular indices instead. If set, this configuration will "
+ "be used alongside %s to construct the data stream name in the form of "
+ "{``%s``}-{``%s``}-{topic}.",
+ "be used alongside %s and %s to construct the data stream name in the form of "
+ "{``%s``}-{``%s``}-{``%s``}.",
DataStreamType.NONE.name(),
DATA_STREAM_DATASET_CONFIG,
DATA_STREAM_NAMESPACE_CONFIG,
DATA_STREAM_TYPE_CONFIG,
DATA_STREAM_DATASET_CONFIG
DATA_STREAM_DATASET_CONFIG,
DATA_STREAM_NAMESPACE_CONFIG
);
private static final String DATA_STREAM_TYPE_DISPLAY = "Data Stream Type";
private static final DataStreamType DATA_STREAM_TYPE_DEFAULT = DataStreamType.NONE;
Expand Down Expand Up @@ -820,6 +837,17 @@ private static void addDataStreamConfigs(ConfigDef configDef) {
int order = 0;
configDef
.define(
DATA_STREAM_NAMESPACE_CONFIG,
Type.STRING,
DATA_STREAM_NAMESPACE_DEFAULT,
new DataStreamNamespaceValidator(),
Importance.LOW,
DATA_STREAM_NAMESPACE_DOC,
DATA_STREAM_GROUP,
++order,
Width.MEDIUM,
DATA_STREAM_NAMESPACE_DISPLAY
).define(
DATA_STREAM_DATASET_CONFIG,
Type.STRING,
DATA_STREAM_DATASET_DEFAULT,
Expand Down Expand Up @@ -946,6 +974,10 @@ public String dataStreamDataset() {
return getString(DATA_STREAM_DATASET_CONFIG);
}

public String dataStreamNamespace() {
return getString(DATA_STREAM_NAMESPACE_CONFIG);
}

public DataStreamType dataStreamType() {
return DataStreamType.valueOf(getString(DATA_STREAM_TYPE_CONFIG).toUpperCase());
}
Expand Down Expand Up @@ -1078,6 +1110,45 @@ public WriteMethod writeMethod() {
return WriteMethod.valueOf(getString(WRITE_METHOD_CONFIG).toUpperCase());
}

private static class DataStreamNamespaceValidator implements Validator {

@Override
@SuppressWarnings("unchecked")
public void ensureValid(String name, Object value) {
if (value == null) {
return;
}

String namespace = (String) value;

if (namespace.length() > 100) {
throw new ConfigException(
name, namespace, "The specified namespace must be no longer than 100 characters."
);
}

if (!namespace.equals(namespace.toLowerCase())) {
throw new ConfigException(
name, namespace, "The specified namespace must be in all lowercase."
);
}

if (namespace.matches(".*[\\\\\\/\\*\\?\\\"<>| ,#\\-:]+.*")) {
throw new ConfigException(
name, namespace,
"The specified namespace must not contain any spaces or "
+ "invalid characters \\/*?\"<>|,#-:"
);
}
}

@Override
public String toString() {
return "A valid namespace name that is all lowercase, less than 100 characters, and "
+ "does not contain any spaces or invalid characters \\/*?\"<>|,#-:";
}
}

private static class DataStreamDatasetValidator implements Validator {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,31 +182,33 @@ private String convertTopicToIndexName(String topic) {
}

/**
* Returns the converted index name from a given topic name in the form {type}-{dataset}-{topic}.
* For the <code>topic</code>, Elasticsearch accepts:
* Returns the converted datastream name from a given topic name in the form:
* {type}-{dataset}-{namespace}
* For the <code>namespace</code> (that can contain topic), Elasticsearch accepts:
* <ul>
* <li>all lowercase</li>
* <li>no longer than 100 bytes</li>
* </ul>
* (<a href="https://github.com/elastic/ecs/blob/master/rfcs/text/0009-data_stream-fields.md#restrictions-on-values">ref</a>_.)
*/
private String convertTopicToDataStreamName(String topic) {
topic = topic.toLowerCase();
if (topic.length() > 100) {
topic = topic.substring(0, 100);
String namespace = config.dataStreamNamespace();
namespace = namespace.replace("${topic}", topic.toLowerCase());
if (namespace.length() > 100) {
namespace = namespace.substring(0, 100);
}
String dataStream = String.format(
"%s-%s-%s",
config.dataStreamType().name().toLowerCase(),
config.dataStreamDataset(),
topic
namespace
);
return dataStream;
}

/**
* Returns the converted index name from a given topic name. If writing to a data stream,
* returns the index name in the form {type}-{dataset}-{topic}. For both cases, Elasticsearch
* returns the index name in the form {type}-{dataset}-{namespace}. For both cases, Elasticsearch
* accepts:
* <ul>
* <li>all lowercase</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public void testSetHttpTimeoutsConfig() {
assertEquals(config.connectionTimeoutMs(), 15000);
}

@Test
public void shouldAllowValidChractersDataStreamNamespace() {
props.put(DATA_STREAM_NAMESPACE_CONFIG, "a_valid.namespace123");
new ElasticsearchSinkConnectorConfig(props);
}

@Test
public void shouldAllowValidChractersDataStreamDataset() {
props.put(DATA_STREAM_DATASET_CONFIG, "a_valid.dataset123");
Expand All @@ -69,12 +75,24 @@ public void shouldAllowValidDataStreamTypeCaseInsensitive() {
new ElasticsearchSinkConnectorConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldNotAllowInvalidCaseDataStreamNamespace() {
props.put(DATA_STREAM_NAMESPACE_CONFIG, "AN_INVALID.namespace123");
new ElasticsearchSinkConnectorConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldNotAllowInvalidCaseDataStreamDataset() {
props.put(DATA_STREAM_DATASET_CONFIG, "AN_INVALID.dataset123");
new ElasticsearchSinkConnectorConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldNotAllowInvalidCharactersDataStreamNamespace() {
props.put(DATA_STREAM_NAMESPACE_CONFIG, "not-valid?");
new ElasticsearchSinkConnectorConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldNotAllowInvalidCharactersDataStreamDataset() {
props.put(DATA_STREAM_DATASET_CONFIG, "not-valid?");
Expand All @@ -87,6 +105,12 @@ public void shouldNotAllowInvalidDataStreamType() {
new ElasticsearchSinkConnectorConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldNotAllowLongDataStreamNamespace() {
props.put(DATA_STREAM_NAMESPACE_CONFIG, String.format("%d%100d", 1, 1));
new ElasticsearchSinkConnectorConfig(props);
}

@Test(expected = ConfigException.class)
public void shouldNotAllowLongDataStreamDataset() {
props.put(DATA_STREAM_DATASET_CONFIG, String.format("%d%100d", 1, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_NAMESPACE_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_DATASET_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TYPE_CONFIG;
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG;
Expand Down Expand Up @@ -376,6 +377,24 @@ public void testConvertTopicToDataStreamAllowUnderscores() {
verify(client, times(1)).createIndexOrDataStream(eq(indexName));
}

@Test
public void testConvertTopicToDataStreamWithCustomNamespace() {
String type = "logs";
String dataset = "a_valid_dataset";
String namespaceTemplate = "a_valid_prefix_${topic}";
props.put(DATA_STREAM_TYPE_CONFIG, type);
props.put(DATA_STREAM_DATASET_CONFIG, dataset);
props.put(DATA_STREAM_NAMESPACE_CONFIG, namespaceTemplate);
setUpTask();

String topic = "a_valid_topic";
String namespace = namespaceTemplate.replace("${topic}", topic);
when(assignment.contains(eq(new TopicPartition(topic, 1)))).thenReturn(true);
task.put(Collections.singletonList(record(topic, true, false, 0)));
String indexName = dataStreamName(type, dataset, namespace);
verify(client, times(1)).createIndexOrDataStream(eq(indexName));
}

@Test
public void testConvertTopicToDataStreamTooLong() {
String type = "logs";
Expand Down

0 comments on commit c5eea3f

Please sign in to comment.