diff --git a/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java b/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java index d6f154bafd..7c2c63df5d 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java +++ b/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java @@ -16,6 +16,7 @@ */ package org.astraea.common.backup; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; @@ -77,8 +78,10 @@ private static Record readRecord(InputStream inputStream) { .serializedValueSize(outerRecord.getValue().size()) .build(); } catch (IOException e) { - throw new SerializationException(e); + // swallow the exception until the importer can read metadata at the end of the file. + if (!(e instanceof InvalidProtocolBufferException)) throw new SerializationException(e); } + return null; } private InputStream fs; @@ -104,7 +107,7 @@ public RecordReaderBuilder buffered(int size) { public RecordReader build() { var version = ByteUtils.readShort(fs); - if (version == 0) return V0.apply(fs); + if (version == 0 || version == 1) return V0.apply(fs); throw new IllegalArgumentException("unsupported version: " + version); } diff --git a/common/src/main/java/org/astraea/common/backup/RecordWriter.java b/common/src/main/java/org/astraea/common/backup/RecordWriter.java index ea8597bd14..ee56c11b37 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordWriter.java +++ b/common/src/main/java/org/astraea/common/backup/RecordWriter.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; +import org.astraea.common.Configuration; import org.astraea.common.DataSize; import org.astraea.common.Utils; import org.astraea.common.consumer.Record; @@ -52,6 +53,10 @@ static RecordWriterBuilder builder(File file) { } static RecordWriterBuilder builder(OutputStream outputStream) { - return new RecordWriterBuilder(RecordWriterBuilder.LATEST_VERSION, outputStream); + return new RecordWriterBuilder((short) 0, outputStream); + } + + static RecordWriterBuilder builder(OutputStream outputStream, Configuration configuration) { + return new RecordWriterBuilder(RecordWriterBuilder.LATEST_VERSION, outputStream, configuration); } } diff --git a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java index 7e5070c0e9..6401d6f52f 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java +++ b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java @@ -19,12 +19,15 @@ import com.google.protobuf.ByteString; import java.io.BufferedOutputStream; import java.io.OutputStream; +import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.zip.GZIPOutputStream; import org.astraea.common.ByteUtils; +import org.astraea.common.Configuration; import org.astraea.common.DataSize; import org.astraea.common.Utils; import org.astraea.common.consumer.Record; @@ -32,6 +35,32 @@ public class RecordWriterBuilder { + static RecordOuterClass.Record record2Builder(Record record) { + return Utils.packException( + () -> + RecordOuterClass.Record.newBuilder() + .setTopic(record.topic()) + .setPartition(record.partition()) + .setOffset(record.offset()) + .setTimestamp(record.timestamp()) + .setKey(record.key() == null ? ByteString.EMPTY : ByteString.copyFrom(record.key())) + .setValue( + record.value() == null ? ByteString.EMPTY : ByteString.copyFrom(record.value())) + .addAllHeaders( + record.headers().stream() + .map( + header -> + RecordOuterClass.Record.Header.newBuilder() + .setKey(header.key()) + .setValue( + header.value() == null + ? ByteString.EMPTY + : ByteString.copyFrom(header.value())) + .build()) + .toList()) + .build()); + } + private static final Function V0 = outputStream -> new RecordWriter() { @@ -44,33 +73,7 @@ public class RecordWriterBuilder { public void append(Record record) { Utils.packException( () -> { - var recordBuilder = - RecordOuterClass.Record.newBuilder() - .setTopic(record.topic()) - .setPartition(record.partition()) - .setOffset(record.offset()) - .setTimestamp(record.timestamp()) - .setKey( - record.key() == null - ? ByteString.EMPTY - : ByteString.copyFrom(record.key())) - .setValue( - record.value() == null - ? ByteString.EMPTY - : ByteString.copyFrom(record.value())) - .addAllHeaders( - record.headers().stream() - .map( - header -> - RecordOuterClass.Record.Header.newBuilder() - .setKey(header.key()) - .setValue( - header.value() == null - ? ByteString.EMPTY - : ByteString.copyFrom(header.value())) - .build()) - .toList()) - .build(); + var recordBuilder = record2Builder(record); recordBuilder.writeDelimitedTo(outputStream); this.size.add(recordBuilder.getSerializedSize()); }); @@ -108,28 +111,124 @@ public void close() { } }; - public static final short LATEST_VERSION = (short) 0; + private static final BiFunction V1 = + (configuration, outputStream) -> + new RecordWriter() { + private final AtomicInteger count = new AtomicInteger(); + private final LongAdder size = new LongAdder(); + private final AtomicLong latestAppendTimestamp = new AtomicLong(); + private final String connectorName; + private final Long interval; + private final String compressionType; + private OutputStream targetOutputStream; + + // instance initializer block + { + this.connectorName = configuration.requireString("connector.name"); + this.interval = + configuration + .string("roll.duration") + .map(Utils::toDuration) + .orElse(Duration.ofSeconds(3)) + .toMillis(); + this.compressionType = configuration.string("compression.type").orElse("none"); + + switch (this.compressionType) { + case "gzip" -> Utils.packException( + () -> this.targetOutputStream = new GZIPOutputStream(outputStream)); + case "none" -> this.targetOutputStream = outputStream; + default -> throw new IllegalArgumentException( + String.format("compression type '%s' is not supported", this.compressionType)); + } + } + + byte[] extendString(String input, int length) { + byte[] original = input.getBytes(); + byte[] result = new byte[length]; + System.arraycopy(original, 0, result, 0, original.length); + for (int i = original.length; i < result.length; i++) { + result[i] = (byte) ' '; + } + return result; + } + + void appendMetadata() { + Utils.packException( + () -> { + if (this.compressionType.equals("gzip")) { + ((GZIPOutputStream) targetOutputStream).finish(); + } + + // 552 Bytes total for whole metadata. + outputStream.write( + this.extendString(this.connectorName, 255)); // 255 Bytes for this connector + outputStream.write(ByteUtils.toBytes(this.count())); // 4 Bytes for count + outputStream.write( + ByteUtils.toBytes(this.interval)); // 8 Bytes for mills of roll.duration + outputStream.write( + this.extendString( + this.compressionType, 10)); // 10 Bytes for compression type name. + }); + } + + @Override + public void append(Record record) { + Utils.packException( + () -> { + var recordBuilder = record2Builder(record); + recordBuilder.writeDelimitedTo(this.targetOutputStream); + this.size.add(recordBuilder.getSerializedSize()); + }); + count.incrementAndGet(); + this.latestAppendTimestamp.set(System.currentTimeMillis()); + } + + @Override + public DataSize size() { + return DataSize.Byte.of(size.sum()); + } + + @Override + public int count() { + return count.get(); + } + + @Override + public void flush() { + Utils.packException(outputStream::flush); + } + + @Override + public long latestAppendTimestamp() { + return this.latestAppendTimestamp.get(); + } + + @Override + public void close() { + Utils.packException( + () -> { + appendMetadata(); + outputStream.flush(); + outputStream.close(); + }); + } + }; + + public static final short LATEST_VERSION = (short) 1; private final short version; private OutputStream fs; + private Configuration configuration; RecordWriterBuilder(short version, OutputStream outputStream) { this.version = version; this.fs = outputStream; } - public RecordWriterBuilder compression(String type) { - switch (type) { - case "gzip": - this.fs = Utils.packException(() -> new GZIPOutputStream(this.fs)); - break; - case "none": - // do nothing. - break; - default: - throw new IllegalArgumentException("unsupported compression type: " + type); - } - return this; + RecordWriterBuilder(short version, OutputStream outputStream, Configuration configuration) { + this.version = version; + this.fs = outputStream; + this.configuration = configuration; } public RecordWriterBuilder buffered() { @@ -148,6 +247,9 @@ public RecordWriter build() { if (version == 0) { fs.write(ByteUtils.toBytes(version)); return V0.apply(fs); + } else if (version == 1) { + fs.write(ByteUtils.toBytes(version)); + return V1.apply(this.configuration, fs); } throw new IllegalArgumentException("unsupported version: " + version); }); diff --git a/connector/src/main/java/org/astraea/connector/SinkTaskContext.java b/connector/src/main/java/org/astraea/connector/SinkTaskContext.java index fc2c069c57..363ad3c66b 100644 --- a/connector/src/main/java/org/astraea/connector/SinkTaskContext.java +++ b/connector/src/main/java/org/astraea/connector/SinkTaskContext.java @@ -37,6 +37,11 @@ public void pause(Collection partitions) {} @Override public void requestCommit() {} + + @Override + public Map configs() { + return null; + } }; /** @@ -67,6 +72,8 @@ public void requestCommit() {} */ void requestCommit(); + Map configs(); + static SinkTaskContext of(org.apache.kafka.connect.sink.SinkTaskContext context) { return new SinkTaskContext() { @Override @@ -94,6 +101,12 @@ public void pause(Collection partitions) { public void requestCommit() { context.requestCommit(); } + + @Override + public Map configs() { + return context.configs(); + } + ; }; } } diff --git a/connector/src/main/java/org/astraea/connector/backup/Exporter.java b/connector/src/main/java/org/astraea/connector/backup/Exporter.java index 2080d8dc29..d6258aecfd 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Exporter.java +++ b/connector/src/main/java/org/astraea/connector/backup/Exporter.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.astraea.common.Configuration; @@ -195,6 +196,8 @@ public static class Task extends SinkTask { long interval; String compressionType; + Configuration configuration; + // a map of > private final Map> offsetForTopicPartition = new HashMap<>(); @@ -205,12 +208,24 @@ public static class Task extends SinkTask { private SinkTaskContext taskContext; - RecordWriter createRecordWriter(TopicPartition tp, long offset) { - var fileName = String.valueOf(offset); + // create for test + RecordWriter createRecordWriter(Record record, Configuration configuration) { + var fileName = String.valueOf(record.offset()); return RecordWriter.builder( fs.write( - String.join("/", path, tp.topic(), String.valueOf(tp.partition()), fileName))) - .compression(this.compressionType) + String.join( + "/", path, record.topic(), String.valueOf(record.partition()), fileName)), + configuration) + .build(); + } + + RecordWriter createRecordWriter(Record record) { + var fileName = String.valueOf(record.offset()); + return RecordWriter.builder( + fs.write( + String.join( + "/", path, record.topic(), String.valueOf(record.partition()), fileName)), + this.configuration) .build(); } @@ -249,8 +264,7 @@ void writeRecords(HashMap writers) { records.forEach( record -> { var writer = - writers.computeIfAbsent( - record.topicPartition(), tp -> createRecordWriter(tp, record.offset())); + writers.computeIfAbsent(record.topicPartition(), tp -> createRecordWriter(record)); writer.append(record); if (writer.size().greaterThan(size)) { writers.remove(record.topicPartition()).close(); @@ -317,6 +331,13 @@ protected void init(Configuration configuration, SinkTaskContext context) { .orElse(COMPRESSION_TYPE_DEFAULT) .toLowerCase(); + var originalConfiguration = + configuration.raw().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + originalConfiguration.computeIfAbsent( + "connector.name", k -> this.taskContext.configs().get("name")); + this.configuration = new Configuration(originalConfiguration); + // fetches key-value pairs from the configuration's variable matching the regular expression // '.*offset.from', updates the values of 'offsetForTopic' or 'offsetForTopicPartition' based // on the diff --git a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java index 6f03beba1f..5f1fd179ab 100644 --- a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java +++ b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java @@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.time.Duration; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +40,7 @@ import org.astraea.common.connector.Value; import org.astraea.common.consumer.Record; import org.astraea.common.producer.Producer; +import org.astraea.connector.SinkTaskContext; import org.astraea.fs.FileSystem; import org.astraea.it.FtpServer; import org.astraea.it.HdfsServer; @@ -52,6 +54,26 @@ public class ExporterTest { private static final Service SERVICE = Service.builder().numberOfWorkers(1).numberOfBrokers(1).build(); + private static final SinkTaskContext context = + new SinkTaskContext() { + @Override + public void offset(Map offsets) {} + + @Override + public void offset(TopicPartition topicPartition, long offset) {} + + @Override + public void pause(Collection partitions) {} + + @Override + public void requestCommit() {} + + @Override + public Map configs() { + return Map.of("name", "test"); + } + }; + @AfterAll static void closeService() { SERVICE.close(); @@ -191,7 +213,7 @@ void testFtpSinkTask() { var fs = FileSystem.of("ftp", new Configuration(configs)); - task.start(configs); + task.init(new Configuration(configs), context); var records = List.of( @@ -278,7 +300,7 @@ void testFtpSinkTaskIntervalWith1File() { var fs = FileSystem.of("ftp", new Configuration(configs)); - task.start(configs); + task.init(new Configuration(configs), context); var records1 = Record.builder() @@ -353,7 +375,7 @@ void testFtpSinkTaskIntervalWith2Writers() { var fs = FileSystem.of("ftp", new Configuration(configs)); - task.start(configs); + task.init(new Configuration(configs), context); var record1 = Record.builder() @@ -458,7 +480,7 @@ void testHdfsSinkTask() { "fs.hdfs.override.dfs.client.use.datanode.hostname", "true"); - task.start(configs); + task.init(new Configuration(configs), context); var records = List.of( @@ -557,7 +579,7 @@ void testHdfsSinkTaskIntervalWith1File() { "roll.duration", "300ms"); - task.start(configs); + task.init(new Configuration(configs), context); var records1 = Record.builder() @@ -630,7 +652,7 @@ void testHdfsSinkTaskIntervalWith2Writers() { "roll.duration", "100ms"); - task.start(configs); + task.init(new Configuration(configs), context); var record1 = Record.builder() @@ -751,8 +773,11 @@ void testCreateRecordWriter() { var topicName = Utils.randomString(10); var tp = TopicPartition.of(topicName, 0); long offset = 123; + var testRecord = Record.builder().topic(topicName).topicPartition(tp).offset(offset).build(); var configs = Map.of( + "connector.name", + "test", "fs.schema", "hdfs", "topics", @@ -779,7 +804,7 @@ void testCreateRecordWriter() { task.interval = 1000; task.compressionType = "none"; - RecordWriter recordWriter = task.createRecordWriter(tp, offset); + RecordWriter recordWriter = task.createRecordWriter(testRecord, new Configuration(configs)); Assertions.assertNotNull(recordWriter); @@ -819,6 +844,8 @@ void testWriteRecords() { var path = "/test"; var configs = Map.of( + "connector.name", + "test", "fs.schema", "hdfs", "topics", @@ -843,6 +870,7 @@ void testWriteRecords() { var task = new Exporter.Task(); task.fs = FileSystem.of("hdfs", new Configuration(configs)); task.compressionType = "none"; + task.configuration = new Configuration(configs); task.size = DataSize.of("100MB"); task.bufferSize.reset(); task.recordsQueue.add( @@ -889,7 +917,7 @@ void testIsValid() { var task = new Exporter.Task(); - task.start(configs); + task.init(new Configuration(configs), context); var record1 = Record.builder() @@ -981,7 +1009,7 @@ void testCompression() { "compression.type", "gzip"); - task.start(configs); + task.init(new Configuration(configs), context); var records = List.of( @@ -1012,7 +1040,8 @@ void testCompression() { var input = fs.read("/" + String.join("/", fileSize, topicName, "0/0")); Assertions.assertArrayEquals( - new byte[] {(byte) 0x1f, (byte) 0x8b}, Utils.packException(() -> input.readNBytes(2))); + new byte[] {(byte) 0x0, (byte) 0x1, (byte) 0x1f, (byte) 0x8b}, + Utils.packException(() -> input.readNBytes(4))); } } } diff --git a/fs/src/main/java/org/astraea/fs/ftp/FtpFileSystem.java b/fs/src/main/java/org/astraea/fs/ftp/FtpFileSystem.java index e27e4bec67..ce172c1476 100644 --- a/fs/src/main/java/org/astraea/fs/ftp/FtpFileSystem.java +++ b/fs/src/main/java/org/astraea/fs/ftp/FtpFileSystem.java @@ -207,6 +207,11 @@ public void write(int b) throws IOException { outputStream.write(b); } + @Override + public void write(byte b[]) throws IOException { + outputStream.write(b); + } + @Override public void write(byte b[], int off, int len) throws IOException { outputStream.write(b, off, len);