From 639435df45fd93fcb8ba5743ce42fd8f597f6bd1 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Sun, 16 Jul 2023 16:42:06 +0800 Subject: [PATCH 1/4] recordWriter will append metadata before closing. During the creation of a new recordWriter, the exporter task will transmit both the configuration and the first record for writing metadata. A new method has been added to the SinkContext for retrieving the connector name to be used within the metadata. --- .../astraea/common/backup/RecordWriter.java | 7 ++ .../common/backup/RecordWriterBuilder.java | 107 ++++++++++++++---- .../astraea/connector/SinkTaskContext.java | 13 +++ .../astraea/connector/backup/Exporter.java | 35 +++++- .../connector/backup/ExporterTest.java | 33 +++++- .../org/astraea/fs/ftp/FtpFileSystem.java | 5 + 6 files changed, 167 insertions(+), 33 deletions(-) diff --git a/common/src/main/java/org/astraea/common/backup/RecordWriter.java b/common/src/main/java/org/astraea/common/backup/RecordWriter.java index ea8597bd14..22bfc42172 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordWriter.java +++ b/common/src/main/java/org/astraea/common/backup/RecordWriter.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; +import org.astraea.common.Configuration; import org.astraea.common.DataSize; import org.astraea.common.Utils; import org.astraea.common.consumer.Record; @@ -54,4 +55,10 @@ static RecordWriterBuilder builder(File file) { static RecordWriterBuilder builder(OutputStream outputStream) { return new RecordWriterBuilder(RecordWriterBuilder.LATEST_VERSION, outputStream); } + + static RecordWriterBuilder builder( + OutputStream outputStream, Configuration configuration, Record record) { + return new RecordWriterBuilder( + RecordWriterBuilder.LATEST_VERSION, outputStream, configuration, record); + } } diff --git a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java index 7e5070c0e9..4278de745a 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java +++ b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java @@ -19,26 +19,90 @@ import com.google.protobuf.ByteString; import java.io.BufferedOutputStream; import java.io.OutputStream; +import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; -import java.util.function.Function; import java.util.zip.GZIPOutputStream; import org.astraea.common.ByteUtils; +import org.astraea.common.Configuration; import org.astraea.common.DataSize; import org.astraea.common.Utils; import org.astraea.common.consumer.Record; +import org.astraea.common.function.Bi3Function; import org.astraea.common.generated.RecordOuterClass; public class RecordWriterBuilder { - private static final Function V0 = - outputStream -> + private static final Bi3Function V0 = + (configuration, record, outputStream) -> new RecordWriter() { private final AtomicInteger count = new AtomicInteger(); private final LongAdder size = new LongAdder(); - private final AtomicLong latestAppendTimestamp = new AtomicLong(); + private final String connectorName; + private final Long interval; + private final String compressionType; + private OutputStream targetOutputStream; + + // instance initializer block + { + this.connectorName = configuration.requireString("connector.name"); + this.interval = + configuration + .string("roll.duration") + .map(Utils::toDuration) + .orElse(Duration.ofSeconds(3)) + .toMillis(); + this.compressionType = configuration.string("compression.type").orElse("none"); + + switch (this.compressionType) { + case "gzip" -> Utils.packException( + () -> this.targetOutputStream = new GZIPOutputStream(outputStream)); + case "none" -> this.targetOutputStream = outputStream; + default -> throw new IllegalArgumentException( + String.format("compression type '%s' is not supported", this.compressionType)); + } + } + + byte[] extendString(String input, int length) { + byte[] original = input.getBytes(); + byte[] result = new byte[length]; + System.arraycopy(original, 0, result, 0, original.length); + for (int i = original.length; i < result.length; i++) { + result[i] = (byte) ' '; + } + return result; + } + + void appendMetadata() { + Utils.packException( + () -> { + if (this.compressionType.equals("gzip")) { + ((GZIPOutputStream) targetOutputStream).finish(); + } + + // 552 Bytes total for whole metadata. + outputStream.write( + this.extendString( + this.connectorName, 255)); // 255 Bytes for this connector name + outputStream.write( + this.extendString(record.topic(), 255)); // 255 Bytes for topic name + // 42 Bytes + outputStream.write( + ByteUtils.toBytes(record.partition())); // 4 Bytes for partition + outputStream.write( + ByteUtils.toBytes(record.offset())); // 8 bytes for 1st record offset + outputStream.write( + ByteUtils.toBytes(record.timestamp())); // 8 bytes for 1st record timestamp + outputStream.write(ByteUtils.toBytes(this.count())); // 4 Bytes for count + outputStream.write( + ByteUtils.toBytes(this.interval)); // 8 Bytes for mills of roll.duration + outputStream.write( + this.extendString( + this.compressionType, 10)); // 10 Bytes for compression type name. + }); + } @Override public void append(Record record) { @@ -71,18 +135,13 @@ public void append(Record record) { .build()) .toList()) .build(); - recordBuilder.writeDelimitedTo(outputStream); + recordBuilder.writeDelimitedTo(this.targetOutputStream); this.size.add(recordBuilder.getSerializedSize()); }); count.incrementAndGet(); this.latestAppendTimestamp.set(System.currentTimeMillis()); } - @Override - public long latestAppendTimestamp() { - return this.latestAppendTimestamp.get(); - } - @Override public DataSize size() { return DataSize.Byte.of(size.sum()); @@ -98,10 +157,16 @@ public void flush() { Utils.packException(outputStream::flush); } + @Override + public long latestAppendTimestamp() { + return this.latestAppendTimestamp.get(); + } + @Override public void close() { Utils.packException( () -> { + appendMetadata(); outputStream.flush(); outputStream.close(); }); @@ -112,24 +177,20 @@ public void close() { private final short version; private OutputStream fs; + private Configuration configuration; + private Record record; RecordWriterBuilder(short version, OutputStream outputStream) { this.version = version; this.fs = outputStream; } - public RecordWriterBuilder compression(String type) { - switch (type) { - case "gzip": - this.fs = Utils.packException(() -> new GZIPOutputStream(this.fs)); - break; - case "none": - // do nothing. - break; - default: - throw new IllegalArgumentException("unsupported compression type: " + type); - } - return this; + RecordWriterBuilder( + short version, OutputStream outputStream, Configuration configuration, Record record) { + this.version = version; + this.fs = outputStream; + this.configuration = configuration; + this.record = record; } public RecordWriterBuilder buffered() { @@ -147,7 +208,7 @@ public RecordWriter build() { () -> { if (version == 0) { fs.write(ByteUtils.toBytes(version)); - return V0.apply(fs); + return V0.apply(this.configuration, this.record, fs); } throw new IllegalArgumentException("unsupported version: " + version); }); diff --git a/connector/src/main/java/org/astraea/connector/SinkTaskContext.java b/connector/src/main/java/org/astraea/connector/SinkTaskContext.java index fc2c069c57..363ad3c66b 100644 --- a/connector/src/main/java/org/astraea/connector/SinkTaskContext.java +++ b/connector/src/main/java/org/astraea/connector/SinkTaskContext.java @@ -37,6 +37,11 @@ public void pause(Collection partitions) {} @Override public void requestCommit() {} + + @Override + public Map configs() { + return null; + } }; /** @@ -67,6 +72,8 @@ public void requestCommit() {} */ void requestCommit(); + Map configs(); + static SinkTaskContext of(org.apache.kafka.connect.sink.SinkTaskContext context) { return new SinkTaskContext() { @Override @@ -94,6 +101,12 @@ public void pause(Collection partitions) { public void requestCommit() { context.requestCommit(); } + + @Override + public Map configs() { + return context.configs(); + } + ; }; } } diff --git a/connector/src/main/java/org/astraea/connector/backup/Exporter.java b/connector/src/main/java/org/astraea/connector/backup/Exporter.java index 855320ec1a..d2f2eda3ce 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Exporter.java +++ b/connector/src/main/java/org/astraea/connector/backup/Exporter.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.astraea.common.Configuration; @@ -195,6 +196,8 @@ public static class Task extends SinkTask { long interval; String compressionType; + Configuration configuration; + // a map of > private final Map> offsetForTopicPartition = new HashMap<>(); @@ -205,12 +208,26 @@ public static class Task extends SinkTask { private SinkTaskContext taskContext; - RecordWriter createRecordWriter(TopicPartition tp, long offset) { - var fileName = String.valueOf(offset); + // create for test + RecordWriter createRecordWriter(Record record, Configuration configuration) { + var fileName = String.valueOf(record.offset()); return RecordWriter.builder( fs.write( - String.join("/", path, tp.topic(), String.valueOf(tp.partition()), fileName))) - .compression(this.compressionType) + String.join( + "/", path, record.topic(), String.valueOf(record.partition()), fileName)), + configuration, + record) + .build(); + } + + RecordWriter createRecordWriter(Record record) { + var fileName = String.valueOf(record.offset()); + return RecordWriter.builder( + fs.write( + String.join( + "/", path, record.topic(), String.valueOf(record.partition()), fileName)), + this.configuration, + record) .build(); } @@ -249,8 +266,7 @@ void writeRecords(HashMap writers) { records.forEach( record -> { var writer = - writers.computeIfAbsent( - record.topicPartition(), tp -> createRecordWriter(tp, record.offset())); + writers.computeIfAbsent(record.topicPartition(), tp -> createRecordWriter(record)); writer.append(record); if (writer.size().greaterThan(size)) { writers.remove(record.topicPartition()).close(); @@ -317,6 +333,13 @@ protected void init(Configuration configuration, SinkTaskContext context) { .orElse(COMPRESSION_TYPE_DEFAULT) .toLowerCase(); + var originalConfiguration = + configuration.raw().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + originalConfiguration.computeIfAbsent( + "connector.name", k -> this.taskContext.configs().get("name")); + this.configuration = new Configuration(originalConfiguration); + // fetches key-value pairs from the configuration's variable matching the regular expression // '.*offset.from', updates the values of 'offsetForTopic' or 'offsetForTopicPartition' based // on the diff --git a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java index 28e8d14de3..cdfac22967 100644 --- a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java +++ b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java @@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.time.Duration; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +40,7 @@ import org.astraea.common.connector.Value; import org.astraea.common.consumer.Record; import org.astraea.common.producer.Producer; +import org.astraea.connector.SinkTaskContext; import org.astraea.fs.FileSystem; import org.astraea.it.FtpServer; import org.astraea.it.HdfsServer; @@ -52,6 +54,26 @@ public class ExporterTest { private static final Service SERVICE = Service.builder().numberOfWorkers(1).numberOfBrokers(1).build(); + private static final SinkTaskContext context = + new SinkTaskContext() { + @Override + public void offset(Map offsets) {} + + @Override + public void offset(TopicPartition topicPartition, long offset) {} + + @Override + public void pause(Collection partitions) {} + + @Override + public void requestCommit() {} + + @Override + public Map configs() { + return Map.of("name", "test"); + } + }; + @AfterAll static void closeService() { SERVICE.close(); @@ -191,7 +213,7 @@ void testFtpSinkTask() { var fs = FileSystem.of("ftp", new Configuration(configs)); - task.start(configs); + task.init(new Configuration(configs), context); var records = List.of( @@ -751,8 +773,11 @@ void testCreateRecordWriter() { var topicName = Utils.randomString(10); var tp = TopicPartition.of(topicName, 0); long offset = 123; + var testRecord = Record.builder().topic(topicName).topicPartition(tp).offset(offset).build(); var configs = Map.of( + "name", + "test", "fs.schema", "hdfs", "topics", @@ -779,7 +804,7 @@ void testCreateRecordWriter() { task.interval = 1000; task.compressionType = "none"; - RecordWriter recordWriter = task.createRecordWriter(tp, offset); + RecordWriter recordWriter = task.createRecordWriter(testRecord, new Configuration(configs)); Assertions.assertNotNull(recordWriter); @@ -981,7 +1006,7 @@ void testCompression() { "compression.type", "gzip"); - task.start(configs); + task.init(new Configuration(configs), context); var records = List.of( @@ -1012,7 +1037,7 @@ void testCompression() { var input = fs.read("/" + String.join("/", fileSize, topicName, "0/0")); Assertions.assertArrayEquals( - new byte[] {(byte) 0x1f, (byte) 0x8b}, Utils.packException(() -> input.readNBytes(2))); + new byte[] {(byte) 0x0, (byte) 0x0, (byte) 0x1f, (byte) 0x8b}, Utils.packException(() -> input.readNBytes(4))); } } } diff --git a/fs/src/main/java/org/astraea/fs/ftp/FtpFileSystem.java b/fs/src/main/java/org/astraea/fs/ftp/FtpFileSystem.java index e27e4bec67..ce172c1476 100644 --- a/fs/src/main/java/org/astraea/fs/ftp/FtpFileSystem.java +++ b/fs/src/main/java/org/astraea/fs/ftp/FtpFileSystem.java @@ -207,6 +207,11 @@ public void write(int b) throws IOException { outputStream.write(b); } + @Override + public void write(byte b[]) throws IOException { + outputStream.write(b); + } + @Override public void write(byte b[], int off, int len) throws IOException { outputStream.write(b, off, len); From b033d941f9ccb3ee2cbf73cbcbf6fc5753476ca2 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Sun, 16 Jul 2023 22:38:12 +0800 Subject: [PATCH 2/4] a new version of recordWriter v1, which contains metadata at the end of a file. The reason for creating a new version instead of replacing version 0 is because the importer is not yet able to handle files with metadata information. --- .../common/backup/RecordReaderBuilder.java | 7 +- .../astraea/common/backup/RecordWriter.java | 2 +- .../common/backup/RecordWriterBuilder.java | 115 +++++++++++++----- .../connector/backup/ExporterTest.java | 20 +-- 4 files changed, 103 insertions(+), 41 deletions(-) diff --git a/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java b/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java index d6f154bafd..65e8aa29a6 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java +++ b/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java @@ -16,6 +16,7 @@ */ package org.astraea.common.backup; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; @@ -77,8 +78,10 @@ private static Record readRecord(InputStream inputStream) { .serializedValueSize(outerRecord.getValue().size()) .build(); } catch (IOException e) { - throw new SerializationException(e); + // swallow the exception until the importer can read metadata at the end of the file. + if (!(e instanceof InvalidProtocolBufferException)) throw new SerializationException(e); } + return null; } private InputStream fs; @@ -104,7 +107,7 @@ public RecordReaderBuilder buffered(int size) { public RecordReader build() { var version = ByteUtils.readShort(fs); - if (version == 0) return V0.apply(fs); + if (version == 0 || version == 1 ) return V0.apply(fs); throw new IllegalArgumentException("unsupported version: " + version); } diff --git a/common/src/main/java/org/astraea/common/backup/RecordWriter.java b/common/src/main/java/org/astraea/common/backup/RecordWriter.java index 22bfc42172..20aa69cfa4 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordWriter.java +++ b/common/src/main/java/org/astraea/common/backup/RecordWriter.java @@ -53,7 +53,7 @@ static RecordWriterBuilder builder(File file) { } static RecordWriterBuilder builder(OutputStream outputStream) { - return new RecordWriterBuilder(RecordWriterBuilder.LATEST_VERSION, outputStream); + return new RecordWriterBuilder((short) 0, outputStream); } static RecordWriterBuilder builder( diff --git a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java index 4278de745a..f225cc2f5d 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java +++ b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java @@ -20,9 +20,11 @@ import java.io.BufferedOutputStream; import java.io.OutputStream; import java.time.Duration; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; import java.util.zip.GZIPOutputStream; import org.astraea.common.ByteUtils; import org.astraea.common.Configuration; @@ -34,7 +36,83 @@ public class RecordWriterBuilder { - private static final Bi3Function V0 = + static RecordOuterClass.Record record2Builder(Record record) { + return Utils.packException( + () -> + RecordOuterClass.Record.newBuilder() + .setTopic(record.topic()) + .setPartition(record.partition()) + .setOffset(record.offset()) + .setTimestamp(record.timestamp()) + .setKey(record.key() == null ? ByteString.EMPTY : ByteString.copyFrom(record.key())) + .setValue( + record.value() == null ? ByteString.EMPTY : ByteString.copyFrom(record.value())) + .addAllHeaders( + record.headers().stream() + .map( + header -> + RecordOuterClass.Record.Header.newBuilder() + .setKey(header.key()) + .setValue( + header.value() == null + ? ByteString.EMPTY + : ByteString.copyFrom(header.value())) + .build()) + .toList()) + .build()); + } + + private static final Function V0 = + outputStream -> + new RecordWriter() { + private final AtomicInteger count = new AtomicInteger(); + private final LongAdder size = new LongAdder(); + + private final AtomicLong latestAppendTimestamp = new AtomicLong(); + + @Override + public void append(Record record) { + Utils.packException( + () -> { + var recordBuilder = record2Builder(record); + recordBuilder.writeDelimitedTo(outputStream); + this.size.add(recordBuilder.getSerializedSize()); + }); + count.incrementAndGet(); + this.latestAppendTimestamp.set(System.currentTimeMillis()); + } + + @Override + public long latestAppendTimestamp() { + return this.latestAppendTimestamp.get(); + } + + @Override + public DataSize size() { + return DataSize.Byte.of(size.sum()); + } + + @Override + public int count() { + return count.get(); + } + + @Override + public void flush() { + Utils.packException(outputStream::flush); + } + + @Override + public void close() { + Utils.packException( + () -> { + outputStream.flush(); + outputStream.close(); + }); + } + }; + + private static final Bi3Function V1 = (configuration, record, outputStream) -> new RecordWriter() { private final AtomicInteger count = new AtomicInteger(); @@ -108,33 +186,7 @@ void appendMetadata() { public void append(Record record) { Utils.packException( () -> { - var recordBuilder = - RecordOuterClass.Record.newBuilder() - .setTopic(record.topic()) - .setPartition(record.partition()) - .setOffset(record.offset()) - .setTimestamp(record.timestamp()) - .setKey( - record.key() == null - ? ByteString.EMPTY - : ByteString.copyFrom(record.key())) - .setValue( - record.value() == null - ? ByteString.EMPTY - : ByteString.copyFrom(record.value())) - .addAllHeaders( - record.headers().stream() - .map( - header -> - RecordOuterClass.Record.Header.newBuilder() - .setKey(header.key()) - .setValue( - header.value() == null - ? ByteString.EMPTY - : ByteString.copyFrom(header.value())) - .build()) - .toList()) - .build(); + var recordBuilder = record2Builder(record); recordBuilder.writeDelimitedTo(this.targetOutputStream); this.size.add(recordBuilder.getSerializedSize()); }); @@ -173,7 +225,7 @@ public void close() { } }; - public static final short LATEST_VERSION = (short) 0; + public static final short LATEST_VERSION = (short) 1; private final short version; private OutputStream fs; @@ -208,7 +260,10 @@ public RecordWriter build() { () -> { if (version == 0) { fs.write(ByteUtils.toBytes(version)); - return V0.apply(this.configuration, this.record, fs); + return V0.apply(fs); + } else if (version == 1) { + fs.write(ByteUtils.toBytes(version)); + return V1.apply(this.configuration, this.record, fs); } throw new IllegalArgumentException("unsupported version: " + version); }); diff --git a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java index cdfac22967..2eaa223b1c 100644 --- a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java +++ b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java @@ -300,7 +300,7 @@ void testFtpSinkTaskIntervalWith1File() { var fs = FileSystem.of("ftp", new Configuration(configs)); - task.start(configs); + task.init(new Configuration(configs), context); var records1 = Record.builder() @@ -375,7 +375,7 @@ void testFtpSinkTaskIntervalWith2Writers() { var fs = FileSystem.of("ftp", new Configuration(configs)); - task.start(configs); + task.init(new Configuration(configs), context); var record1 = Record.builder() @@ -480,7 +480,7 @@ void testHdfsSinkTask() { "fs.hdfs.override.dfs.client.use.datanode.hostname", "true"); - task.start(configs); + task.init(new Configuration(configs), context); var records = List.of( @@ -579,7 +579,7 @@ void testHdfsSinkTaskIntervalWith1File() { "roll.duration", "300ms"); - task.start(configs); + task.init(new Configuration(configs), context); var records1 = Record.builder() @@ -652,7 +652,7 @@ void testHdfsSinkTaskIntervalWith2Writers() { "roll.duration", "100ms"); - task.start(configs); + task.init(new Configuration(configs), context); var record1 = Record.builder() @@ -776,7 +776,7 @@ void testCreateRecordWriter() { var testRecord = Record.builder().topic(topicName).topicPartition(tp).offset(offset).build(); var configs = Map.of( - "name", + "connector.name", "test", "fs.schema", "hdfs", @@ -844,6 +844,8 @@ void testWriteRecords() { var path = "/test"; var configs = Map.of( + "connector.name", + "test", "fs.schema", "hdfs", "topics", @@ -868,6 +870,7 @@ void testWriteRecords() { var task = new Exporter.Task(); task.fs = FileSystem.of("hdfs", new Configuration(configs)); task.compressionType = "none"; + task.configuration = new Configuration(configs); task.size = DataSize.of("100MB"); task.bufferSize.reset(); task.recordsQueue.add( @@ -914,7 +917,7 @@ void testIsValid() { var task = new Exporter.Task(); - task.start(configs); + task.init(new Configuration(configs), context); var record1 = Record.builder() @@ -1037,7 +1040,8 @@ void testCompression() { var input = fs.read("/" + String.join("/", fileSize, topicName, "0/0")); Assertions.assertArrayEquals( - new byte[] {(byte) 0x0, (byte) 0x0, (byte) 0x1f, (byte) 0x8b}, Utils.packException(() -> input.readNBytes(4))); + new byte[] {(byte) 0x0, (byte) 0x1, (byte) 0x1f, (byte) 0x8b}, + Utils.packException(() -> input.readNBytes(4))); } } } From b1e983f6cce7fcb442b06054d3c532e5668a2133 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Mon, 17 Jul 2023 11:29:08 +0800 Subject: [PATCH 3/4] Fix RecordReader to not error out when receiving V1 files While it can accept V1 files, it was only processing the initial data and not the following metadata --- .../java/org/astraea/common/backup/RecordReaderBuilder.java | 2 +- .../java/org/astraea/common/backup/RecordWriterBuilder.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java b/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java index 65e8aa29a6..7c2c63df5d 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java +++ b/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java @@ -107,7 +107,7 @@ public RecordReaderBuilder buffered(int size) { public RecordReader build() { var version = ByteUtils.readShort(fs); - if (version == 0 || version == 1 ) return V0.apply(fs); + if (version == 0 || version == 1) return V0.apply(fs); throw new IllegalArgumentException("unsupported version: " + version); } diff --git a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java index f225cc2f5d..02cab13548 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java +++ b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java @@ -20,7 +20,6 @@ import java.io.BufferedOutputStream; import java.io.OutputStream; import java.time.Duration; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; From c865e3695feeb257dab7bde57d9951a5194bc174 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Sun, 23 Jul 2023 16:00:37 +0800 Subject: [PATCH 4/4] Remove topic/partition/1st record info metadata Removing these data from metadata to give writers more flexibility. This allows writers to write data without restrictions, allowing the exporter to decide how they want to handle the data. --- .../astraea/common/backup/RecordWriter.java | 6 ++--- .../common/backup/RecordWriterBuilder.java | 25 +++++-------------- .../astraea/connector/backup/Exporter.java | 6 ++--- 3 files changed, 10 insertions(+), 27 deletions(-) diff --git a/common/src/main/java/org/astraea/common/backup/RecordWriter.java b/common/src/main/java/org/astraea/common/backup/RecordWriter.java index 20aa69cfa4..ee56c11b37 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordWriter.java +++ b/common/src/main/java/org/astraea/common/backup/RecordWriter.java @@ -56,9 +56,7 @@ static RecordWriterBuilder builder(OutputStream outputStream) { return new RecordWriterBuilder((short) 0, outputStream); } - static RecordWriterBuilder builder( - OutputStream outputStream, Configuration configuration, Record record) { - return new RecordWriterBuilder( - RecordWriterBuilder.LATEST_VERSION, outputStream, configuration, record); + static RecordWriterBuilder builder(OutputStream outputStream, Configuration configuration) { + return new RecordWriterBuilder(RecordWriterBuilder.LATEST_VERSION, outputStream, configuration); } } diff --git a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java index 02cab13548..6401d6f52f 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java +++ b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.zip.GZIPOutputStream; import org.astraea.common.ByteUtils; @@ -30,7 +31,6 @@ import org.astraea.common.DataSize; import org.astraea.common.Utils; import org.astraea.common.consumer.Record; -import org.astraea.common.function.Bi3Function; import org.astraea.common.generated.RecordOuterClass; public class RecordWriterBuilder { @@ -111,8 +111,8 @@ public void close() { } }; - private static final Bi3Function V1 = - (configuration, record, outputStream) -> + private static final BiFunction V1 = + (configuration, outputStream) -> new RecordWriter() { private final AtomicInteger count = new AtomicInteger(); private final LongAdder size = new LongAdder(); @@ -161,17 +161,7 @@ void appendMetadata() { // 552 Bytes total for whole metadata. outputStream.write( - this.extendString( - this.connectorName, 255)); // 255 Bytes for this connector name - outputStream.write( - this.extendString(record.topic(), 255)); // 255 Bytes for topic name - // 42 Bytes - outputStream.write( - ByteUtils.toBytes(record.partition())); // 4 Bytes for partition - outputStream.write( - ByteUtils.toBytes(record.offset())); // 8 bytes for 1st record offset - outputStream.write( - ByteUtils.toBytes(record.timestamp())); // 8 bytes for 1st record timestamp + this.extendString(this.connectorName, 255)); // 255 Bytes for this connector outputStream.write(ByteUtils.toBytes(this.count())); // 4 Bytes for count outputStream.write( ByteUtils.toBytes(this.interval)); // 8 Bytes for mills of roll.duration @@ -229,19 +219,16 @@ public void close() { private final short version; private OutputStream fs; private Configuration configuration; - private Record record; RecordWriterBuilder(short version, OutputStream outputStream) { this.version = version; this.fs = outputStream; } - RecordWriterBuilder( - short version, OutputStream outputStream, Configuration configuration, Record record) { + RecordWriterBuilder(short version, OutputStream outputStream, Configuration configuration) { this.version = version; this.fs = outputStream; this.configuration = configuration; - this.record = record; } public RecordWriterBuilder buffered() { @@ -262,7 +249,7 @@ public RecordWriter build() { return V0.apply(fs); } else if (version == 1) { fs.write(ByteUtils.toBytes(version)); - return V1.apply(this.configuration, this.record, fs); + return V1.apply(this.configuration, fs); } throw new IllegalArgumentException("unsupported version: " + version); }); diff --git a/connector/src/main/java/org/astraea/connector/backup/Exporter.java b/connector/src/main/java/org/astraea/connector/backup/Exporter.java index d03bfb783d..d6258aecfd 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Exporter.java +++ b/connector/src/main/java/org/astraea/connector/backup/Exporter.java @@ -215,8 +215,7 @@ RecordWriter createRecordWriter(Record record, Configuration configuration) { fs.write( String.join( "/", path, record.topic(), String.valueOf(record.partition()), fileName)), - configuration, - record) + configuration) .build(); } @@ -226,8 +225,7 @@ RecordWriter createRecordWriter(Record record) { fs.write( String.join( "/", path, record.topic(), String.valueOf(record.partition()), fileName)), - this.configuration, - record) + this.configuration) .build(); }