Skip to content

Commit

Permalink
[improve][connector] add reader config to pulsar-io-debezium and `p…
Browse files Browse the repository at this point in the history
…ulsar-io-kafka-connect-adaptor` (#16675)

(cherry picked from commit 9c19976)
  • Loading branch information
freeznet authored and codelipenghui committed Jul 25, 2022
1 parent 98d1cc9 commit d923c84
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
*/
package org.apache.pulsar.io.common;

import static org.apache.commons.lang.StringUtils.isBlank;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
Expand All @@ -40,6 +44,15 @@ public static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz, Sin
return loadWithSecrets(map, clazz, secretName -> sinkContext.getSecret(secretName));
}

public static Map<String, Object> loadConfigFromJsonString(String config) throws JsonProcessingException {
if (!isBlank(config)) {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(config, new TypeReference<Map<String, Object>>() {
});
} else {
return Collections.emptyMap();
}
}

private static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz,
Function<String, String> secretsGetter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.pulsar.io.debezium;

import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.pulsar.io.common.IOConfigUtils.loadConfigFromJsonString;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
Expand All @@ -30,6 +33,8 @@
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -77,14 +82,26 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
.withDescription("Pulsar client builder")
.withValidation(Field::isOptional);

public static final Field READER_CONFIG = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.reader.config")
.withDisplayName("Extra configs of the reader")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDescription("The configs of the reader for the database schema history topic, "
+ "in the form of a JSON string with key-value pairs")
.withDefault((String) null)
.withValidation(Field::isOptional);

public static final Field.Set ALL_FIELDS = Field.setOf(
TOPIC,
SERVICE_URL,
CLIENT_BUILDER,
DatabaseHistory.NAME);
DatabaseHistory.NAME,
READER_CONFIG);

private final DocumentReader reader = DocumentReader.defaultReader();
private String topicName;
private Map<String, Object> readerConfigMap = new HashMap<>();
private String dbHistoryName;
private ClientBuilder clientBuilder;
private volatile PulsarClient pulsarClient;
Expand All @@ -102,6 +119,12 @@ public void configure(
+ getClass().getSimpleName() + "; check the logs for details");
}
this.topicName = config.getString(TOPIC);
try {
this.readerConfigMap = loadConfigFromJsonString(config.getString(READER_CONFIG));
} catch (JsonProcessingException exception) {
log.warn("The provided reader configs are invalid, "
+ "will not passing any extra config to the reader builder.", exception);
}

