Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PRODUCER] rewrite producer.Record by java 17 record #1699

Merged
merged 1 commit into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 19 additions & 61 deletions common/src/main/java/org/astraea/common/producer/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,21 @@
import org.astraea.common.Header;
import org.astraea.common.admin.TopicPartition;

public interface Record<Key, Value> {

static Builder<byte[], byte[]> builder() {
public record Record<Key, Value>(
String topic,
List<Header> headers,
Key key,
Value value,
// timestamp of record
Optional<Long> timestamp,
// expected partition, or null if you don't care for it.
Optional<Integer> partition) {

public static Builder<byte[], byte[]> builder() {
return new Builder<>();
}

String topic();

List<Header> headers();

Key key();

Value value();

/**
* @return timestamp of record
*/
Optional<Long> timestamp();

/**
* @return expected partition, or null if you don't care for it.
*/
Optional<Integer> partition();

class Builder<Key, Value> {
public static class Builder<Key, Value> {
private Object key;
private Object value;
private String topic;
Expand Down Expand Up @@ -106,45 +96,13 @@ public Builder<Key, Value> headers(List<Header> headers) {

@SuppressWarnings("unchecked")
public Record<Key, Value> build() {
return new Record<>() {
private final Key key = (Key) Builder.this.key;
private final Value value = (Value) Builder.this.value;
private final String topic =
Objects.requireNonNull(Builder.this.topic, "topic must be defined");
private final Optional<Integer> partition = Builder.this.partition;
private final Optional<Long> timestamp = Builder.this.timestamp;
private final List<Header> headers = Objects.requireNonNull(Builder.this.headers);

@Override
public String topic() {
return topic;
}

@Override
public List<Header> headers() {
return headers;
}

@Override
public Key key() {
return key;
}

@Override
public Value value() {
return value;
}

@Override
public Optional<Long> timestamp() {
return timestamp;
}

@Override
public Optional<Integer> partition() {
return partition;
}
};
return new Record<>(
Objects.requireNonNull(topic, "topic must be defined"),
Objects.requireNonNull(headers),
(Key) key,
(Value) value,
timestamp,
partition);
}
}
}
100 changes: 39 additions & 61 deletions connector/src/main/java/org/astraea/connector/SourceRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,88 +18,58 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.astraea.common.Header;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.producer.Record;

public class SourceRecord implements Record<byte[], byte[]> {
public record SourceRecord(
String topic,
List<Header> headers,
byte[] key,
byte[] value,
// timestamp of record
Optional<Long> timestamp,
// expected partition, or null if you don't care for it.
Optional<Integer> partition,
Map<String, String> metadataIndex,
Map<String, String> metadata) {

public static Builder builder() {
return new Builder();
}

private final Record<byte[], byte[]> record;
private final Map<String, String> metadataIndex;
private final Map<String, String> metadata;

private SourceRecord(
Record<byte[], byte[]> record,
Map<String, String> metadataIndex,
Map<String, String> metadata) {
this.record = record;
this.metadataIndex = metadataIndex;
this.metadata = metadata;
}

@Override
public String topic() {
return record.topic();
}

@Override
public List<Header> headers() {
return record.headers();
}

@Override
public byte[] key() {
return record.key();
}

@Override
public byte[] value() {
return record.value();
}

@Override
public Optional<Long> timestamp() {
return record.timestamp();
}

@Override
public Optional<Integer> partition() {
return record.partition();
}

public Map<String, String> metadataIndex() {
return metadataIndex;
}

public Map<String, String> metadata() {
return metadata;
}

public static class Builder {

private final Record.Builder<byte[], byte[]> builder = Record.builder();
private byte[] key;
private byte[] value;
private String topic;
private Optional<Integer> partition = Optional.empty();
private Optional<Long> timestamp = Optional.empty();
private List<Header> headers = List.of();
private Map<String, String> metadataIndex = Map.of();
private Map<String, String> metadata = Map.of();

private Builder() {}

public Builder record(Record<byte[], byte[]> record) {
builder.record(record);
key(record.key());
value(record.value());
topic(record.topic());
record.partition().ifPresent(this::partition);
record.timestamp().ifPresent(this::timestamp);
headers(record.headers());
return this;
}

public Builder key(byte[] key) {
builder.key(key);
this.key = key;
return this;
}

public Builder value(byte[] value) {
builder.value(value);
this.value = value;
return this;
}

Expand All @@ -109,22 +79,22 @@ public Builder topicPartition(TopicPartition topicPartition) {
}

public Builder topic(String topic) {
builder.topic(topic);
this.topic = Objects.requireNonNull(topic);
return this;
}

public Builder partition(int partition) {
builder.partition(partition);
if (partition >= 0) this.partition = Optional.of(partition);
return this;
}

public Builder timestamp(long timestamp) {
builder.timestamp(timestamp);
this.timestamp = Optional.of(timestamp);
return this;
}

public Builder headers(List<Header> headers) {
builder.headers(headers);
this.headers = headers;
return this;
}

Expand All @@ -139,7 +109,15 @@ public Builder metadata(Map<String, String> metadata) {
}

public SourceRecord build() {
return new SourceRecord(builder.build(), metadataIndex, metadata);
return new SourceRecord(
Objects.requireNonNull(topic, "topic must be defined"),
Objects.requireNonNull(headers),
key,
value,
timestamp,
partition,
metadataIndex,
metadata);
}
}
}
38 changes: 15 additions & 23 deletions connector/src/main/java/org/astraea/connector/SourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public abstract class SourceTask extends org.apache.kafka.connect.source.SourceT
* use {@link Record#builder()} or {@link SourceRecord#builder()} to construct the returned
* records
*/
protected abstract Collection<Record<byte[], byte[]>> take() throws InterruptedException;
protected abstract Collection<SourceRecord> take() throws InterruptedException;

protected void commit(Metadata metadata) throws InterruptedException {
// empty
Expand Down Expand Up @@ -62,28 +62,20 @@ public final List<org.apache.kafka.connect.source.SourceRecord> poll()
if (records == null || records.isEmpty()) return null;
return records.stream()
.map(
r -> {
Map<String, ?> sp = null;
Map<String, ?> so = null;
if (r instanceof SourceRecord) {
var sr = (SourceRecord) r;
if (!sr.metadataIndex().isEmpty()) sp = sr.metadataIndex();
if (!sr.metadata().isEmpty()) so = sr.metadata();
}
return new org.apache.kafka.connect.source.SourceRecord(
sp,
so,
r.topic(),
r.partition().orElse(null),
r.key() == null ? null : Schema.BYTES_SCHEMA,
r.key(),
r.value() == null ? null : Schema.BYTES_SCHEMA,
r.value(),
r.timestamp().orElse(null),
r.headers().stream()
.map(h -> new HeaderImpl(h.key(), null, h.value()))
.collect(Collectors.toList()));
})
r ->
new org.apache.kafka.connect.source.SourceRecord(
r.metadataIndex(),
r.metadata(),
r.topic(),
r.partition().orElse(null),
r.key() == null ? null : Schema.BYTES_SCHEMA,
r.key(),
r.value() == null ? null : Schema.BYTES_SCHEMA,
r.value(),
r.timestamp().orElse(null),
r.headers().stream()
.map(h -> new HeaderImpl(h.key(), null, h.value()))
.collect(Collectors.toList())))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import org.astraea.common.Utils;
import org.astraea.common.backup.RecordReader;
import org.astraea.common.backup.RecordWriter;
import org.astraea.common.producer.Record;
import org.astraea.connector.Definition;
import org.astraea.connector.MetadataStorage;
import org.astraea.connector.SourceConnector;
import org.astraea.connector.SourceRecord;
import org.astraea.connector.SourceTask;
import org.astraea.fs.FileSystem;
import org.astraea.fs.Type;
Expand Down Expand Up @@ -158,21 +158,21 @@ protected void init(Configuration configuration, MetadataStorage storage) {
}

@Override
protected Collection<Record<byte[], byte[]>> take() {
protected Collection<SourceRecord> take() {
if (paths.isEmpty()) {
paths = getFileSet(addedPaths, rootDir, tasksCount, fileSet);
}
addedPaths.addAll(paths);
var currentPath = ((LinkedList<String>) paths).poll();
if (currentPath != null) {
var records = new ArrayList<Record<byte[], byte[]>>();
var records = new ArrayList<SourceRecord>();
var inputStream = Client.read(currentPath);
var reader = RecordReader.builder(inputStream).build();
while (reader.hasNext()) {
var record = reader.next();
if (record.key() == null && record.value() == null) continue;
records.add(
Record.builder()
SourceRecord.builder()
.topic(record.topic())
.partition(record.partition())
.key(record.key())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
import org.astraea.common.DistributionType;
import org.astraea.common.Utils;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.producer.Record;
import org.astraea.common.producer.RecordGenerator;
import org.astraea.connector.Definition;
import org.astraea.connector.MetadataStorage;
import org.astraea.connector.SourceConnector;
import org.astraea.connector.SourceRecord;
import org.astraea.connector.SourceTask;

public class PerfSource extends SourceConnector {
Expand Down Expand Up @@ -253,10 +253,11 @@ protected void init(Configuration configuration, MetadataStorage storage) {
}

@Override
protected Collection<Record<byte[], byte[]>> take() {
protected Collection<SourceRecord> take() {
return specifyPartitions.stream()
.flatMap(tp -> recordGenerator.apply(tp).stream())
.collect(Collectors.toUnmodifiableList());
.map(r -> SourceRecord.builder().record(r).build())
.toList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.astraea.common.Utils;
import org.astraea.common.connector.ConnectorClient;
import org.astraea.common.connector.ConnectorConfigs;
import org.astraea.common.producer.Record;
import org.astraea.it.Service;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -152,7 +151,7 @@ protected void close() {
}

@Override
protected Collection<Record<byte[], byte[]>> take() {
protected Collection<SourceRecord> take() {
return List.of();
}
}
Expand Down
Loading