Skip to content

Commit

Permalink
[EXPORTER] Add metadata in record writer (#1832)
Browse files Browse the repository at this point in the history
* recordWriter will append metadata before closing.

During the creation of a new recordWriter, the exporter task will transmit both the configuration and the first record for writing metadata. A new method has been added to the SinkContext for retrieving the connector name to be used within the metadata.

* a new version of recordWriter v1, which contains metadata at the end of a file.

The reason for creating a new version instead of replacing version 0 is because the importer is not yet able to handle files with metadata information.

* Fix RecordReader to not error out when receiving V1 files

While it can accept V1 files, it was only processing the initial data and not the following metadata

* Remove topic/partition/1st record info metadata

Removing these data from metadata to give writers more flexibility. This allows writers to write data without restrictions, allowing the exporter to decide how they want to handle the data.
  • Loading branch information
Haser0305 authored Jul 26, 2023
1 parent e48ef42 commit 4140b68
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.astraea.common.backup;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -77,8 +78,10 @@ private static Record<byte[], byte[]> readRecord(InputStream inputStream) {
.serializedValueSize(outerRecord.getValue().size())
.build();
} catch (IOException e) {
throw new SerializationException(e);
// swallow the exception until the importer can read metadata at the end of the file.
if (!(e instanceof InvalidProtocolBufferException)) throw new SerializationException(e);
}
return null;
}

