Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EXPORTER] Add metadata in record writer #1832

Merged
merged 5 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.astraea.common.backup;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -77,8 +78,10 @@ private static Record<byte[], byte[]> readRecord(InputStream inputStream) {
.serializedValueSize(outerRecord.getValue().size())
.build();
} catch (IOException e) {
throw new SerializationException(e);
// swallow the exception until the importer can read metadata at the end of the file.
if (!(e instanceof InvalidProtocolBufferException)) throw new SerializationException(e);
Haser0305 marked this conversation as resolved.
Show resolved Hide resolved
}
return null;
}

private InputStream fs;
Expand All @@ -104,7 +107,7 @@ public RecordReaderBuilder buffered(int size) {

public RecordReader build() {
var version = ByteUtils.readShort(fs);
if (version == 0) return V0.apply(fs);
if (version == 0 || version == 1) return V0.apply(fs);

throw new IllegalArgumentException("unsupported version: " + version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import org.astraea.common.Configuration;
import org.astraea.common.DataSize;
import org.astraea.common.Utils;
import org.astraea.common.consumer.Record;
Expand Down Expand Up @@ -52,6 +53,10 @@ static RecordWriterBuilder builder(File file) {
}

static RecordWriterBuilder builder(OutputStream outputStream) {
return new RecordWriterBuilder(RecordWriterBuilder.LATEST_VERSION, outputStream);
return new RecordWriterBuilder((short) 0, outputStream);
}

static RecordWriterBuilder builder(OutputStream outputStream, Configuration configuration) {
return new RecordWriterBuilder(RecordWriterBuilder.LATEST_VERSION, outputStream, configuration);
}
}
182 changes: 142 additions & 40 deletions common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,48 @@
import com.google.protobuf.ByteString;
import java.io.BufferedOutputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.zip.GZIPOutputStream;
import org.astraea.common.ByteUtils;
import org.astraea.common.Configuration;
import org.astraea.common.DataSize;
import org.astraea.common.Utils;
import org.astraea.common.consumer.Record;
import org.astraea.common.generated.RecordOuterClass;

public class RecordWriterBuilder {

static RecordOuterClass.Record record2Builder(Record<byte[], byte[]> record) {
return Utils.packException(
() ->
RecordOuterClass.Record.newBuilder()
.setTopic(record.topic())
.setPartition(record.partition())
.setOffset(record.offset())
.setTimestamp(record.timestamp())
.setKey(record.key() == null ? ByteString.EMPTY : ByteString.copyFrom(record.key()))
.setValue(
record.value() == null ? ByteString.EMPTY : ByteString.copyFrom(record.value()))
.addAllHeaders(
record.headers().stream()
.map(
header ->
RecordOuterClass.Record.Header.newBuilder()
.setKey(header.key())
.setValue(
header.value() == null
? ByteString.EMPTY
: ByteString.copyFrom(header.value()))
.build())
.toList())
.build());
}

private static final Function<OutputStream, RecordWriter> V0 =
outputStream ->
new RecordWriter() {
Expand All @@ -44,33 +73,7 @@ public class RecordWriterBuilder {
public void append(Record<byte[], byte[]> record) {
Utils.packException(
() -> {
var recordBuilder =
RecordOuterClass.Record.newBuilder()
.setTopic(record.topic())
.setPartition(record.partition())
.setOffset(record.offset())
.setTimestamp(record.timestamp())
.setKey(
record.key() == null
? ByteString.EMPTY
: ByteString.copyFrom(record.key()))
.setValue(
record.value() == null
? ByteString.EMPTY
: ByteString.copyFrom(record.value()))
.addAllHeaders(
record.headers().stream()
.map(
header ->
RecordOuterClass.Record.Header.newBuilder()
.setKey(header.key())
.setValue(
header.value() == null
? ByteString.EMPTY
: ByteString.copyFrom(header.value()))
.build())
.toList())
.build();
var recordBuilder = record2Builder(record);
recordBuilder.writeDelimitedTo(outputStream);
this.size.add(recordBuilder.getSerializedSize());
});
Expand Down Expand Up @@ -108,28 +111,124 @@ public void close() {
}
};

public static final short LATEST_VERSION = (short) 0;
private static final BiFunction<Configuration, OutputStream, RecordWriter> V1 =
(configuration, outputStream) ->
new RecordWriter() {
private final AtomicInteger count = new AtomicInteger();
private final LongAdder size = new LongAdder();
private final AtomicLong latestAppendTimestamp = new AtomicLong();
private final String connectorName;
private final Long interval;
private final String compressionType;
private OutputStream targetOutputStream;

// instance initializer block
{
this.connectorName = configuration.requireString("connector.name");
this.interval =
configuration
.string("roll.duration")
.map(Utils::toDuration)
.orElse(Duration.ofSeconds(3))
.toMillis();
this.compressionType = configuration.string("compression.type").orElse("none");

switch (this.compressionType) {
case "gzip" -> Utils.packException(
() -> this.targetOutputStream = new GZIPOutputStream(outputStream));
case "none" -> this.targetOutputStream = outputStream;
default -> throw new IllegalArgumentException(
String.format("compression type '%s' is not supported", this.compressionType));
}
}

byte[] extendString(String input, int length) {
byte[] original = input.getBytes();
byte[] result = new byte[length];
System.arraycopy(original, 0, result, 0, original.length);
for (int i = original.length; i < result.length; i++) {
result[i] = (byte) ' ';
}
return result;
}

void appendMetadata() {
Utils.packException(
() -> {
if (this.compressionType.equals("gzip")) {
((GZIPOutputStream) targetOutputStream).finish();
}

// 552 Bytes total for whole metadata.
outputStream.write(
this.extendString(this.connectorName, 255)); // 255 Bytes for this connector
outputStream.write(ByteUtils.toBytes(this.count())); // 4 Bytes for count
outputStream.write(
ByteUtils.toBytes(this.interval)); // 8 Bytes for mills of roll.duration
outputStream.write(
this.extendString(
this.compressionType, 10)); // 10 Bytes for compression type name.
});
}

@Override
public void append(Record<byte[], byte[]> record) {
Utils.packException(
() -> {
var recordBuilder = record2Builder(record);
recordBuilder.writeDelimitedTo(this.targetOutputStream);
this.size.add(recordBuilder.getSerializedSize());
});
count.incrementAndGet();
this.latestAppendTimestamp.set(System.currentTimeMillis());
}

@Override
public DataSize size() {
return DataSize.Byte.of(size.sum());
}

@Override
public int count() {
return count.get();
}

@Override
public void flush() {
Utils.packException(outputStream::flush);
}

@Override
public long latestAppendTimestamp() {
return this.latestAppendTimestamp.get();
}

@Override
public void close() {
Utils.packException(
() -> {
appendMetadata();
outputStream.flush();
outputStream.close();
});
}
};

public static final short LATEST_VERSION = (short) 1;

private final short version;
private OutputStream fs;
private Configuration configuration;

RecordWriterBuilder(short version, OutputStream outputStream) {
this.version = version;
this.fs = outputStream;
}

public RecordWriterBuilder compression(String type) {
switch (type) {
case "gzip":
this.fs = Utils.packException(() -> new GZIPOutputStream(this.fs));
break;
case "none":
// do nothing.
break;
default:
throw new IllegalArgumentException("unsupported compression type: " + type);
}
return this;
RecordWriterBuilder(short version, OutputStream outputStream, Configuration configuration) {
this.version = version;
this.fs = outputStream;
this.configuration = configuration;
}

public RecordWriterBuilder buffered() {
Expand All @@ -148,6 +247,9 @@ public RecordWriter build() {
if (version == 0) {
fs.write(ByteUtils.toBytes(version));
return V0.apply(fs);
} else if (version == 1) {
fs.write(ByteUtils.toBytes(version));
return V1.apply(this.configuration, fs);
}
throw new IllegalArgumentException("unsupported version: " + version);
});
Expand Down
13 changes: 13 additions & 0 deletions connector/src/main/java/org/astraea/connector/SinkTaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public void pause(Collection<TopicPartition> partitions) {}

@Override
public void requestCommit() {}

@Override
public Map<String, String> configs() {
return null;
}
};

/**
Expand Down Expand Up @@ -67,6 +72,8 @@ public void requestCommit() {}
*/
void requestCommit();

Map<String, String> configs();

static SinkTaskContext of(org.apache.kafka.connect.sink.SinkTaskContext context) {
return new SinkTaskContext() {
@Override
Expand Down Expand Up @@ -94,6 +101,12 @@ public void pause(Collection<TopicPartition> partitions) {
public void requestCommit() {
context.requestCommit();
}

@Override
public Map<String, String> configs() {
return context.configs();
}
;
};
}
}
33 changes: 27 additions & 6 deletions connector/src/main/java/org/astraea/connector/backup/Exporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.astraea.common.Configuration;
Expand Down Expand Up @@ -195,6 +196,8 @@ public static class Task extends SinkTask {
long interval;
String compressionType;

Configuration configuration;

// a map of <Topic, <Partition, Offset>>
private final Map<String, Map<String, Long>> offsetForTopicPartition = new HashMap<>();

Expand All @@ -205,12 +208,24 @@ public static class Task extends SinkTask {

private SinkTaskContext taskContext;

RecordWriter createRecordWriter(TopicPartition tp, long offset) {
var fileName = String.valueOf(offset);
// create for test
RecordWriter createRecordWriter(Record record, Configuration configuration) {
var fileName = String.valueOf(record.offset());
return RecordWriter.builder(
fs.write(
String.join("/", path, tp.topic(), String.valueOf(tp.partition()), fileName)))
.compression(this.compressionType)
String.join(
"/", path, record.topic(), String.valueOf(record.partition()), fileName)),
configuration)
.build();
}

RecordWriter createRecordWriter(Record record) {
var fileName = String.valueOf(record.offset());
return RecordWriter.builder(
fs.write(
String.join(
"/", path, record.topic(), String.valueOf(record.partition()), fileName)),
this.configuration)
.build();
}

Expand Down Expand Up @@ -249,8 +264,7 @@ void writeRecords(HashMap<TopicPartition, RecordWriter> writers) {
records.forEach(
record -> {
var writer =
writers.computeIfAbsent(
record.topicPartition(), tp -> createRecordWriter(tp, record.offset()));
writers.computeIfAbsent(record.topicPartition(), tp -> createRecordWriter(record));
writer.append(record);
if (writer.size().greaterThan(size)) {
writers.remove(record.topicPartition()).close();
Expand Down Expand Up @@ -317,6 +331,13 @@ protected void init(Configuration configuration, SinkTaskContext context) {
.orElse(COMPRESSION_TYPE_DEFAULT)
.toLowerCase();

var originalConfiguration =
configuration.raw().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
originalConfiguration.computeIfAbsent(
"connector.name", k -> this.taskContext.configs().get("name"));
this.configuration = new Configuration(originalConfiguration);

// fetches key-value pairs from the configuration's variable matching the regular expression
// '.*offset.from', updates the values of 'offsetForTopic' or 'offsetForTopicPartition' based
// on the
Expand Down
Loading