Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SSDP 192] Use the schema registry to derive a stream schema in the kafka source #7

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION source-kafka

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/source-kafka
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: d917a47b-8537-4d0d-8c10-36a9928d4265
dockerImageTag: 0.2.3
dockerImageTag: 0.3.0
dockerRepository: airbyte/source-kafka
githubIssueLabel: source-kafka
icon: kafka.svg
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.kafka;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

public class KafkaConsumerRebalanceListener implements ConsumerRebalanceListener {

public KafkaConsumerRebalanceListener(final KafkaConsumer<?, ?> consumer, final Map<TopicPartition, Long> positions) {
this.consumer = consumer;
this.positions = positions;
}

@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {

}

@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
partitions.forEach(partition -> Optional.ofNullable(positions.get(partition)).ifPresent(position -> consumer.seek(partition, position)));
}

private final KafkaConsumer<?, ?> consumer;
private final Map<TopicPartition, Long> positions;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.airbyte.integrations.source.kafka;

import io.airbyte.protocol.models.v0.AirbyteRecordMessage;

public record KafkaMessage(String topic, int partition, long offset, AirbyteRecordMessage message) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.kafka.config.ConfigHelper;
import io.airbyte.integrations.source.kafka.format.KafkaFormat;
import io.airbyte.integrations.source.kafka.generator.GeneratorHelper;
import io.airbyte.integrations.source.kafka.state.StateHelper;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
Expand All @@ -24,7 +27,8 @@ public class KafkaSource extends BaseConnector implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class);

public KafkaSource() {}
public KafkaSource() {
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) {
Expand All @@ -40,7 +44,7 @@ public AirbyteConnectionStatus check(final JsonNode config) {
@Override
public AirbyteCatalog discover(final JsonNode config) {
KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config);
final List<AirbyteStream> streams = kafkaFormat.getStreams();
final List<AirbyteStream> streams = kafkaFormat.getStreams(config);
return new AirbyteCatalog().withStreams(streams);
}

Expand All @@ -51,8 +55,11 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final C
if (check.getStatus().equals(AirbyteConnectionStatus.Status.FAILED)) {
throw new RuntimeException("Unable establish a connection: " + check.getMessage());
}
KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config);
return kafkaFormat.read();
final var parsedConfig = ConfigHelper.fromJson(config);
final var offsets = StateHelper.stateFromJson(state);
final var generator = GeneratorHelper.buildFrom(parsedConfig, offsets);

return generator.read();
}

public static void main(final String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package io.airbyte.integrations.source.kafka.config;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.kafka.KafkaProtocol;
import io.airbyte.integrations.source.kafka.KafkaStrategy;
import io.airbyte.integrations.source.kafka.MessageFormat;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonDeserializer;

public class ConfigHelper {

public static SourceConfig fromJson(JsonNode config) {
final var messageFormat = MessageFormat.valueOf(
Optional.ofNullable(config.get("MessageFormat")).map(it -> it.get("deserialization_type").asText().toUpperCase()).orElse("JSON")
);
final var maxRecords = config.has("max_records_process") ? config.get("max_records_process").intValue() : 100000;
final var maxRetries = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0;
final var pollingTimeInMs = config.has("polling_time") ? config.get("polling_time").intValue() : 100;
final var kafkaConfig = new KafkaConfig(getKafkaConfigByFormat(config, messageFormat), getKafkaSubscriptionConfig(config));
return new SourceConfig(messageFormat, kafkaConfig, maxRecords, maxRetries, pollingTimeInMs);
}

private static Map<String, Object> getKafkaConfigByFormat(JsonNode config, MessageFormat format) {
Map<String, Object> props = getKafkaProperties(config);

switch (format) {
case AVRO -> {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
final JsonNode avroConfig = config.get("MessageFormat");
props.put(SchemaRegistryClientConfig.USER_INFO_CONFIG,
String.format("%s:%s", avroConfig.get("schema_registry_username").asText(), avroConfig.get("schema_registry_password").asText()));
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, avroConfig.get("schema_registry_url").asText());
props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY,
KafkaStrategy.getStrategyName(avroConfig.get("deserialization_strategy").asText()));
}
case JSON -> {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
}
}

return props;
}