private InputStream fs;
Expand All @@ -104,7 +107,7 @@ public RecordReaderBuilder buffered(int size) {

public RecordReader build() {
var version = ByteUtils.readShort(fs);
if (version == 0) return V0.apply(fs);
if (version == 0 || version == 1) return V0.apply(fs);

throw new IllegalArgumentException("unsupported version: " + version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import org.astraea.common.Configuration;
import org.astraea.common.DataSize;
import org.astraea.common.Utils;
import org.astraea.common.consumer.Record;
Expand Down Expand Up @@ -52,6 +53,10 @@ static RecordWriterBuilder builder(File file) {
}

static RecordWriterBuilder builder(OutputStream outputStream) {
return new RecordWriterBuilder(RecordWriterBuilder.LATEST_VERSION, outputStream);
return new RecordWriterBuilder((short) 0, outputStream);
}

static RecordWriterBuilder builder(OutputStream outputStream, Configuration configuration) {
return new RecordWriterBuilder(RecordWriterBuilder.LATEST_VERSION, outputStream, configuration);
}
}
182 changes: 142 additions & 40 deletions common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,48 @@
import com.google.protobuf.ByteString;
import java.io.BufferedOutputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.zip.GZIPOutputStream;
import org.astraea.common.ByteUtils;
import org.astraea.common.Configuration;
import org.astraea.common.DataSize;
import org.astraea.common.Utils;
import org.astraea.common.consumer.Record;
import org.astraea.common.generated.RecordOuterClass;

public class RecordWriterBuilder {

static RecordOuterClass.Record record2Builder(Record<byte[], byte[]> record) {
return Utils.packException(
() ->
RecordOuterClass.Record.newBuilder()
.setTopic(record.topic())
.setPartition(record.partition())
.setOffset(record.offset())
.setTimestamp(record.timestamp())
.setKey(record.key() == null ? ByteString.EMPTY : ByteString.copyFrom(record.key()))
.setValue(
record.value() == null ? ByteString.EMPTY : ByteString.copyFrom(record.value()))
.addAllHeaders(
record.headers().stream()
.map(
header ->
RecordOuterClass.Record.Header.newBuilder()
.setKey(header.key())
.setValue(
header.value() == null
? ByteString.EMPTY
: ByteString.copyFrom(header.value()))
.build())
.toList())
.build());
}

private static final Function<OutputStream, RecordWriter> V0 =
outputStream ->
new RecordWriter() {
Expand All @@ -44,33 +73,7 @@ public class RecordWriterBuilder {
public void append(Record<byte[], byte[]> record) {
Utils.packException(
() -> {
var recordBuilder =
RecordOuterClass.Record.newBuilder()
.setTopic(record.topic())
.setPartition(record.partition())
.setOffset(record.offset())
.setTimestamp(record.timestamp())
.setKey(
record.key() == null
? ByteString.EMPTY
: ByteString.copyFrom(record.key()))
.setValue(
record.value() == null
? ByteString.EMPTY
: ByteString.copyFrom(record.value()))
.addAllHeaders(
record.headers().stream()
.map(
header ->
RecordOuterClass.Record.Header.newBuilder()
.setKey(header.key())
.setValue(
header.value() == null
? ByteString.EMPTY
: ByteString.copyFrom(header.value()))
.build())
.toList())
.build();
var recordBuilder = record2Builder(record);
recordBuilder.writeDelimitedTo(outputStream);
this.size.add(recordBuilder.getSerializedSize());
});
Expand Down Expand Up @@ -108,28 +111,124 @@ public void close() {
}
};

public static final short LATEST_VERSION = (short) 0;
private static final BiFunction<Configuration, OutputStream, RecordWriter> V1 =
(configuration, outputStream) ->
new RecordWriter() {
private final AtomicInteger count = new AtomicInteger();
private final LongAdder size = new LongAdder();
private final AtomicLong latestAppendTimestamp = new AtomicLong();
private final String connectorName;
private final Long interval;
private final String compressionType;
private OutputStream targetOutputStream;

// instance initializer block
{
this.connectorName = configuration.requireString("connector.name");
this.interval =
configuration
.string("roll.duration")
.map(Utils::toDuration)
.orElse(Duration.ofSeconds(3))
.toMillis();
this.compressionType = configuration.string("compression.type").orElse("none");

switch (this.compressionType) {
case "gzip" -> Utils.packException(
() -> this.targetOutputStream = new GZIPOutputStream(outputStream));
case "none" -> this.targetOutputStream = outputStream;
default -> throw new IllegalArgumentException(
String.format("compression type '%s' is not supported", this.compressionType));
}
}

byte[] extendString(String input, int length) {
byte[] original = input.getBytes();
byte[] result = new byte[length];
System.arraycopy(original, 0, result, 0, original.length);
for (int i = original.length; i < result.length; i++) {
result[i] = (byte) ' ';
}
return result;
}

void appendMetadata() {
Utils.packException(
() -> {
if (this.compressionType.equals("gzip")) {
((GZIPOutputStream) targetOutputStream).finish();
}

// 552 Bytes total for whole metadata.
outputStream.write(
this.extendString(this.connectorName, 255)); // 255 Bytes for this connector
outputStream.write(ByteUtils.toBytes(this.count())); // 4 Bytes for count
outputStream.write(
ByteUtils.toBytes(this.interval)); // 8 Bytes for mills of roll.duration
outputStream.write(
this.extendString(
this.compressionType, 10)); // 10 Bytes for compression type name.
});
}

@Override
public void append(Record<byte[], byte[]> record) {
Utils.packException(
() -> {
var recordBuilder = record2Builder(record);
recordBuilder.writeDelimitedTo(this.targetOutputStream);
this.size.add(recordBuilder.getSerializedSize());
});
count.incrementAndGet();
this.latestAppendTimestamp.set(System.currentTimeMillis());
}

@Override
public DataSize size() {
return DataSize.Byte.of(size.sum());
}

@Override
public int count() {
return count.get();
}

@Override
public void flush() {
Utils.packException(outputStream::flush);
}

@Override
public long latestAppendTimestamp() {
return this.latestAppendTimestamp.get();
}

@Override
public void close() {
Utils.packException(
() -> {
appendMetadata();
outputStream.flush();
outputStream.close();
});
}
};

public static final short LATEST_VERSION = (short) 1;

private final short version;
private OutputStream fs;
private Configuration configuration;

RecordWriterBuilder(short version, OutputStream outputStream) {
this.version = version;
this.fs = outputStream;
}

public RecordWriterBuilder compression(String type) {
switch (type) {
case "gzip":
this.fs = Utils.packException(() -> new GZIPOutputStream(this.fs));
break;
case "none":
// do nothing.
break;
default:
throw new IllegalArgumentException("unsupported compression type: " + type);
}
return this;
RecordWriterBuilder(short version, OutputStream outputStream, Configuration configuration) {
this.version = version;
this.fs = outputStream;
this.configuration = configuration;
}

public RecordWriterBuilder buffered() {
Expand All @@ -148,6 +247,9 @@ public RecordWriter build() {
if (version == 0) {
fs.write(ByteUtils.toBytes(version));
return V0.apply(fs);
} else if (version == 1) {
fs.write(ByteUtils.toBytes(version));
return V1.apply(this.configuration, fs);
}
throw new IllegalArgumentException("unsupported version: " + version);
});
Expand Down
13 changes: 13 additions & 0 deletions connector/src/main/java/org/astraea/connector/SinkTaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public void pause(Collection<TopicPartition> partitions) {}

@Override
public void requestCommit() {}

@Override
public Map<String, String> configs() {
return null;
}
};

/**
Expand Down Expand Up @@ -67,6 +72,8 @@ public void requestCommit() {}
*/
void requestCommit();

Map<String, String> configs();

static SinkTaskContext of(org.apache.kafka.connect.sink.SinkTaskContext context) {
return new SinkTaskContext() {
@Override
Expand Down Expand Up @@ -94,6 +101,12 @@ public void pause(Collection<TopicPartition> partitions) {
public void requestCommit() {
context.requestCommit();
}

@Override
public Map<String, String> configs() {
return context.configs();
}
;
};
}
}
33 changes: 27 additions & 6 deletions connector/src/main/java/org/astraea/connector/backup/Exporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.astraea.common.Configuration;
Expand Down Expand Up @@ -195,6 +196,8 @@ public static class Task extends SinkTask {
long interval;
String compressionType;

Configuration configuration;

// a map of <Topic, <Partition, Offset>>
private final Map<String, Map<String, Long>> offsetForTopicPartition = new HashMap<>();

Expand All @@ -205,12 +208,24 @@ public static class Task extends SinkTask {

private SinkTaskContext taskContext;

RecordWriter createRecordWriter(TopicPartition tp, long offset) {
var fileName = String.valueOf(offset);
// create for test
RecordWriter createRecordWriter(Record record, Configuration configuration) {
var fileName = String.valueOf(record.offset());
return RecordWriter.builder(
fs.write(
String.join("/", path, tp.topic(), String.valueOf(tp.partition()), fileName)))
.compression(this.compressionType)
String.join(
"/", path, record.topic(), String.valueOf(record.partition()), fileName)),
configuration)
.build();
}

RecordWriter createRecordWriter(Record record) {
var fileName = String.valueOf(record.offset());
return RecordWriter.builder(
fs.write(
String.join(
"/", path, record.topic(), String.valueOf(record.partition()), fileName)),
this.configuration)
.build();
}

Expand Down Expand Up @@ -249,8 +264,7 @@ void writeRecords(HashMap<TopicPartition, RecordWriter> writers) {
records.forEach(
record -> {
var writer =
writers.computeIfAbsent(
record.topicPartition(), tp -> createRecordWriter(tp, record.offset()));
writers.computeIfAbsent(record.topicPartition(), tp -> createRecordWriter(record));
writer.append(record);
if (writer.size().greaterThan(size)) {
writers.remove(record.topicPartition()).close();
Expand Down Expand Up @@ -317,6 +331,13 @@ protected void init(Configuration configuration, SinkTaskContext context) {
.orElse(COMPRESSION_TYPE_DEFAULT)
.toLowerCase();

var originalConfiguration =
configuration.raw().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
originalConfiguration.computeIfAbsent(
"connector.name", k -> this.taskContext.configs().get("name"));
this.configuration = new Configuration(originalConfiguration);

// fetches key-value pairs from the configuration's variable matching the regular expression
// '.*offset.from', updates the values of 'offsetForTopic' or 'offsetForTopicPartition' based
// on the
Expand Down
Loading

0 comments on commit 4140b68

Please sign in to comment.