Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combined set of changes for discussion, focussed on a new methodology for record grouping #319

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,33 @@
package io.aiven.kafka.connect.common.config;

import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

import io.aiven.kafka.connect.common.config.validators.ClassValidator;
import io.aiven.kafka.connect.common.config.validators.FileCompressionTypeValidator;
import io.aiven.kafka.connect.common.config.validators.FilenameTemplateValidator;
import io.aiven.kafka.connect.common.config.validators.NonNegativeValidator;
import io.aiven.kafka.connect.common.config.validators.OutputFieldsEncodingValidator;
import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator;
import io.aiven.kafka.connect.common.config.validators.OutputTypeValidator;
import io.aiven.kafka.connect.common.config.validators.TimeZoneValidator;
import io.aiven.kafka.connect.common.config.validators.TimestampSourceValidator;
import io.aiven.kafka.connect.common.grouper.CustomRecordGrouperBuilder;
import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory;
import io.aiven.kafka.connect.common.templating.Template;

@SuppressWarnings("PMD.TooManyMethods")
public class AivenCommonConfig extends AbstractConfig {
public static final String FORMAT_OUTPUT_FIELDS_CONFIG = "format.output.fields";
public static final String FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG = "format.output.fields.value.encoding";
Expand All @@ -45,6 +54,8 @@ public class AivenCommonConfig extends AbstractConfig {
public static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone";
public static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source";
public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
public static final String CUSTOM_RECORD_GROUPER_BUILDER = "file.record.grouper.builder";
public static final String FILE_WRITE_PARALLEL = "file.write.parallel";

private static final String GROUP_COMPRESSION = "File Compression";
private static final String GROUP_FORMAT = "Format";
Expand All @@ -53,12 +64,79 @@ public class AivenCommonConfig extends AbstractConfig {
private static final String GROUP_RETRY_BACKOFF_POLICY = "Retry backoff policy";
public static final String KAFKA_RETRY_BACKOFF_MS_CONFIG = "kafka.retry.backoff.ms";

@SuppressWarnings({ "PMD.this-escape", "PMD.ConstructorCallsOverridableMethodcls" })
protected AivenCommonConfig(final ConfigDef definition, final Map<?, ?> originals) {
super(definition, originals);
// TODO: calls getOutputFields, can be overridden in subclasses.
validate(); // NOPMD ConstructorCallsOverridableMethod
validate(); // NOPMD
}

protected static int addFileConfigGroup(final ConfigDef configDef, final String groupFile, final String type,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have a PR open #312 that will split AivenCommonConfig into a Source and Sink Common config so that we can start building the Source connectors to start reading back the data we have stored here, This is mostly a minor structural change but just wanted to give you a heads up. This info does make sense to be moved to the Sink Common config imo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to work with what you are doing. It does seems that the config is duplicate a lot, and some bits don't seem to be used.
What do you suggest as the best was to do this, without causing too much conflict that we can avoid @aindriu-aiven ?

int fileGroupCounter, final CompressionType defaultCompressionType) {
configDef.define(FILE_NAME_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, null, ConfigDef.Importance.MEDIUM,
"The template for file names on " + type + ". "
+ "Supports `{{ variable }}` placeholders for substituting variables. "
+ "Currently supported variables are `topic`, `partition`, and `start_offset` "
+ "(the offset of the first record in the file). "
+ "Only some combinations of variables are valid, which currently are:\n"
+ "- `topic`, `partition`, `start_offset`."
+ "There is also `key` only variable {{key}} for grouping by keys" + "If a "
+ CUSTOM_RECORD_GROUPER_BUILDER + " is set, the template will be passed"
+ " to that builder and validated according to to its rules which may be more or less constrained.",
groupFile, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.LONG, FILE_NAME_TEMPLATE_CONFIG);

configDef.define(FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING,
defaultCompressionType == null ? null : defaultCompressionType.name, new FileCompressionTypeValidator(),
ConfigDef.Importance.MEDIUM,
"The compression type used for files put on " + type + ". " + "The supported values are: "
+ CompressionType.SUPPORTED_COMPRESSION_TYPES + ".",
groupFile, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.NONE, FILE_COMPRESSION_TYPE_CONFIG,
FixedSetRecommender.ofSupportedValues(CompressionType.names()));

configDef.define(FILE_MAX_RECORDS, ConfigDef.Type.INT, 0, new NonNegativeValidator(),
ConfigDef.Importance.MEDIUM,
"The maximum number of records to put in a single file. " + "Must be a non-negative integer number. "
+ "0 is interpreted as \"unlimited\", which is the default.",
groupFile, fileGroupCounter++, ConfigDef.Width.SHORT, FILE_MAX_RECORDS);

configDef.define(FILE_NAME_TIMESTAMP_TIMEZONE, ConfigDef.Type.STRING, ZoneOffset.UTC.toString(),
new TimeZoneValidator(), ConfigDef.Importance.LOW,
"Specifies the timezone in which the dates and time for the timestamp variable will be treated. "
+ "Use standard shot and long names. Default is UTC",
groupFile, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_TIMEZONE);

configDef.define(FILE_NAME_TIMESTAMP_SOURCE, ConfigDef.Type.STRING, TimestampSource.Type.WALLCLOCK.name(),
new TimestampSourceValidator(), ConfigDef.Importance.LOW,
"Specifies the the timestamp variable source. Default is wall-clock.", groupFile, fileGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE);

configDef.define(CUSTOM_RECORD_GROUPER_BUILDER, ConfigDef.Type.CLASS, null,
new ClassValidator(CustomRecordGrouperBuilder.class), ConfigDef.Importance.LOW,
"Specifies a custom record grouper. The default record grouper is defined by "
+ FILE_NAME_TEMPLATE_CONFIG,
groupFile, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.SHORT, CUSTOM_RECORD_GROUPER_BUILDER);

configDef.define(FILE_WRITE_PARALLEL, ConfigDef.Type.BOOLEAN, false,
null, ConfigDef.Importance.LOW,
"Specifies if file should be written in parallel. Default is false",
groupFile, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.SHORT, FILE_WRITE_PARALLEL);

return fileGroupCounter;

}

protected static void addCommonConfig(final ConfigDef configDef) {
addKafkaBackoffPolicy(configDef);
addExtensionConfig(configDef);
}

@SuppressWarnings("PMD.this-escape")
private void validate() {
// Special checks for output json envelope config.
final List<OutputField> outputFields = getOutputFields();
Expand All @@ -68,8 +146,17 @@ private void validate() {
FORMAT_OUTPUT_ENVELOPE_CONFIG, false, FORMAT_OUTPUT_FIELDS_CONFIG);
throw new ConfigException(msg);
}
if (getCustomRecordGrouperBuilder() == null) {
// if there is a custom record grouper builder, it will validate the filename template
new FilenameTemplateValidator(FILE_NAME_TEMPLATE_CONFIG).ensureValid(FILE_NAME_TEMPLATE_CONFIG,
getString(FILE_NAME_TEMPLATE_CONFIG));
}
validateKeyFilenameTemplate();
}
protected static void addExtensionConfig(final ConfigDef configDef) {
final ServiceLoader<ExtraConfiguration> extraConfigurations = ServiceLoader.load(ExtraConfiguration.class);
extraConfigurations.forEach(extraConfiguration -> extraConfiguration.configure(configDef));
}

protected static void addKafkaBackoffPolicy(final ConfigDef configDef) {
configDef.define(KAFKA_RETRY_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, null, new ConfigDef.Validator() {
Expand Down Expand Up @@ -173,13 +260,15 @@ public final Template getFilenameTemplate() {
}

protected final void validateKeyFilenameTemplate() {
// Special checks for {{key}} filename template.
final Template filenameTemplate = getFilenameTemplate();
final String groupType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate);
if (isKeyBased(groupType) && getMaxRecordsPerFile() > 1) {
final String msg = String.format("When %s is %s, %s must be either 1 or not set", FILE_NAME_TEMPLATE_CONFIG,
filenameTemplate, FILE_MAX_RECORDS);
throw new ConfigException(msg);
// Special checks for {{key}} filename template, if there isnt a custom record grouper.
if (getCustomRecordGrouperBuilder() == null) {
final Template filenameTemplate = getFilenameTemplate();
final String groupType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate);
if (isKeyBased(groupType) && getMaxRecordsPerFile() > 1) {
final String msg = String.format("When %s is %s, %s must be either 1 or not set",
FILE_NAME_TEMPLATE_CONFIG, filenameTemplate, FILE_MAX_RECORDS);
throw new ConfigException(msg);
}
}
}

Expand All @@ -202,8 +291,9 @@ public final ZoneId getFilenameTimezone() {
}

public final TimestampSource getFilenameTimestampSource() {
return TimestampSource.of(getFilenameTimezone(),
TimestampSource.Type.of(getString(FILE_NAME_TIMESTAMP_SOURCE)));
return new TimestampSource.Builder().configuration(getString(FILE_NAME_TIMESTAMP_SOURCE))
.zoneId(getFilenameTimezone())
.build();
}

public final int getMaxRecordsPerFile() {
Expand All @@ -229,4 +319,13 @@ private Boolean isKeyBased(final String groupType) {
return RecordGrouperFactory.KEY_RECORD.equals(groupType)
|| RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD.equals(groupType);
}

public Class<? extends CustomRecordGrouperBuilder> getCustomRecordGrouperBuilder() {
final Class<?> result = getClass(CUSTOM_RECORD_GROUPER_BUILDER);
// its already been validated to be a subclass of CustomRecordGrouperBuilder
return result == null ? null : result.asSubclass(CustomRecordGrouperBuilder.class);
}
public boolean isWriteParallel() {
return getBoolean(FILE_WRITE_PARALLEL);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.config;

public interface Configurable {
void configure(AivenCommonConfig config);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.config;

import org.apache.kafka.common.config.ConfigDef;

public interface ExtraConfiguration {
void configure(ConfigDef configDef);
}
Loading