diff --git a/common/src/main/java/org/astraea/common/producer/Record.java b/common/src/main/java/org/astraea/common/producer/Record.java index 3dcf517d46..76fcc2b2fb 100644 --- a/common/src/main/java/org/astraea/common/producer/Record.java +++ b/common/src/main/java/org/astraea/common/producer/Record.java @@ -22,31 +22,21 @@ import org.astraea.common.Header; import org.astraea.common.admin.TopicPartition; -public interface Record { - - static Builder builder() { +public record Record( + String topic, + List
headers, + Key key, + Value value, + // timestamp of record + Optional timestamp, + // expected partition, or null if you don't care for it. + Optional partition) { + + public static Builder builder() { return new Builder<>(); } - String topic(); - - List
headers(); - - Key key(); - - Value value(); - - /** - * @return timestamp of record - */ - Optional timestamp(); - - /** - * @return expected partition, or null if you don't care for it. - */ - Optional partition(); - - class Builder { + public static class Builder { private Object key; private Object value; private String topic; @@ -106,45 +96,13 @@ public Builder headers(List
headers) { @SuppressWarnings("unchecked") public Record build() { - return new Record<>() { - private final Key key = (Key) Builder.this.key; - private final Value value = (Value) Builder.this.value; - private final String topic = - Objects.requireNonNull(Builder.this.topic, "topic must be defined"); - private final Optional partition = Builder.this.partition; - private final Optional timestamp = Builder.this.timestamp; - private final List
headers = Objects.requireNonNull(Builder.this.headers); - - @Override - public String topic() { - return topic; - } - - @Override - public List
headers() { - return headers; - } - - @Override - public Key key() { - return key; - } - - @Override - public Value value() { - return value; - } - - @Override - public Optional timestamp() { - return timestamp; - } - - @Override - public Optional partition() { - return partition; - } - }; + return new Record<>( + Objects.requireNonNull(topic, "topic must be defined"), + Objects.requireNonNull(headers), + (Key) key, + (Value) value, + timestamp, + partition); } } } diff --git a/connector/src/main/java/org/astraea/connector/SourceRecord.java b/connector/src/main/java/org/astraea/connector/SourceRecord.java index 88e9c2ad84..0f6cb39b51 100644 --- a/connector/src/main/java/org/astraea/connector/SourceRecord.java +++ b/connector/src/main/java/org/astraea/connector/SourceRecord.java @@ -18,88 +18,58 @@ import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import org.astraea.common.Header; import org.astraea.common.admin.TopicPartition; import org.astraea.common.producer.Record; -public class SourceRecord implements Record { +public record SourceRecord( + String topic, + List
headers, + byte[] key, + byte[] value, + // timestamp of record + Optional timestamp, + // expected partition, or null if you don't care for it. + Optional partition, + Map metadataIndex, + Map metadata) { public static Builder builder() { return new Builder(); } - private final Record record; - private final Map metadataIndex; - private final Map metadata; - - private SourceRecord( - Record record, - Map metadataIndex, - Map metadata) { - this.record = record; - this.metadataIndex = metadataIndex; - this.metadata = metadata; - } - - @Override - public String topic() { - return record.topic(); - } - - @Override - public List
headers() { - return record.headers(); - } - - @Override - public byte[] key() { - return record.key(); - } - - @Override - public byte[] value() { - return record.value(); - } - - @Override - public Optional timestamp() { - return record.timestamp(); - } - - @Override - public Optional partition() { - return record.partition(); - } - - public Map metadataIndex() { - return metadataIndex; - } - - public Map metadata() { - return metadata; - } - public static class Builder { - private final Record.Builder builder = Record.builder(); + private byte[] key; + private byte[] value; + private String topic; + private Optional partition = Optional.empty(); + private Optional timestamp = Optional.empty(); + private List
headers = List.of(); private Map metadataIndex = Map.of(); private Map metadata = Map.of(); private Builder() {} public Builder record(Record record) { - builder.record(record); + key(record.key()); + value(record.value()); + topic(record.topic()); + record.partition().ifPresent(this::partition); + record.timestamp().ifPresent(this::timestamp); + headers(record.headers()); return this; } public Builder key(byte[] key) { - builder.key(key); + this.key = key; return this; } public Builder value(byte[] value) { - builder.value(value); + this.value = value; return this; } @@ -109,22 +79,22 @@ public Builder topicPartition(TopicPartition topicPartition) { } public Builder topic(String topic) { - builder.topic(topic); + this.topic = Objects.requireNonNull(topic); return this; } public Builder partition(int partition) { - builder.partition(partition); + if (partition >= 0) this.partition = Optional.of(partition); return this; } public Builder timestamp(long timestamp) { - builder.timestamp(timestamp); + this.timestamp = Optional.of(timestamp); return this; } public Builder headers(List
headers) { - builder.headers(headers); + this.headers = headers; return this; } @@ -139,7 +109,15 @@ public Builder metadata(Map metadata) { } public SourceRecord build() { - return new SourceRecord(builder.build(), metadataIndex, metadata); + return new SourceRecord( + Objects.requireNonNull(topic, "topic must be defined"), + Objects.requireNonNull(headers), + key, + value, + timestamp, + partition, + metadataIndex, + metadata); } } } diff --git a/connector/src/main/java/org/astraea/connector/SourceTask.java b/connector/src/main/java/org/astraea/connector/SourceTask.java index 0aa7655fa6..a2de195483 100644 --- a/connector/src/main/java/org/astraea/connector/SourceTask.java +++ b/connector/src/main/java/org/astraea/connector/SourceTask.java @@ -34,7 +34,7 @@ public abstract class SourceTask extends org.apache.kafka.connect.source.SourceT * use {@link Record#builder()} or {@link SourceRecord#builder()} to construct the returned * records */ - protected abstract Collection> take() throws InterruptedException; + protected abstract Collection take() throws InterruptedException; protected void commit(Metadata metadata) throws InterruptedException { // empty @@ -62,28 +62,20 @@ public final List poll() if (records == null || records.isEmpty()) return null; return records.stream() .map( - r -> { - Map sp = null; - Map so = null; - if (r instanceof SourceRecord) { - var sr = (SourceRecord) r; - if (!sr.metadataIndex().isEmpty()) sp = sr.metadataIndex(); - if (!sr.metadata().isEmpty()) so = sr.metadata(); - } - return new org.apache.kafka.connect.source.SourceRecord( - sp, - so, - r.topic(), - r.partition().orElse(null), - r.key() == null ? null : Schema.BYTES_SCHEMA, - r.key(), - r.value() == null ? null : Schema.BYTES_SCHEMA, - r.value(), - r.timestamp().orElse(null), - r.headers().stream() - .map(h -> new HeaderImpl(h.key(), null, h.value())) - .collect(Collectors.toList())); - }) + r -> + new org.apache.kafka.connect.source.SourceRecord( + r.metadataIndex(), + r.metadata(), + r.topic(), + r.partition().orElse(null), + r.key() == null ? null : Schema.BYTES_SCHEMA, + r.key(), + r.value() == null ? null : Schema.BYTES_SCHEMA, + r.value(), + r.timestamp().orElse(null), + r.headers().stream() + .map(h -> new HeaderImpl(h.key(), null, h.value())) + .collect(Collectors.toList()))) .collect(Collectors.toList()); } diff --git a/connector/src/main/java/org/astraea/connector/backup/Importer.java b/connector/src/main/java/org/astraea/connector/backup/Importer.java index 7267aa1652..31ee09d507 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Importer.java +++ b/connector/src/main/java/org/astraea/connector/backup/Importer.java @@ -32,10 +32,10 @@ import org.astraea.common.Utils; import org.astraea.common.backup.RecordReader; import org.astraea.common.backup.RecordWriter; -import org.astraea.common.producer.Record; import org.astraea.connector.Definition; import org.astraea.connector.MetadataStorage; import org.astraea.connector.SourceConnector; +import org.astraea.connector.SourceRecord; import org.astraea.connector.SourceTask; import org.astraea.fs.FileSystem; import org.astraea.fs.Type; @@ -158,21 +158,21 @@ protected void init(Configuration configuration, MetadataStorage storage) { } @Override - protected Collection> take() { + protected Collection take() { if (paths.isEmpty()) { paths = getFileSet(addedPaths, rootDir, tasksCount, fileSet); } addedPaths.addAll(paths); var currentPath = ((LinkedList) paths).poll(); if (currentPath != null) { - var records = new ArrayList>(); + var records = new ArrayList(); var inputStream = Client.read(currentPath); var reader = RecordReader.builder(inputStream).build(); while (reader.hasNext()) { var record = reader.next(); if (record.key() == null && record.value() == null) continue; records.add( - Record.builder() + SourceRecord.builder() .topic(record.topic()) .partition(record.partition()) .key(record.key()) diff --git a/connector/src/main/java/org/astraea/connector/perf/PerfSource.java b/connector/src/main/java/org/astraea/connector/perf/PerfSource.java index b91c1475d9..a788ad4b79 100644 --- a/connector/src/main/java/org/astraea/connector/perf/PerfSource.java +++ b/connector/src/main/java/org/astraea/connector/perf/PerfSource.java @@ -29,11 +29,11 @@ import org.astraea.common.DistributionType; import org.astraea.common.Utils; import org.astraea.common.admin.TopicPartition; -import org.astraea.common.producer.Record; import org.astraea.common.producer.RecordGenerator; import org.astraea.connector.Definition; import org.astraea.connector.MetadataStorage; import org.astraea.connector.SourceConnector; +import org.astraea.connector.SourceRecord; import org.astraea.connector.SourceTask; public class PerfSource extends SourceConnector { @@ -253,10 +253,11 @@ protected void init(Configuration configuration, MetadataStorage storage) { } @Override - protected Collection> take() { + protected Collection take() { return specifyPartitions.stream() .flatMap(tp -> recordGenerator.apply(tp).stream()) - .collect(Collectors.toUnmodifiableList()); + .map(r -> SourceRecord.builder().record(r).build()) + .toList(); } } } diff --git a/connector/src/test/java/org/astraea/connector/ConnectorTest.java b/connector/src/test/java/org/astraea/connector/ConnectorTest.java index c8d5fa915f..9845c58088 100644 --- a/connector/src/test/java/org/astraea/connector/ConnectorTest.java +++ b/connector/src/test/java/org/astraea/connector/ConnectorTest.java @@ -29,7 +29,6 @@ import org.astraea.common.Utils; import org.astraea.common.connector.ConnectorClient; import org.astraea.common.connector.ConnectorConfigs; -import org.astraea.common.producer.Record; import org.astraea.it.Service; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -152,7 +151,7 @@ protected void close() { } @Override - protected Collection> take() { + protected Collection take() { return List.of(); } } diff --git a/connector/src/test/java/org/astraea/connector/MetadataStorageTest.java b/connector/src/test/java/org/astraea/connector/MetadataStorageTest.java index 48a484277e..16659b831c 100644 --- a/connector/src/test/java/org/astraea/connector/MetadataStorageTest.java +++ b/connector/src/test/java/org/astraea/connector/MetadataStorageTest.java @@ -27,7 +27,6 @@ import org.astraea.common.Utils; import org.astraea.common.connector.ConnectorClient; import org.astraea.common.connector.ConnectorConfigs; -import org.astraea.common.producer.Record; import org.astraea.it.Service; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -139,7 +138,7 @@ protected void init(Configuration configuration, MetadataStorage storage) { } @Override - protected Collection> take() { + protected Collection take() { if (isDone) return List.of(); isDone = true; return topics.stream() diff --git a/connector/src/test/java/org/astraea/connector/SourceDataTest.java b/connector/src/test/java/org/astraea/connector/SourceDataTest.java index 9827d0562e..1e9ea36290 100644 --- a/connector/src/test/java/org/astraea/connector/SourceDataTest.java +++ b/connector/src/test/java/org/astraea/connector/SourceDataTest.java @@ -31,7 +31,6 @@ import org.astraea.common.connector.ConnectorConfigs; import org.astraea.common.consumer.Consumer; import org.astraea.common.consumer.ConsumerConfigs; -import org.astraea.common.producer.Record; import org.astraea.it.Service; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -148,7 +147,7 @@ protected void init(Configuration configuration, MetadataStorage storage) { } @Override - protected Collection> take() throws InterruptedException { + protected Collection take() throws InterruptedException { if (isDone) return List.of(); isDone = true; return topics.stream()