diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordStreamer.java b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordStreamer.java new file mode 100644 index 000000000..89d5986ba --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/grouper/RecordStreamer.java @@ -0,0 +1,43 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.grouper; + +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.List; +import java.util.Map; + +/** + * The interface for classes that associates {@link SinkRecord}s with files by some criteria. + */ +public interface RecordStreamer{ + /** + * determine the logical grouping of the record + * + * @param record + * - record to group + */ + String getStream(SinkRecord record); + + /** + * determine the actual filename of the record + * + * @param record + * - record to drive the filename + */ + String getFilename(SinkRecord record); +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkConfig.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkConfig.java new file mode 100644 index 000000000..4d4999de8 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkConfig.java @@ -0,0 +1,43 @@ +package io.aiven.kafka.connect.gcs; + +import io.aiven.kafka.connect.common.config.validators.ClassValidator; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; + +import java.util.Map; + +public class AsyncGcsSinkConfig extends GcsSinkConfig{ + + private static final String GROUP_ASYNC = "Async"; + private static final String ASYNC_MAX_RECORD_AGE = "async.file.max.record.age.ms"; + private static final String ASYNC_MAX_OPEN_FILES = "async.file.max.open.files"; + private static final String ASYNC_TASK_CLASS = "async.task.class"; + + public AsyncGcsSinkConfig(Map properties) { + super(configDef(), properties); + } + + public static ConfigDef configDef() { + final ConfigDef configDef = GcsSinkConfig.configDef(); + addAsyncConfig(configDef); + return configDef; + } + private static void addAsyncConfig(final ConfigDef configDef) { + int groupCounter = 0; + configDef.define(ASYNC_MAX_RECORD_AGE, ConfigDef.Type.INT, 60000, ConfigDef.Importance.LOW, + "write files asynchronously", GROUP_ASYNC, groupCounter++, ConfigDef.Width.NONE, ASYNC_MAX_RECORD_AGE); + configDef.define(ASYNC_MAX_OPEN_FILES, ConfigDef.Type.INT, 100, ConfigDef.Importance.LOW, + "write files asynchronously", GROUP_ASYNC, groupCounter++, ConfigDef.Width.NONE, ASYNC_MAX_OPEN_FILES); + configDef.define(ASYNC_TASK_CLASS, ConfigDef.Type.CLASS, AsyncGcsSinkTask.class, new ClassValidator(AsyncGcsSinkTask.class), + ConfigDef.Importance.LOW,"the task class", GROUP_ASYNC, groupCounter++, ConfigDef.Width.NONE, ASYNC_TASK_CLASS); + } + + + public int getAsyncMaxRecordAgeMs() { + return getInt(ASYNC_MAX_RECORD_AGE); + } + + public Class getTaskClass() { + return getClass(ASYNC_MAX_RECORD_AGE).asSubclass(AsyncGcsSinkTask.class); + } +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkConnector.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkConnector.java new file mode 100644 index 000000000..555375b89 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkConnector.java @@ -0,0 +1,29 @@ +package io.aiven.kafka.connect.gcs; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; + +import java.util.Map; + +public class AsyncGcsSinkConnector extends GcsSinkConnector { + private AsyncGcsSinkConfig config; + @Override + public Class taskClass() { + return config.getTaskClass(); + } + + protected void setConfig(AsyncGcsSinkConfig config) { + this.config = config ; + super.setConfig(config); + } + + @Override + protected void parseConfig(Map props) { + setConfig(new AsyncGcsSinkConfig(props)); + } + + @Override + public ConfigDef config() { + return AsyncGcsSinkConfig.configDef(); + } +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkTask.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkTask.java new file mode 100644 index 000000000..a3f3a2b5c --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncGcsSinkTask.java @@ -0,0 +1,282 @@ +package io.aiven.kafka.connect.gcs; + +import com.google.cloud.storage.Storage; +import com.google.common.collect.Streams; +import io.aiven.kafka.connect.common.grouper.RecordStreamer; +import io.aiven.kafka.connect.gcs.overload.OverloadHandler; +import io.aiven.kafka.connect.gcs.overload.OverloadHandlerActions; +import io.aiven.kafka.connect.gcs.overload.PauseOrFlushOverloadHandler; +import io.aiven.kafka.connect.gcs.writer.LateWriteStreamWriter; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +public class AsyncGcsSinkTask extends GcsSinkTask { + private static final Logger LOG = LoggerFactory.getLogger(AsyncGcsSinkTask.class); + final static ForkJoinPool exec = ForkJoinPool.commonPool(); + final static ScheduledExecutorService timed = Executors.newSingleThreadScheduledExecutor(); + + private final ConcurrentMap openStreams = new ConcurrentHashMap<>(); + private final Set finishing = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final AtomicReference fatalError = new AtomicReference<>(); + private final Instrumentation instrumentation = new Instrumentation(); + + //config + private AsyncGcsSinkConfig config; + private int maxRecordsPerFile; + private int maxAgeRecordsMs; + private RecordStreamer recordStreamer; + private OverloadHandler overloadHandler; + + // required by Connect + public AsyncGcsSinkTask() { + super(); + } + + // for testing + public AsyncGcsSinkTask(final Map props, final Storage storage) { + super(props, storage); + } + + protected void setConfig(AsyncGcsSinkConfig config) { + this.config = config; + super.setConfig(config); + } + + @Override + protected void parseConfig(Map props) { + setConfig(new AsyncGcsSinkConfig(props)); + } + + @Override + public void start(final Map props) { + super.start(props); + this.maxRecordsPerFile = config.getMaxRecordsPerFile(); + this.maxAgeRecordsMs = config.getAsyncMaxRecordAgeMs(); + if (recordGrouper instanceof RecordStreamer) { + recordStreamer = (RecordStreamer) recordGrouper; + } else { + throw new ConnectException("RecordGrouper must implement RecordStreamer to use the async sink task"); + } + //TODO drive this through config + this.overloadHandler = new PauseOrFlushOverloadHandler(new OverloadActions(), instrumentation); + } + + @Override + public void put(Collection records) { + checkForErrors(); + Objects.requireNonNull(records, "records cannot be null"); + + LOG.debug("Processing {} records", records.size()); + + for (final SinkRecord record : records) { + var groupId = recordStreamer.getStream(record); + openStreams.compute(groupId, (group, stream) -> { + if (stream == null) { + // set the value to the new stream, with first element set + return createRecordStream(group, record); + } else { + stream.addRecord(record); + if (stream.recordCount() >= maxRecordsPerFile) { + startAsyncWrite(stream); + //and remove the entry from the map + return null; + } else { + //keep the entry in the map + return stream; + } + } + }); + } + instrumentation.addedRecords(records.size()); + overloadHandler.addedRecords(records.size()); + } + + private void checkForErrors() { + ConnectException error = fatalError.get(); + if (error != null) { + throw error; + } + } + public void fatalError(ConnectException error) { + if (fatalError.compareAndSet(null, error)){ + LOG.error("Fatal error", error); + } else { + LOG.error("Another fatal error - which is suppressed as another fatal error has already been reported", error); + } + throw error; + } + + private AsyncRecordStreamHandler createRecordStream(String group, SinkRecord record) { + return new AsyncRecordStreamHandler(this, group, record, + //TODO drive the early/late write through config + new LateWriteStreamWriter(writeHelper(), recordStreamer.getFilename(record))); + } + + + void nonFullStreamTimeout(AsyncRecordStreamHandler streamTimedOut) { + openStreams.compute(streamTimedOut.groupId(), (group, stream) -> { + if (stream != streamTimedOut) { + // if stream is null, streamTimedOut became full and was removed + // if stream is a different object, streamTimedOut became full and then another record was added to the same groupId + // either way streamTimedOut should have been started + if (!streamTimedOut.finishing()) { + fatalError(new ConnectException("Stream not started, but not in - program error")); + } + return stream; + } else { + //start the async write, and remove it from the map + startAsyncWrite(streamTimedOut); + return null; + } + }); + } + + /** + * Start the async write for the stream. Guaranteed to not be called twice for the same stream + * @implNote guaranteed not to be called concurrently, only called via a + * {@link ConcurrentHashMap#compute(Object, BiFunction) compute} method + * of a {@link ConcurrentHashMap}, which enforces memory barriers. + */ + private void startAsyncWrite(AsyncRecordStreamHandler stream) { + finishing.add(stream); + instrumentation.startFlush(stream); + overloadHandler.startFlush(stream); + stream.finishWrite(exec) + .whenComplete((result, error) -> { + if (error != null) { + fatalError(new ConnectException("Error writing records", error)); + } + //fatalError always throws, so this is success handling + finishing.remove(stream); + instrumentation.finishedFlush(stream, result); + overloadHandler.finishedFlush(stream, result); + }); + } + + + @Override + public void flush(Map currentOffsets) { + checkForErrors(); + commitPending.set(false); + //we need to hold the 'lock' on the value, for startAsyncWrite to work + openStreams.forEach((k,v) -> openStreams.compute(k, (group, stream) -> { + startAsyncWrite(stream); + return null; + })); + + CompletableFuture[] list = finishing.stream().map(AsyncRecordStreamHandler::asyncFinish).toArray(CompletableFuture[]::new); + try { + CompletableFuture.allOf(list).get(); + } catch (InterruptedException | ExecutionException e) { + throw new ConnectException(e); + } + finishing.clear(); + + } + + @Override + public Map preCommit(Map currentOffsets) { + checkForErrors(); + // we need to check the openStreams and the completing + // in that order to ensure that we don't miss anything + //so check the openStream, and then the finishing, as they may move asynchronously from one to the other! + + final Map earliestStillPending = new HashMap<>(); + Streams.concat(openStreams.values().stream(), + finishing.stream()) + .forEach(pendingStreamHandler -> + earliestStillPending.compute(pendingStreamHandler.topicPartition(), + (ignored, earliestPendingOffset) -> { + if (earliestPendingOffset == null || pendingStreamHandler.firstOffset() < earliestPendingOffset) { + return (int) pendingStreamHandler.firstOffset(); + } else { + return earliestPendingOffset; + } + }) + ); + + return currentOffsets + .entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + suppliedEntry -> { + var inTransit = earliestStillPending.get(suppliedEntry.getKey()); + if (inTransit == null) { + return suppliedEntry.getValue(); + } else { + return new OffsetAndMetadata(inTransit, suppliedEntry.getValue().metadata()); + } + })); + } + + + private final AtomicBoolean commitPending = new AtomicBoolean(false); + public void requestCommitNow() { + context.requestCommit(); + } + public void requestCommitSoon() { + if (commitPending.compareAndSet(false, true)) { + //TODO config + timed.schedule(this::requestCommitNow, 100, TimeUnit.MILLISECONDS); + } + } + + public RecordStreamer recordStreamer() { + return recordStreamer; + } + + public long maxAgeRecordsMs() { + return maxAgeRecordsMs; + } + + private class OverloadActions implements OverloadHandlerActions { + @Override + public List snapshotOpenStreams() { + return openStreams.values().stream().map(StreamSnapshotMetadata::new).collect(Collectors.toList()); + } + + @Override + public void forceStartAsyncWrite(StreamSnapshotMetadata streamSnapshotMetadata) { + openStreams.compute(streamSnapshotMetadata.groupId, (group, stream) -> { + //we dont really care if the stream is null. that means that the openStreams has been updated + //and its already enqueued for writing, e.g. because a timer has expired + if (stream != null) { + startAsyncWrite(stream); + } + return null; + }); + } + } + //TODO should be a record + public static class StreamSnapshotMetadata { + public final long createTime; + public final int recordCount; + public final long firstOffset; + public final String groupId; + public final TopicPartition file; + public final boolean finishing; + + public StreamSnapshotMetadata(AsyncRecordStreamHandler stream) { + this.createTime = stream.createTime(); + this.recordCount = stream.recordCount(); + this.firstOffset = stream.firstOffset(); + this.groupId = stream.groupId(); + this.file = stream.topicPartition(); + this.finishing = stream.finishing(); + } + + } +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncRecordStreamHandler.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncRecordStreamHandler.java new file mode 100644 index 000000000..7090f0b90 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncRecordStreamHandler.java @@ -0,0 +1,94 @@ +package io.aiven.kafka.connect.gcs; + +import io.aiven.kafka.connect.gcs.writer.StreamWriter; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public final class AsyncRecordStreamHandler { + + private final AsyncGcsSinkTask asyncGcsSinkTask; + private final String file; + private final String groupId; + private final CompletableFuture asyncFinish; + private final Future timeout; + private final long firstOffset; + private final StreamWriter streamWriter; + private final TopicPartition topicPartition; + private final long createTime; + + private boolean finishing = false; + private int recordCount = 0; + + public AsyncRecordStreamHandler(AsyncGcsSinkTask asyncGcsSinkTask, final String groupId, + SinkRecord firstElement, StreamWriter streamWriter) { + this.asyncGcsSinkTask = asyncGcsSinkTask; + this.groupId = groupId; + this.topicPartition = new TopicPartition(firstElement.topic(), firstElement.kafkaPartition()); + this.firstOffset = firstElement.kafkaOffset(); + this.streamWriter = streamWriter; + this.file = asyncGcsSinkTask.recordStreamer().getFilename(firstElement); + this.timeout = AsyncGcsSinkTask.timed.schedule(() -> asyncGcsSinkTask.nonFullStreamTimeout(this), asyncGcsSinkTask.maxAgeRecordsMs(), TimeUnit.MILLISECONDS); + this.asyncFinish = new CompletableFuture<>(); + this.createTime = System.currentTimeMillis(); + } + + public int recordCount() { + return recordCount; + } + + void addRecord(SinkRecord record) { + if (record.kafkaPartition() != topicPartition.partition() || !record.topic().equals(topicPartition.topic())) { + throw new ConnectException("Records from different partitions or topics are not allowed in the same group"); + } + recordCount ++; + streamWriter.addRecord(record); + } + + boolean finishing() { + return finishing; + } + + /** + * requested that the file is written (or that writes complete), and the file is closed, flushed etc + * This should be non-blocking, and when the write completes the asyncFinish should be completed. + * This method should only be called once + * + * @return + */ + CompletableFuture finishWrite(ForkJoinPool exec){ + if (finishing) { + throw new IllegalStateException("finishWrite called twice"); + } + finishing = true; + timeout.cancel(false); + asyncFinish.completeAsync(streamWriter::finishWriteAsync, exec); + return asyncFinish; + } + + + public long firstOffset() { + return firstOffset; + } + + public String groupId() { + return groupId; + } + + public TopicPartition topicPartition() { + return topicPartition; + } + + public CompletableFuture asyncFinish() { + return asyncFinish; + } + + public long createTime() { + return createTime(); + } +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncWriteResult.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncWriteResult.java new file mode 100644 index 000000000..bc6d4a613 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/AsyncWriteResult.java @@ -0,0 +1,25 @@ +package io.aiven.kafka.connect.gcs; + +public class AsyncWriteResult { + private final int recordCount; + private final long writeDurationNs; + private final long closeDurationNs; + + public AsyncWriteResult(int recordCount, long writeDurationNs, long closeDurationNs) { + this.recordCount = recordCount; + this.writeDurationNs = writeDurationNs; + this.closeDurationNs = closeDurationNs; + } + + public int recordCount() { + return recordCount; + } + + public long writeDurationNs() { + return writeDurationNs; + } + + public long closeDurationNs() { + return closeDurationNs; + } +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java index 94ac254fd..38805c34a 100644 --- a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConfig.java @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory; import org.threeten.bp.Duration; -public final class GcsSinkConfig extends AivenCommonConfig { +public class GcsSinkConfig extends AivenCommonConfig { private static final Logger LOG = LoggerFactory.getLogger(GcsSinkConfig.class); private static final String USER_AGENT_HEADER_FORMAT = "Google GCS Sink/%s (GPN: Aiven;)"; public static final String USER_AGENT_HEADER_VALUE = String.format(USER_AGENT_HEADER_FORMAT, Version.VERSION); @@ -237,11 +237,15 @@ public void ensureValid(final String name, final Object value) { } - public GcsSinkConfig(final Map properties) { - super(configDef(), handleDeprecatedYyyyUppercase(properties)); + protected GcsSinkConfig(ConfigDef configDef, Map properties) { + super(configDef, handleDeprecatedYyyyUppercase(properties)); validate(); } + public GcsSinkConfig(final Map properties) { + this(configDef(), properties); + } + static Map handleDeprecatedYyyyUppercase(final Map properties) { if (properties.containsKey(FILE_NAME_TEMPLATE_CONFIG)) { final var result = new HashMap<>(properties); diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConnector.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConnector.java index f4d664524..ad5466bbe 100644 --- a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConnector.java +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkConnector.java @@ -30,7 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class GcsSinkConnector extends SinkConnector { +public class GcsSinkConnector extends SinkConnector { private static final Logger LOG = LoggerFactory.getLogger(GcsSinkConnector.class); private Map configProps; @@ -51,10 +51,18 @@ public void start(final Map props) { Objects.requireNonNull(props, "props cannot be null"); this.configProps = Collections.unmodifiableMap(props); - this.config = new GcsSinkConfig(props); + parseConfig(props); LOG.info("Starting connector {}", config.getConnectorName()); } + protected void setConfig(GcsSinkConfig config) { + this.config = config ; + } + + protected void parseConfig(Map props) { + setConfig(new GcsSinkConfig(props)); + } + @Override public Class taskClass() { return GcsSinkTask.class; diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java index 716c731d1..6bb10a65e 100644 --- a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/GcsSinkTask.java @@ -16,12 +16,8 @@ package io.aiven.kafka.connect.gcs; -import java.lang.reflect.InvocationTargetException; import java.nio.channels.Channels; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.stream.Stream; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -32,25 +28,25 @@ import io.aiven.kafka.connect.common.grouper.RecordGrouper; import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory; -import io.aiven.kafka.connect.common.output.OutputWriter; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.FixedHeaderProvider; -import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class GcsSinkTask extends SinkTask { +public class GcsSinkTask extends SinkTask { private static final Logger LOG = LoggerFactory.getLogger(GcsSinkConnector.class); private static final String USER_AGENT_HEADER_KEY = "user-agent"; - private RecordGrouper recordGrouper; + protected RecordGrouper recordGrouper; private GcsSinkConfig config; private Storage storage; + private WriteHelper writeHelper; + // required by Connect public GcsSinkTask() { @@ -60,10 +56,11 @@ public GcsSinkTask() { // for testing public GcsSinkTask(final Map props, final Storage storage) { super(); + Objects.requireNonNull(props, "props cannot be null"); Objects.requireNonNull(storage, "storage cannot be null"); - this.config = new GcsSinkConfig(props); + parseConfig(props); this.storage = storage; initRest(); } @@ -72,7 +69,7 @@ public GcsSinkTask(final Map props, final Storage storage) { public void start(final Map props) { Objects.requireNonNull(props, "props cannot be null"); - this.config = new GcsSinkConfig(props); + parseConfig(props); StorageOptions.Builder builder = StorageOptions.newBuilder() .setHost(config.getGcsEndpoint()) .setCredentials(config.getCredentials()) @@ -93,6 +90,15 @@ public void start(final Map props) { if (Objects.nonNull(config.getKafkaRetryBackoffMs())) { context.timeout(config.getKafkaRetryBackoffMs()); } + this.writeHelper = new WriteHelper(storage, config); + } + + protected void setConfig(GcsSinkConfig gcsSinkConfig) { + this.config = gcsSinkConfig; + } + + protected void parseConfig(Map props) { + setConfig(new GcsSinkConfig(props)); } private void initRest() { @@ -122,29 +128,12 @@ public void flush(final Map currentOffsets) { } else { stream = recordGrouper.records().entrySet().stream(); } - stream.forEach(entry -> flushFile(entry.getKey(), entry.getValue())); + stream.forEach(entry -> writeHelper.flushFile(entry.getKey(), entry.getValue())); } finally { recordGrouper.clear(); } } - private void flushFile(final String filename, final List records) { - final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename) - .setContentEncoding(config.getObjectContentEncoding()) - .build(); - try (var out = Channels.newOutputStream(storage.writer(blob)); - var writer = OutputWriter.builder() - .withExternalProperties(config.originalsStrings()) - .withOutputFields(config.getOutputFields()) - .withCompressionType(config.getCompressionType()) - .withEnvelopeEnabled(config.envelopeEnabled()) - .build(out, config.getFormatType())) { - writer.writeRecords(records); - } catch (final Exception e) { // NOPMD broad exception catched - throw new ConnectException(e); - } - } - @Override public void stop() { // Nothing to do. @@ -154,4 +143,8 @@ public void stop() { public String version() { return Version.VERSION; } + protected WriteHelper writeHelper() { + return writeHelper; + } + } diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/Instrumentation.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/Instrumentation.java new file mode 100644 index 000000000..7e80bd3f2 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/Instrumentation.java @@ -0,0 +1,77 @@ +package io.aiven.kafka.connect.gcs; + +import java.util.concurrent.atomic.AtomicLong; + +public class Instrumentation { + private final AtomicLong recordsTotalReceived = new AtomicLong(); + private final AtomicLong recordsTotalCompleted = new AtomicLong(); + + // received, but not started flush + private final AtomicLong recordsPending = new AtomicLong(); + //in the process of flushing + private final AtomicLong recordsWriting = new AtomicLong(); + + private final AtomicLong filesStarted = new AtomicLong(); + private final AtomicLong filesCompleted = new AtomicLong(); + + private final AtomicLong writeDurationNs = new AtomicLong(); + private final AtomicLong closeDurationNs = new AtomicLong(); + + void addedRecords(int recordCount) { + recordsTotalReceived.addAndGet(recordCount); + recordsPending.addAndGet(recordCount); + } + + void finishedFlush(AsyncRecordStreamHandler stream, AsyncWriteResult result) { + filesCompleted.incrementAndGet(); + + recordsTotalCompleted.addAndGet(stream.recordCount()); + + recordsWriting.addAndGet(-stream.recordCount()); + + writeDurationNs.addAndGet(result.writeDurationNs()); + closeDurationNs.addAndGet(result.closeDurationNs()); + } + + void startFlush(AsyncRecordStreamHandler stream) { + filesStarted.incrementAndGet(); + recordsPending.addAndGet(-stream.recordCount()); + recordsWriting.addAndGet(stream.recordCount()); + } + + public long recordsTotalReceived() { + return recordsTotalReceived.get(); + } + public long recordsTotalCompleted() { + return recordsTotalCompleted.get(); + } + + public long recordsPending() { + return recordsPending.get(); + } + public long recordsBuffered() { + return recordsPending.get() + recordsWriting.get(); + } + + public long recordsWriting() { + return recordsWriting.get(); + } + + + public long filesStarted() { + return filesStarted.get(); + } + + public long filesCompleted() { + return filesCompleted.get(); + } + + public long writeDurationNs() { + return writeDurationNs.get(); + } + + public long closeDurationNs() { + return closeDurationNs.get(); + } + +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/WriteHelper.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/WriteHelper.java new file mode 100644 index 000000000..054181655 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/WriteHelper.java @@ -0,0 +1,45 @@ +package io.aiven.kafka.connect.gcs; + +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import io.aiven.kafka.connect.common.output.OutputWriter; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.nio.channels.Channels; +import java.util.List; + +public class WriteHelper { + private final Storage storage; + private final GcsSinkConfig config; + + public WriteHelper(Storage storage, GcsSinkConfig config) { + this.storage = storage; + this.config = config; + } + + public OutputWriter openFile(final String filename) { + final BlobInfo blob = BlobInfo.newBuilder(config.getBucketName(), config.getPrefix() + filename) + .setContentEncoding(config.getObjectContentEncoding()) + .build(); + try { + var out = Channels.newOutputStream(storage.writer(blob)); + return OutputWriter.builder() + .withExternalProperties(config.originalsStrings()) + .withOutputFields(config.getOutputFields()) + .withCompressionType(config.getCompressionType()) + .withEnvelopeEnabled(config.envelopeEnabled()) + .build(out, config.getFormatType()); + } catch (final Exception e) { // NOPMD broad exception catched + throw new ConnectException(e); + } + } + public void flushFile(final String filename, final List records) { + try (var writer = openFile(filename)) { + writer.writeRecords(records); + } catch (final Exception e) { // NOPMD broad exception catched + throw new ConnectException(e); + } + } + +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/OverloadHandler.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/OverloadHandler.java new file mode 100644 index 000000000..f1f2c2ca5 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/OverloadHandler.java @@ -0,0 +1,15 @@ +package io.aiven.kafka.connect.gcs.overload; + +import io.aiven.kafka.connect.gcs.AsyncGcsSinkTask; +import io.aiven.kafka.connect.gcs.AsyncRecordStreamHandler; +import io.aiven.kafka.connect.gcs.AsyncWriteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicLong; + +public interface OverloadHandler { + void addedRecords(int recordCount); + void startFlush(AsyncRecordStreamHandler stream); + void finishedFlush(AsyncRecordStreamHandler stream, AsyncWriteResult result); +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/OverloadHandlerActions.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/OverloadHandlerActions.java new file mode 100644 index 000000000..0db5574b5 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/OverloadHandlerActions.java @@ -0,0 +1,25 @@ +package io.aiven.kafka.connect.gcs.overload; + +import io.aiven.kafka.connect.gcs.AsyncGcsSinkTask.StreamSnapshotMetadata; +import java.util.List; + +/** + * The limited actions that the overload handler can take + */ +public interface OverloadHandlerActions { + /** + * Get the list of open streams. Note - the list is a snapshot and may not be up to date. Streams may be removed + * asynchronously as they time out. No new streams will be added, or records added to existing streams by background threads + * + * @return the list of open streams + */ + List snapshotOpenStreams(); + + /** + * force a stream to start writing asynchronously or synchronously (depending on the writer implementation) + * Typically used to retrieve memory pressure + * + * @param streamSnapshotMetadata the stream to force write + */ + void forceStartAsyncWrite(StreamSnapshotMetadata streamSnapshotMetadata); +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/PauseOrFlushOverloadHandler.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/PauseOrFlushOverloadHandler.java new file mode 100644 index 000000000..cd8b1d57d --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/overload/PauseOrFlushOverloadHandler.java @@ -0,0 +1,121 @@ +package io.aiven.kafka.connect.gcs.overload; + +import io.aiven.kafka.connect.gcs.AsyncRecordStreamHandler; +import io.aiven.kafka.connect.gcs.AsyncWriteResult; +import io.aiven.kafka.connect.gcs.Instrumentation; +import io.aiven.kafka.connect.gcs.AsyncGcsSinkTask.StreamSnapshotMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class PauseOrFlushOverloadHandler implements OverloadHandler { + private static final Logger LOG = LoggerFactory.getLogger(PauseOrFlushOverloadHandler.class); + private final static Comparator bySize = + Comparator.comparingLong(a -> a.recordCount).reversed();; + + //TODO - make these configurable + private final int softThreshold = 1000; + //TODO - make these configurable + private final int panic = 2000; + private final Instrumentation instrumentation; + private final OverloadHandlerActions actions; + private final Lock writingLock = new ReentrantLock() ; + private final Condition writingCompleted = writingLock.newCondition(); + + public PauseOrFlushOverloadHandler(OverloadHandlerActions actions, Instrumentation instrumentation) { + this.actions = actions; + this.instrumentation = instrumentation; + if (panic <= 0 || softThreshold <= 0 || panic < softThreshold) { + throw new IllegalArgumentException("Panic ("+panic+") and soft ("+softThreshold+") thresholds must be positive and panic must be greater than soft"); + } + } + + @Override + public void addedRecords(int recordCount) { + final long pending = instrumentation.recordsPending(); + final long writing = instrumentation.recordsWriting(); + + if (pending + writing > softThreshold) { + if (pending + writing > panic) { + startHardFlush(); + } else if (pending > softThreshold) { + startSoftFlush(); + } + } + } + + /** + * ensure that there are sufficient records finishing to bring the pending count down below the panic threshold + */ + private void startHardFlush() { + LOG.warn("Starting hard flush, openPending: {}, panic :{}", instrumentation.recordsPending(), panic); + + //wait for enough of the flushes to complete to bring us down below the panic threshold + writingLock.lock(); + try { + while (instrumentation.recordsPending() + instrumentation.recordsWriting() > panic) { + try { + var woken = writingCompleted.await(10, TimeUnit.SECONDS); + LOG.info("During hard flush after sleep, woken: {}, openPending: {}, panic :{}", woken, instrumentation.recordsPending(), panic); + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting for hard flush to complete", e); + Thread.currentThread().interrupt(); + // TODO - not sure if we should return or retry + // if we retry then we need to interrupt at the end if we had one interrupt here + // for the moment we return, and assume that we will probably get a callback if + // we are still overloaded + return; + } + } + } finally { + writingLock.unlock(); + } + LOG.info("Finished hard flush, openPending: {}, panic :{}", instrumentation.recordsPending(), panic); + } + + /** + * ensure that there are sufficient records finishing to bring the pending count down below the soft threshold + */ + private void startSoftFlush() { + var open = actions.snapshotOpenStreams(); + //recalculate the pending count, and base all decisions on the snapshot - it makes life easier to reason about + //but in reality records that time out may be removed for the underlying collection + + //we have openPending as a heap variable so we can modify it in the lambda + final long[] openPending = {open.stream().mapToLong(r -> r.recordCount).sum()}; + if (openPending[0] > softThreshold) { + LOG.warn("Starting soft flush, openPending: {}, softThreshold :{}", openPending[0], softThreshold); + open.sort(bySize); + open + .stream() + .takeWhile(r -> openPending[0] > softThreshold) + .forEach(stream -> { + LOG.info("Starting early async write for stream: {}, recordCount :{}", stream.groupId, stream.recordCount); + actions.forceStartAsyncWrite(stream); + }); + LOG.info("finished soft flush, openPending: {}, softThreshold :{}", openPending[0], softThreshold); + } + } + + @Override + public void startFlush(AsyncRecordStreamHandler stream) { + LOG.info("startFlush {} records, total pending: {}", stream.recordCount(), instrumentation.recordsPending()); + } + + @Override + public void finishedFlush(AsyncRecordStreamHandler stream, AsyncWriteResult result) { + LOG.info("finishedFlush {} records, total pending: {}", stream.recordCount(), instrumentation.recordsPending()); + writingLock.lock(); + try { + writingCompleted.signalAll(); + } finally { + writingLock.unlock(); + } + } + +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/EarlyWriteStreamWriter.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/EarlyWriteStreamWriter.java new file mode 100644 index 000000000..1fb8287cc --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/EarlyWriteStreamWriter.java @@ -0,0 +1,44 @@ +package io.aiven.kafka.connect.gcs.writer; + +import io.aiven.kafka.connect.common.output.OutputWriter; +import io.aiven.kafka.connect.gcs.AsyncWriteResult; +import io.aiven.kafka.connect.gcs.WriteHelper; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.io.IOException; + +public class EarlyWriteStreamWriter extends StreamWriter { + private final OutputWriter writer; + private int recordCount; + private long durationNs; + + public EarlyWriteStreamWriter(WriteHelper writeHelper, String filename) { + this.writer = writeHelper.openFile(filename); + } + + @Override + public void addRecord(SinkRecord record) { + final var start = System.nanoTime(); + try { + writer.writeRecord(record); + } catch (IOException e) { + throw new ConnectException(e); + } + durationNs += System.nanoTime() - start; + recordCount++; + } + + @Override + public AsyncWriteResult finishWriteAsync() { + final var start = System.nanoTime(); + try { + writer.close(); + } catch (IOException e) { + throw new ConnectException(e); + } + return new AsyncWriteResult(recordCount, durationNs, System.nanoTime() - start); + } + + +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/LateWriteStreamWriter.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/LateWriteStreamWriter.java new file mode 100644 index 000000000..e6ea5cf47 --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/LateWriteStreamWriter.java @@ -0,0 +1,35 @@ +package io.aiven.kafka.connect.gcs.writer; + +import io.aiven.kafka.connect.gcs.AsyncWriteResult; +import io.aiven.kafka.connect.gcs.WriteHelper; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.ArrayList; +import java.util.List; + +public class LateWriteStreamWriter extends StreamWriter { + private final List records; + private final WriteHelper writeHelper; + private final String filename; + + public LateWriteStreamWriter(WriteHelper writeHelper, String filename) { + this.writeHelper = writeHelper; + this.filename = filename; + this.records = new ArrayList<>(); + } + + @Override + public void addRecord(SinkRecord record) { + records.add(record); + } + + @Override + public AsyncWriteResult finishWriteAsync() { + final var start = System.nanoTime(); + writeHelper.flushFile(filename, records); + return new AsyncWriteResult(records.size(), 0, System.currentTimeMillis() - start); + } + + +} diff --git a/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/StreamWriter.java b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/StreamWriter.java new file mode 100644 index 000000000..a8263dacb --- /dev/null +++ b/gcs-sink-connector/src/main/java/io/aiven/kafka/connect/gcs/writer/StreamWriter.java @@ -0,0 +1,11 @@ +package io.aiven.kafka.connect.gcs.writer; + +import io.aiven.kafka.connect.gcs.AsyncWriteResult; +import org.apache.kafka.connect.sink.SinkRecord; + +public abstract class StreamWriter { + + public abstract void addRecord(SinkRecord record); + + public abstract AsyncWriteResult finishWriteAsync(); +}