diff --git a/airbyte-integrations/connectors/source-kafka/Dockerfile b/airbyte-integrations/connectors/source-kafka/Dockerfile index 7ca85493e476..3ce7df2b6ab3 100644 --- a/airbyte-integrations/connectors/source-kafka/Dockerfile +++ b/airbyte-integrations/connectors/source-kafka/Dockerfile @@ -24,5 +24,5 @@ ENV APPLICATION source-kafka COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.3 +LABEL io.airbyte.version=0.3.0 LABEL io.airbyte.name=airbyte/source-kafka diff --git a/airbyte-integrations/connectors/source-kafka/metadata.yaml b/airbyte-integrations/connectors/source-kafka/metadata.yaml index 7be9d9571c23..cf52f167dec9 100644 --- a/airbyte-integrations/connectors/source-kafka/metadata.yaml +++ b/airbyte-integrations/connectors/source-kafka/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: source definitionId: d917a47b-8537-4d0d-8c10-36a9928d4265 - dockerImageTag: 0.2.3 + dockerImageTag: 0.3.0 dockerRepository: airbyte/source-kafka githubIssueLabel: source-kafka icon: kafka.svg diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaConsumerRebalanceListener.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaConsumerRebalanceListener.java new file mode 100644 index 000000000000..e1371cdf1565 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaConsumerRebalanceListener.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +public class KafkaConsumerRebalanceListener implements ConsumerRebalanceListener { + + public KafkaConsumerRebalanceListener(final KafkaConsumer consumer, final Map positions) { + this.consumer = consumer; + this.positions = positions; + } + + @Override + public void onPartitionsRevoked(final Collection partitions) { + + } + + @Override + public void onPartitionsAssigned(final Collection partitions) { + partitions.forEach(partition -> Optional.ofNullable(positions.get(partition)).ifPresent(position -> consumer.seek(partition, position))); + } + + private final KafkaConsumer consumer; + private final Map positions; +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaMessage.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaMessage.java new file mode 100644 index 000000000000..17ce13e43e7d --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaMessage.java @@ -0,0 +1,7 @@ +package io.airbyte.integrations.source.kafka; + +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; + +public record KafkaMessage(String topic, int partition, long offset, AirbyteRecordMessage message) { + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java index e7c452073e91..2bb6ad60ace2 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/KafkaSource.java @@ -9,7 +9,10 @@ import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.source.kafka.config.ConfigHelper; import io.airbyte.integrations.source.kafka.format.KafkaFormat; +import io.airbyte.integrations.source.kafka.generator.GeneratorHelper; +import io.airbyte.integrations.source.kafka.state.StateHelper; import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; @@ -24,7 +27,8 @@ public class KafkaSource extends BaseConnector implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class); - public KafkaSource() {} + public KafkaSource() { + } @Override public AirbyteConnectionStatus check(final JsonNode config) { @@ -40,7 +44,7 @@ public AirbyteConnectionStatus check(final JsonNode config) { @Override public AirbyteCatalog discover(final JsonNode config) { KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config); - final List streams = kafkaFormat.getStreams(); + final List streams = kafkaFormat.getStreams(config); return new AirbyteCatalog().withStreams(streams); } @@ -51,8 +55,11 @@ public AutoCloseableIterator read(final JsonNode config, final C if (check.getStatus().equals(AirbyteConnectionStatus.Status.FAILED)) { throw new RuntimeException("Unable establish a connection: " + check.getMessage()); } - KafkaFormat kafkaFormat = KafkaFormatFactory.getFormat(config); - return kafkaFormat.read(); + final var parsedConfig = ConfigHelper.fromJson(config); + final var offsets = StateHelper.stateFromJson(state); + final var generator = GeneratorHelper.buildFrom(parsedConfig, offsets); + + return generator.read(); } public static void main(final String[] args) throws Exception { diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/config/ConfigHelper.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/config/ConfigHelper.java new file mode 100644 index 000000000000..4758f2de9d61 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/config/ConfigHelper.java @@ -0,0 +1,124 @@ +package io.airbyte.integrations.source.kafka.config; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.source.kafka.KafkaProtocol; +import io.airbyte.integrations.source.kafka.KafkaStrategy; +import io.airbyte.integrations.source.kafka.MessageFormat; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.connect.json.JsonDeserializer; + +public class ConfigHelper { + + public static SourceConfig fromJson(JsonNode config) { + final var messageFormat = MessageFormat.valueOf( + Optional.ofNullable(config.get("MessageFormat")).map(it -> it.get("deserialization_type").asText().toUpperCase()).orElse("JSON") + ); + final var maxRecords = config.has("max_records_process") ? config.get("max_records_process").intValue() : 100000; + final var maxRetries = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; + final var pollingTimeInMs = config.has("polling_time") ? config.get("polling_time").intValue() : 100; + final var kafkaConfig = new KafkaConfig(getKafkaConfigByFormat(config, messageFormat), getKafkaSubscriptionConfig(config)); + return new SourceConfig(messageFormat, kafkaConfig, maxRecords, maxRetries, pollingTimeInMs); + } + + private static Map getKafkaConfigByFormat(JsonNode config, MessageFormat format) { + Map props = getKafkaProperties(config); + + switch (format) { + case AVRO -> { + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); + props.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + final JsonNode avroConfig = config.get("MessageFormat"); + props.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, + String.format("%s:%s", avroConfig.get("schema_registry_username").asText(), avroConfig.get("schema_registry_password").asText())); + props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, avroConfig.get("schema_registry_url").asText()); + props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, + KafkaStrategy.getStrategyName(avroConfig.get("deserialization_strategy").asText())); + } + case JSON -> { + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); + } + } + + return props; + } + + private static Map getKafkaProperties(JsonNode config) { + final Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, + config.has("group_id") ? config.get("group_id").asText() : null); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, + config.has("max_poll_records") ? config.get("max_poll_records").intValue() : null); + props.putAll(getSecurityProtocolConfig(config)); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, + config.has("client_id") ? config.get("client_id").asText() : null); + props.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get("client_dns_lookup").asText()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.get("enable_auto_commit").booleanValue()); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, + config.has("auto_commit_interval_ms") ? config.get("auto_commit_interval_ms").intValue() : null); + props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, + config.has("retry_backoff_ms") ? config.get("retry_backoff_ms").intValue() : null); + props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, + config.has("request_timeout_ms") ? config.get("request_timeout_ms").intValue() : null); + props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, + config.has("receive_buffer_bytes") ? config.get("receive_buffer_bytes").intValue() : null); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + config.has("auto_offset_reset") ? config.get("auto_offset_reset").asText() : null); + + return props.entrySet().stream() + .filter(entry -> entry.getValue() != null && !entry.getValue().toString().isBlank()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private static Map getKafkaSubscriptionConfig(JsonNode config) { + final Map props = new HashMap<>(); + final var subscription = config.get("subscription"); + + props.put("subscription_type", subscription.get("subscription_type").asText()); + + if (subscription.get("topic_pattern") != null) { + props.put("topic_pattern", subscription.get("topic_pattern").asText()); + } + + if (subscription.get("topic_partitions") != null) { + props.put("topic_partitions", subscription.get("topic_partitions").asText()); + } + + return props; + } + + private static Map getSecurityProtocolConfig(final JsonNode config) { + final JsonNode protocolConfig = config.get("protocol"); + final KafkaProtocol protocol = KafkaProtocol.valueOf(protocolConfig.get("security_protocol").asText().toUpperCase()); + final Map props = new HashMap<>(); + + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.toString()); + + switch (protocol) { + case PLAINTEXT -> { + } + case SASL_SSL, SASL_PLAINTEXT -> { + props.put(SaslConfigs.SASL_JAAS_CONFIG, protocolConfig.get("sasl_jaas_config").asText()); + props.put(SaslConfigs.SASL_MECHANISM, protocolConfig.get("sasl_mechanism").asText()); + } + default -> throw new RuntimeException("Unexpected Kafka protocol: " + Jsons.serialize(protocol)); + } + + return props; + } + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/config/KafkaConfig.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/config/KafkaConfig.java new file mode 100644 index 000000000000..2322649b6334 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/config/KafkaConfig.java @@ -0,0 +1,7 @@ +package io.airbyte.integrations.source.kafka.config; + +import java.util.Map; + +public record KafkaConfig(Map properties, Map subscription) { + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/config/SourceConfig.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/config/SourceConfig.java new file mode 100644 index 000000000000..f4eac07ee2cb --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/config/SourceConfig.java @@ -0,0 +1,7 @@ +package io.airbyte.integrations.source.kafka.config; + +import io.airbyte.integrations.source.kafka.MessageFormat; + +public record SourceConfig(MessageFormat format, KafkaConfig kafkaConfig, int maxRecords, int maxRetries, int pollingTimeInMs) { + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/converter/AvroConverter.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/converter/AvroConverter.java new file mode 100644 index 000000000000..3953dc804dbf --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/converter/AvroConverter.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.converter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import java.time.Instant; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.StringUtils; + +public class AvroConverter implements Converter { + + @Override + public AirbyteRecordMessage convertToAirbyteRecord(String topic, GenericRecord value) { + String namespace = value.getSchema().getNamespace(); + String name = value.getSchema().getName(); + JsonNode output = Jsons.deserialize(value.toString()); + + // Todo dynamic namespace is not supported now hence, adding avro schema name in the message + // NB this is adding a new column to the data, I don't know whether we really want it + if (StringUtils.isNoneEmpty(namespace) && StringUtils.isNoneEmpty(name)) { + String newString = String.format("{ \"avro_schema\": \"%s\",\"name\": \"%s\" }", namespace, name); + ((ObjectNode) output).set("_namespace_", Jsons.deserialize(newString)); + } + + return new AirbyteRecordMessage() + .withStream(topic) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(output); + } +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/converter/Converter.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/converter/Converter.java new file mode 100644 index 000000000000..78c01d47a63b --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/converter/Converter.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.converter; + +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; + +public interface Converter { + + AirbyteRecordMessage convertToAirbyteRecord(String topic, V value); +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/converter/JsonConverter.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/converter/JsonConverter.java new file mode 100644 index 000000000000..ef97403b9498 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/converter/JsonConverter.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.converter; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import java.time.Instant; + +public class JsonConverter implements Converter { + + @Override + public AirbyteRecordMessage convertToAirbyteRecord(String topic, JsonNode value) { + return new AirbyteRecordMessage() + .withStream(topic) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(value); + } +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/converter/schema/Avro2JsonConvert.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/converter/schema/Avro2JsonConvert.java new file mode 100644 index 000000000000..f2b09ea35afa --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/converter/schema/Avro2JsonConvert.java @@ -0,0 +1,178 @@ +package io.airbyte.integrations.source.kafka.converter.schema; + + +import static java.util.Map.entry; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.protocol.models.Jsons; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.StreamSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Avro2JsonConvert { + + private static final Logger LOGGER = LoggerFactory.getLogger(Avro2JsonConvert.class); + private static final ObjectMapper mapper = new ObjectMapper(); + + /** + * Mapping from avro to Json type + * + * @link https://docs.airbyte.com/understanding-airbyte/json-avro-conversion/#conversion-rules + */ + private static final Map AVRO_TO_JSON_DATA_TYPE_MAPPING = Map.ofEntries( + entry("null", "null"), + entry("boolean", "boolean"), + entry("int", "integer"), + entry("long", "integer"), + entry("float", "number"), + entry("double", "number"), + entry("bytes", "string"), + entry("string", "string"), + entry("record", "object"), + entry("enum", "string"), + entry("array", "array"), + entry("map", "object"), + entry("fixed", "string") + ); + + + /** + * Method to mapping avro type to json type + * + * @param avroType + * @return + */ + private String avroTypeToJsonType(final String avroType) { + final String jsonTypes = AVRO_TO_JSON_DATA_TYPE_MAPPING.get(avroType); + if (jsonTypes == null) { + throw new IllegalArgumentException("Unknown Avro type: " + avroType); + } + return jsonTypes; + } + + /** + * Method to convert the avro schema in to Json schema in order to save the schema in the Airbyte Catalog + * + * @param avroSchema + * @return JsonNode + * @throws Exception + */ + public JsonNode convertoToAirbyteJson(final String avroSchema) throws Exception { + LOGGER.info("Starting to convert Avro schema in Json Schema"); + final JsonNode jsonSchema = convertoToAirbyteJson(Jsons.deserialize(avroSchema)); + return jsonSchema; + } + + + /** + * Method to convert the avro schema in to Json schema in order to save the schema in the Airbyte Catalog + * + * @param avroSchema JsonNode node with Avro struct + * @return JsonNode node Json struct + * @throws Exception + * @link https://docs.airbyte.com/understanding-airbyte/json-avro-conversion/ + */ + public JsonNode convertoToAirbyteJson(final JsonNode avroSchema) throws Exception { + + final ObjectNode node = mapper.createObjectNode(); + JsonNode typeFields = null; + final JsonNode typeField = removeNull(avroSchema.get("type")); + + if (typeField.isObject()) { + return convertoToAirbyteJson(typeField); + } else if (typeField.isValueNode()) { + typeFields = typeField; + } else if (typeField.isArray() && StreamSupport.stream(typeField.spliterator(), false).allMatch(t -> t.isTextual())) { + final ArrayNode array = node.putArray("anyOf"); + for (final Iterator it = typeField.iterator(); it.hasNext(); ) { + final JsonNode type = it.next(); + array.add(mapper.createObjectNode().put("type", avroTypeToJsonType(type.asText()))); + } + return node; + } + if (typeFields == null) { + StreamSupport.stream(avroSchema.get("type").spliterator(), false).filter(t -> !t.isNull()).filter(t -> !t.asText().equals("null")) + .forEach(t -> node.put("type", avroTypeToJsonType(t.asText()))); + return node; + + } + final String typeText = typeFields.asText(); + switch (typeText) { + case "record" -> { + node.put("type", "object"); + final ObjectNode properties = mapper.createObjectNode(); + for (final Iterator it = avroSchema.get("fields").iterator(); it.hasNext(); ) { + final JsonNode field = it.next(); + properties.put(field.get("name").asText(), convertoToAirbyteJson(field)); + } + node.set("properties", properties); + return node; + } + case "string", "int", "null", "float", "boolean" -> { + return node.put("type", avroTypeToJsonType(typeText)); + } + case "map" -> { + final JsonNode typeObj = mapper.createObjectNode().put("type", "string"); + return mapper.createObjectNode() + .put("type", "object") + .set("additionalProperties", typeObj); + } + case "array" -> { + final ArrayNode array = node.putArray("items"); + node.put("type", "array"); + final JsonNode items = removeNull(avroSchema.get("items")); + + if (items.isValueNode()) { + array.add(mapper.createObjectNode().put("type", avroTypeToJsonType(items.asText()))); + } else { + final JsonNode a = convertoToAirbyteJson(items); + array.add(a); + } + return node; + } + } + return node; + } + + + /** + * Remove null or "null" value present in the Type array + * + * @param field + * @return + * @throws Exception + */ + private static JsonNode removeNull(final JsonNode field) throws Exception { + ArrayNode array = null; + if (field.isTextual()) { + return field; + } else if (field.isObject()) { + array = mapper.createArrayNode().add(field).add(mapper.createObjectNode().textNode("null")); + } else if (field.isArray()) { + array = (ArrayNode) field; + } + + final List fieldWithoutNull = StreamSupport.stream(array.spliterator(), false) + .filter(t -> !t.isNull()).filter(t -> !t.asText().equals("null")).toList(); + if (fieldWithoutNull.isEmpty()) { + throw new Exception("Unknown JsonNode converter:" + field); + } else { + if (fieldWithoutNull.size() == 1) { + return fieldWithoutNull.stream().findFirst().get(); + } else { + + final ArrayNode arrayNode = mapper.createArrayNode(); + fieldWithoutNull.stream().forEach(arrayNode::add); + return (JsonNode) arrayNode; + } + } + } + +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java index 3f875e2baaa8..9923fbfb7ee4 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/AvroFormat.java @@ -13,6 +13,7 @@ import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.integrations.source.kafka.KafkaStrategy; +import io.airbyte.integrations.source.kafka.converter.schema.Avro2JsonConvert; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.v0.AirbyteMessage; @@ -20,6 +21,9 @@ import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.CatalogHelpers; import io.airbyte.protocol.models.v0.SyncMode; +import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; @@ -44,174 +48,202 @@ public class AvroFormat extends AbstractFormat { - private static final Logger LOGGER = LoggerFactory.getLogger(AvroFormat.class); - - private KafkaConsumer consumer; - - public AvroFormat(JsonNode jsonConfig) { - super(jsonConfig); - } - - @Override - protected Map getKafkaConfig() { - Map props = super.getKafkaConfig(); - final JsonNode avro_config = config.get("MessageFormat"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); - props.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); - props.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, - String.format("%s:%s", avro_config.get("schema_registry_username").asText(), avro_config.get("schema_registry_password").asText())); - props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, avro_config.get("schema_registry_url").asText()); - props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, - KafkaStrategy.getStrategyName(avro_config.get("deserialization_strategy").asText())); - return props; - } - - @Override - protected KafkaConsumer getConsumer() { - if (consumer != null) { - return consumer; - } - Map filteredProps = getKafkaConfig(); - consumer = new KafkaConsumer<>(filteredProps); - - final JsonNode subscription = config.get("subscription"); - LOGGER.info("Kafka subscribe method: {}", subscription.toString()); - switch (subscription.get("subscription_type").asText()) { - case "subscribe" -> { - final String topicPattern = subscription.get("topic_pattern").asText(); - consumer.subscribe(Pattern.compile(topicPattern)); - topicsToSubscribe = consumer.listTopics().keySet().stream() - .filter(topic -> topic.matches(topicPattern)) - .collect(Collectors.toSet()); - LOGGER.info("Topic list: {}", topicsToSubscribe); - } - case "assign" -> { - topicsToSubscribe = new HashSet<>(); - final String topicPartitions = subscription.get("topic_partitions").asText(); - final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(","); - final List topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> { - final String[] pair = topicPartition.split(":"); - topicsToSubscribe.add(pair[0]); - return new TopicPartition(pair[0], Integer.parseInt(pair[1])); - }).collect(Collectors.toList()); - LOGGER.info("Topic-partition list: {}", topicPartitionList); - consumer.assign(topicPartitionList); - } + private static final Logger LOGGER = LoggerFactory.getLogger(AvroFormat.class); + + private KafkaConsumer consumer; + + public AvroFormat(final JsonNode jsonConfig) { + super(jsonConfig); } - return consumer; - } - @Override - protected Set getTopicsToSubscribe() { - if (topicsToSubscribe == null) { - getConsumer(); + @Override + protected Map getKafkaConfig() { + final Map props = super.getKafkaConfig(); + final JsonNode avro_config = config.get("MessageFormat"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); + props.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + props.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, + String.format("%s:%s", avro_config.get("schema_registry_username").asText(), avro_config.get("schema_registry_password").asText())); + props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, avro_config.get("schema_registry_url").asText()); + props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, + KafkaStrategy.getStrategyName(avro_config.get("deserialization_strategy").asText())); + + return props; } - return topicsToSubscribe; - } - - @Override - public boolean isAccessible() { - try { - final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : ""; - if (!testTopic.isBlank()) { - final KafkaConsumer consumer = getConsumer(); - consumer.subscribe(Pattern.compile(testTopic)); - consumer.listTopics(); - consumer.close(); - LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText()); - } - return true; - } catch (final Exception e) { - LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e); - return false; + + @Override + protected KafkaConsumer getConsumer() { + if (consumer != null) { + return consumer; + } + final Map filteredProps = getKafkaConfig(); + consumer = new KafkaConsumer<>(filteredProps); + + final JsonNode subscription = config.get("subscription"); + LOGGER.info("Kafka subscribe method: {}", subscription.toString()); + switch (subscription.get("subscription_type").asText()) { + case "subscribe" -> { + final String topicPattern = subscription.get("topic_pattern").asText(); + consumer.subscribe(Pattern.compile(topicPattern)); + topicsToSubscribe = consumer.listTopics().keySet().stream() + .filter(topic -> topic.matches(topicPattern)) + .collect(Collectors.toSet()); + LOGGER.info("Topic list: {}", topicsToSubscribe); + } + case "assign" -> { + topicsToSubscribe = new HashSet<>(); + final String topicPartitions = subscription.get("topic_partitions").asText(); + final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(","); + final List topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> { + final String[] pair = topicPartition.split(":"); + topicsToSubscribe.add(pair[0]); + return new TopicPartition(pair[0], Integer.parseInt(pair[1])); + }).collect(Collectors.toList()); + LOGGER.info("Topic-partition list: {}", topicPartitionList); + consumer.assign(topicPartitionList); + } + } + return consumer; } - } - - @Override - public List getStreams() { - final Set topicsToSubscribe = getTopicsToSubscribe(); - final List streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers - .createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING)) - .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))) - .collect(Collectors.toList()); - return streams; - } - - @Override - public AutoCloseableIterator read() { - - final KafkaConsumer consumer = getConsumer(); - final List> recordsList = new ArrayList<>(); - final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; - final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; - final int max_records = config.has("max_records_process") ? config.get("max_records_process").intValue() : 100000; - AtomicInteger record_count = new AtomicInteger(); - final Map poll_lookup = new HashMap<>(); - getTopicsToSubscribe().forEach(topic -> poll_lookup.put(topic, 0)); - while (true) { - final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); - consumerRecords.forEach(record -> { - record_count.getAndIncrement(); - recordsList.add(record); - }); - consumer.commitAsync(); - - if (consumerRecords.count() == 0) { - consumer.assignment().stream().map(record -> record.topic()).distinct().forEach( - topic -> { - poll_lookup.put(topic, poll_lookup.get(topic) + 1); - }); - boolean is_complete = poll_lookup.entrySet().stream().allMatch( - e -> e.getValue() > retry); - if (is_complete) { - LOGGER.info("There is no new data in the queue!!"); - break; + + @Override + protected Set getTopicsToSubscribe() { + if (topicsToSubscribe == null) { + getConsumer(); } - } else if (record_count.get() > max_records) { - LOGGER.info("Max record count is reached !!"); - break; - } + return topicsToSubscribe; } - consumer.close(); - final Iterator> iterator = recordsList.iterator(); - return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { - - @Override - protected AirbyteMessage computeNext() { - if (iterator.hasNext()) { - final ConsumerRecord record = iterator.next(); - GenericRecord avro_data = record.value(); - ObjectMapper mapper = new ObjectMapper(); - String namespace = avro_data.getSchema().getNamespace(); - String name = avro_data.getSchema().getName(); - JsonNode output; - try { - // Todo dynamic namespace is not supported now hence, adding avro schema name in the message - if (StringUtils.isNoneEmpty(namespace) && StringUtils.isNoneEmpty(name)) { - String newString = String.format("{\"avro_schema\": \"%s\",\"name\":\"%s\"}", namespace, name); - JsonNode newNode = mapper.readTree(newString); - output = mapper.readTree(avro_data.toString()); - ((ObjectNode) output).set("_namespace_", newNode); - } else { - output = mapper.readTree(avro_data.toString()); + + @Override + public boolean isAccessible() { + try { + final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : ""; + if (!testTopic.isBlank()) { + final KafkaConsumer consumer = getConsumer(); + consumer.subscribe(Pattern.compile(testTopic)); + consumer.listTopics(); + consumer.close(); + LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText()); } - } catch (JsonProcessingException e) { - LOGGER.error("Exception whilst reading avro data from stream", e); + return true; + } catch (final Exception e) { + LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e); + return false; + } + } + +// List providers = List.of( +// new AvroSchemaProvider, +// new JsonSchemaProvider +// ) + + @Override + public List getStreams(final JsonNode config) { + final JsonNode avroConfig = config.get("MessageFormat"); + final String schemRegistryUrl = avroConfig.get("schema_registry_url").asText(); + final Map properties = Map.of(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO", + SchemaRegistryClientConfig.USER_INFO_CONFIG, String.format("%s:%s", avroConfig.get("schema_registry_username").asText(), avroConfig.get("schema_registry_password").asText())); + final CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(schemRegistryUrl, 1, List.of(new AvroSchemaProvider()), properties); + final Set topicsToSubscribe = getTopicsToSubscribe(); + final List streams = topicsToSubscribe.stream().map(topic -> + CatalogHelpers + .createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING)) + .withJsonSchema(extractSchemaStream(schemaRegistryClient, topic)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + + ) + .collect(Collectors.toList()); + return streams; + } + + private static JsonNode extractSchemaStream(final CachedSchemaRegistryClient client, final String topic) { + try { + final SchemaMetadata schema = client.getLatestSchemaMetadata(topic + "-value"); + final String rawSchema = client.getSchemaById(schema.getId()).rawSchema().toString(); + final Avro2JsonConvert converter = new Avro2JsonConvert(); + return converter.convertoToAirbyteJson(rawSchema); + + } catch (final Exception e) { + LOGGER.error("Errore when extract and convert avro schema" + e.getMessage()); throw new RuntimeException(e); - } - return new AirbyteMessage() - .withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage() - .withStream(record.topic()) - .withEmittedAt(Instant.now().toEpochMilli()) - .withData(output)); } + } + + @Override + public AutoCloseableIterator read() { - return endOfData(); - } + final KafkaConsumer consumer = getConsumer(); + final List> recordsList = new ArrayList<>(); + final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0; + final int polling_time = config.has("polling_time") ? config.get("polling_time").intValue() : 100; + final int max_records = config.has("max_records_process") ? config.get("max_records_process").intValue() : 100000; + final AtomicInteger record_count = new AtomicInteger(); + final Map poll_lookup = new HashMap<>(); + getTopicsToSubscribe().forEach(topic -> poll_lookup.put(topic, 0)); + while (true) { + final ConsumerRecords consumerRecords = consumer.poll(Duration.of(polling_time, ChronoUnit.MILLIS)); + consumerRecords.forEach(record -> { + record_count.getAndIncrement(); + recordsList.add(record); + }); + consumer.commitAsync(); + + if (consumerRecords.count() == 0) { + consumer.assignment().stream().map(record -> record.topic()).distinct().forEach( + topic -> { + poll_lookup.put(topic, poll_lookup.get(topic) + 1); + }); + final boolean is_complete = poll_lookup.entrySet().stream().allMatch( + e -> e.getValue() > retry); + if (is_complete) { + LOGGER.info("There is no new data in the queue!!"); + break; + } + } else if (record_count.get() > max_records) { + LOGGER.info("Max record count is reached !!"); + break; + } + } + consumer.close(); + final Iterator> iterator = recordsList.iterator(); + return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { + + @Override + protected AirbyteMessage computeNext() { + if (iterator.hasNext()) { + final ConsumerRecord record = iterator.next(); + final GenericRecord avro_data = record.value(); + final ObjectMapper mapper = new ObjectMapper(); + final String namespace = avro_data.getSchema().getNamespace(); + final String name = avro_data.getSchema().getName(); + final JsonNode output; + try { + // Todo dynamic namespace is not supported now hence, adding avro schema name in the message + if (StringUtils.isNoneEmpty(namespace) && StringUtils.isNoneEmpty(name)) { + final String newString = String.format("{\"avro_schema\": \"%s\",\"name\":\"%s\"}", namespace, name); + final JsonNode newNode = mapper.readTree(newString); + output = mapper.readTree(avro_data.toString()); + ((ObjectNode) output).set("_namespace_", newNode); + } else { + output = mapper.readTree(avro_data.toString()); + } + } catch (final JsonProcessingException e) { + LOGGER.error("Exception whilst reading avro data from stream", e); + throw new RuntimeException(e); + } + return new AirbyteMessage() + .withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage() + .withStream(record.topic()) + .withEmittedAt(Instant.now().toEpochMilli()) + .withData(output)); + } + + return endOfData(); + } - }); - } + }); + } } diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java index 6e1707bd2104..29dbc4e483de 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/JsonFormat.java @@ -94,7 +94,7 @@ public Set getTopicsToSubscribe() { } @Override - public List getStreams() { + public List getStreams(JsonNode config) { final Set topicsToSubscribe = getTopicsToSubscribe(); final List streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers .createAirbyteStream(topic, Field.of("value", JsonSchemaType.STRING)) diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java index 257ef37ac0b6..60257d0f57a1 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/format/KafkaFormat.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.source.kafka.format; +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStream; @@ -13,7 +14,7 @@ public interface KafkaFormat { boolean isAccessible(); - List getStreams(); + List getStreams(final JsonNode config); AutoCloseableIterator read(); diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/generator/Generator.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/generator/Generator.java new file mode 100644 index 000000000000..d13199258d4c --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/generator/Generator.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.generator; + +import com.google.common.collect.AbstractIterator; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.integrations.source.kafka.KafkaMessage; +import io.airbyte.integrations.source.kafka.mediator.KafkaMediator; +import io.airbyte.integrations.source.kafka.state.StateHelper; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; + +final public class Generator { + + private final KafkaMediator mediator; + private final int maxRecords; + private final int maxRetries; + + public Generator(Builder builder) { + this.maxRecords = builder.maxRecords; + this.maxRetries = builder.maxRetries; + this.mediator = builder.mediator; + } + + public static class Builder { + + private KafkaMediator mediator; + private int maxRecords = 100000; + private int maxRetries = 10; + + public static Builder newInstance() { + return new Builder(); + } + + private Builder() { + } + + public Builder withMaxRecords(int maxRecords) { + this.maxRecords = maxRecords; + return this; + } + + public Builder withMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + public Builder withMediator(KafkaMediator mediator) { + this.mediator = mediator; + return this; + } + + public Generator build() { + return new Generator(this); + } + + } + + public AutoCloseableIterator read() { + + return AutoCloseableIterators.fromIterator(new AbstractIterator<>() { + + private int totalRead = 0; + private final Queue pendingMessages = new LinkedList<>(); + + @Override + protected AirbyteMessage computeNext() { + if (this.pendingMessages.isEmpty()) { + if (this.totalRead < Generator.this.maxRecords) { + List batch = pullBatchFromKafka(); + if (!batch.isEmpty()) { + this.totalRead += batch.size(); + this.pendingMessages.addAll(convertToAirbyteMessagesWithState(batch)); + } + } else { + return endOfData(); + } + } + + // If no more pending kafka records, close iterator + if (this.pendingMessages.isEmpty()) { + return endOfData(); + } else { + return pendingMessages.poll(); + } + } + + private List convertToAirbyteMessagesWithState(List batch) { + final Set partitions = new HashSet<>(); + final List messages = new ArrayList<>(); + + for (KafkaMessage entry : batch) { + final var topic = entry.topic(); + final var partition = entry.partition(); + final var message = entry.message(); + partitions.add(new TopicPartition(topic, partition)); + messages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(message)); + } + + final var offsets = Generator.this.mediator.position(partitions); + + for (AirbyteStateMessage entry : StateHelper.toAirbyteState(offsets)) { + messages.add(new AirbyteMessage().withType(AirbyteMessage.Type.STATE).withState(entry)); + } + + return messages; + } + + private List pullBatchFromKafka() { + List batch; + var nrOfRetries = 0; + do { + batch = Generator.this.mediator.poll(); + } while (batch.isEmpty() && ++nrOfRetries < Generator.this.maxRetries); + return batch; + } + }); + } +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/generator/GeneratorHelper.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/generator/GeneratorHelper.java new file mode 100644 index 000000000000..ab7438c9b96d --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/generator/GeneratorHelper.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.generator; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.source.kafka.config.SourceConfig; +import io.airbyte.integrations.source.kafka.converter.AvroConverter; +import io.airbyte.integrations.source.kafka.converter.Converter; +import io.airbyte.integrations.source.kafka.converter.JsonConverter; +import io.airbyte.integrations.source.kafka.mediator.DefaultKafkaMediator; +import io.airbyte.integrations.source.kafka.mediator.KafkaMediator; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; + +public class GeneratorHelper { + + public static Generator buildFrom(SourceConfig config, Map initialOffsets) { + return switch (config.format()) { + case AVRO -> { + final KafkaConsumer consumer = new KafkaConsumer<>(config.kafkaConfig().properties()); + final Converter converter = new AvroConverter(); + final KafkaMediator mediator = new DefaultKafkaMediator<>(consumer, converter, config.pollingTimeInMs(), + config.kafkaConfig().subscription(), + initialOffsets); + + yield Generator.Builder.newInstance() + .withMaxRecords(config.maxRecords()) + .withMaxRetries(config.maxRetries()) + .withMediator(mediator).build(); + } + case JSON -> { + final KafkaConsumer consumer = new KafkaConsumer<>(config.kafkaConfig().properties()); + final Converter converter = new JsonConverter(); + final KafkaMediator mediator = new DefaultKafkaMediator<>(consumer, converter, config.pollingTimeInMs(), + config.kafkaConfig().subscription(), + initialOffsets); + + yield Generator.Builder.newInstance() + .withMaxRecords(config.maxRecords()) + .withMaxRetries(config.maxRetries()) + .withMediator(mediator) + .build(); + } + }; + } + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/mediator/DefaultKafkaMediator.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/mediator/DefaultKafkaMediator.java new file mode 100644 index 000000000000..b31e0b2a7218 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/mediator/DefaultKafkaMediator.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.mediator; + +import io.airbyte.integrations.source.kafka.KafkaConsumerRebalanceListener; +import io.airbyte.integrations.source.kafka.KafkaMessage; +import io.airbyte.integrations.source.kafka.converter.Converter; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultKafkaMediator implements KafkaMediator { + + private final KafkaConsumer consumer; + private final Converter converter; + private final int pollingTimeInMs; + + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultKafkaMediator.class); + + public DefaultKafkaMediator(KafkaConsumer consumer, Converter converter, int pollingTimeInMs, + Map subscription, Map initialOffsets) { + final KafkaConsumerRebalanceListener listener = new KafkaConsumerRebalanceListener(consumer, initialOffsets); + LOGGER.info("Kafka subscribe method: {}", subscription.toString()); + switch (subscription.get("subscription_type")) { + case "subscribe" -> { + final String topicPattern = subscription.get("topic_pattern"); + consumer.subscribe(Pattern.compile(topicPattern), listener); + } + case "assign" -> { + final String topicPartitions = subscription.get("topic_partitions"); + final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(","); + final List topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> { + final String[] pair = topicPartition.split(":"); + return new TopicPartition(pair[0], Integer.parseInt(pair[1])); + }).collect(Collectors.toList()); + LOGGER.info("Topic-partition list: {}", topicPartitionList); + consumer.assign(topicPartitionList); + topicPartitionList.forEach(partition -> Optional.ofNullable(initialOffsets.get(partition)) + .ifPresent(offset -> consumer.seek(partition, offset))); + } + } + + this.consumer = consumer; + this.converter = converter; + this.pollingTimeInMs = pollingTimeInMs; + } + + @Override + public List poll() { + List output = new ArrayList<>(); + consumer.poll(Duration.of(this.pollingTimeInMs, ChronoUnit.MILLIS)).forEach(it -> { + final var message = new KafkaMessage(it.topic(), it.partition(), it.offset(), this.converter.convertToAirbyteRecord(it.topic(), it.value())); + output.add(message); + }); + return output; + } + + @Override + public Map position(Set partitions) { + return partitions.stream() + .map(it -> Map.entry(it, consumer.position(it))) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + } + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/mediator/KafkaMediator.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/mediator/KafkaMediator.java new file mode 100644 index 000000000000..0478d909eaee --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/mediator/KafkaMediator.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.mediator; + +import io.airbyte.integrations.source.kafka.KafkaMessage; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; + +public interface KafkaMediator { + + List poll(); + + Map position(Set partitions); + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/state/State.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/state/State.java new file mode 100644 index 000000000000..01b0806f6fec --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/state/State.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.state; + +import java.util.Map; + +public record State(Map partitions) { + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/state/StateHelper.java b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/state/StateHelper.java new file mode 100644 index 000000000000..eb000b390705 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/main/java/io/airbyte/integrations/source/kafka/state/StateHelper.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.state; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.configoss.StateWrapper; +import io.airbyte.configoss.helpers.StateMessageHelper; +import io.airbyte.protocol.models.v0.AirbyteStateMessage; +import io.airbyte.protocol.models.v0.AirbyteStreamState; +import io.airbyte.protocol.models.v0.StreamDescriptor; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import org.apache.kafka.common.TopicPartition; + +public class StateHelper { + + public static Map stateFromJson(JsonNode state) { + final boolean USE_STREAM_CAPABLE_STATE = true; + final Optional wrapper = StateMessageHelper.getTypedState(state, USE_STREAM_CAPABLE_STATE); + final var serialisedState = wrapper.map(value -> + switch (value.getStateType()) { + case GLOBAL -> fromAirbyteStreamState(value.getGlobal().getGlobal().getStreamStates()); + case STREAM -> fromAirbyteStreamState(value.getStateMessages().stream().map(it -> it.getStream()).toList()); + case LEGACY -> new HashMap(); + } + ); + + return serialisedState.orElse(new HashMap<>()); + } + + public static List toAirbyteState(Map state) { + final Map> intermediate = new HashMap<>(); + + for (final Entry entry : state.entrySet()) { + final var topic = entry.getKey().topic(); + final var partition = entry.getKey().partition(); + final var offset = entry.getValue(); + if (!intermediate.containsKey(topic)) { + intermediate.put(topic, new HashMap<>()); + } + intermediate.get(topic).put(partition, offset); + } + + return intermediate + .entrySet() + .stream() + .map(it -> + new AirbyteStateMessage() + .withType(AirbyteStateMessage.AirbyteStateType.STREAM) + .withStream(new AirbyteStreamState() + .withStreamDescriptor(new StreamDescriptor().withName(it.getKey())) + .withStreamState(Jsons.jsonNode(new State(it.getValue())))) + ) + .toList(); + } + + private static HashMap fromAirbyteStreamState(final List states) { + final var result = new HashMap(); + + for (final io.airbyte.protocol.models.AirbyteStreamState state : states) { + final var topic = state.getStreamDescriptor().getName(); + final var stream = Jsons.convertValue(state.getStreamState(), State.class); + + for (final Entry entry : stream.partitions().entrySet()) { + final var partition = entry.getKey(); + final var offset = entry.getValue(); + + result.put(new TopicPartition(topic, partition), offset); + } + } + + return result; + } +} diff --git a/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json b/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json index 65adada1124e..abd1f2bd299a 100644 --- a/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-kafka/src/main/resources/spec.json @@ -48,7 +48,8 @@ }, "schema_registry_password": { "type": "string", - "default": "" + "default": "", + "airbyte_secret": true } } } diff --git a/airbyte-integrations/connectors/source-kafka/src/test-integration/resources/expected_spec.json b/airbyte-integrations/connectors/source-kafka/src/test-integration/resources/expected_spec.json index 7cf1426c1190..abd1f2bd299a 100644 --- a/airbyte-integrations/connectors/source-kafka/src/test-integration/resources/expected_spec.json +++ b/airbyte-integrations/connectors/source-kafka/src/test-integration/resources/expected_spec.json @@ -1,5 +1,7 @@ { "documentationUrl": "https://docs.airbyte.com/integrations/sources/kafka", + "supportsIncremental": true, + "supported_source_sync_modes": ["append"], "connectionSpecification": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "Kafka Source Spec", @@ -46,7 +48,8 @@ }, "schema_registry_password": { "type": "string", - "default": "" + "default": "", + "airbyte_secret": true } } } @@ -266,8 +269,5 @@ "default": 100000 } } - }, - "supportsIncremental": true, - "supported_destination_sync_modes": [], - "supported_source_sync_modes": ["append"] + } } diff --git a/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/AvroConverterTest.java b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/AvroConverterTest.java new file mode 100644 index 000000000000..0e3124813046 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/AvroConverterTest.java @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.integrations.source.kafka.converter.schema.Avro2JsonConvert; +import io.airbyte.protocol.models.Jsons; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class AvroConverterTest { + + ObjectMapper mapper = new ObjectMapper(); + + + + @Test + public void testConverterAvroSimpleSchema() throws Exception { + + final String avroSimpleSchema = getFileFromResourceAsString("/converter/simpleSchema.avsc"); + final String jsonSimpleSchema = getFileFromResourceAsString("/converter/simpleSchema.json"); + + final Avro2JsonConvert converter = new Avro2JsonConvert(); + final JsonNode actual = converter.convertoToAirbyteJson( Jsons.deserialize(avroSimpleSchema)); + final JsonNode expect = mapper.readTree(jsonSimpleSchema); + assertEquals(expect, actual); + } + + @Test + public void testConverterAvroNestedRecordsSchema() throws Exception { + + final String avroNestedRecordsSchema = getFileFromResourceAsString("/converter/nestedRecordsSchema.avsc"); + final String jsonNestedRecordSchema = getFileFromResourceAsString("/converter/nestedRecordsSchema.json"); + final Avro2JsonConvert converter = new Avro2JsonConvert(); + final JsonNode actual = converter.convertoToAirbyteJson( Jsons.deserialize((avroNestedRecordsSchema))); + final JsonNode expect = mapper.readTree(jsonNestedRecordSchema); + assertEquals(expect, actual); + } + + @Test + public void testConverterAvroWithArray() throws Exception { + + final String avroWithArraySchema = getFileFromResourceAsString("/converter/withArraySchema.avsc"); + final String jsonWithArraySchema = getFileFromResourceAsString("/converter/withArraySchema.json"); + + final Avro2JsonConvert converter = new Avro2JsonConvert(); + final JsonNode actual = converter.convertoToAirbyteJson( Jsons.deserialize(avroWithArraySchema)); + final JsonNode expect = mapper.readTree(jsonWithArraySchema); + assertEquals(expect, actual); + } + + + @Test + public void testConverterAvroWithArrayAndRecordSchema() throws Exception { + + final String avroWithArrayAndRecordSchema = getFileFromResourceAsString("/converter/withArrayAndRecordSchema.avsc"); + final String jsonWithArrayAndRecordSchema = getFileFromResourceAsString("/converter/withArrayAndRecordSchema.json"); + + + final Avro2JsonConvert converter = new Avro2JsonConvert(); + final JsonNode actual = converter.convertoToAirbyteJson( Jsons.deserialize(avroWithArrayAndRecordSchema)); + final JsonNode expect = mapper.readTree(jsonWithArrayAndRecordSchema); + assertEquals(expect, actual); + } + + + @Test + public void testConverterAvroWithCombinedRestrictions() throws Exception { + + final String avroWithCombinedRestrictionsSchema = getFileFromResourceAsString("/converter/withCombinedRestrictionsSchema.avsc"); + final String jsonWithCombinedRestrictionsSchema = getFileFromResourceAsString("/converter/withCombinedRestrictionsSchema.json"); + + final Map jsonSchema = mapper.readValue(avroWithCombinedRestrictionsSchema, new TypeReference<>() { + }); + final Avro2JsonConvert converter = new Avro2JsonConvert(); + final JsonNode actual = converter.convertoToAirbyteJson( Jsons.deserialize(avroWithCombinedRestrictionsSchema)); + final JsonNode expect = mapper.readTree(jsonWithCombinedRestrictionsSchema); + assertEquals(expect, actual); + } + + + @Test + public void testConverterAvroWithArrayAndNestedRecordSchema() throws Exception { + + final String avroWithArrayAndNestedRecordSchema = getFileFromResourceAsString("/converter/withArrayAndNestedRecordSchema.avsc"); + final String jsonWithArrayAndNestedRecordSchema = getFileFromResourceAsString("/converter/withArrayAndNestedRecordSchema.json"); + + final Map jsonSchema = mapper.readValue(avroWithArrayAndNestedRecordSchema, new TypeReference<>() { + }); + final Avro2JsonConvert converter = new Avro2JsonConvert(); + final JsonNode actual = converter.convertoToAirbyteJson( Jsons.deserialize(avroWithArrayAndNestedRecordSchema)); + final JsonNode expect = mapper.readTree(jsonWithArrayAndNestedRecordSchema); + assertEquals(expect, actual); + } + + @Test + public void testConverterAvroWithSchemaReference() throws Exception { + + final String avroWithSchemaReference = getFileFromResourceAsString("/converter/withSchemaReference.avsc"); + final String jsonWithSchemaReference = getFileFromResourceAsString("/converter/withSchemaReference.json"); + + final Map jsonSchema = mapper.readValue(avroWithSchemaReference, new TypeReference<>() { + }); + final Avro2JsonConvert converter = new Avro2JsonConvert(); + final JsonNode actual = converter.convertoToAirbyteJson( Jsons.deserialize(avroWithSchemaReference)); + final JsonNode expect = mapper.readTree(jsonWithSchemaReference); + final String a = actual.toPrettyString(); + System.out.println(a); + assertEquals(expect, actual); + } + + + @Test + public void testConvertoToAirbyteJson() throws Exception { + final String avroSimpleSchema = getFileFromResourceAsString("/converter/simpleSchema.avsc"); + final String jsonSimpleSchema = getFileFromResourceAsString("/converter/simpleSchema.json"); + final Avro2JsonConvert converter = new Avro2JsonConvert(); + final JsonNode actual = converter.convertoToAirbyteJson( Jsons.deserialize(avroSimpleSchema)); + final JsonNode expect = mapper.readTree(jsonSimpleSchema); + assertEquals(expect, actual); + } + + + private String getFileFromResourceAsString(final String fileName) throws IOException { + + // The class loader that loaded the class + final InputStream inputStream = getClass().getResourceAsStream(fileName); + return IOUtils.toString(inputStream, Charset.defaultCharset()); + + } + + + + +} diff --git a/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java index 60046a2a2b99..484302e91f6f 100644 --- a/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java +++ b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/KafkaSourceTest.java @@ -9,9 +9,14 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.kafka.format.AvroFormat; import io.airbyte.integrations.source.kafka.format.KafkaFormat; import java.io.IOException; + +import io.airbyte.protocol.models.v0.AirbyteMessage; import org.junit.jupiter.api.Test; public class KafkaSourceTest { @@ -23,4 +28,16 @@ public void testAvroformat() throws IOException { assertInstanceOf(AvroFormat.class, kafkaFormat); } +// @Test +// public void testAvroMessage() throws Exception { +// final JsonNode configJson = Jsons.deserialize(MoreResources.readResource("test_config_uk.json")); +// final Source source = new KafkaSource(); +// source.discover(configJson); +// +// } + + + + + } diff --git a/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/converter/AvroConverterTest.java b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/converter/AvroConverterTest.java new file mode 100644 index 000000000000..2339937e558d --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/converter/AvroConverterTest.java @@ -0,0 +1,101 @@ +package io.airbyte.integrations.source.kafka.converter; + +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Test; + + +class AvroConverterTest { + + @Test + void convertToAirbyteRecord() { + + String rawSchema = """ + { + "type": "record", + "name": "TestRecord", + "namespace": "mynamespace", + "fields": [{ + "name": "address", + "type": { + "type": "record", + "name": "Address", + "fields": [{ + "name": "number", + "type": ["null", "string"], + "default": null + }, { + "name": "postal_code", + "type": "int" + }, { + "name": "street", + "type": ["null", "string"], + "default": null + }] + } + }, { + "name": "name", + "type": "string" + + }, { + "name": "skills", + "type": ["null", { + "type": "array", + "items": ["null", "string"] + }], + "default": null + }, { + "name": "surname", + "type": "string" + }] + } + """; + Schema.Parser parser = new Schema.Parser(); + Schema schema = parser.parse(rawSchema); + + GenericRecord addressTestRecord = new GenericData.Record(schema.getField("address").schema()); + addressTestRecord.put("street", "via fittizie"); + addressTestRecord.put("number", "42"); + addressTestRecord.put("postal_code", 12345); + + List skillsTestRecord = new LinkedList<>(); + skillsTestRecord.add("coding"); + skillsTestRecord.add("etl"); + + GenericRecord testRecord = new GenericData.Record(schema); + testRecord.put("name", "Team"); + testRecord.put("surname", "Member"); + testRecord.put("address", addressTestRecord); + testRecord.put("skills", skillsTestRecord); + + String testTopic = "conversion.uk.test"; + + Converter converter = new AvroConverter(); + + AirbyteRecordMessage actualMessage = converter.convertToAirbyteRecord(testTopic, testRecord); + JsonNode actualData = actualMessage.getData(); + + List expectedSkills = (List) testRecord.get("skills"); + List actualSkills = new ArrayList<>(); + actualData.get("skills").elements().forEachRemaining(x -> actualSkills.add(x.asText())); + + assertAll( + () -> assertEquals(testTopic, actualMessage.getStream()), + () -> assertEquals(testRecord.get("name"), actualData.get("name").asText()), + () -> assertEquals(testRecord.get("surname"), actualData.get("surname").asText()), + () -> assertEquals(expectedSkills.stream().distinct().toList(), actualSkills.stream().distinct().toList()), + () -> assertEquals(addressTestRecord.get("street"), actualData.get("address").get("street").asText()), + () -> assertEquals(addressTestRecord.get("number"), actualData.get("address").get("number").asText()), + () -> assertEquals(addressTestRecord.get("postal_code"), actualData.get("address").get("postal_code").asInt()) + ); + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/converter/JsonConverterTest.java b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/converter/JsonConverterTest.java new file mode 100644 index 000000000000..e5f25f02c70b --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/converter/JsonConverterTest.java @@ -0,0 +1,38 @@ +package io.airbyte.integrations.source.kafka.converter; + +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import org.junit.jupiter.api.Test; + +class JsonConverterTest { + + @Test + void testConvertToAirbyteRecord() throws JsonProcessingException { + String recordString = """ + { + "name": "Team", + "surname": "Member", + "age": 42 + } + """; + + ObjectMapper mapper = new ObjectMapper(); + JsonNode testRecord = mapper.readTree(recordString); + + String testTopic = "test_topic"; + + Converter converter = new JsonConverter(); + + AirbyteRecordMessage actualMessage = converter.convertToAirbyteRecord(testTopic, testRecord); + + assertAll( + () -> assertEquals(testTopic, actualMessage.getStream()), + () -> assertEquals(testRecord, actualMessage.getData()) + ); + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/generator/GeneratorTest.java b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/generator/GeneratorTest.java new file mode 100644 index 000000000000..aed66b9cf42c --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/generator/GeneratorTest.java @@ -0,0 +1,283 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.generator; + +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.source.kafka.KafkaMessage; +import io.airbyte.integrations.source.kafka.mediator.KafkaMediator; +import io.airbyte.integrations.source.kafka.state.State; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.StreamSupport; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; + +public class GeneratorTest { + + final int maxMessages = 1000; + final int maxRetries = 10; + + @Test + public void testOneBatchNoState() { + final var mediator = new KafkaMediator() { + + final String topic = "topic-0"; + final Queue messages = new LinkedList<>( + List.of( + new KafkaMessage(topic, 0, 0, new AirbyteRecordMessage().withStream(topic).withData(Jsons.deserialize("{ \"message\" : 1 }"))) + ) + ); + + @Override + public List poll() { + return Optional.ofNullable(this.messages.poll()).stream().toList(); + } + + @Override + public Map position(Set partitions) { + return Map.of(); + } + }; + final var generator = Generator.Builder.newInstance() + .withMaxRecords(maxMessages) + .withMaxRetries(maxRetries) + .withMediator(mediator) + .build(); + final var messages = StreamSupport.stream( + Spliterators.spliteratorUnknownSize(generator.read(), Spliterator.ORDERED), false + ).toList(); + final var expectedRecord = Jsons.deserialize("{ \"message\" : 1 }"); + + assertAll( + () -> assertEquals(1, messages.size()), + () -> assertEquals(AirbyteMessage.Type.RECORD, messages.get(0).getType()), + () -> assertEquals(expectedRecord, messages.get(0).getRecord().getData()) + ); + } + + @Test + public void testOneBatchWithState() { + final var mediator = new KafkaMediator() { + + final String topic = "topic-0"; + final Queue> messages = new LinkedList<>( + List.of( + List.of( + new KafkaMessage(this.topic, 0, 0L, + new AirbyteRecordMessage().withStream(this.topic).withData(Jsons.deserialize("{ \"message\" : 2 }"))), + new KafkaMessage(this.topic, 1, 5L, + new AirbyteRecordMessage().withStream(this.topic).withData(Jsons.deserialize("{ \"message\" : 3 }"))) + ) + ) + ); + + @Override + public List poll() { + return Optional.ofNullable(this.messages.poll()).orElse(List.of()); + } + + @Override + public Map position(Set partitions) { + return Map.ofEntries( + Map.entry(new TopicPartition(this.topic, 0), 0L), + Map.entry(new TopicPartition(this.topic, 1), 5L) + ); + } + }; + final var generator = Generator.Builder.newInstance() + .withMaxRecords(maxMessages) + .withMaxRetries(maxRetries) + .withMediator(mediator) + .build(); + final var messages = StreamSupport.stream( + Spliterators.spliteratorUnknownSize(generator.read(), Spliterator.ORDERED), false + ).toList(); + final var expectedRecord1 = Jsons.deserialize("{ \"message\" : 2 }"); + final var expectedRecord2 = Jsons.deserialize("{ \"message\" : 3 }"); + final var expectedStateTopic = "topic-0"; + final var expectedStateContent = Jsons.jsonNode(new State(Map.ofEntries( + Map.entry(0, 0L), + Map.entry(1, 5L) + ))); + + assertAll( + () -> assertEquals(3, messages.size()), + () -> assertEquals(AirbyteMessage.Type.RECORD, messages.get(0).getType()), + () -> assertEquals(expectedRecord1, messages.get(0).getRecord().getData()), + () -> assertEquals(AirbyteMessage.Type.RECORD, messages.get(1).getType()), + () -> assertEquals(expectedRecord2, messages.get(1).getRecord().getData()), + () -> assertEquals(AirbyteMessage.Type.STATE, messages.get(2).getType()), + () -> assertEquals(AirbyteStateType.STREAM, messages.get(2).getState().getType()), + () -> assertEquals(expectedStateTopic, messages.get(2).getState().getStream().getStreamDescriptor().getName()), + () -> assertEquals(expectedStateContent, messages.get(2).getState().getStream().getStreamState()) + ); + } + + @Test + public void testMultipleBatches() { + final var mediator = new KafkaMediator() { + + final String topic0 = "topic-0"; + final String topic1 = "topic-2"; + + final Queue> messages = new LinkedList<>( + List.of( + List.of( + new KafkaMessage(this.topic0, 0, 0L, + new AirbyteRecordMessage().withStream(this.topic0).withData(Jsons.deserialize("{ \"message\" : 4 }"))) + ), + List.of( + new KafkaMessage(this.topic1, 1, 5L, + new AirbyteRecordMessage().withStream(this.topic1).withData(Jsons.deserialize("{ \"message\" : 5 }"))) + ) + ) + ); + final Queue> partitions = new LinkedList<>( + List.of( + Map.of(new TopicPartition(this.topic0, 0), 0L), + Map.of(new TopicPartition(this.topic1, 1), 5L) + ) + ); + + @Override + public List poll() { + return Optional.ofNullable(this.messages.poll()).orElse(List.of()); + } + + @Override + public Map position(Set partitions) { + return Optional.ofNullable(this.partitions.poll()).orElse(Map.of()); + } + }; + final var generator = Generator.Builder.newInstance() + .withMaxRecords(maxMessages) + .withMaxRetries(maxRetries) + .withMediator(mediator) + .build(); + final var messages = StreamSupport.stream( + Spliterators.spliteratorUnknownSize(generator.read(), Spliterator.ORDERED), false + ).toList(); + final var expectedRecord1 = Jsons.deserialize("{ \"message\" : 4 }"); + final var expectedRecord2 = Jsons.deserialize("{ \"message\" : 5 }"); + final var expectedStateTopic1 = "topic-0"; + final var expectedStateContent1 = Jsons.jsonNode(new State(Map.ofEntries( + Map.entry(0, 0L) + ))); + final var expectedStateTopic2 = "topic-2"; + final var expectedStateContent2 = Jsons.jsonNode(new State(Map.ofEntries( + Map.entry(1, 5L) + ))); + + assertAll( + () -> assertEquals(4, messages.size()), + () -> assertEquals(AirbyteMessage.Type.RECORD, messages.get(0).getType()), + () -> assertEquals(expectedRecord1, messages.get(0).getRecord().getData()), + () -> assertEquals(AirbyteMessage.Type.STATE, messages.get(1).getType()), + () -> assertEquals(AirbyteStateType.STREAM, messages.get(1).getState().getType()), + () -> assertEquals(expectedStateTopic1, messages.get(1).getState().getStream().getStreamDescriptor().getName()), + () -> assertEquals(expectedStateContent1, messages.get(1).getState().getStream().getStreamState()), + () -> assertEquals(AirbyteMessage.Type.RECORD, messages.get(2).getType()), + () -> assertEquals(expectedRecord2, messages.get(2).getRecord().getData()), + () -> assertEquals(AirbyteMessage.Type.STATE, messages.get(3).getType()), + () -> assertEquals(AirbyteStateType.STREAM, messages.get(3).getState().getType()), + () -> assertEquals(expectedStateTopic2, messages.get(3).getState().getStream().getStreamDescriptor().getName()), + () -> assertEquals(expectedStateContent2, messages.get(3).getState().getStream().getStreamState()) + ); + } + + @Test + public void testRetriesNoData() { + final var mediator = new KafkaMediator() { + + @Override + public List poll() { + return List.of(); + } + + @Override + public Map position(Set partitions) { + return Map.of(); + } + }; + final var generator = Generator.Builder.newInstance() + .withMaxRecords(maxMessages) + .withMaxRetries(maxRetries) + .withMediator(mediator) + .build(); + final var messages = StreamSupport.stream( + Spliterators.spliteratorUnknownSize(generator.read(), Spliterator.ORDERED), false + ).toList(); + + assertTrue(messages.isEmpty()); + } + + @Test + public void testRetriesDataAfterSomeAttempts() { + final var mediator = new KafkaMediator() { + + final String topic = "topic-0"; + final Queue> messages = new LinkedList<>( + List.of( + List.of(), + List.of(), + List.of(), + List.of(), + List.of( + new KafkaMessage(this.topic, 0, 0L, + new AirbyteRecordMessage().withStream(this.topic).withData(Jsons.deserialize("{ \"message\" : 6 }"))) + ) + ) + ); + + @Override + public List poll() { + return Optional.ofNullable(this.messages.poll()).orElse(List.of()); + } + + @Override + public Map position(Set partitions) { + return Map.ofEntries( + Map.entry(new TopicPartition(this.topic, 0), 0L) + ); + } + }; + final var generator = Generator.Builder.newInstance() + .withMaxRecords(maxMessages) + .withMaxRetries(maxRetries) + .withMediator(mediator) + .build(); + final var messages = StreamSupport.stream( + Spliterators.spliteratorUnknownSize(generator.read(), Spliterator.ORDERED), false + ).toList(); + final var expectedRecord = Jsons.deserialize("{ \"message\" : 6 }"); + final var expectedStateTopic = "topic-0"; + final var expectedStateContent = Jsons.jsonNode(new State(Map.ofEntries( + Map.entry(0, 0L) + ))); + + assertAll( + () -> assertEquals(2, messages.size()), + () -> assertEquals(AirbyteMessage.Type.RECORD, messages.get(0).getType()), + () -> assertEquals(expectedRecord, messages.get(0).getRecord().getData()), + () -> assertEquals(AirbyteMessage.Type.STATE, messages.get(1).getType()), + () -> assertEquals(AirbyteStateType.STREAM, messages.get(1).getState().getType()), + () -> assertEquals(expectedStateTopic, messages.get(1).getState().getStream().getStreamDescriptor().getName()), + () -> assertEquals(expectedStateContent, messages.get(1).getState().getStream().getStreamState()) + ); + } +} diff --git a/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/state/StateHelperTest.java b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/state/StateHelperTest.java new file mode 100644 index 000000000000..82562cd23133 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/java/io/airbyte/integrations/source/kafka/state/StateHelperTest.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.kafka.state; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import java.io.IOException; +import java.util.Map; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; + +public class StateHelperTest { + + @Test + public void testGlobalStateDeserialisation() throws IOException { + final var jsonState = Jsons.deserialize(MoreResources.readResource("state/test_global_state_deserialisation.json")); + final var state = StateHelper.stateFromJson(jsonState); + final var expected = Map.ofEntries( + Map.entry(new TopicPartition("topic-0", 0), 42L) + ); + assertEquals(expected, state); + } + + @Test + public void testLegacyStateDeserialisation() throws IOException { + final var jsonState = Jsons.deserialize(MoreResources.readResource("state/test_legacy_state_deserialisation.json")); + final var state = StateHelper.stateFromJson(jsonState); + assertTrue(state.isEmpty()); + } + + @Test + public void testStreamStateDeserialisation() throws IOException { + final var jsonState = Jsons.deserialize(MoreResources.readResource("state/test_stream_state_deserialisation.json")); + final var state = StateHelper.stateFromJson(jsonState); + final var expected = Map.ofEntries( + Map.entry(new TopicPartition("topic-1", 0), 24L), + Map.entry(new TopicPartition("topic-1", 1), 42L) + ); + assertEquals(expected, state); + } + + @Test + public void testStateSerialisation() throws IOException { + final var state = Map.ofEntries( + Map.entry(new TopicPartition("topic-0", 0), 24L), + Map.entry(new TopicPartition("topic-1", 0), 42L), + Map.entry(new TopicPartition("topic-1", 1), 66L) + ); + final var serialised = Jsons.serialize(StateHelper.toAirbyteState(state)); + final var expected = MoreResources.readResource("state/test_state_serialisation.json"); + assertEquals(expected, serialised); + } + +} + diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/nestedRecordsSchema.avsc b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/nestedRecordsSchema.avsc new file mode 100644 index 000000000000..f43ab26434b8 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/nestedRecordsSchema.avsc @@ -0,0 +1,17 @@ + { + "type": "record", + "name": "sampleAvroNested", + "namespace": "AVRO", + "fields": [ + {"name": "lastname", "type": "string"}, + {"name": "address","type": { + "type" : "record", + "name" : "AddressUSRecord", + "fields" : [ + {"name": "streetaddress", "type": ["string", "null"]}, + {"name": "city", "type": "string"} + ] + } + } + ] + } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/nestedRecordsSchema.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/nestedRecordsSchema.json new file mode 100644 index 000000000000..c3307a2e4c4d --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/nestedRecordsSchema.json @@ -0,0 +1,19 @@ +{ + "type": "object", + "properties": { + "address":{ + "type": "object", + "properties": { + "city":{ + "type": "string" + }, + "streetaddress":{ + "type": "string" + } + } + }, + "lastname":{ + "type":"string" + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/simpleSchema.avsc b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/simpleSchema.avsc new file mode 100644 index 000000000000..749139910ab2 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/simpleSchema.avsc @@ -0,0 +1,12 @@ +{ + "type": "record", + "name": "sampleAvro", + "namespace": "AVRO", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": ["int", "null"]}, + {"name": "address", "type": ["float", "null"]}, + {"name": "street", "type": "float"}, + {"name": "valid", "type": "boolean"} + ] +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/simpleSchema.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/simpleSchema.json new file mode 100644 index 000000000000..1506158a1704 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/simpleSchema.json @@ -0,0 +1,10 @@ +{ + "type": "object", + "properties": { + "address": {"type": "number"}, + "age": {"type": "integer"}, + "name": {"type": "string"}, + "street": {"type": "number"}, + "valid": {"type": "boolean"} + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArrayAndNestedRecordSchema.avsc b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArrayAndNestedRecordSchema.avsc new file mode 100644 index 000000000000..db30f458e96f --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArrayAndNestedRecordSchema.avsc @@ -0,0 +1,37 @@ + { + "type": "record", + "name": "TestObject", + "namespace": "ca.dataedu", + "fields": [{ + "name": "array_field", + "type": ["null", { + "type": "array", + "items": ["null", { + "type": "record", + "name": "Array_field", + "fields": [{ + "name": "id", + "type": ["null", { + "type": "record", + "name": "Id", + "fields": [{ + "name": "id_part_1", + "type": ["null", "int"], + "default": null + }, { + "name": "id_part_2", + "type": ["null", "string"], + "default": null + }] + }], + "default": null + }, { + "name": "message", + "type": ["null", "string"], + "default": null + }] + }] + }], + "default": null + }] + } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArrayAndNestedRecordSchema.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArrayAndNestedRecordSchema.json new file mode 100644 index 000000000000..56b5e576473e --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArrayAndNestedRecordSchema.json @@ -0,0 +1,23 @@ +{ + "type": "object", + "properties":{ + "array_field": { + "type": "array", + "items": [ + { "type":"object", + "properties":{ + "id": { + + "type": "object", + "properties":{ + "id_part_1": { "type": "integer" }, + "id_part_2": { "type": "string"} + } + }, + "message" : {"type": "string"} + } + } + ] + } + } + } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArrayAndRecordSchema.avsc b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArrayAndRecordSchema.avsc new file mode 100644 index 000000000000..c6817df3a795 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArrayAndRecordSchema.avsc @@ -0,0 +1,29 @@ + { + "type": "record", + "name": "TestObject", + "namespace": "ca.dataedu", + "fields": [{ + "name": "array_field", + "type": ["null", { + "type": "array", + "items": ["null", { + "type": "record", + "name": "Array_field", + "fields": [{ + "name": "id", + "type": ["null", { + "type": "record", + "name": "Id", + "fields": [{ + "name": "id_part_1", + "type": ["null", "int"], + "default": null + }] + }], + "default": null + }] + }] + }], + "default": null + }] + } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArrayAndRecordSchema.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArrayAndRecordSchema.json new file mode 100644 index 000000000000..765b989a5d45 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArrayAndRecordSchema.json @@ -0,0 +1,25 @@ +{ + "type": "object", + "properties":{ + "array_field":{ + "type":"array", + "items":[ + { + "type":"object", + "properties":{ + "id":{ + "type":"object", + "properties":{ + "id_part_1":{ + "type": + "integer" + } + } + } + } + + } + ] + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArraySchema.avsc b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArraySchema.avsc new file mode 100644 index 000000000000..062735ea4a50 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArraySchema.avsc @@ -0,0 +1,15 @@ + { + "type": "record", + "fields": [ + { + "name": "identifier", + "type": [ + null, + { + "type": "array", + "items": ["null", "string"] + } + ] + } + ] + } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArraySchema.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArraySchema.json new file mode 100644 index 000000000000..4b5a1926298f --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withArraySchema.json @@ -0,0 +1,11 @@ +{ + "type": "object", + "properties": { + "identifier": { + "type": "array", + "items" : [ + {"type":"string"} + ] + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withCombinedRestrictionsSchema.avsc b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withCombinedRestrictionsSchema.avsc new file mode 100644 index 000000000000..f0534918d7be --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withCombinedRestrictionsSchema.avsc @@ -0,0 +1,10 @@ + { + "type": "record", + "name": "sampleAvro", + "namespace": "AVRO", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": ["int", "null"]}, + {"name": "address", "type": ["float", "string", "null"]} + ] + } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withCombinedRestrictionsSchema.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withCombinedRestrictionsSchema.json new file mode 100644 index 000000000000..d4761487ee5a --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withCombinedRestrictionsSchema.json @@ -0,0 +1,11 @@ +{ + "type":"object", + "properties":{ + "address": {"anyOf": [ + {"type": "number"}, + {"type": "string"} + ]}, + "name": {"type": "string"}, + "age": {"type": "integer"} + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withSchemaReference.avsc b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withSchemaReference.avsc new file mode 100644 index 000000000000..9157a8ca5ffc --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withSchemaReference.avsc @@ -0,0 +1,166 @@ +{ + "type":"record", + "name":"PolicyCreated", + "namespace":"com.prima.uk", + "fields":[ + { + "name":"amount_per_instalment", + "type":[ + "null", + "string" + ] + }, + { + "name":"auth_context_uuid", + "type":"string" + }, + { + "name":"context_introducer", + "type":"string" + }, + { + "name":"context_introducer_reference", + "type":[ + "null", + "string" + ] + }, + { + "name":"context_tariff_revision", + "type":[ + "null", + "string" + ] + }, + { + "name":"created_at", + "type":"string", + "logicalType":"iso_datetime" + }, + { + "name":"deposit", + "type":"string", + "logicalType":"decimal" + }, + { + "name":"premium_breakdown", + "type":{ + "type":"map", + "values":{ + "type":"string", + "logicalType":"decimal" + } + } + }, + { + "name":"offer_uuid", + "type":"string" + }, + { + "name":"payment_frequency", + "type":"string" + }, + { + "name":"policy_cfid", + "type":"string" + }, + { + "name":"policy_version_uuid", + "type":"string" + }, + { + "name":"quote_uuid", + "type":"string" + }, + { + "name":"segments", + "type":{ + "type":"array", + "items":{ + "type":"record", + "name":"PolicySegment", + "fields":[ + { + "name":"annualized_price", + "type":"string", + "logicalType":"decimal" + }, + { + "name":"cover_type", + "type":"string" + }, + { + "name":"ends_at", + "type":"string", + "logicalType":"iso_datetime" + }, + { + "name":"items", + "type":{ + "type":"array", + "items":{ + "type":"record", + "name":"PolicyItem", + "fields":[ + { + "name":"annualized_price", + "type":"string", + "logicalType":"decimal" + }, + { + "name":"compulsory_excess", + "type":[ + "null", + "string" + ] + }, + { + "name":"coverage", + "type":"string" + }, + { + "name":"level", + "type":"string" + }, + { + "name":"provider", + "type":"string" + }, + { + "name":"type", + "type":"string" + }, + { + "name":"voluntary_excess", + "type":[ + "null", + "string" + ] + } + ] + } + } + }, + { + "name":"price_source", + "type":"string" + }, + { + "name":"risk_id", + "type":"string" + }, + { + "name":"segment_uuid", + "type":"string" + }, + { + "name":"starts_at", + "type":"string", + "logicalType":"iso_datetime" + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withSchemaReference.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withSchemaReference.json new file mode 100644 index 000000000000..891d9dcf2acc --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/converter/withSchemaReference.json @@ -0,0 +1,110 @@ +{ + "type":"object", + "properties": + { + "amount_per_instalment": { + "type": "string" + }, + "auth_context_uuid":{ + "type":"string" + }, + "context_introducer":{ + "type":"string" + }, + "context_introducer_reference":{ + "type":"string" + }, + "context_tariff_revision" : { + "type":"string" + }, + "created_at": { + "type":"string" + }, + "deposit":{ + "type":"string" + }, + "premium_breakdown":{ + "type":"object", + "additionalProperties": { + "type": "string" + } + }, + "offer_uuid": { + "type":"string" + }, + "payment_frequency": { + "type":"string" + }, + "policy_cfid": { + "type":"string" + }, + "policy_version_uuid":{ + "type":"string" + }, + "quote_uuid":{ + "type":"string" + }, + "segments": { + "type":"array", + "items":[ + { + "type":"object", + "properties":{ + "annualized_price":{ + "type":"string" + }, + "cover_type": { + "type":"string" + }, + "ends_at": { + "type":"string" + }, + "items": { + "type":"array", + "items": [ + { + "type" : "object", + "properties" : { + "annualized_price" : { + "type" : "string" + }, + "compulsory_excess" : { + "type" : "string" + }, + "coverage" : { + "type" : "string" + }, + "level" : { + "type" : "string" + }, + "provider" : { + "type" : "string" + }, + "type" : { + "type" : "string" + }, + "voluntary_excess" : { + "type" : "string" + } + } + } + ] + }, + "price_source" :{ + "type":"string" + }, + "risk_id": { + "type":"string" + }, + "segment_uuid": { + "type":"string" + }, + "starts_at": { + "type":"string" + } + } + } + ] + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/state/test_global_state_deserialisation.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/state/test_global_state_deserialisation.json new file mode 100644 index 000000000000..84b743780aba --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/state/test_global_state_deserialisation.json @@ -0,0 +1 @@ +[{"type":"GLOBAL","global":{"shared_state":{},"stream_states":[{"stream_descriptor":{"name":"topic-0"},"stream_state":{"partitions":{"0":42}}}]}}] \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/state/test_legacy_state_deserialisation.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/state/test_legacy_state_deserialisation.json new file mode 100644 index 000000000000..f4c415cae236 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/state/test_legacy_state_deserialisation.json @@ -0,0 +1 @@ +[{"type":"LEGACY","data":{"stream_descriptor":{"name":"topic-0"},"stream_state":{"partitions":{"0":42}}}}] \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/state/test_state_serialisation.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/state/test_state_serialisation.json new file mode 100644 index 000000000000..97d6626f8d9a --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/state/test_state_serialisation.json @@ -0,0 +1 @@ +[{"type":"STREAM","stream":{"stream_descriptor":{"name":"topic-0"},"stream_state":{"partitions":{"0":24}}}},{"type":"STREAM","stream":{"stream_descriptor":{"name":"topic-1"},"stream_state":{"partitions":{"0":42,"1":66}}}}] \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-kafka/src/test/resources/state/test_stream_state_deserialisation.json b/airbyte-integrations/connectors/source-kafka/src/test/resources/state/test_stream_state_deserialisation.json new file mode 100644 index 000000000000..befc57f99646 --- /dev/null +++ b/airbyte-integrations/connectors/source-kafka/src/test/resources/state/test_stream_state_deserialisation.json @@ -0,0 +1 @@ +[{"type":"STREAM","stream":{"stream_descriptor":{"name":"topic-1"},"stream_state":{"partitions":{"0":24,"1":42}}}}] \ No newline at end of file