String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
if (isBlank(clientBuilderBase64Encoded) && isBlank(config.getString(SERVICE_URL))) {
Expand Down Expand Up @@ -210,11 +233,7 @@ public void stop() {
@Override
protected void recoverRecords(Consumer<HistoryRecord> records) {
setupClientIfNeeded();
try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
.topic(topicName)
.startMessageId(MessageId.earliest)
.create()
) {
try (Reader<String> historyReader = createHistoryReader()) {
log.info("Scanning the database history topic '{}'", topicName);

// Read all messages in the topic ...
Expand Down Expand Up @@ -257,11 +276,7 @@ protected void recoverRecords(Consumer<HistoryRecord> records) {
@Override
public boolean exists() {
setupClientIfNeeded();
try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
.topic(topicName)
.startMessageId(MessageId.earliest)
.create()
) {
try (Reader<String> historyReader = createHistoryReader()) {
return historyReader.hasMessageAvailable();
} catch (IOException e) {
log.error("Encountered issues on checking existence of database history", e);
Expand All @@ -281,4 +296,13 @@ public String toString() {
}
return "Pulsar topic";
}

@VisibleForTesting
Reader<String> createHistoryReader() throws PulsarClientException {
return pulsarClient.newReader(Schema.STRING)
.topic(topicName)
.startMessageId(MessageId.earliest)
.loadConf(readerConfigMap)
.create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
Expand All @@ -34,12 +35,14 @@
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Base64;
import java.util.List;
import java.util.Map;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -74,7 +77,7 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder) throws Exception {
private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder, boolean testWithReaderConfig) throws Exception {
Configuration.Builder configBuidler = Configuration.create()
.with(PulsarDatabaseHistory.TOPIC, topicName)
.with(DatabaseHistory.NAME, "my-db-history")
Expand All @@ -93,6 +96,10 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit
configBuidler.with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString());
}

if (testWithReaderConfig) {
configBuidler.with(PulsarDatabaseHistory.READER_CONFIG, "{\"subscriptionName\":\"my-subscription\"}");
}

// Start up the history ...
history.configure(configBuidler.build(), null, DatabaseHistoryListener.NOOP, true);
history.start();
Expand Down Expand Up @@ -122,8 +129,8 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit
// Now record schema changes, which writes out to kafka but doesn't actually change the Tables ...
setLogPosition(10);
ddl = "CREATE TABLE foo ( first VARCHAR(22) NOT NULL ); \n" +
"CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
"CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL ); \n";
"CREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \n" +
"CREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL ); \n";
history.record(source, position, "db1", ddl);

// Parse the DDL statement 3x and each time update a different Tables object ...
Expand Down Expand Up @@ -181,6 +188,10 @@ private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWit
assertEquals(recoveredTables, tables3);
}

private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean testWithClientBuilder) throws Exception {
testHistoryTopicContent(skipUnparseableDDL, testWithClientBuilder, false);
}

protected void setLogPosition(int index) {
this.position = Collect.hashMapOf("filename", "my-txn-file.log",
"position", index);
Expand Down Expand Up @@ -239,4 +250,17 @@ public void testExists() throws Exception {
// dummytopic should not exist yet
assertFalse(history.exists());
}

@Test
public void testSubscriptionName() throws Exception {
testHistoryTopicContent(true, false, true);
assertTrue(history.exists());
try (Reader<String> ignored = history.createHistoryReader()) {
List<String> subscriptions = admin.topics().getSubscriptions(topicName);
assertEquals(subscriptions.size(), 1);
assertTrue(subscriptions.contains("my-subscription"));
} catch (Exception e) {
fail("Failed to create history reader");
}
}
}
6 changes: 6 additions & 0 deletions pulsar-io/kafka-connect-adaptor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
public static final String TOPIC_NAMESPACE_CONFIG = "topic.namespace";
private static final String TOPIC_NAMESPACE_CONFIG_DOC = "namespace of topic name to store the output topics";

/**
* <code>offset.storage.reader.config</code>.
*/
public static final String OFFSET_STORAGE_READER_CONFIG = "offset.storage.reader.config";
private static final String OFFSET_STORAGE_READER_CONFIG_DOC = "The configs of the reader for the "
+ "kafka connector offsets topic, in the form of a JSON string with key-value pairs";


static {
CONFIG = new ConfigDef()
.define(OFFSET_STORAGE_TOPIC_CONFIG,
Expand All @@ -53,7 +61,12 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig {
Type.STRING,
"public/default",
Importance.HIGH,
TOPIC_NAMESPACE_CONFIG_DOC);
TOPIC_NAMESPACE_CONFIG_DOC)
.define(OFFSET_STORAGE_READER_CONFIG,
Type.STRING,
null,
Importance.HIGH,
OFFSET_STORAGE_READER_CONFIG_DOC);
}

public PulsarKafkaWorkerConfig(Map<String, String> props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.pulsar.io.common.IOConfigUtils.loadConfigFromJsonString;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -54,6 +56,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
private final Map<ByteBuffer, ByteBuffer> data = new ConcurrentHashMap<>();
private PulsarClient client;
private String topic;
private Map<String, Object> readerConfigMap = new HashMap<>();
private Producer<byte[]> producer;
private Reader<byte[]> reader;
private volatile CompletableFuture<Void> outstandingReadToEnd = null;
Expand All @@ -67,6 +70,13 @@ public PulsarOffsetBackingStore(PulsarClient client) {
public void configure(WorkerConfig workerConfig) {
this.topic = workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG);
checkArgument(!isBlank(topic), "Offset storage topic must be specified");
try {
this.readerConfigMap = loadConfigFromJsonString(
workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_READER_CONFIG));
} catch (JsonProcessingException exception) {
log.warn("The provided reader configs are invalid, "
+ "will not passing any extra config to the reader builder.", exception);
}

log.info("Configure offset backing store on pulsar topic {}", topic);
}
Expand Down Expand Up @@ -148,6 +158,7 @@ public void start() {
reader = client.newReader(Schema.BYTES)
.topic(topic)
.startMessageId(MessageId.earliest)
.loadConf(readerConfigMap)
.create();
log.info("Successfully created reader to replay updates from topic {}", topic);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,10 @@ protected void setup() throws Exception {

this.topicName = "persistent://my-property/my-ns/offset-topic";
this.defaultProps.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, topicName);
this.distributedConfig = new PulsarKafkaWorkerConfig(this.defaultProps);
this.client = PulsarClient.builder()
.serviceUrl(brokerUrl.toString())
.build();
this.offsetBackingStore = new PulsarOffsetBackingStore(client);
this.offsetBackingStore.configure(distributedConfig);
this.offsetBackingStore.start();
}

@AfterMethod(alwaysRun = true)
Expand All @@ -81,20 +78,33 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

private void testOffsetBackingStore(boolean testWithReaderConfig) throws Exception {
if (testWithReaderConfig) {
this.defaultProps.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_READER_CONFIG,
"{\"subscriptionName\":\"my-subscription\"}");
}
this.distributedConfig = new PulsarKafkaWorkerConfig(this.defaultProps);
this.offsetBackingStore.configure(distributedConfig);
this.offsetBackingStore.start();
}

@Test
public void testGetFromEmpty() throws Exception {
testOffsetBackingStore(false);
assertTrue(offsetBackingStore.get(
Arrays.asList(ByteBuffer.wrap("empty-key".getBytes(UTF_8)))
).get().isEmpty());
}

@Test
public void testGetSet() throws Exception {
testOffsetBackingStore(false);
testGetSet(false);
}

@Test
public void testGetSetCallback() throws Exception {
testOffsetBackingStore(false);
testGetSet(true);
}

Expand Down Expand Up @@ -136,4 +146,12 @@ private void testGetSet(boolean testCallback) throws Exception {
assertEquals(new String(valData, UTF_8), "test-val-" + idx);
});
}

@Test
public void testWithReaderConfig() throws Exception {
testOffsetBackingStore(true);
testGetSet(false);
List<String> subscriptions = admin.topics().getSubscriptions(topicName);
assertTrue(subscriptions.contains("my-subscription"));
}
}

0 comments on commit d923c84

Please sign in to comment.