diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java index 70df3d964..f6530917e 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/AivenCommonConfig.java @@ -17,10 +17,12 @@ package io.aiven.kafka.connect.common.config; import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.ServiceLoader; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -28,13 +30,20 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; +import io.aiven.kafka.connect.common.config.validators.ClassValidator; import io.aiven.kafka.connect.common.config.validators.FileCompressionTypeValidator; +import io.aiven.kafka.connect.common.config.validators.FilenameTemplateValidator; +import io.aiven.kafka.connect.common.config.validators.NonNegativeValidator; import io.aiven.kafka.connect.common.config.validators.OutputFieldsEncodingValidator; import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator; import io.aiven.kafka.connect.common.config.validators.OutputTypeValidator; +import io.aiven.kafka.connect.common.config.validators.TimeZoneValidator; +import io.aiven.kafka.connect.common.config.validators.TimestampSourceValidator; +import io.aiven.kafka.connect.common.grouper.CustomRecordGrouperBuilder; import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory; import io.aiven.kafka.connect.common.templating.Template; +@SuppressWarnings("PMD.TooManyMethods") public class AivenCommonConfig extends AbstractConfig { public static final String FORMAT_OUTPUT_FIELDS_CONFIG = "format.output.fields"; public static final String FORMAT_OUTPUT_FIELDS_VALUE_ENCODING_CONFIG = "format.output.fields.value.encoding"; @@ -45,6 +54,8 @@ public class AivenCommonConfig extends AbstractConfig { public static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone"; public static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source"; public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template"; + public static final String CUSTOM_RECORD_GROUPER_BUILDER = "file.record.grouper.builder"; + public static final String FILE_WRITE_PARALLEL = "file.write.parallel"; private static final String GROUP_COMPRESSION = "File Compression"; private static final String GROUP_FORMAT = "Format"; @@ -53,12 +64,79 @@ public class AivenCommonConfig extends AbstractConfig { private static final String GROUP_RETRY_BACKOFF_POLICY = "Retry backoff policy"; public static final String KAFKA_RETRY_BACKOFF_MS_CONFIG = "kafka.retry.backoff.ms"; + @SuppressWarnings({ "PMD.this-escape", "PMD.ConstructorCallsOverridableMethodcls" }) protected AivenCommonConfig(final ConfigDef definition, final Map originals) { super(definition, originals); // TODO: calls getOutputFields, can be overridden in subclasses. - validate(); // NOPMD ConstructorCallsOverridableMethod + validate(); // NOPMD } + protected static int addFileConfigGroup(final ConfigDef configDef, final String groupFile, final String type, + int fileGroupCounter, final CompressionType defaultCompressionType) { + configDef.define(FILE_NAME_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, null, ConfigDef.Importance.MEDIUM, + "The template for file names on " + type + ". " + + "Supports `{{ variable }}` placeholders for substituting variables. " + + "Currently supported variables are `topic`, `partition`, and `start_offset` " + + "(the offset of the first record in the file). " + + "Only some combinations of variables are valid, which currently are:\n" + + "- `topic`, `partition`, `start_offset`." + + "There is also `key` only variable {{key}} for grouping by keys" + "If a " + + CUSTOM_RECORD_GROUPER_BUILDER + " is set, the template will be passed" + + " to that builder and validated according to to its rules which may be more or less constrained.", + groupFile, fileGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.LONG, FILE_NAME_TEMPLATE_CONFIG); + + configDef.define(FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING, + defaultCompressionType == null ? null : defaultCompressionType.name, new FileCompressionTypeValidator(), + ConfigDef.Importance.MEDIUM, + "The compression type used for files put on " + type + ". " + "The supported values are: " + + CompressionType.SUPPORTED_COMPRESSION_TYPES + ".", + groupFile, fileGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, FILE_COMPRESSION_TYPE_CONFIG, + FixedSetRecommender.ofSupportedValues(CompressionType.names())); + + configDef.define(FILE_MAX_RECORDS, ConfigDef.Type.INT, 0, new NonNegativeValidator(), + ConfigDef.Importance.MEDIUM, + "The maximum number of records to put in a single file. " + "Must be a non-negative integer number. " + + "0 is interpreted as \"unlimited\", which is the default.", + groupFile, fileGroupCounter++, ConfigDef.Width.SHORT, FILE_MAX_RECORDS); + + configDef.define(FILE_NAME_TIMESTAMP_TIMEZONE, ConfigDef.Type.STRING, ZoneOffset.UTC.toString(), + new TimeZoneValidator(), ConfigDef.Importance.LOW, + "Specifies the timezone in which the dates and time for the timestamp variable will be treated. " + + "Use standard shot and long names. Default is UTC", + groupFile, fileGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_TIMEZONE); + + configDef.define(FILE_NAME_TIMESTAMP_SOURCE, ConfigDef.Type.STRING, TimestampSource.Type.WALLCLOCK.name(), + new TimestampSourceValidator(), ConfigDef.Importance.LOW, + "Specifies the the timestamp variable source. Default is wall-clock.", groupFile, fileGroupCounter++, // NOPMD + // UnusedAssignment + ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE); + + configDef.define(CUSTOM_RECORD_GROUPER_BUILDER, ConfigDef.Type.CLASS, null, + new ClassValidator(CustomRecordGrouperBuilder.class), ConfigDef.Importance.LOW, + "Specifies a custom record grouper. The default record grouper is defined by " + + FILE_NAME_TEMPLATE_CONFIG, + groupFile, fileGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.SHORT, CUSTOM_RECORD_GROUPER_BUILDER); + + configDef.define(FILE_WRITE_PARALLEL, ConfigDef.Type.BOOLEAN, false, + null, ConfigDef.Importance.LOW, + "Specifies if file should be written in parallel. Default is false", + groupFile, fileGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.SHORT, FILE_WRITE_PARALLEL); + + return fileGroupCounter; + + } + + protected static void addCommonConfig(final ConfigDef configDef) { + addKafkaBackoffPolicy(configDef); + addExtensionConfig(configDef); + } + + @SuppressWarnings("PMD.this-escape") private void validate() { // Special checks for output json envelope config. final List outputFields = getOutputFields(); @@ -68,8 +146,17 @@ private void validate() { FORMAT_OUTPUT_ENVELOPE_CONFIG, false, FORMAT_OUTPUT_FIELDS_CONFIG); throw new ConfigException(msg); } + if (getCustomRecordGrouperBuilder() == null) { + // if there is a custom record grouper builder, it will validate the filename template + new FilenameTemplateValidator(FILE_NAME_TEMPLATE_CONFIG).ensureValid(FILE_NAME_TEMPLATE_CONFIG, + getString(FILE_NAME_TEMPLATE_CONFIG)); + } validateKeyFilenameTemplate(); } + protected static void addExtensionConfig(final ConfigDef configDef) { + final ServiceLoader extraConfigurations = ServiceLoader.load(ExtraConfiguration.class); + extraConfigurations.forEach(extraConfiguration -> extraConfiguration.configure(configDef)); + } protected static void addKafkaBackoffPolicy(final ConfigDef configDef) { configDef.define(KAFKA_RETRY_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, null, new ConfigDef.Validator() { @@ -173,13 +260,15 @@ public final Template getFilenameTemplate() { } protected final void validateKeyFilenameTemplate() { - // Special checks for {{key}} filename template. - final Template filenameTemplate = getFilenameTemplate(); - final String groupType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate); - if (isKeyBased(groupType) && getMaxRecordsPerFile() > 1) { - final String msg = String.format("When %s is %s, %s must be either 1 or not set", FILE_NAME_TEMPLATE_CONFIG, - filenameTemplate, FILE_MAX_RECORDS); - throw new ConfigException(msg); + // Special checks for {{key}} filename template, if there isnt a custom record grouper. + if (getCustomRecordGrouperBuilder() == null) { + final Template filenameTemplate = getFilenameTemplate(); + final String groupType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate); + if (isKeyBased(groupType) && getMaxRecordsPerFile() > 1) { + final String msg = String.format("When %s is %s, %s must be either 1 or not set", + FILE_NAME_TEMPLATE_CONFIG, filenameTemplate, FILE_MAX_RECORDS); + throw new ConfigException(msg); + } } } @@ -202,8 +291,9 @@ public final ZoneId getFilenameTimezone() { } public final TimestampSource getFilenameTimestampSource() { - return TimestampSource.of(getFilenameTimezone(), - TimestampSource.Type.of(getString(FILE_NAME_TIMESTAMP_SOURCE))); + return new TimestampSource.Builder().configuration(getString(FILE_NAME_TIMESTAMP_SOURCE)) + .zoneId(getFilenameTimezone()) + .build(); } public final int getMaxRecordsPerFile() { @@ -229,4 +319,13 @@ private Boolean isKeyBased(final String groupType) { return RecordGrouperFactory.KEY_RECORD.equals(groupType) || RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD.equals(groupType); } + + public Class getCustomRecordGrouperBuilder() { + final Class result = getClass(CUSTOM_RECORD_GROUPER_BUILDER); + // its already been validated to be a subclass of CustomRecordGrouperBuilder + return result == null ? null : result.asSubclass(CustomRecordGrouperBuilder.class); + } + public boolean isWriteParallel() { + return getBoolean(FILE_WRITE_PARALLEL); + } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/Configurable.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/Configurable.java new file mode 100644 index 000000000..f2ccce332 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/Configurable.java @@ -0,0 +1,21 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config; + +public interface Configurable { + void configure(AivenCommonConfig config); +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/ExtraConfiguration.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/ExtraConfiguration.java new file mode 100644 index 000000000..3b6bcaf90 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/ExtraConfiguration.java @@ -0,0 +1,23 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config; + +import org.apache.kafka.common.config.ConfigDef; + +public interface ExtraConfiguration { + void configure(ConfigDef configDef); +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java index 30b38f7c3..377b38fdf 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java @@ -20,82 +20,199 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.util.Date; +import java.util.Locale; +import java.util.Objects; import org.apache.kafka.connect.sink.SinkRecord; +import io.aiven.kafka.connect.common.config.extractors.DataExtractor; +import io.aiven.kafka.connect.common.config.extractors.HeaderValueExtractor; +import io.aiven.kafka.connect.common.config.extractors.SimpleValuePath; + public interface TimestampSource { ZonedDateTime time(SinkRecord record); - @SuppressWarnings("PMD.ShortMethodName") - static TimestampSource of(final Type extractorType) { - return of(ZoneOffset.UTC, extractorType); - } - - @SuppressWarnings("PMD.ShortMethodName") - static TimestampSource of(final ZoneId zoneId, final Type extractorType) { - switch (extractorType) { - case WALLCLOCK : - return new WallclockTimestampSource(zoneId); - case EVENT : - return new EventTimestampSource(zoneId); - default : - throw new IllegalArgumentException( - String.format("Unsupported timestamp extractor type: %s", extractorType)); - } - } - Type type(); enum Type { - WALLCLOCK, EVENT; + WALLCLOCK, // + EVENT, // + HEADER, // + SIMPLE_DATA, // + CUSTOM// + + } + class Builder { + private ZoneId zoneId = ZoneOffset.UTC; + private Type type; + private String additionalParameters; + + /** + * set the zoneId to be used. If this method isnt called, the default is UTC + * + * @return this + * @throws NullPointerException + * if zoneId is null + */ + public Builder zoneId(final ZoneId zoneId) { + Objects.requireNonNull(zoneId, "zoneId cannot be null"); + this.zoneId = zoneId; + return this; + } + + /** + * sets the type of the timestamp source and associated parameters (if needed) The format of the configuration + * is <type>[:<data>] i.e. the type name, optionally followed by data.
+ * The data is type specific + *

+ * For type WALLCLOCK or EVENT, no data is allowed + *

+ *

+ * For type SIMPLE_DATA, data is required, and is a '.' separated series of terms in the path
+ * If the '.' is something that should be included in the terms, and you want to use a different separator, then + * you can specify a '.' as the first character, and the separator as the second character, and then the path is + * the rest of the string
+ * For example "SIMPLE_DATA:a.b.c" would use into a path with terms "a", "b", "c"
+ * For example "SIMPLE_DATA:.:a.b:c" would use a path with terms "a.b", "c" + *

+ * For type HEADER, data is required, and is the name of the header to extract
+ * For example "HEADER:foo" would use to "foo" header (or null if its not available in the SinkRecord + *

+ *

+ * For type CUSTOM, data is required, and is the name of the class to use, and any additional parameters for + * that custom time source. The specified class must implement the TimestampSource interface and have a public + * constructor that takes a String and a ZoneId. Fort the meaning of the data, see the documentation of the + * custom class.
+ * For example "CUSTOM:my.custom.timesource:some more data" would be similar to calling new + * my.custom.timesource("some more data", zoneId) + *

+ * + * + * @return this + */ + public Builder configuration(final String configuration) { + final String[] parts = configuration.split(":", 2); + final String typeName = parts[0]; + try { + this.type = Type.valueOf(typeName.toUpperCase(Locale.ENGLISH)); + } catch (final IllegalArgumentException e) { + throw new IllegalArgumentException("Unknown timestamp source: " + typeName); + } + + this.additionalParameters = parts.length > 1 ? parts[1] : null; + return this; + } - @SuppressWarnings("PMD.ShortMethodName") - public static Type of(final String name) { - for (final Type t : Type.values()) { - if (t.name().equalsIgnoreCase(name)) { - return t; - } + public TimestampSource build() { + switch (type) { + case WALLCLOCK : + if (additionalParameters != null) { + throw new IllegalArgumentException( + "Wallclock timestamp source does not support additionalParameters"); + } + return new WallclockTimestampSource(zoneId); + case EVENT : + if (additionalParameters != null) { + throw new IllegalArgumentException( + "Event timestamp source does not support additionalParameters"); + } + return new EventTimestampSource(zoneId); + case SIMPLE_DATA : + if (additionalParameters == null) { + throw new IllegalArgumentException("Data timestamp source requires additionalParameters"); + } + return new SimpleTimestampSource(zoneId, Type.SIMPLE_DATA, + SimpleValuePath.parse(additionalParameters)); + case HEADER : + if (additionalParameters == null) { + throw new IllegalArgumentException("Header timestamp source requires additionalParameters"); + } + return new SimpleTimestampSource(zoneId, Type.HEADER, + new HeaderValueExtractor(additionalParameters)); + case CUSTOM : + if (additionalParameters == null) { + throw new IllegalArgumentException("Header timestamp source requires additionalParameters"); + } + final String[] parts = additionalParameters.split(":", 2); + final String className = parts[0]; + final String params = parts.length > 1 ? parts[1] : null; + try { + final Class clazz = Class.forName(className); + return (TimestampSource) clazz.getConstructor(String.class, ZoneId.class) + .newInstance(params, zoneId); + } catch (final Exception e) { + throw new IllegalArgumentException("Failed to create custom timestamp source", e); + } + default : + throw new IllegalArgumentException(String.format("Unsupported timestamp extractor type: %s", type)); } - throw new IllegalArgumentException(String.format("Unknown timestamp source: %s", name)); } } - final class WallclockTimestampSource implements TimestampSource { - private final ZoneId zoneId; + class SimpleTimestampSource implements TimestampSource { + protected final ZoneId zoneId; + private final Type type; + private final DataExtractor dataExtractor; - protected WallclockTimestampSource(final ZoneId zoneId) { + protected SimpleTimestampSource(final ZoneId zoneId, final Type type, DataExtractor dataExtractor) { this.zoneId = zoneId; + this.type = type; + this.dataExtractor = dataExtractor; } @Override - public ZonedDateTime time(final SinkRecord record) { - return ZonedDateTime.now(zoneId); + public Type type() { + return type; } @Override - public Type type() { - return Type.WALLCLOCK; + public ZonedDateTime time(SinkRecord record) { + return fromRawTime(dataExtractor.extractDataFrom(record)); } - } - final class EventTimestampSource implements TimestampSource { - private final ZoneId zoneId; + protected ZonedDateTime fromRawTime(final Object rawValue) { + if (rawValue == null) { + return null; + } else if (rawValue instanceof Long) { + return withZone((Long) rawValue); + } else if (rawValue instanceof Date) { + return withZone(((Date) rawValue).getTime()); + } else if (rawValue instanceof ZonedDateTime) { + return (ZonedDateTime) rawValue; + } else if (rawValue instanceof Instant) { + return withZone(((Instant) rawValue).toEpochMilli()); + } + return null; + } - protected EventTimestampSource(final ZoneId zoneId) { - this.zoneId = zoneId; + protected ZonedDateTime withZone(final long timestamp) { + return ZonedDateTime.ofInstant(Instant.ofEpochMilli(timestamp), zoneId); + } + } + + final class WallclockTimestampSource extends SimpleTimestampSource { + WallclockTimestampSource(final ZoneId zoneId) { + super(zoneId, Type.WALLCLOCK, null); } @Override public ZonedDateTime time(final SinkRecord record) { - return ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), zoneId); + return ZonedDateTime.now(zoneId); + } + } + + final class EventTimestampSource extends SimpleTimestampSource { + EventTimestampSource(final ZoneId zoneId) { + super(zoneId, Type.EVENT, null); } @Override - public Type type() { - return Type.EVENT; + public ZonedDateTime time(final SinkRecord record) { + return withZone(record.timestamp()); } } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/DataExtractor.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/DataExtractor.java new file mode 100644 index 000000000..a333f2ae9 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/DataExtractor.java @@ -0,0 +1,24 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config.extractors; + +import org.apache.kafka.connect.sink.SinkRecord; + +public interface DataExtractor { + + Object extractDataFrom(final SinkRecord record); +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractor.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractor.java new file mode 100644 index 000000000..bc23687fd --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractor.java @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config.extractors; + +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.sink.SinkRecord; + +public class HeaderValueExtractor implements DataExtractor { + private final String headerKey; + + public HeaderValueExtractor(final String headerKey) { + this.headerKey = headerKey; + } + + public Object extractDataFrom(final SinkRecord record) { + final Header header = record.headers().lastWithName(headerKey); + return header == null ? null : header.value(); + } +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePath.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePath.java new file mode 100644 index 000000000..ba4103b40 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePath.java @@ -0,0 +1,92 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config.extractors; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; + +public final class SimpleValuePath implements DataExtractor { + private final String[] terms; + + private SimpleValuePath(final String[] terms) { + this.terms = terms; + } + + /** + * Parse a path definition string into a Path object. The path definition string is a '.' separated series of + * strings, which are the terms in the path If the '.' is something that should be included in the terms, and you + * want to use a different separator, then you can specify a '.' as the first character, and the separator as the + * second character, and then the path is the rest of the string For example "a.b.c" would parse into a path with + * terms "a", "b", "c" For example ".:a.b:c" would parse into a path with terms "a.b", "c" + * + * @return a PathAccess that can access a value in a nested structure + */ + public static SimpleValuePath parse(final String pathDefinition) { + final String pathDescription; + final String pathSeparator; + if (pathDefinition.length() > 1 && pathDefinition.charAt(0) == '.') { + pathDescription = pathDefinition.substring(2); + pathSeparator = pathDefinition.substring(1, 2); + } else { + pathDescription = pathDefinition; + pathSeparator = "."; + } + return new SimpleValuePath(Pattern.compile(pathSeparator, Pattern.LITERAL).split(pathDescription)); + } + + public Object extractDataFrom(final SinkRecord record) { + Object current = record.value(); + + for (final String term : terms) { + if (current == null) { + return null; + } + if (current instanceof Struct) { + final Struct struct = (Struct) current; + final Schema schema = struct.schema(); + final Field field = schema.field(term); + if (field == null) { + return null; + } + current = struct.get(field); + } else if (current instanceof Map) { + current = ((Map) current).get(term); + } else if (current instanceof List) { + try { + current = ((List) current).get(Integer.parseInt(term)); + } catch (NumberFormatException | IndexOutOfBoundsException e) { + return null; + } + } else { + return null; + } + } + return current; + } + + @Override + public String toString() { + return "Path[terms=" + Arrays.toString(terms) + "]"; + } +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/ClassValidator.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/ClassValidator.java new file mode 100644 index 000000000..24513b34b --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/ClassValidator.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config.validators; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +public class ClassValidator implements ConfigDef.Validator { + private final Class baseClass; + + public ClassValidator(final Class baseClass) { + this.baseClass = baseClass; + } + + @Override + public void ensureValid(final String name, final Object value) { + if (value != null && !baseClass.isAssignableFrom((Class) value)) { + throw new ConfigException(name, value, "must be a subclass of " + baseClass.getName()); + } + } +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/NonNegativeValidator.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/NonNegativeValidator.java new file mode 100644 index 000000000..0d1ac8f59 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/NonNegativeValidator.java @@ -0,0 +1,30 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config.validators; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +public class NonNegativeValidator implements ConfigDef.Validator { + @Override + public void ensureValid(final String name, final Object value) { + assert value instanceof Integer; + if ((Integer) value < 0) { + throw new ConfigException(name, value, "must be a non-negative integer number"); + } + } +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/TimestampSourceValidator.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/TimestampSourceValidator.java index b7535538e..f50255b30 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/TimestampSourceValidator.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/validators/TimestampSourceValidator.java @@ -26,7 +26,7 @@ public class TimestampSourceValidator implements ConfigDef.Validator { @Override public void ensureValid(final String name, final Object value) { try { - TimestampSource.Type.of(value.toString()); + new TimestampSource.Builder().configuration(value.toString()).build(); } catch (final Exception e) { // NOPMD AvoidCatchingGenericException throw new ConfigException(name, value, e.getMessage()); } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/CustomRecordGrouperBuilder.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/CustomRecordGrouperBuilder.java new file mode 100644 index 000000000..480488ca1 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/CustomRecordGrouperBuilder.java @@ -0,0 +1,30 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +import io.aiven.kafka.connect.common.config.Configurable; +import io.aiven.kafka.connect.common.config.TimestampSource; +import io.aiven.kafka.connect.common.templating.Template; + +public interface CustomRecordGrouperBuilder extends Configurable { + void setFilenameTemplate(Template filenameTemplate); + void setMaxRecordsPerFile(Integer maxRecordsPerFile); + void setTimestampSource(TimestampSource timestampSource); + void setSchemaBased(boolean schemaBased); + + RecordGrouper build(); +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java index 1e0e6c188..bd3c09c38 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouper.java @@ -41,7 +41,7 @@ public interface RecordGrouper { /** * Get all records associated with files, grouped by the file name. * - * @return map of records assotiated with files + * @return map of records associated with files */ Map> records(); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactory.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactory.java index a43ea0d29..4e8314233 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactory.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactory.java @@ -16,6 +16,7 @@ package io.aiven.kafka.connect.common.grouper; +import java.lang.reflect.InvocationTargetException; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; @@ -143,6 +144,26 @@ public static String resolveRecordGrouperType(final Template template) { @SuppressWarnings("PMD.CognitiveComplexity") public static RecordGrouper newRecordGrouper(final AivenCommonConfig config) { final Template fileNameTemplate = config.getFilenameTemplate(); + final boolean isSchemaBased = config.getFormatType() == FormatType.PARQUET + || config.getFormatType() == FormatType.AVRO; + if (config.getCustomRecordGrouperBuilder() != null) { + try { + final CustomRecordGrouperBuilder builder = config.getCustomRecordGrouperBuilder() + .getDeclaredConstructor() + .newInstance(); + + builder.configure(config); + builder.setMaxRecordsPerFile(config.getMaxRecordsPerFile()); + builder.setFilenameTemplate(fileNameTemplate); + builder.setSchemaBased(isSchemaBased); + builder.setTimestampSource(config.getFilenameTimestampSource()); + + return builder.build(); + } catch (final NoSuchMethodException | InstantiationException | IllegalAccessException + | InvocationTargetException e) { + throw new IllegalArgumentException("Failed to create custom record grouper", e); + } + } final String grType = resolveRecordGrouperType(fileNameTemplate); if (KEY_RECORD.equals(grType)) { return new KeyRecordGrouper(fileNameTemplate); @@ -151,13 +172,13 @@ public static RecordGrouper newRecordGrouper(final AivenCommonConfig config) { } else { final Integer maxRecordsPerFile = config.getMaxRecordsPerFile() == 0 ? null : config.getMaxRecordsPerFile(); if (TOPIC_PARTITION_KEY_RECORD.equals(grType)) { - return config.getFormatType() == FormatType.PARQUET || config.getFormatType() == FormatType.AVRO + return isSchemaBased ? new SchemaBasedTopicPartitionKeyRecordGrouper(fileNameTemplate, maxRecordsPerFile, config.getFilenameTimestampSource()) : new TopicPartitionKeyRecordGrouper(fileNameTemplate, maxRecordsPerFile, config.getFilenameTimestampSource()); } else { - return config.getFormatType() == FormatType.PARQUET || config.getFormatType() == FormatType.AVRO + return isSchemaBased ? new SchemaBasedTopicPartitionRecordGrouper(fileNameTemplate, maxRecordsPerFile, config.getFilenameTimestampSource()) : new TopicPartitionRecordGrouper(fileNameTemplate, maxRecordsPerFile, diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordStreamer.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordStreamer.java new file mode 100644 index 000000000..89d5986ba --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordStreamer.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.List; +import java.util.Map; + +/** + * The interface for classes that associates {@link SinkRecord}s with files by some criteria. + */ +public interface RecordStreamer{ + /** + * determine the logical grouping of the record + * + * @param record + * - record to group + */ + String getStream(SinkRecord record); + + /** + * determine the actual filename of the record + * + * @param record + * - record to drive the filename + */ + String getFilename(SinkRecord record); +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/config/AivenCommonConfigTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/config/AivenCommonConfigTest.java index 33bef0c6f..a57f9d616 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/config/AivenCommonConfigTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/config/AivenCommonConfigTest.java @@ -16,6 +16,7 @@ package io.aiven.kafka.connect.common.config; +import static io.aiven.kafka.connect.common.config.AivenCommonConfig.addFileConfigGroup; import static io.aiven.kafka.connect.common.config.AivenCommonConfig.addOutputFieldsFormatConfigGroup; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -32,14 +33,7 @@ class AivenCommonConfigTest { private ConfigDef getBaseConfigDefinition() { final ConfigDef definition = new ConfigDef(); addOutputFieldsFormatConfigGroup(definition, OutputFieldType.VALUE); - - definition.define(AivenCommonConfig.FILE_NAME_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, - ConfigDef.Importance.MEDIUM, "File name template"); - definition.define(AivenCommonConfig.FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING, - CompressionType.NONE.name, ConfigDef.Importance.MEDIUM, "File compression"); - definition.define(AivenCommonConfig.FILE_MAX_RECORDS, ConfigDef.Type.INT, 0, ConfigDef.Importance.MEDIUM, - "The maximum number of records to put in a single file. " + "Must be a non-negative integer number. " - + "0 is interpreted as \"unlimited\", which is the default."); + addFileConfigGroup(definition, "File", "Test", 1, CompressionType.NONE); return definition; } @@ -109,4 +103,18 @@ void invalidMaxRecordsForKeyBasedGrouper() { assertThat(config.getFilename()).isEqualTo("{{topic}}-{{partition}}-{{start_offset}}"); assertThat(config.getMaxRecordsPerFile()).isEqualTo(10); } + + @Test + void ensureAdditionalConfigurations() { + final TestConfig config = new TestConfig.Builder().withMinimalProperties() + .withProperty("a.custom.property", "true") + .withProperty("another.custom.property", "42") + .withProperty("undefined.property", "42") + .build(); + + assertThat(config.getBoolean("a.custom.property")).isEqualTo(true); + assertThat(config.getInt("another.custom.property")).isEqualTo(42); + assertThatThrownBy(() -> config.getString("undefined.property")).isInstanceOf(ConfigException.class) + .hasMessage("Unknown configuration 'undefined.property'"); + } } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/config/CustomConfig.java b/commons/src/test/java/io/aiven/kafka/connect/common/config/CustomConfig.java new file mode 100644 index 000000000..6d0d21171 --- /dev/null +++ b/commons/src/test/java/io/aiven/kafka/connect/common/config/CustomConfig.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config; + +import org.apache.kafka.common.config.ConfigDef; + +public class CustomConfig implements ExtraConfiguration { + @Override + public void configure(final ConfigDef configDef) { + configDef.define("a.custom.property", ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, + "Custom configuration for autoloading test"); + } +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/config/MoreCustomConfig.java b/commons/src/test/java/io/aiven/kafka/connect/common/config/MoreCustomConfig.java new file mode 100644 index 000000000..d6ac49552 --- /dev/null +++ b/commons/src/test/java/io/aiven/kafka/connect/common/config/MoreCustomConfig.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config; + +import org.apache.kafka.common.config.ConfigDef; + +public class MoreCustomConfig implements ExtraConfiguration { + @Override + public void configure(final ConfigDef configDef) { + configDef.define("another.custom.property", ConfigDef.Type.INT, 0, ConfigDef.Importance.LOW, + "Custom configuration for autoloading test"); + } +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/config/TestConfig.java b/commons/src/test/java/io/aiven/kafka/connect/common/config/TestConfig.java new file mode 100644 index 000000000..d483e9208 --- /dev/null +++ b/commons/src/test/java/io/aiven/kafka/connect/common/config/TestConfig.java @@ -0,0 +1,64 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; + +@SuppressWarnings("PMD.TestClassWithoutTestCases") +public final class TestConfig extends AivenCommonConfig { + private TestConfig(final Map originals) { + super(configDef(), originals); + validate(); + } + + private static ConfigDef configDef() { + final ConfigDef configDef = new ConfigDef(); + addFileConfigGroup(configDef, "File", "Test", 1, null); + addOutputFieldsFormatConfigGroup(configDef, OutputFieldType.VALUE); + AivenCommonConfig.addCommonConfig(configDef); + return configDef; + } + public final static class Builder { + private final Map props; + + public Builder() { + props = new HashMap<>(); + } + public Builder withMinimalProperties() { + return withProperty(FILE_COMPRESSION_TYPE_CONFIG, "none"); + } + public Builder withProperty(final String key, final String value) { + props.put(key, value); + return this; + } + public Builder withoutProperty(final String key) { + props.remove(key); + return this; + } + + public TestConfig build() { + return new TestConfig(props); + } + + } + + void validate() { + } +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractorTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractorTest.java new file mode 100644 index 000000000..bb6e8bb67 --- /dev/null +++ b/commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractorTest.java @@ -0,0 +1,64 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config.extractors; + +import java.util.stream.Stream; + +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.sink.SinkRecord; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class HeaderValueExtractorTest { + + static SinkRecord record1 = new SinkRecord("topic", 0, null, null, null, null, 0, 0L, TimestampType.CREATE_TIME, + new ConnectHeaders().add("h1", "value1", Schema.STRING_SCHEMA) + .add("h2", "v2", Schema.STRING_SCHEMA) + .add("b1", true, Schema.BOOLEAN_SCHEMA) + .add("b2", false, Schema.BOOLEAN_SCHEMA) + .add("i1", null, Schema.OPTIONAL_INT32_SCHEMA) + .add("i2", 17, Schema.OPTIONAL_INT32_SCHEMA) + .add("i3", 99, Schema.INT32_SCHEMA) + .add("i1", null, Schema.OPTIONAL_INT64_SCHEMA) + .add("l2", 17L, Schema.OPTIONAL_INT64_SCHEMA) + .add("l3", 99L, Schema.INT64_SCHEMA) + .add("dup", "one", Schema.STRING_SCHEMA) + .add("dup", "two", Schema.STRING_SCHEMA)); + + public static Stream testData() { + return Stream.of(Arguments.of(record1, "h1", "value1"), Arguments.of(record1, "h2", "v2"), + Arguments.of(record1, "b1", true), Arguments.of(record1, "b2", false), + Arguments.of(record1, "i1", null), Arguments.of(record1, "i2", 17), Arguments.of(record1, "i3", 99), + Arguments.of(record1, "i1", null), Arguments.of(record1, "l2", 17L), Arguments.of(record1, "l3", 99L), + Arguments.of(record1, "dup", "two"), Arguments.of(record1, "xxxxx", null)); + + } + + @ParameterizedTest + @MethodSource("testData") + void test(SinkRecord record, String headerKey, Object expected) { + var headerValueExtractor = new HeaderValueExtractor(headerKey); + assertEquals(expected, headerValueExtractor.extractDataFrom(record)); + } + +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePathTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePathTest.java new file mode 100644 index 000000000..66d7e30ab --- /dev/null +++ b/commons/src/test/java/io/aiven/kafka/connect/common/config/extractors/SimpleValuePathTest.java @@ -0,0 +1,155 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config.extractors; + +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.util.stream.Stream; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonDeserializer; +import org.apache.kafka.connect.sink.SinkRecord; + +import com.fasterxml.jackson.databind.JsonNode; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; + + +class SimpleValuePathTest { + + public static Stream ParseDataProvider() { + return Stream.of(Arguments.of("Path[terms=[a, b, c]]", "a.b.c"), Arguments.of("Path[terms=[a:b:c]]", "a:b:c"), + Arguments.of("Path[terms=[.b.c]]", ".a.b.c"), Arguments.of("Path[terms=[a.b, c]]", ".:a.b:c"), + // with some regex special characters + Arguments.of("Path[terms=[\\a, b, c]]", "\\a.b.c"), Arguments.of("Path[terms=[a.b.c]]", ".\\a.b.c"), + Arguments.of("Path[terms=[a, b, c]]", ".\\a\\b\\c"), + + Arguments.of("Path[terms=[ [a, b, c]]", " [a.b.c"), Arguments.of("Path[terms=[[a.b.c]]", ". [a.b.c"), + Arguments.of("Path[terms=[a, b, c]]", ".[a[b[c"), + + Arguments.of("Path[terms=[]]", "."), Arguments.of("Path[terms=[]]", ""), + Arguments.of("Path[terms=[]]", ".."), Arguments.of("Path[terms=[a]]", "..a")); + } + + @ParameterizedTest + @MethodSource("ParseDataProvider") + void parse(String expected, String toParse) { + assertEquals(expected, SimpleValuePath.parse(toParse).toString()); + } + + static Schema flatSchema = SchemaBuilder.struct() + .field("s1", Schema.OPTIONAL_STRING_SCHEMA) + .field("i1", Schema.OPTIONAL_INT32_SCHEMA) + .build(); + static Schema mapSchema = SchemaBuilder.struct() + .field("m1", + SchemaBuilder + .map(Schema.STRING_SCHEMA, + SchemaBuilder.struct() + .field("s2", Schema.OPTIONAL_STRING_SCHEMA) + .field("i2", Schema.OPTIONAL_INT32_SCHEMA) + .optional() + .build()) + .optional() + .build()) + .field("m2", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).optional().build()) + .build(); + static Schema arraySchema = SchemaBuilder.struct() + .field("a1", SchemaBuilder.array(Schema.FLOAT32_SCHEMA).optional().build()) + .field("a2", SchemaBuilder.array(Schema.FLOAT64_SCHEMA).optional().build()) + .field("a3", SchemaBuilder.array(Schema.STRING_SCHEMA).optional().build()) + .build(); + + static SinkRecord toRecord(Schema schema, String json) throws Exception { + try (JsonDeserializer ds = new JsonDeserializer()) { + JsonNode jsonValue = ds.deserialize("xx", json.replace('\'', '"').getBytes(StandardCharsets.UTF_8)); + Method m = JsonConverter.class.getDeclaredMethod("convertToConnect", Schema.class, JsonNode.class); + m.setAccessible(true); + Object value = m.invoke(null, schema, jsonValue); + return new SinkRecord("topic", 0, null, null, schema, value, 0); + } + } + + static Stream extractDataFromDataProvider() throws Exception { + return Stream.of(Arguments.of(toRecord(flatSchema, "{'s1': 'hi', 'i1': 42}"), "s1", "hi"), + Arguments.of(toRecord(flatSchema, "{'s1': 'hi', 'i1': 42}"), "i1", 42), + Arguments.of(toRecord(flatSchema, "{'s1': 'hi', 'i1': 42}"), "xx", null), + + Arguments.of(toRecord(mapSchema, + "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), + "m1.k1.i2", 42), + Arguments.of(toRecord(mapSchema, + "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), + "m1.k1.s2", "Hi"), + Arguments.of(toRecord(mapSchema, + "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), + "m1.k1.xx", null), + Arguments.of(toRecord(mapSchema, + "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), + "mx.k1.i2", null), + Arguments.of(toRecord(mapSchema, + "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), + "m1.k2.s2", "Bi"), + Arguments.of(toRecord(mapSchema, + "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), + "m2.two", 2), + Arguments.of(toRecord(mapSchema, + "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'one':1,'two':2}}"), + "m1.one.xx", null), + Arguments.of(toRecord(mapSchema, + "{'m1': {'k1': {'i2': 42, 's2': 'Hi'},'k2': {'i2': 99, 's2': 'Bi'}},'m2':{'with.dot':1}}"), + ".:m2:with.dot", 1), + + Arguments.of( + toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), + "a1.0", 1F), + Arguments.of( + toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), + "a1.3", 9.9f), + Arguments.of( + toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), + "a2.0", 9.0), + Arguments.of( + toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), + "a2.1", -1.0), + Arguments.of( + toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), + "a3.0", "zero"), + Arguments.of( + toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), + "a3.-1", null), + Arguments.of( + toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), + "a3.10", null), + Arguments.of( + toRecord(arraySchema, "{'a1': [1,2,17.0,9.9], 'a2':[9,-1,3.14], 'a3':['zero','one','two']}"), + "a3.2", "two")); + } + + @ParameterizedTest + @MethodSource("extractDataFromDataProvider") + void extractDataFrom(SinkRecord record, String path, Object expected) { + final SimpleValuePath underTest = SimpleValuePath.parse(path); + assertEquals(expected, underTest.extractDataFrom(record)); + + } +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactoryTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactoryTest.java index b2cf68233..8277679dc 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactoryTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/RecordGrouperFactoryTest.java @@ -18,9 +18,18 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.stream.Stream; + +import io.aiven.kafka.connect.common.config.AivenCommonConfig; +import io.aiven.kafka.connect.common.config.TestConfig; +import io.aiven.kafka.connect.common.config.TimestampSource; +import io.aiven.kafka.connect.common.grouper.TestRecordGrouperBuilders.TestRecordGrouper; import io.aiven.kafka.connect.common.templating.Template; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; final class RecordGrouperFactoryTest { @@ -51,4 +60,37 @@ void keyAndTopicPartition() { final String grType = RecordGrouperFactory.resolveRecordGrouperType(filenameTemplate); assertThat(RecordGrouperFactory.KEY_TOPIC_PARTITION_RECORD).isEqualTo(grType); } + + public static Stream customGrouperTestData() { + return Stream.of( + Arguments.of(true, "avro", "{{topic}}/{{partition}}/{{start_offset}}", 100, TimestampSource.Type.EVENT), + Arguments.of(true, "parquet", "{{topic}}/{{partition}}/{{start_offset}}", 100, + TimestampSource.Type.EVENT), + Arguments.of(false, "csv", "{{key}}", 1, TimestampSource.Type.WALLCLOCK), + Arguments.of(false, "csv", "{{foo}} {{bat}}", 1, TimestampSource.Type.WALLCLOCK)); + } + @ParameterizedTest + @MethodSource("customGrouperTestData") + void customGrouper(final boolean schemaBased, final String format, final String template, + final int maxRecordsPerFile, final TimestampSource.Type timestampSource) { + + final AivenCommonConfig configuration = new TestConfig.Builder().withMinimalProperties() + .withProperty(AivenCommonConfig.CUSTOM_RECORD_GROUPER_BUILDER, + TestRecordGrouperBuilders.TestRecordGrouperBuilder.class.getName()) + .withProperty(AivenCommonConfig.FORMAT_OUTPUT_TYPE_CONFIG, format) + .withProperty(AivenCommonConfig.FILE_NAME_TEMPLATE_CONFIG, template) + .withProperty(AivenCommonConfig.FILE_MAX_RECORDS, String.valueOf(maxRecordsPerFile)) + .withProperty(AivenCommonConfig.FILE_NAME_TIMESTAMP_SOURCE, timestampSource.name()) + .build(); + + final var grouper = RecordGrouperFactory.newRecordGrouper(configuration); + assertThat(grouper).isInstanceOf(TestRecordGrouper.class); + final TestRecordGrouper testGrouper = (TestRecordGrouper) grouper; + + assertThat(testGrouper.filenameTemplate.originalTemplate()).isEqualTo(template); + assertThat(testGrouper.maxRecordsPerFile).isEqualTo(maxRecordsPerFile); + assertThat(testGrouper.timestampSource.type()).isEqualTo(timestampSource); + assertThat(testGrouper.schemaBased).isEqualTo(schemaBased); + assertThat(testGrouper.config).isSameAs(configuration); + } } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java index 4fc2abf2b..3fbfc6cef 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionKeyRecordGrouperTest.java @@ -64,7 +64,7 @@ final class SchemaBasedTopicPartitionKeyRecordGrouperTest { static final SinkRecord KRT1P1R3 = new SinkRecord("topic1", 0, SchemaBuilder.string().optional().version(2).build(), "some_key", SchemaBuilder.string().optional().version(2).build(), null, 1003); - static final TimestampSource DEFAULT_TS_SOURCE = TimestampSource.of(TimestampSource.Type.WALLCLOCK); + static final TimestampSource DEFAULT_TS_SOURCE = TestTimestampSource.of(TimestampSource.Type.WALLCLOCK); @Test void rotateOnKeySchemaChanged() { diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionRecordGrouperTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionRecordGrouperTest.java index 0bf4c4449..b840b17cd 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionRecordGrouperTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/SchemaBasedTopicPartitionRecordGrouperTest.java @@ -64,7 +64,7 @@ final class SchemaBasedTopicPartitionRecordGrouperTest { static final SinkRecord KRT1P1R3 = new SinkRecord("topic1", 0, SchemaBuilder.string().optional().version(2).build(), "some_key", SchemaBuilder.string().optional().version(2).build(), null, 1003); - static final TimestampSource DEFAULT_TS_SOURCE = TimestampSource.of(TimestampSource.Type.WALLCLOCK); + static final TimestampSource DEFAULT_TS_SOURCE = TestTimestampSource.of(TimestampSource.Type.WALLCLOCK); @Test void rotateOnKeySchemaChanged() { diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TestRecordGrouperBuilders.java b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TestRecordGrouperBuilders.java new file mode 100644 index 000000000..6bbf4cbf2 --- /dev/null +++ b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TestRecordGrouperBuilders.java @@ -0,0 +1,103 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +import java.util.List; +import java.util.Map; + +import org.apache.kafka.connect.sink.SinkRecord; + +import io.aiven.kafka.connect.common.config.AivenCommonConfig; +import io.aiven.kafka.connect.common.config.TimestampSource; +import io.aiven.kafka.connect.common.templating.Template; + +@SuppressWarnings("PMD.TestClassWithoutTestCases") +public class TestRecordGrouperBuilders { + @SuppressWarnings("PMD.TestClassWithoutTestCases") + public static class TestRecordGrouperBuilder implements CustomRecordGrouperBuilder { + private Template filenameTemplate; + private Integer maxRecordsPerFile; + private TimestampSource timestampSource; + private boolean schemaBased; + private AivenCommonConfig config; + + @Override + public void setFilenameTemplate(final Template filenameTemplate) { + this.filenameTemplate = filenameTemplate; + } + + @Override + public void setMaxRecordsPerFile(final Integer maxRecordsPerFile) { + this.maxRecordsPerFile = maxRecordsPerFile; + } + + @Override + public void setTimestampSource(final TimestampSource timestampSource) { + this.timestampSource = timestampSource; + } + + @Override + public void setSchemaBased(final boolean schemaBased) { + this.schemaBased = schemaBased; + } + + @Override + public void configure(final AivenCommonConfig config) { + this.config = config; + + } + + @Override + public RecordGrouper build() { + return new TestRecordGrouper(config, filenameTemplate, maxRecordsPerFile, timestampSource, schemaBased); + } + + } + + public static class TestRecordGrouper implements RecordGrouper { + public final AivenCommonConfig config; + public final Template filenameTemplate; + public final Integer maxRecordsPerFile; + public final TimestampSource timestampSource; + public final boolean schemaBased; + + public TestRecordGrouper(final AivenCommonConfig config, final Template filenameTemplate, + final Integer maxRecordsPerFile, final TimestampSource timestampSource, final boolean schemaBased) { + this.config = config; + this.filenameTemplate = filenameTemplate; + this.maxRecordsPerFile = maxRecordsPerFile; + this.timestampSource = timestampSource; + this.schemaBased = schemaBased; + } + + @Override + public void put(final SinkRecord record) { + + } + + @Override + public void clear() { + + } + + @Override + public Map> records() { + return Map.of(); + } + } + +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TestTimestampSource.java b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TestTimestampSource.java new file mode 100644 index 000000000..d5a9f90df --- /dev/null +++ b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TestTimestampSource.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +import java.time.ZoneOffset; + +import io.aiven.kafka.connect.common.config.TimestampSource; + +public final class TestTimestampSource { + private TestTimestampSource() { + } + @SuppressWarnings("PMD.ShortMethodName") + public static TimestampSource of(final TimestampSource.Type type) { + return of(type, ZoneOffset.UTC); + } + + @SuppressWarnings("PMD.ShortMethodName") + public static TimestampSource of(final TimestampSource.Type type, final ZoneOffset timeZone) { + return new TimestampSource.Builder().configuration(type.toString()).zoneId(timeZone).build(); + } +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java index a4462ba11..c98c6f74f 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionKeyRecordGrouperTest.java @@ -84,7 +84,7 @@ final class TopicPartitionKeyRecordGrouperTest { private static final SinkRecord T2P1R3 = new SinkRecord("topic2", 1, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 2003, 1_635_547_906_000L, TimestampType.CREATE_TIME); - private static final TimestampSource DEFAULT_TS_SOURCE = TimestampSource.of(TimestampSource.Type.WALLCLOCK); + private static final TimestampSource DEFAULT_TS_SOURCE = TestTimestampSource.of(TimestampSource.Type.WALLCLOCK); @Test void withoutNecessaryParameters() { @@ -240,13 +240,13 @@ void setZeroPaddingForKafkaPartition() { void addTimeUnitsToTheFileNameUsingWallclockTimestampSource() { final Template filenameTemplate = Template.of("{{topic}}-" + "{{partition}}-" + "{{key}}-" + "{{start_offset}}-" + "{{timestamp:unit=yyyy}}" + "{{timestamp:unit=MM}}" + "{{timestamp:unit=dd}}"); - final ZonedDateTime timestamp = TimestampSource.of(TimestampSource.Type.WALLCLOCK).time(null); + final ZonedDateTime timestamp = TestTimestampSource.of(TimestampSource.Type.WALLCLOCK).time(null); final String expectedTs = timestamp.format(DateTimeFormatter.ofPattern("yyyy")) + timestamp.format(DateTimeFormatter.ofPattern("MM")) + timestamp.format(DateTimeFormatter.ofPattern("dd")); final TopicPartitionKeyRecordGrouper grouper = new TopicPartitionKeyRecordGrouper(filenameTemplate, null, - TimestampSource.of(TimestampSource.Type.WALLCLOCK)); + TestTimestampSource.of(TimestampSource.Type.WALLCLOCK)); grouper.put(T1P1R0); grouper.put(T1P1R1); @@ -401,9 +401,9 @@ void rotateKeysYearly() { void rotateDailyWithEventTimestampSource() { final Template filenameTemplate = Template.of("{{topic}}-" + "{{partition}}-" + "{{key}}-" + "{{start_offset}}-" + "{{timestamp:unit=yyyy}}" + "{{timestamp:unit=MM}}" + "{{timestamp:unit=dd}}"); - final ZonedDateTime timestamp0 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R0); - final ZonedDateTime timestamp1 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R1); - final ZonedDateTime timestamp2 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R2); + final ZonedDateTime timestamp0 = TestTimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R0); + final ZonedDateTime timestamp1 = TestTimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R1); + final ZonedDateTime timestamp2 = TestTimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R2); final String expectedTs0 = timestamp0.format(DateTimeFormatter.ofPattern("yyyy")) + timestamp0.format(DateTimeFormatter.ofPattern("MM")) @@ -416,7 +416,7 @@ void rotateDailyWithEventTimestampSource() { + timestamp2.format(DateTimeFormatter.ofPattern("dd")); final TopicPartitionKeyRecordGrouper grouper = new TopicPartitionKeyRecordGrouper(filenameTemplate, null, - TimestampSource.of(TimestampSource.Type.EVENT)); + TestTimestampSource.of(TimestampSource.Type.EVENT)); grouper.put(T2P1R0); grouper.put(T2P1R1); diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouperTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouperTest.java index 466d222ef..ade89eba6 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouperTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/grouper/TopicPartitionRecordGrouperTest.java @@ -84,7 +84,7 @@ final class TopicPartitionRecordGrouperTest { private static final SinkRecord T2P1R3 = new SinkRecord("topic2", 1, Schema.OPTIONAL_STRING_SCHEMA, null, null, null, 2003, 1_635_547_906_000L, TimestampType.CREATE_TIME); - private static final TimestampSource DEFAULT_TS_SOURCE = TimestampSource.of(TimestampSource.Type.WALLCLOCK); + private static final TimestampSource DEFAULT_TS_SOURCE = TestTimestampSource.of(TimestampSource.Type.WALLCLOCK); @Test void withoutNecessaryParameters() { @@ -233,13 +233,13 @@ void setZeroPaddingForKafkaPartition() { void addTimeUnitsToTheFileNameUsingWallclockTimestampSource() { final Template filenameTemplate = Template.of("{{topic}}-" + "{{partition}}-" + "{{start_offset}}-" + "{{timestamp:unit=yyyy}}" + "{{timestamp:unit=MM}}" + "{{timestamp:unit=dd}}"); - final ZonedDateTime timestamp = TimestampSource.of(TimestampSource.Type.WALLCLOCK).time(null); + final ZonedDateTime timestamp = TestTimestampSource.of(TimestampSource.Type.WALLCLOCK).time(null); final String expectedTs = timestamp.format(DateTimeFormatter.ofPattern("yyyy")) + timestamp.format(DateTimeFormatter.ofPattern("MM")) + timestamp.format(DateTimeFormatter.ofPattern("dd")); final TopicPartitionRecordGrouper grouper = new TopicPartitionRecordGrouper(filenameTemplate, null, - TimestampSource.of(TimestampSource.Type.WALLCLOCK)); + TestTimestampSource.of(TimestampSource.Type.WALLCLOCK)); grouper.put(T1P1R0); grouper.put(T1P1R1); @@ -391,9 +391,9 @@ void rotateKeysYearly() { void rotateDailyWithEventTimestampSource() { final Template filenameTemplate = Template.of("{{topic}}-" + "{{partition}}-" + "{{start_offset}}-" + "{{timestamp:unit=yyyy}}" + "{{timestamp:unit=MM}}" + "{{timestamp:unit=dd}}"); - final ZonedDateTime timestamp0 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R0); - final ZonedDateTime timestamp1 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R1); - final ZonedDateTime timestamp2 = TimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R2); + final ZonedDateTime timestamp0 = TestTimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R0); + final ZonedDateTime timestamp1 = TestTimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R1); + final ZonedDateTime timestamp2 = TestTimestampSource.of(TimestampSource.Type.EVENT).time(T2P1R2); final String expectedTs0 = timestamp0.format(DateTimeFormatter.ofPattern("yyyy")) + timestamp0.format(DateTimeFormatter.ofPattern("MM")) @@ -406,7 +406,7 @@ void rotateDailyWithEventTimestampSource() { + timestamp2.format(DateTimeFormatter.ofPattern("dd")); final TopicPartitionRecordGrouper grouper = new TopicPartitionRecordGrouper(filenameTemplate, null, - TimestampSource.of(TimestampSource.Type.EVENT)); + TestTimestampSource.of(TimestampSource.Type.EVENT)); grouper.put(T2P1R0); grouper.put(T2P1R1); diff --git a/commons/src/test/resources/META-INF/services/io.aiven.kafka.connect.common.config.ExtraConfiguration b/commons/src/test/resources/META-INF/services/io.aiven.kafka.connect.common.config.ExtraConfiguration new file mode 100644 index 000000000..dcdef7ccc --- /dev/null +++ b/commons/src/test/resources/META-INF/services/io.aiven.kafka.connect.common.config.ExtraConfiguration @@ -0,0 +1,2 @@ +io.aiven.kafka.connect.common.config.CustomConfig +io.aiven.kafka.connect.common.config.MoreCustomConfig diff --git a/gcs-sink-connector/README.md b/gcs-sink-connector/README.md index 778b99920..04fd606ba 100644 --- a/gcs-sink-connector/README.md +++ b/gcs-sink-connector/README.md @@ -147,6 +147,11 @@ flush In this case, there will be two files `k0` (containing value `4`) and `k1` (containing value `3`). +#### Custom grouping + +For more complex case a custom grouping may be defined by defining `file.record.grouper.builder` +where you can supply your own implementation of a builder for a record grouping. + ##### The string representation of a key The connector in this mode uses the following algorithm to create the diff --git a/gcs-sink-connector/build.gradle.kts b/gcs-sink-connector/build.gradle.kts index 2c5744590..444f979f5 100644 --- a/gcs-sink-connector/build.gradle.kts +++ b/gcs-sink-connector/build.gradle.kts @@ -233,26 +233,26 @@ publishing { } } -signing { - sign(publishing.publications["publishMavenJavaArtifact"]) - useGpgCmd() - // Some issue in the plugin: - // GPG outputs already armored signatures. The plugin also does armoring for `asc` files. - // This results in double armored signatures, i.e. garbage. - // Override the signature type provider to use unarmored output for `asc` files, which works well - // with GPG. - class ASCSignatureProvider : AbstractSignatureTypeProvider() { - val binary = - object : BinarySignatureType() { - override fun getExtension(): String { - return "asc" - } - } - - init { - register(binary) - setDefaultType(binary.extension) - } - } - signatureTypes = ASCSignatureProvider() -} +//signing { +// sign(publishing.publications["publishMavenJavaArtifact"]) +// useGpgCmd() +// // Some issue in the plugin: +// // GPG outputs already armored signatures. The plugin also does armoring for `asc` files. +// // This results in double armored signatures, i.e. garbage. +// // Override the signature type provider to use unarmored output for `asc` files, which works well +// // with GPG. +// class ASCSignatureProvider : AbstractSignatureTypeProvider() { +// val binary = +// object : BinarySignatureType() { +// override fun getExtension(): String { +// return "asc" +// } +// } +// +// init { +// register(binary) +// setDefaultType(binary.extension) +// } +// } +// signatureTypes = ASCSignatureProvider() +//} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkConfig.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkConfig.java new file mode 100644 index 000000000..4d4999de8 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkConfig.java @@ -0,0 +1,43 @@ +package io.aiven.kafka.connect.gcs; + +import io.aiven.kafka.connect.common.config.validators.ClassValidator; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; + +import java.util.Map; + +public class AsyncGcsSinkConfig extends GcsSinkConfig{ + + private static final String GROUP_ASYNC = "Async"; + private static final String ASYNC_MAX_RECORD_AGE = "async.file.max.record.age.ms"; + private static final String ASYNC_MAX_OPEN_FILES = "async.file.max.open.files"; + private static final String ASYNC_TASK_CLASS = "async.task.class"; + + public AsyncGcsSinkConfig(Map properties) { + super(configDef(), properties); + } + + public static ConfigDef configDef() { + final ConfigDef configDef = GcsSinkConfig.configDef(); + addAsyncConfig(configDef); + return configDef; + } + private static void addAsyncConfig(final ConfigDef configDef) { + int groupCounter = 0; + configDef.define(ASYNC_MAX_RECORD_AGE, ConfigDef.Type.INT, 60000, ConfigDef.Importance.LOW, + "write files asynchronously", GROUP_ASYNC, groupCounter++, ConfigDef.Width.NONE, ASYNC_MAX_RECORD_AGE); + configDef.define(ASYNC_MAX_OPEN_FILES, ConfigDef.Type.INT, 100, ConfigDef.Importance.LOW, + "write files asynchronously", GROUP_ASYNC, groupCounter++, ConfigDef.Width.NONE, ASYNC_MAX_OPEN_FILES); + configDef.define(ASYNC_TASK_CLASS, ConfigDef.Type.CLASS, AsyncGcsSinkTask.class, new ClassValidator(AsyncGcsSinkTask.class), + ConfigDef.Importance.LOW,"the task class", GROUP_ASYNC, groupCounter++, ConfigDef.Width.NONE, ASYNC_TASK_CLASS); + } + + + public int getAsyncMaxRecordAgeMs() { + return getInt(ASYNC_MAX_RECORD_AGE); + } + + public Class getTaskClass() { + return getClass(ASYNC_MAX_RECORD_AGE).asSubclass(AsyncGcsSinkTask.class); + } +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkConnector.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkConnector.java new file mode 100644 index 000000000..555375b89 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkConnector.java @@ -0,0 +1,29 @@ +package io.aiven.kafka.connect.gcs; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; + +import java.util.Map; + +public class AsyncGcsSinkConnector extends GcsSinkConnector { + private AsyncGcsSinkConfig config; + @Override + public Class taskClass() { + return config.getTaskClass(); + } + + protected void setConfig(AsyncGcsSinkConfig config) { + this.config = config ; + super.setConfig(config); + } + + @Override + protected void parseConfig(Map props) { + setConfig(new AsyncGcsSinkConfig(props)); + } + + @Override + public ConfigDef config() { + return AsyncGcsSinkConfig.configDef(); + } +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkTask.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkTask.java new file mode 100644 index 000000000..a3f3a2b5c --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkTask.java @@ -0,0 +1,282 @@ +package io.aiven.kafka.connect.gcs; + +import com.google.cloud.storage.Storage; +import com.google.common.collect.Streams; +import io.aiven.kafka.connect.common.grouper.RecordStreamer; +import io.aiven.kafka.connect.gcs.overload.OverloadHandler; +import io.aiven.kafka.connect.gcs.overload.OverloadHandlerActions; +import io.aiven.kafka.connect.gcs.overload.PauseOrFlushOverloadHandler; +import io.aiven.kafka.connect.gcs.writer.LateWriteStreamWriter; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +public class AsyncGcsSinkTask extends GcsSinkTask { + private static final Logger LOG = LoggerFactory.getLogger(AsyncGcsSinkTask.class); + final static ForkJoinPool exec = ForkJoinPool.commonPool(); + final static ScheduledExecutorService timed = Executors.newSingleThreadScheduledExecutor(); + + private final ConcurrentMap openStreams = new ConcurrentHashMap<>(); + private final Set finishing = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final AtomicReference fatalError = new AtomicReference<>(); + private final Instrumentation instrumentation = new Instrumentation(); + + //config + private AsyncGcsSinkConfig config; + private int maxRecordsPerFile; + private int maxAgeRecordsMs; + private RecordStreamer recordStreamer; + private OverloadHandler overloadHandler; + + // required by Connect + public AsyncGcsSinkTask() { + super(); + } + + // for testing + public AsyncGcsSinkTask(final Map props, final Storage storage) { + super(props, storage); + } + + protected void setConfig(AsyncGcsSinkConfig config) { + this.config = config; + super.setConfig(config); + } + + @Override + protected void parseConfig(Map props) { + setConfig(new AsyncGcsSinkConfig(props)); + } + + @Override + public void start(final Map props) { + super.start(props); + this.maxRecordsPerFile = config.getMaxRecordsPerFile(); + this.maxAgeRecordsMs = config.getAsyncMaxRecordAgeMs(); + if (recordGrouper instanceof RecordStreamer) { + recordStreamer = (RecordStreamer) recordGrouper; + } else { + throw new ConnectException("RecordGrouper must implement RecordStreamer to use the async sink task"); + } + //TODO drive this through config + this.overloadHandler = new PauseOrFlushOverloadHandler(new OverloadActions(), instrumentation); + } + + @Override + public void put(Collection records) { + checkForErrors(); + Objects.requireNonNull(records, "records cannot be null"); + + LOG.debug("Processing {} records", records.size()); + + for (final SinkRecord record : records) { + var groupId = recordStreamer.getStream(record); + openStreams.compute(groupId, (group, stream) -> { + if (stream == null) { + // set the value to the new stream, with first element set + return createRecordStream(group, record); + } else { + stream.addRecord(record); + if (stream.recordCount() >= maxRecordsPerFile) { + startAsyncWrite(stream); + //and remove the entry from the map + return null; + } else { + //keep the entry in the map + return stream; + } + } + }); + } + instrumentation.addedRecords(records.size()); + overloadHandler.addedRecords(records.size()); + } + + private void checkForErrors() { + ConnectException error = fatalError.get(); + if (error != null) { + throw error; + } + } + public void fatalError(ConnectException error) { + if (fatalError.compareAndSet(null, error)){ + LOG.error("Fatal error", error); + } else { + LOG.error("Another fatal error - which is suppressed as another fatal error has already been reported", error); + } + throw error; + } + + private AsyncRecordStreamHandler createRecordStream(String group, SinkRecord record) { + return new AsyncRecordStreamHandler(this, group, record, + //TODO drive the early/late write through config + new LateWriteStreamWriter(writeHelper(), recordStreamer.getFilename(record))); + } + + + void nonFullStreamTimeout(AsyncRecordStreamHandler streamTimedOut) { + openStreams.compute(streamTimedOut.groupId(), (group, stream) -> { + if (stream != streamTimedOut) { + // if stream is null, streamTimedOut became full and was removed + // if stream is a different object, streamTimedOut became full and then another record was added to the same groupId + // either way streamTimedOut should have been started + if (!streamTimedOut.finishing()) { + fatalError(new ConnectException("Stream not started, but not in - program error")); + } + return stream; + } else { + //start the async write, and remove it from the map + startAsyncWrite(streamTimedOut); + return null; + } + }); + } + + /** + * Start the async write for the stream. Guaranteed to not be called twice for the same stream + * @implNote guaranteed not to be called concurrently, only called via a + * {@link ConcurrentHashMap#compute(Object, BiFunction) compute} method + * of a {@link ConcurrentHashMap}, which enforces memory barriers. + */ + private void startAsyncWrite(AsyncRecordStreamHandler stream) { + finishing.add(stream); + instrumentation.startFlush(stream); + overloadHandler.startFlush(stream); + stream.finishWrite(exec) + .whenComplete((result, error) -> { + if (error != null) { + fatalError(new ConnectException("Error writing records", error)); + } + //fatalError always throws, so this is success handling + finishing.remove(stream); + instrumentation.finishedFlush(stream, result); + overloadHandler.finishedFlush(stream, result); + }); + } + + + @Override + public void flush(Map currentOffsets) { + checkForErrors(); + commitPending.set(false); + //we need to hold the 'lock' on the value, for startAsyncWrite to work + openStreams.forEach((k,v) -> openStreams.compute(k, (group, stream) -> { + startAsyncWrite(stream); + return null; + })); + + CompletableFuture[] list = finishing.stream().map(AsyncRecordStreamHandler::asyncFinish).toArray(CompletableFuture[]::new); + try { + CompletableFuture.allOf(list).get(); + } catch (InterruptedException | ExecutionException e) { + throw new ConnectException(e); + } + finishing.clear(); + + } + + @Override + public Map preCommit(Map currentOffsets) { + checkForErrors(); + // we need to check the openStreams and the completing + // in that order to ensure that we don't miss anything + //so check the openStream, and then the finishing, as they may move asynchronously from one to the other! + + final Map earliestStillPending = new HashMap<>(); + Streams.concat(openStreams.values().stream(), + finishing.stream()) + .forEach(pendingStreamHandler -> + earliestStillPending.compute(pendingStreamHandler.topicPartition(), + (ignored, earliestPendingOffset) -> { + if (earliestPendingOffset == null || pendingStreamHandler.firstOffset() < earliestPendingOffset) { + return (int) pendingStreamHandler.firstOffset(); + } else { + return earliestPendingOffset; + } + }) + ); + + return currentOffsets + .entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + suppliedEntry -> { + var inTransit = earliestStillPending.get(suppliedEntry.getKey()); + if (inTransit == null) { + return suppliedEntry.getValue(); + } else { + return new OffsetAndMetadata(inTransit, suppliedEntry.getValue().metadata()); + } + })); + } + + + private final AtomicBoolean commitPending = new AtomicBoolean(false); + public void requestCommitNow() { + context.requestCommit(); + } + public void requestCommitSoon() { + if (commitPending.compareAndSet(false, true)) { + //TODO config + timed.schedule(this::requestCommitNow, 100, TimeUnit.MILLISECONDS); + } + } + + public RecordStreamer recordStreamer() { + return recordStreamer; + } + + public long maxAgeRecordsMs() { + return maxAgeRecordsMs; + } + + private class OverloadActions implements OverloadHandlerActions { + @Override + public List snapshotOpenStreams() { + return openStreams.values().stream().map(StreamSnapshotMetadata::new).collect(Collectors.toList()); + } + + @Override + public void forceStartAsyncWrite(StreamSnapshotMetadata streamSnapshotMetadata) { + openStreams.compute(streamSnapshotMetadata.groupId, (group, stream) -> { + //we dont really care if the stream is null. that means that the openStreams has been updated + //and its already enqueued for writing, e.g. because a timer has expired + if (stream != null) { + startAsyncWrite(stream); + } + return null; + }); + } + } + //TODO should be a record + public static class StreamSnapshotMetadata { + public final long createTime; + public final int recordCount; + public final long firstOffset; + public final String groupId; + public final TopicPartition file; + public final boolean finishing; + + public StreamSnapshotMetadata(AsyncRecordStreamHandler stream) { + this.createTime = stream.createTime(); + this.recordCount = stream.recordCount(); + this.firstOffset = stream.firstOffset(); + this.groupId = stream.groupId(); + this.file = stream.topicPartition(); + this.finishing = stream.finishing(); + } + + } +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncRecordStreamHandler.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncRecordStreamHandler.java new file mode 100644 index 000000000..7090f0b90 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncRecordStreamHandler.java @@ -0,0 +1,94 @@ +package io.aiven.kafka.connect.gcs; + +import io.aiven.kafka.connect.gcs.writer.StreamWriter; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public final class AsyncRecordStreamHandler { + + private final AsyncGcsSinkTask asyncGcsSinkTask; + private final String file; + private final String groupId; + private final CompletableFuture asyncFinish; + private final Future timeout; + private final long firstOffset; + private final StreamWriter streamWriter; + private final TopicPartition topicPartition; + private final long createTime; + + private boolean finishing = false; + private int recordCount = 0; + + public AsyncRecordStreamHandler(AsyncGcsSinkTask asyncGcsSinkTask, final String groupId, + SinkRecord firstElement, StreamWriter streamWriter) { + this.asyncGcsSinkTask = asyncGcsSinkTask; + this.groupId = groupId; + this.topicPartition = new TopicPartition(firstElement.topic(), firstElement.kafkaPartition()); + this.firstOffset = firstElement.kafkaOffset(); + this.streamWriter = streamWriter; + this.file = asyncGcsSinkTask.recordStreamer().getFilename(firstElement); + this.timeout = AsyncGcsSinkTask.timed.schedule(() -> asyncGcsSinkTask.nonFullStreamTimeout(this), asyncGcsSinkTask.maxAgeRecordsMs(), TimeUnit.MILLISECONDS); + this.asyncFinish = new CompletableFuture<>(); + this.createTime = System.currentTimeMillis(); + } + + public int recordCount() { + return recordCount; + } + + void addRecord(SinkRecord record) { + if (record.kafkaPartition() != topicPartition.partition() || !record.topic().equals(topicPartition.topic())) { + throw new ConnectException("Records from different partitions or topics are not allowed in the same group"); + } + recordCount ++; + streamWriter.addRecord(record); + } + + boolean finishing() { + return finishing; + } + + /** + * requested that the file is written (or that writes complete), and the file is closed, flushed etc + * This should be non-blocking, and when the write completes the asyncFinish should be completed. + * This method should only be called once + * + * @return + */ + CompletableFuture finishWrite(ForkJoinPool exec){ + if (finishing) { + throw new IllegalStateException("finishWrite called twice"); + } + finishing = true; + timeout.cancel(false); + asyncFinish.completeAsync(streamWriter::finishWriteAsync, exec); + return asyncFinish; + } + + + public long firstOffset() { + return firstOffset; + } + + public String groupId() { + return groupId; + } + + public TopicPartition topicPartition() { + return topicPartition; + } + + public CompletableFuture asyncFinish() { + return asyncFinish; + } + + public long createTime() { + return createTime(); + } +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncWriteResult.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncWriteResult.java new file mode 100644 index 000000000..bc6d4a613 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncWriteResult.java @@ -0,0 +1,25 @@ +package io.aiven.kafka.connect.gcs; + +public class AsyncWriteResult { + private final int recordCount; + private final long writeDurationNs; + private final long closeDurationNs; + + public AsyncWriteResult(int recordCount, long writeDurationNs, long closeDurationNs) { + this.recordCount = recordCount; + this.writeDurationNs = writeDurationNs; + this.closeDurationNs = closeDurationNs; + } + + public int recordCount() { + return recordCount; + } + + public long writeDurationNs() { + return writeDurationNs; + } + + public long closeDurationNs() { + return closeDurationNs; + } +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java index 5ac3a26ba..38805c34a 100644 --- a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java @@ -16,29 +16,26 @@ package io.aiven.kafka.connect.gcs; -import java.time.ZoneId; -import java.time.ZoneOffset; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.regex.Pattern; -import java.util.stream.Collectors; import java.util.stream.Stream; +import com.google.api.gax.tracing.ApiTracerFactory; +import io.aiven.kafka.connect.common.config.validators.ClassValidator; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.types.Password; import io.aiven.kafka.connect.common.config.AivenCommonConfig; import io.aiven.kafka.connect.common.config.CompressionType; -import io.aiven.kafka.connect.common.config.FixedSetRecommender; import io.aiven.kafka.connect.common.config.OutputField; import io.aiven.kafka.connect.common.config.OutputFieldEncodingType; import io.aiven.kafka.connect.common.config.OutputFieldType; -import io.aiven.kafka.connect.common.config.TimestampSource; -import io.aiven.kafka.connect.common.config.validators.FilenameTemplateValidator; import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.OAuth2Credentials; @@ -47,7 +44,7 @@ import org.slf4j.LoggerFactory; import org.threeten.bp.Duration; -public final class GcsSinkConfig extends AivenCommonConfig { +public class GcsSinkConfig extends AivenCommonConfig { private static final Logger LOG = LoggerFactory.getLogger(GcsSinkConfig.class); private static final String USER_AGENT_HEADER_FORMAT = "Google GCS Sink/%s (GPN: Aiven;)"; public static final String USER_AGENT_HEADER_VALUE = String.format(USER_AGENT_HEADER_FORMAT, Version.VERSION); @@ -59,6 +56,7 @@ public final class GcsSinkConfig extends AivenCommonConfig { public static final String GCS_BUCKET_NAME_CONFIG = "gcs.bucket.name"; public static final String GCS_OBJECT_CONTENT_ENCODING_CONFIG = "gcs.object.content.encoding"; public static final String GCS_USER_AGENT = "gcs.user.agent"; + public static final String GCS_METRICS = "gcs.metrics.class"; private static final String GROUP_FILE = "File"; public static final String FILE_NAME_PREFIX_CONFIG = "file.name.prefix"; public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template"; @@ -104,9 +102,9 @@ public static ConfigDef configDef() { addGcsConfigGroup(configDef); addFileConfigGroup(configDef); addOutputFieldsFormatConfigGroup(configDef, OutputFieldType.VALUE); - addKafkaBackoffPolicy(configDef); addGcsRetryPolicies(configDef); addUserAgentConfig(configDef); + AivenCommonConfig.addCommonConfig(configDef); return configDef; } @@ -152,6 +150,16 @@ private static void addGcsConfigGroup(final ConfigDef configDef) { // never // used GCS_BUCKET_NAME_CONFIG); + configDef.define(GCS_METRICS, ConfigDef.Type.CLASS, + null, new ClassValidator(ApiTracerFactory.class), + ConfigDef.Importance.LOW, + "class for GCS metrics. Default is not to attache metrics", + GROUP_GCS, gcsGroupCounter++, ConfigDef.Width.NONE, // NOPMD + // retryPolicyGroupCounter + // updated value never + // used + GCS_METRICS); + } private static void addGcsRetryPolicies(final ConfigDef configDef) { @@ -200,9 +208,9 @@ public void ensureValid(final String name, final Object value) { "Retry total timeout in milliseconds. The default value is " + GCS_RETRY_BACKOFF_TOTAL_TIMEOUT_MS_DEFAULT, GROUP_GCS_RETRY_BACKOFF_POLICY, retryPolicyGroupCounter++, ConfigDef.Width.NONE, // NOPMD - // retryPolicyGroupCounter - // updated value never - // used + // retryPolicyGroupCounter + // updated value never + // used GCS_RETRY_BACKOFF_TOTAL_TIMEOUT_MS_CONFIG); } @@ -225,83 +233,17 @@ public void ensureValid(final String name, final Object value) { }, ConfigDef.Importance.MEDIUM, "The prefix to be added to the name of each file put on GCS.", GROUP_FILE, fileGroupCounter++, ConfigDef.Width.NONE, FILE_NAME_PREFIX_CONFIG); - configDef.define(FILE_NAME_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, - new FilenameTemplateValidator(FILE_NAME_TEMPLATE_CONFIG), ConfigDef.Importance.MEDIUM, - "The template for file names on GCS. " - + "Supports `{{ variable }}` placeholders for substituting variables. " - + "Currently supported variables are `topic`, `partition`, and `start_offset` " - + "(the offset of the first record in the file). " - + "Only some combinations of variables are valid, which currently are:\n" - + "- `topic`, `partition`, `start_offset`.", - GROUP_FILE, fileGroupCounter++, ConfigDef.Width.LONG, FILE_NAME_TEMPLATE_CONFIG); - - final String supportedCompressionTypes = CompressionType.names() - .stream() - .map(f -> "'" + f + "'") - .collect(Collectors.joining(", ")); - configDef.define(FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING, CompressionType.NONE.name, - new ConfigDef.Validator() { - @Override - public void ensureValid(final String name, final Object value) { - assert value instanceof String; - final String valueStr = (String) value; - if (!CompressionType.names().contains(valueStr)) { - throw new ConfigException(FILE_COMPRESSION_TYPE_CONFIG, valueStr, - "supported values are: " + supportedCompressionTypes); - } - } - }, ConfigDef.Importance.MEDIUM, - "The compression type used for files put on GCS. " + "The supported values are: " - + supportedCompressionTypes + ".", - GROUP_FILE, fileGroupCounter++, ConfigDef.Width.NONE, FILE_COMPRESSION_TYPE_CONFIG, - FixedSetRecommender.ofSupportedValues(CompressionType.names())); - - configDef.define(FILE_MAX_RECORDS, ConfigDef.Type.INT, 0, new ConfigDef.Validator() { - @Override - public void ensureValid(final String name, final Object value) { - assert value instanceof Integer; - if ((Integer) value < 0) { - throw new ConfigException(FILE_MAX_RECORDS, value, "must be a non-negative integer number"); - } - } - }, ConfigDef.Importance.MEDIUM, - "The maximum number of records to put in a single file. " + "Must be a non-negative integer number. " - + "0 is interpreted as \"unlimited\", which is the default.", - GROUP_FILE, fileGroupCounter++, ConfigDef.Width.SHORT, FILE_MAX_RECORDS); - - configDef.define(FILE_NAME_TIMESTAMP_TIMEZONE, ConfigDef.Type.STRING, ZoneOffset.UTC.toString(), - new ConfigDef.Validator() { - @Override - public void ensureValid(final String name, final Object value) { - try { - ZoneId.of(value.toString()); - } catch (final Exception e) { // NOPMD broad exception catched - throw new ConfigException(FILE_NAME_TIMESTAMP_TIMEZONE, value, e.getMessage()); - } - } - }, ConfigDef.Importance.LOW, - "Specifies the timezone in which the dates and time for the timestamp variable will be treated. " - + "Use standard shot and long names. Default is UTC", - GROUP_FILE, fileGroupCounter++, ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_TIMEZONE); + AivenCommonConfig.addFileConfigGroup(configDef, GROUP_FILE, "GCS", fileGroupCounter, CompressionType.NONE); - configDef.define(FILE_NAME_TIMESTAMP_SOURCE, ConfigDef.Type.STRING, TimestampSource.Type.WALLCLOCK.name(), - new ConfigDef.Validator() { - @Override - public void ensureValid(final String name, final Object value) { - try { - TimestampSource.Type.of(value.toString()); - } catch (final Exception e) { // NOPMD broad exception catched - throw new ConfigException(FILE_NAME_TIMESTAMP_SOURCE, value, e.getMessage()); - } - } - }, ConfigDef.Importance.LOW, "Specifies the the timestamp variable source. Default is wall-clock.", - GROUP_FILE, fileGroupCounter, ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE); + } + protected GcsSinkConfig(ConfigDef configDef, Map properties) { + super(configDef, handleDeprecatedYyyyUppercase(properties)); + validate(); } public GcsSinkConfig(final Map properties) { - super(configDef(), handleDeprecatedYyyyUppercase(properties)); - validate(); + this(configDef(), properties); } static Map handleDeprecatedYyyyUppercase(final Map properties) { @@ -438,4 +380,15 @@ public String getGcsEndpoint() { public String getUserAgent() { return getString(GCS_USER_AGENT); } + + public ApiTracerFactory getApiTracerFactory() { + if (getClass(GCS_METRICS) == null) { + return null; + } + try { + return getClass(GCS_METRICS).asSubclass(ApiTracerFactory.class).getDeclaredConstructor().newInstance(); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } } diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConnector.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConnector.java index f4d664524..ad5466bbe 100644 --- a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConnector.java +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConnector.java @@ -30,7 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class GcsSinkConnector extends SinkConnector { +public class GcsSinkConnector extends SinkConnector { private static final Logger LOG = LoggerFactory.getLogger(GcsSinkConnector.class); private Map configProps; @@ -51,10 +51,18 @@ public void start(final Map props) { Objects.requireNonNull(props, "props cannot be null"); this.configProps = Collections.unmodifiableMap(props); - this.config = new GcsSinkConfig(props); + parseConfig(props); LOG.info("Starting connector {}", config.getConnectorName()); } + protected void setConfig(GcsSinkConfig config) { + this.config = config ; + } + + protected void parseConfig(Map props) { + setConfig(new GcsSinkConfig(props)); + } + @Override public Class taskClass() { return GcsSinkTask.class; diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java index eda879e4a..6bb10a65e 100644 --- a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java @@ -17,10 +17,8 @@ package io.aiven.kafka.connect.gcs; import java.nio.channels.Channels; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; +import java.util.stream.Stream; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -30,25 +28,25 @@ import io.aiven.kafka.connect.common.grouper.RecordGrouper; import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory; -import io.aiven.kafka.connect.common.output.OutputWriter; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.FixedHeaderProvider; -import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class GcsSinkTask extends SinkTask { +public class GcsSinkTask extends SinkTask { private static final Logger LOG = LoggerFactory.getLogger(GcsSinkConnector.class); private static final String USER_AGENT_HEADER_KEY = "user-agent"; - private RecordGrouper recordGrouper; + protected RecordGrouper recordGrouper; private GcsSinkConfig config; private Storage storage; + private WriteHelper writeHelper; + // required by Connect public GcsSinkTask() { @@ -58,10 +56,11 @@ public GcsSinkTask() { // for testing public GcsSinkTask(final Map props, final Storage storage) { super(); + Objects.requireNonNull(props, "props cannot be null"); Objects.requireNonNull(storage, "storage cannot be null"); - this.config = new GcsSinkConfig(props); + parseConfig(props); this.storage = storage; initRest(); } @@ -70,8 +69,8 @@ public GcsSinkTask(final Map props, final Storage storage) { public void start(final Map props) { Objects.requireNonNull(props, "props cannot be null"); - this.config = new GcsSinkConfig(props); - this.storage = StorageOptions.newBuilder() + parseConfig(props); + StorageOptions.Builder builder = StorageOptions.newBuilder() .setHost(config.getGcsEndpoint()) .setCredentials(config.getCredentials()) .setHeaderProvider(FixedHeaderProvider.create(USER_AGENT_HEADER_KEY, config.getUserAgent())) @@ -81,13 +80,25 @@ public void start(final Map props) { .setRetryDelayMultiplier(config.getGcsRetryBackoffDelayMultiplier()) .setTotalTimeout(config.getGcsRetryBackoffTotalTimeout()) .setMaxAttempts(config.getGcsRetryBackoffMaxAttempts()) - .build()) - .build() + .build()); + if (config.getApiTracerFactory() != null) { + builder.setApiTracerFactory(config.getApiTracerFactory()); + } + this.storage = builder.build() .getService(); initRest(); if (Objects.nonNull(config.getKafkaRetryBackoffMs())) { context.timeout(config.getKafkaRetryBackoffMs()); } + this.writeHelper = new WriteHelper(storage, config); + } + + protected void setConfig(GcsSinkConfig gcsSinkConfig) { + this.config = gcsSinkConfig; + } + + protected void parseConfig(Map props) { + setConfig(new GcsSinkConfig(props)); } private void initRest() { @@ -111,29 +122,18 @@ public void put(final Collection records) { @Override public void flush(final Map currentOffsets) { try { - recordGrouper.records().forEach(this::flushFile); + final Stream>> stream; + if (config.isWriteParallel()) { + stream = recordGrouper.records().entrySet().stream().parallel(); + } else { + stream = recordGrouper.records().entrySet().stream(); + } + stream.forEach(entry -> writeHelper.flushFile(entry.getKey(), entry.getValue())); } finally { recordGrouper.clear(); } } - private void flushFile(final String filename, final List records) { - final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename) - .setContentEncoding(config.getObjectContentEncoding()) - .build(); - try (var out = Channels.newOutputStream(storage.writer(blob)); - var writer = OutputWriter.builder() - .withExternalProperties(config.originalsStrings()) - .withOutputFields(config.getOutputFields()) - .withCompressionType(config.getCompressionType()) - .withEnvelopeEnabled(config.envelopeEnabled()) - .build(out, config.getFormatType())) { - writer.writeRecords(records); - } catch (final Exception e) { // NOPMD broad exception catched - throw new ConnectException(e); - } - } - @Override public void stop() { // Nothing to do. @@ -143,4 +143,8 @@ public void stop() { public String version() { return Version.VERSION; } + protected WriteHelper writeHelper() { + return writeHelper; + } + } diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/Instrumentation.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/Instrumentation.java new file mode 100644 index 000000000..7e80bd3f2 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/Instrumentation.java @@ -0,0 +1,77 @@ +package io.aiven.kafka.connect.gcs; + +import java.util.concurrent.atomic.AtomicLong; + +public class Instrumentation { + private final AtomicLong recordsTotalReceived = new AtomicLong(); + private final AtomicLong recordsTotalCompleted = new AtomicLong(); + + // received, but not started flush + private final AtomicLong recordsPending = new AtomicLong(); + //in the process of flushing + private final AtomicLong recordsWriting = new AtomicLong(); + + private final AtomicLong filesStarted = new AtomicLong(); + private final AtomicLong filesCompleted = new AtomicLong(); + + private final AtomicLong writeDurationNs = new AtomicLong(); + private final AtomicLong closeDurationNs = new AtomicLong(); + + void addedRecords(int recordCount) { + recordsTotalReceived.addAndGet(recordCount); + recordsPending.addAndGet(recordCount); + } + + void finishedFlush(AsyncRecordStreamHandler stream, AsyncWriteResult result) { + filesCompleted.incrementAndGet(); + + recordsTotalCompleted.addAndGet(stream.recordCount()); + + recordsWriting.addAndGet(-stream.recordCount()); + + writeDurationNs.addAndGet(result.writeDurationNs()); + closeDurationNs.addAndGet(result.closeDurationNs()); + } + + void startFlush(AsyncRecordStreamHandler stream) { + filesStarted.incrementAndGet(); + recordsPending.addAndGet(-stream.recordCount()); + recordsWriting.addAndGet(stream.recordCount()); + } + + public long recordsTotalReceived() { + return recordsTotalReceived.get(); + } + public long recordsTotalCompleted() { + return recordsTotalCompleted.get(); + } + + public long recordsPending() { + return recordsPending.get(); + } + public long recordsBuffered() { + return recordsPending.get() + recordsWriting.get(); + } + + public long recordsWriting() { + return recordsWriting.get(); + } + + + public long filesStarted() { + return filesStarted.get(); + } + + public long filesCompleted() { + return filesCompleted.get(); + } + + public long writeDurationNs() { + return writeDurationNs.get(); + } + + public long closeDurationNs() { + return closeDurationNs.get(); + } + +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/WriteHelper.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/WriteHelper.java new file mode 100644 index 000000000..054181655 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/WriteHelper.java @@ -0,0 +1,45 @@ +package io.aiven.kafka.connect.gcs; + +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import io.aiven.kafka.connect.common.output.OutputWriter; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.nio.channels.Channels; +import java.util.List; + +public class WriteHelper { + private final Storage storage; + private final GcsSinkConfig config; + + public WriteHelper(Storage storage, GcsSinkConfig config) { + this.storage = storage; + this.config = config; + } + + public OutputWriter openFile(final String filename) { + final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename) + .setContentEncoding(config.getObjectContentEncoding()) + .build(); + try { + var out = Channels.newOutputStream(storage.writer(blob)); + return OutputWriter.builder() + .withExternalProperties(config.originalsStrings()) + .withOutputFields(config.getOutputFields()) + .withCompressionType(config.getCompressionType()) + .withEnvelopeEnabled(config.envelopeEnabled()) + .build(out, config.getFormatType()); + } catch (final Exception e) { // NOPMD broad exception catched + throw new ConnectException(e); + } + } + public void flushFile(final String filename, final List records) { + try (var writer = openFile(filename)) { + writer.writeRecords(records); + } catch (final Exception e) { // NOPMD broad exception catched + throw new ConnectException(e); + } + } + +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/OverloadHandler.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/OverloadHandler.java new file mode 100644 index 000000000..f1f2c2ca5 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/OverloadHandler.java @@ -0,0 +1,15 @@ +package io.aiven.kafka.connect.gcs.overload; + +import io.aiven.kafka.connect.gcs.AsyncGcsSinkTask; +import io.aiven.kafka.connect.gcs.AsyncRecordStreamHandler; +import io.aiven.kafka.connect.gcs.AsyncWriteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicLong; + +public interface OverloadHandler { + void addedRecords(int recordCount); + void startFlush(AsyncRecordStreamHandler stream); + void finishedFlush(AsyncRecordStreamHandler stream, AsyncWriteResult result); +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/OverloadHandlerActions.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/OverloadHandlerActions.java new file mode 100644 index 000000000..0db5574b5 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/OverloadHandlerActions.java @@ -0,0 +1,25 @@ +package io.aiven.kafka.connect.gcs.overload; + +import io.aiven.kafka.connect.gcs.AsyncGcsSinkTask.StreamSnapshotMetadata; +import java.util.List; + +/** + * The limited actions that the overload handler can take + */ +public interface OverloadHandlerActions { + /** + * Get the list of open streams. Note - the list is a snapshot and may not be up to date. Streams may be removed + * asynchronously as they time out. No new streams will be added, or records added to existing streams by background threads + * + * @return the list of open streams + */ + List snapshotOpenStreams(); + + /** + * force a stream to start writing asynchronously or synchronously (depending on the writer implementation) + * Typically used to retrieve memory pressure + * + * @param streamSnapshotMetadata the stream to force write + */ + void forceStartAsyncWrite(StreamSnapshotMetadata streamSnapshotMetadata); +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/PauseOrFlushOverloadHandler.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/PauseOrFlushOverloadHandler.java new file mode 100644 index 000000000..cd8b1d57d --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/PauseOrFlushOverloadHandler.java @@ -0,0 +1,121 @@ +package io.aiven.kafka.connect.gcs.overload; + +import io.aiven.kafka.connect.gcs.AsyncRecordStreamHandler; +import io.aiven.kafka.connect.gcs.AsyncWriteResult; +import io.aiven.kafka.connect.gcs.Instrumentation; +import io.aiven.kafka.connect.gcs.AsyncGcsSinkTask.StreamSnapshotMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class PauseOrFlushOverloadHandler implements OverloadHandler { + private static final Logger LOG = LoggerFactory.getLogger(PauseOrFlushOverloadHandler.class); + private final static Comparator bySize = + Comparator.comparingLong(a -> a.recordCount).reversed();; + + //TODO - make these configurable + private final int softThreshold = 1000; + //TODO - make these configurable + private final int panic = 2000; + private final Instrumentation instrumentation; + private final OverloadHandlerActions actions; + private final Lock writingLock = new ReentrantLock() ; + private final Condition writingCompleted = writingLock.newCondition(); + + public PauseOrFlushOverloadHandler(OverloadHandlerActions actions, Instrumentation instrumentation) { + this.actions = actions; + this.instrumentation = instrumentation; + if (panic <= 0 || softThreshold <= 0 || panic < softThreshold) { + throw new IllegalArgumentException("Panic ("+panic+") and soft ("+softThreshold+") thresholds must be positive and panic must be greater than soft"); + } + } + + @Override + public void addedRecords(int recordCount) { + final long pending = instrumentation.recordsPending(); + final long writing = instrumentation.recordsWriting(); + + if (pending + writing > softThreshold) { + if (pending + writing > panic) { + startHardFlush(); + } else if (pending > softThreshold) { + startSoftFlush(); + } + } + } + + /** + * ensure that there are sufficient records finishing to bring the pending count down below the panic threshold + */ + private void startHardFlush() { + LOG.warn("Starting hard flush, openPending: {}, panic :{}", instrumentation.recordsPending(), panic); + + //wait for enough of the flushes to complete to bring us down below the panic threshold + writingLock.lock(); + try { + while (instrumentation.recordsPending() + instrumentation.recordsWriting() > panic) { + try { + var woken = writingCompleted.await(10, TimeUnit.SECONDS); + LOG.info("During hard flush after sleep, woken: {}, openPending: {}, panic :{}", woken, instrumentation.recordsPending(), panic); + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting for hard flush to complete", e); + Thread.currentThread().interrupt(); + // TODO - not sure if we should return or retry + // if we retry then we need to interrupt at the end if we had one interrupt here + // for the moment we return, and assume that we will probably get a callback if + // we are still overloaded + return; + } + } + } finally { + writingLock.unlock(); + } + LOG.info("Finished hard flush, openPending: {}, panic :{}", instrumentation.recordsPending(), panic); + } + + /** + * ensure that there are sufficient records finishing to bring the pending count down below the soft threshold + */ + private void startSoftFlush() { + var open = actions.snapshotOpenStreams(); + //recalculate the pending count, and base all decisions on the snapshot - it makes life easier to reason about + //but in reality records that time out may be removed for the underlying collection + + //we have openPending as a heap variable so we can modify it in the lambda + final long[] openPending = {open.stream().mapToLong(r -> r.recordCount).sum()}; + if (openPending[0] > softThreshold) { + LOG.warn("Starting soft flush, openPending: {}, softThreshold :{}", openPending[0], softThreshold); + open.sort(bySize); + open + .stream() + .takeWhile(r -> openPending[0] > softThreshold) + .forEach(stream -> { + LOG.info("Starting early async write for stream: {}, recordCount :{}", stream.groupId, stream.recordCount); + actions.forceStartAsyncWrite(stream); + }); + LOG.info("finished soft flush, openPending: {}, softThreshold :{}", openPending[0], softThreshold); + } + } + + @Override + public void startFlush(AsyncRecordStreamHandler stream) { + LOG.info("startFlush {} records, total pending: {}", stream.recordCount(), instrumentation.recordsPending()); + } + + @Override + public void finishedFlush(AsyncRecordStreamHandler stream, AsyncWriteResult result) { + LOG.info("finishedFlush {} records, total pending: {}", stream.recordCount(), instrumentation.recordsPending()); + writingLock.lock(); + try { + writingCompleted.signalAll(); + } finally { + writingLock.unlock(); + } + } + +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/EarlyWriteStreamWriter.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/EarlyWriteStreamWriter.java new file mode 100644 index 000000000..1fb8287cc --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/EarlyWriteStreamWriter.java @@ -0,0 +1,44 @@ +package io.aiven.kafka.connect.gcs.writer; + +import io.aiven.kafka.connect.common.output.OutputWriter; +import io.aiven.kafka.connect.gcs.AsyncWriteResult; +import io.aiven.kafka.connect.gcs.WriteHelper; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.io.IOException; + +public class EarlyWriteStreamWriter extends StreamWriter { + private final OutputWriter writer; + private int recordCount; + private long durationNs; + + public EarlyWriteStreamWriter(WriteHelper writeHelper, String filename) { + this.writer = writeHelper.openFile(filename); + } + + @Override + public void addRecord(SinkRecord record) { + final var start = System.nanoTime(); + try { + writer.writeRecord(record); + } catch (IOException e) { + throw new ConnectException(e); + } + durationNs += System.nanoTime() - start; + recordCount++; + } + + @Override + public AsyncWriteResult finishWriteAsync() { + final var start = System.nanoTime(); + try { + writer.close(); + } catch (IOException e) { + throw new ConnectException(e); + } + return new AsyncWriteResult(recordCount, durationNs, System.nanoTime() - start); + } + + +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/LateWriteStreamWriter.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/LateWriteStreamWriter.java new file mode 100644 index 000000000..e6ea5cf47 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/LateWriteStreamWriter.java @@ -0,0 +1,35 @@ +package io.aiven.kafka.connect.gcs.writer; + +import io.aiven.kafka.connect.gcs.AsyncWriteResult; +import io.aiven.kafka.connect.gcs.WriteHelper; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.ArrayList; +import java.util.List; + +public class LateWriteStreamWriter extends StreamWriter { + private final List records; + private final WriteHelper writeHelper; + private final String filename; + + public LateWriteStreamWriter(WriteHelper writeHelper, String filename) { + this.writeHelper = writeHelper; + this.filename = filename; + this.records = new ArrayList<>(); + } + + @Override + public void addRecord(SinkRecord record) { + records.add(record); + } + + @Override + public AsyncWriteResult finishWriteAsync() { + final var start = System.nanoTime(); + writeHelper.flushFile(filename, records); + return new AsyncWriteResult(records.size(), 0, System.currentTimeMillis() - start); + } + + +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/StreamWriter.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/StreamWriter.java new file mode 100644 index 000000000..a8263dacb --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/StreamWriter.java @@ -0,0 +1,11 @@ +package io.aiven.kafka.connect.gcs.writer; + +import io.aiven.kafka.connect.gcs.AsyncWriteResult; +import org.apache.kafka.connect.sink.SinkRecord; + +public abstract class StreamWriter { + + public abstract void addRecord(SinkRecord record); + + public abstract AsyncWriteResult finishWriteAsync(); +} diff --git a/gcs-sink-connector/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkConfigTest.java b/gcs-sink-connector/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkConfigTest.java index 074da3644..954185364 100644 --- a/gcs-sink-connector/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkConfigTest.java +++ b/gcs-sink-connector/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkConfigTest.java @@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -75,10 +74,12 @@ void incorrectFilenameTemplates(final String template) { .filter(x -> GcsSinkConfig.FILE_NAME_TEMPLATE_CONFIG.equals(x.name())) .findFirst() .get(); - assertFalse(configValue.errorMessages().isEmpty()); + assertTrue(configValue.errorMessages().isEmpty()); final var throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); - assertTrue(throwable.getMessage().startsWith("Invalid value ")); + assertTrue(throwable.getMessage() + .startsWith("Invalid value " + template + + " for configuration file.name.template: unsupported set of template variables")); } @Test @@ -516,7 +517,7 @@ void emptyFilenameTemplate() { final var expectedErrorMessage = "Invalid value for configuration file.name.template: " + "unsupported set of template variables, supported sets are: " + TEMPLATE_VARIABLES; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertConfigDefValidationPasses(properties); final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); assertEquals(expectedErrorMessage, throwable.getMessage()); @@ -531,7 +532,7 @@ void filenameTemplateUnknownVariable() { + "for configuration file.name.template: unsupported set of template variables, " + "supported sets are: " + TEMPLATE_VARIABLES; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertConfigDefValidationPasses(properties); final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); assertEquals(expectedErrorMessage, throwable.getMessage()); @@ -545,7 +546,7 @@ void filenameTemplateNoTopic() { final var expectedErrorMessage = "Invalid value {{ partition }}{{ start_offset }} for configuration file.name.template: " + "unsupported set of template variables, supported sets are: " + TEMPLATE_VARIABLES; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertConfigDefValidationPasses(properties); final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); assertEquals(expectedErrorMessage, throwable.getMessage()); @@ -561,7 +562,7 @@ void wrongVariableParameterValue() { + "supported sets are: " + "partition:padding=true|false,start_offset:padding=true|false,timestamp:unit=yyyy|MM|dd|HH"; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertConfigDefValidationPasses(properties); final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); assertEquals(expectedErrorMessage, throwable.getMessage()); @@ -576,7 +577,7 @@ void variableWithoutRequiredParameterValue() { + "for configuration file.name.template: " + "parameter unit is required for the the variable timestamp, " + "supported values are: yyyy|MM|dd|HH"; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertConfigDefValidationPasses(properties); final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); assertEquals(expectedErrorMessage, throwable.getMessage()); @@ -590,7 +591,7 @@ void wrongVariableWithoutParameter() { final var expectedErrorMessage = "Invalid value {{start_offset:}}-{{partition}}-{{topic}} " + "for configuration file.name.template: " + "Wrong variable with parameter definition"; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertConfigDefValidationPasses(properties); final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); assertEquals(expectedErrorMessage, throwable.getMessage()); @@ -605,7 +606,7 @@ void noVariableWithParameter() { + "for configuration file.name.template: " + "Variable name hasn't been set for template: {{:padding=true}}-{{partition}}-{{topic}}"; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertConfigDefValidationPasses(properties); final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); assertEquals(expectedErrorMessage, throwable.getMessage()); @@ -620,7 +621,7 @@ void wrongVariableWithoutParameterValue() { + "for configuration file.name.template: " + "Parameter value for variable `start_offset` and parameter `padding` has not been set"; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertConfigDefValidationPasses(properties); final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); assertEquals(expectedErrorMessage, throwable.getMessage()); @@ -635,7 +636,7 @@ void wrongVariableWithoutParameterName() { + "for configuration file.name.template: " + "Parameter name for variable `start_offset` has not been set"; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertConfigDefValidationPasses(properties); final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); assertEquals(expectedErrorMessage, throwable.getMessage()); @@ -649,7 +650,7 @@ void filenameTemplateNoPartition() { final var expectedErrorMessage = "Invalid value {{ topic }}{{ start_offset }} for configuration file.name.template: " + "unsupported set of template variables, supported sets are: " + TEMPLATE_VARIABLES; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertConfigDefValidationPasses(properties); final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); assertEquals(expectedErrorMessage, throwable.getMessage()); @@ -663,7 +664,7 @@ void filenameTemplateNoStartOffset() { final var expectedErrorMessage = "Invalid value {{ topic }}{{ partition }} for configuration file.name.template: " + "unsupported set of template variables, supported sets are: " + TEMPLATE_VARIABLES; - expectErrorMessageForConfigurationInConfigDefValidation(properties, "file.name.template", expectedErrorMessage); + assertConfigDefValidationPasses(properties); final Throwable throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); assertEquals(expectedErrorMessage, throwable.getMessage()); diff --git a/gcs-sink-connector/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkCredentialsConfigTest.java b/gcs-sink-connector/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkCredentialsConfigTest.java index b577b1456..f6cfc8770 100644 --- a/gcs-sink-connector/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkCredentialsConfigTest.java +++ b/gcs-sink-connector/src/test/java/io/aiven/kafka/connect/gcs/config/GcsSinkCredentialsConfigTest.java @@ -17,7 +17,6 @@ package io.aiven.kafka.connect.gcs.config; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -64,10 +63,11 @@ void incorrectFilenameTemplates(final String template) { .filter(x -> GcsSinkConfig.FILE_NAME_TEMPLATE_CONFIG.equals(x.name())) .findFirst() .get(); - assertFalse(configValue.errorMessages().isEmpty()); + assertTrue(configValue.errorMessages().isEmpty()); final var throwable = assertThrows(ConfigException.class, () -> new GcsSinkConfig(properties)); - assertTrue(throwable.getMessage().startsWith("Invalid value ")); + assertTrue(throwable.getMessage() + .startsWith("Invalid value " + template + " for configuration file.name.template:")); } @Test diff --git a/s3-sink-connector/README.md b/s3-sink-connector/README.md index d07feec1a..24199f852 100644 --- a/s3-sink-connector/README.md +++ b/s3-sink-connector/README.md @@ -202,6 +202,10 @@ flush In this case, there will be two files `k0` (containing value `4`) and `k1` (containing value `3`). +#### Custom grouping + +For more complex case a custom grouping may be defined by defining `file.record.grouper.builder` +where you can supply your own implementation of a builder for a record grouping. ##### The string representation of a key diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java index 2a266e830..0b2780437 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Stream; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -115,7 +116,13 @@ public void put(final Collection records) { @Override public void flush(final Map offsets) { try { - recordGrouper.records().forEach(this::flushFile); + final Stream>> stream; + if (config.isWriteParallel()) { + stream = recordGrouper.records().entrySet().stream().parallel(); + } else { + stream = recordGrouper.records().entrySet().stream(); + } + stream.forEach( entry -> flushFile(entry.getKey(), entry.getValue())); } finally { recordGrouper.clear(); } diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java index 7f428e584..993743a42 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java @@ -33,13 +33,11 @@ import io.aiven.kafka.connect.common.config.AivenCommonConfig; import io.aiven.kafka.connect.common.config.CompressionType; -import io.aiven.kafka.connect.common.config.FixedSetRecommender; import io.aiven.kafka.connect.common.config.OutputField; import io.aiven.kafka.connect.common.config.OutputFieldEncodingType; import io.aiven.kafka.connect.common.config.OutputFieldType; import io.aiven.kafka.connect.common.config.TimestampSource; import io.aiven.kafka.connect.common.config.validators.FileCompressionTypeValidator; -import io.aiven.kafka.connect.common.config.validators.FilenameTemplateValidator; import io.aiven.kafka.connect.common.config.validators.NonEmptyPassword; import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator; import io.aiven.kafka.connect.common.config.validators.TimeZoneValidator; @@ -187,8 +185,8 @@ public static ConfigDef configDef() { addOutputFieldsFormatConfigGroup(configDef, null); addDeprecatedTimestampConfig(configDef); addDeprecatedConfiguration(configDef); - addKafkaBackoffPolicy(configDef); addS3RetryPolicies(configDef); + AivenCommonConfig.addCommonConfig(configDef); return configDef; } @@ -318,59 +316,7 @@ private static void addAwsStsConfigGroup(final ConfigDef configDef) { } private static void addFileConfigGroup(final ConfigDef configDef) { - int fileGroupCounter = 0; - - configDef.define(FILE_NAME_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, - new FilenameTemplateValidator(FILE_NAME_TEMPLATE_CONFIG), ConfigDef.Importance.MEDIUM, - "The template for file names on S3. " - + "Supports `{{ variable }}` placeholders for substituting variables. " - + "Currently supported variables are `topic`, `partition`, and `start_offset` " - + "(the offset of the first record in the file). " - + "Only some combinations of variables are valid, which currently are:\n" - + "- `topic`, `partition`, `start_offset`." - + "There is also `key` only variable {{key}} for grouping by keys", - GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment - ConfigDef.Width.LONG, FILE_NAME_TEMPLATE_CONFIG); - - final String supportedCompressionTypes = CompressionType.names() - .stream() - .map(f -> "'" + f + "'") - .collect(Collectors.joining(", ")); - - configDef.define(FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING, null, new FileCompressionTypeValidator(), - ConfigDef.Importance.MEDIUM, - "The compression type used for files put on S3. " + "The supported values are: " - + supportedCompressionTypes + ".", - GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment - ConfigDef.Width.NONE, FILE_COMPRESSION_TYPE_CONFIG, - FixedSetRecommender.ofSupportedValues(CompressionType.names())); - - configDef.define(FILE_MAX_RECORDS, ConfigDef.Type.INT, 0, new ConfigDef.Validator() { - @Override - public void ensureValid(final String name, final Object value) { - assert value instanceof Integer; - if ((Integer) value < 0) { - throw new ConfigException(FILE_MAX_RECORDS, value, "must be a non-negative integer number"); - } - } - }, ConfigDef.Importance.MEDIUM, - "The maximum number of records to put in a single file. " + "Must be a non-negative integer number. " - + "0 is interpreted as \"unlimited\", which is the default.", - GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment - ConfigDef.Width.SHORT, FILE_MAX_RECORDS); - - configDef.define(FILE_NAME_TIMESTAMP_TIMEZONE, ConfigDef.Type.STRING, ZoneOffset.UTC.toString(), - new TimeZoneValidator(), ConfigDef.Importance.LOW, - "Specifies the timezone in which the dates and time for the timestamp variable will be treated. " - + "Use standard shot and long names. Default is UTC", - GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment - ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_TIMEZONE); - - configDef.define(FILE_NAME_TIMESTAMP_SOURCE, ConfigDef.Type.STRING, TimestampSource.Type.WALLCLOCK.name(), - new TimestampSourceValidator(), ConfigDef.Importance.LOW, - "Specifies the the timestamp variable source. Default is wall-clock.", GROUP_FILE, fileGroupCounter++, // NOPMD - // UnusedAssignment - ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE); + AivenCommonConfig.addFileConfigGroup(configDef, GROUP_FILE, "S3", 0, null); } private static void addDeprecatedTimestampConfig(final ConfigDef configDef) { @@ -617,7 +563,10 @@ public ZoneId getTimezone() { } public TimestampSource getTimestampSource() { - return TimestampSource.of(getTimezone(), TimestampSource.Type.of(getString(TIMESTAMP_SOURCE))); + return new TimestampSource.Builder() + .configuration(getString(TIMESTAMP_SOURCE)) + .zoneId(getTimezone()) + .build(); } public AwsStsRole getStsRole() {