private static Map<String, Object> getKafkaProperties(JsonNode config) {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText());
props.put(ConsumerConfig.GROUP_ID_CONFIG,
config.has("group_id") ? config.get("group_id").asText() : null);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
config.has("max_poll_records") ? config.get("max_poll_records").intValue() : null);
props.putAll(getSecurityProtocolConfig(config));
props.put(ConsumerConfig.CLIENT_ID_CONFIG,
config.has("client_id") ? config.get("client_id").asText() : null);
props.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get("client_dns_lookup").asText());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.get("enable_auto_commit").booleanValue());
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
config.has("auto_commit_interval_ms") ? config.get("auto_commit_interval_ms").intValue() : null);
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG,
config.has("retry_backoff_ms") ? config.get("retry_backoff_ms").intValue() : null);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
config.has("request_timeout_ms") ? config.get("request_timeout_ms").intValue() : null);
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
config.has("receive_buffer_bytes") ? config.get("receive_buffer_bytes").intValue() : null);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
config.has("auto_offset_reset") ? config.get("auto_offset_reset").asText() : null);

return props.entrySet().stream()
.filter(entry -> entry.getValue() != null && !entry.getValue().toString().isBlank())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private static Map<String, String> getKafkaSubscriptionConfig(JsonNode config) {
final Map<String, String> props = new HashMap<>();
final var subscription = config.get("subscription");

props.put("subscription_type", subscription.get("subscription_type").asText());

if (subscription.get("topic_pattern") != null) {
props.put("topic_pattern", subscription.get("topic_pattern").asText());
}

if (subscription.get("topic_partitions") != null) {
props.put("topic_partitions", subscription.get("topic_partitions").asText());
}

return props;
}

private static Map<String, Object> getSecurityProtocolConfig(final JsonNode config) {
final JsonNode protocolConfig = config.get("protocol");
final KafkaProtocol protocol = KafkaProtocol.valueOf(protocolConfig.get("security_protocol").asText().toUpperCase());
final Map<String, Object> props = new HashMap<>();

props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString());

switch (protocol) {
case PLAINTEXT -> {
}
case SASL_SSL, SASL_PLAINTEXT -> {
props.put(SaslConfigs.SASL_JAAS_CONFIG, protocolConfig.get("sasl_jaas_config").asText());
props.put(SaslConfigs.SASL_MECHANISM, protocolConfig.get("sasl_mechanism").asText());
}
default -> throw new RuntimeException("Unexpected Kafka protocol: " + Jsons.serialize(protocol));
}

return props;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.airbyte.integrations.source.kafka.config;

import java.util.Map;

public record KafkaConfig(Map<String, Object> properties, Map<String, String> subscription) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.airbyte.integrations.source.kafka.config;

import io.airbyte.integrations.source.kafka.MessageFormat;

public record SourceConfig(MessageFormat format, KafkaConfig kafkaConfig, int maxRecords, int maxRetries, int pollingTimeInMs) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.kafka.converter;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.time.Instant;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;

public class AvroConverter implements Converter<GenericRecord> {

@Override
public AirbyteRecordMessage convertToAirbyteRecord(String topic, GenericRecord value) {
String namespace = value.getSchema().getNamespace();
String name = value.getSchema().getName();
JsonNode output = Jsons.deserialize(value.toString());

// Todo dynamic namespace is not supported now hence, adding avro schema name in the message
// NB this is adding a new column to the data, I don't know whether we really want it
if (StringUtils.isNoneEmpty(namespace) && StringUtils.isNoneEmpty(name)) {
String newString = String.format("{ \"avro_schema\": \"%s\",\"name\": \"%s\" }", namespace, name);
((ObjectNode) output).set("_namespace_", Jsons.deserialize(newString));
}

return new AirbyteRecordMessage()
.withStream(topic)
.withEmittedAt(Instant.now().toEpochMilli())
.withData(output);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.kafka.converter;

import io.airbyte.protocol.models.v0.AirbyteRecordMessage;

public interface Converter<V> {

AirbyteRecordMessage convertToAirbyteRecord(String topic, V value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.kafka.converter;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.time.Instant;

public class JsonConverter implements Converter<JsonNode> {

@Override
public AirbyteRecordMessage convertToAirbyteRecord(String topic, JsonNode value) {
return new AirbyteRecordMessage()
.withStream(topic)
.withEmittedAt(Instant.now().toEpochMilli())
.withData(value);
}
}
Loading