diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000..e0f7e4c --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,5 @@ +#!groovy +@Library('jenkins-pipeline') import com.github.jcustenborder.jenkins.pipeline.KafkaConnectPipeline + +def pipe = new KafkaConnectPipeline() +pipe.execute() \ No newline at end of file diff --git a/README.md b/README.md index ef1d1cb..7c27db7 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,19 @@ -# kafka-connect-transform-archive -Kafka Connect transform to assist with archiving to S3. +# Introduction + +Kafka Connect transform to assist with archiving to S3. + +## Kafka 0.11.x and newer + +```properties + + +``` + +## Kafka 0.10.x and older + +```properties + + + +``` + diff --git a/config/archive.properties b/config/archive.properties new file mode 100644 index 0000000..e69de29 diff --git a/docs/transformations.rst b/docs/transformations.rst new file mode 100644 index 0000000..0c86d2a --- /dev/null +++ b/docs/transformations.rst @@ -0,0 +1,21 @@ +================= +Archive Transform +================= + +The Archive transformation is used to help preserve all of the data for a message when archived to S3. + +.. toctree:: + :maxdepth: 1 + :caption: Transformations: + :hidden: + :glob: + + transformations/* + + +.. toctree:: + :maxdepth: 0 + :caption: Schemas: + :hidden: + + schemas \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..bec67e8 --- /dev/null +++ b/pom.xml @@ -0,0 +1,77 @@ + + + + 4.0.0 + + com.github.jcustenborder.kafka.connect + kafka-connect-parent + 1.0.0 + + kafka-connect-transform-archive + 0.1.0-SNAPSHOT + kafka-connect-transform-archive + https://github.com/jcustenborder/kafka-connect-transform-archive + 2017 + + + Apache License 2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + Jeremy Custenborder + jeremy@confluent.io + https://github.com/jcustenborder + + maintainer + + + + + scm:git:https://github.com/jcustenborder/kafka-connect-transform-archive.git + scm:git:git@github.com:jcustenborder/kafka-connect-transform-archive.git + https://github.com/jcustenborder/kafka-connect-transform-archive + + + github + https://github.com/jcustenborder/kafka-connect-transform-archive/issues + + + + com.github.jcustenborder + cef-parser + [0.0.1.7,0.0.1.2000) + + + org.reflections + reflections + 0.9.10 + test + + + com.github.jcustenborder.kafka.connect + connect-utils-testing-data + [0.2.33,0.2.1000) + test + + + diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java b/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java new file mode 100644 index 0000000..c50be13 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/archive/Archive.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.archive; + +import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.DocumentationNote; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.Transformation; + +import java.util.Map; + +@Description("The Archive transformation is used to help preserve all of the data for a message when archived to S3.") +@DocumentationNote("This transform works by copying the key, value, topic, and timestamp to new record where this is all " + + "contained in the value of the message. This will allow connectors like Confluent's S3 connector to properly archive " + + "the record.") +public class Archive> implements Transformation { + @Override + public R apply(R r) { + final Schema schema = SchemaBuilder.struct() + .name("com.github.jcustenborder.kafka.connect.archive.Storage") + .field("key", r.keySchema()) + .field("value", r.valueSchema()) + .field("topic", Schema.STRING_SCHEMA) + .field("timestamp", Schema.INT64_SCHEMA); + Struct value = new Struct(schema) + .put("key", r.key()) + .put("value", r.value()) + .put("topic", r.topic()) + .put("timestamp", r.timestamp()); + return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, value, r.timestamp()); + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } +}