Skip to content

Commit

Permalink
feat: support Protobuf in ksqlDB (#4469)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Feb 11, 2020
1 parent 3f756c1 commit a77cebe
Show file tree
Hide file tree
Showing 45 changed files with 888 additions and 322 deletions.
2 changes: 1 addition & 1 deletion config/ksql-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ bootstrap.servers=localhost:9092
# ksql.connect.worker.config=config/connect.properties

# Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry:
#ksql.schema.registry.url=?
# ksql.schema.registry.url=?
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,18 @@
package io.confluent.ksql.schema.ksql.inference;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.connect.avro.AvroData;
import io.confluent.connect.avro.AvroDataConfig;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.links.DocumentationLinks;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.connect.ConnectSchemaTranslator;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.apache.avro.Schema.Parser;
import org.apache.http.HttpStatus;
import org.apache.kafka.connect.data.Schema;

Expand All @@ -40,29 +37,26 @@
public class SchemaRegistryTopicSchemaSupplier implements TopicSchemaSupplier {

private final SchemaRegistryClient srClient;
private final Function<String, org.apache.avro.Schema> toAvroTranslator;
private final Function<org.apache.avro.Schema, Schema> toConnectTranslator;
private final Function<Schema, Schema> toKsqlTranslator;
private final Function<String, Format> formatFactory;

public SchemaRegistryTopicSchemaSupplier(final SchemaRegistryClient srClient) {
this(
srClient,
schema -> new Parser().parse(schema),
new AvroData(new AvroDataConfig(Collections.emptyMap()))::toConnectSchema,
new ConnectSchemaTranslator()::toKsqlSchema);
new ConnectSchemaTranslator()::toKsqlSchema,
FormatFactory::fromName
);
}

@VisibleForTesting
SchemaRegistryTopicSchemaSupplier(
final SchemaRegistryClient srClient,
final Function<String, org.apache.avro.Schema> toAvroTranslator,
final Function<org.apache.avro.Schema, Schema> toConnectTranslator,
final Function<Schema, Schema> toKsqlTranslator
final Function<Schema, Schema> toKsqlTranslator,
final Function<String, Format> formatFactory
) {
this.srClient = Objects.requireNonNull(srClient, "srClient");
this.toAvroTranslator = Objects.requireNonNull(toAvroTranslator, "toAvroTranslator");
this.toConnectTranslator = Objects.requireNonNull(toConnectTranslator, "toConnectTranslator");
this.toKsqlTranslator = Objects.requireNonNull(toKsqlTranslator, "toKsqlTranslator");
this.formatFactory = Objects.requireNonNull(formatFactory, "formatFactory");
}

@Override
Expand Down Expand Up @@ -100,32 +94,15 @@ public SchemaResult fromParsedSchema(
final int id,
final ParsedSchema parsedSchema
) {

try {
final Schema connectSchema;

switch (parsedSchema.schemaType()) {
case AvroSchema.TYPE:
connectSchema = toConnectSchema(parsedSchema.canonicalString());
break;
case "JSON":
case "PROTOBUF":
default:
throw new KsqlException("Unsupported schema type: " + parsedSchema.schemaType());
}

final Format format = formatFactory.apply(parsedSchema.schemaType());
final Schema connectSchema = toKsqlTranslator.apply(format.toConnectSchema(parsedSchema));
return SchemaResult.success(SchemaAndId.schemaAndId(connectSchema, id));
} catch (final Exception e) {
return notCompatible(topic, parsedSchema.canonicalString(), e);
}
}

private Schema toConnectSchema(final String avroSchemaString) {
final org.apache.avro.Schema avroSchema = toAvroTranslator.apply(avroSchemaString);
final Schema connectSchema = toConnectTranslator.apply(avroSchema);
return toKsqlTranslator.apply(connectSchema);
}

private static SchemaResult notFound(final String topicName) {
return SchemaResult.failure(new KsqlException(
"Avro schema for message values on topic " + topicName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
package io.confluent.ksql.schema.registry;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.ksql.util.KsqlConfig;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
Expand All @@ -41,6 +46,7 @@ public class KsqlSchemaRegistryClientFactory {
interface SchemaRegistryClientFactory {
CachedSchemaRegistryClient create(RestService service,
int identityMapCapacity,
List<SchemaProvider> providers,
Map<String, Object> clientConfigs,
Map<String, String> httpHeaders);
}
Expand Down Expand Up @@ -113,6 +119,7 @@ public SchemaRegistryClient get() {
return schemaRegistryClientFactory.create(
restService,
1000,
ImmutableList.of(new AvroSchemaProvider(), new ProtobufSchemaProvider()),
schemaRegistryClientConfigs,
httpHeaders
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import com.google.common.collect.ImmutableMap;
import io.confluent.connect.avro.AvroData;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.metastore.MetaStore;
Expand Down Expand Up @@ -69,7 +69,7 @@ public class DefaultSchemaInjectorFunctionalTest {
@Mock
private SchemaRegistryClient srClient;
@Mock
private ParsedSchema schema;
private AvroSchema avroSchema;
@Mock
private MetaStore metaStore;
private DefaultSchemaInjector schemaInjector;
Expand Down Expand Up @@ -499,9 +499,9 @@ private void shouldInferSchema(
try {
when(srClient.getLatestSchemaMetadata(any()))
.thenReturn(new SchemaMetadata(1, 1, avroSchema.toString()));
when(srClient.getSchemaBySubjectAndId(any(), anyInt())).thenReturn(schema);
when(schema.schemaType()).thenReturn("AVRO");
when(schema.canonicalString()).thenReturn(avroSchema.toString());
when(srClient.getSchemaBySubjectAndId(any(), anyInt())).thenReturn(this.avroSchema);
when(this.avroSchema.schemaType()).thenReturn("AVRO");
when(this.avroSchema.rawSchema()).thenReturn(avroSchema);
} catch (final Exception e) {
throw new AssertionError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaResult;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.util.KsqlException;
import java.io.IOException;
import java.util.Optional;
Expand Down Expand Up @@ -57,26 +58,22 @@ public class SchemaRegistryTopicSchemaSupplierTest {
@Mock
private SchemaRegistryClient srClient;
@Mock
private Function<String, org.apache.avro.Schema> toAvroTranslator;
@Mock
private Function<org.apache.avro.Schema, Schema> toConnectTranslator;
@Mock
private Function<Schema, Schema> toKsqlTranslator;
@Mock
private org.apache.avro.Schema avroSchema;
@Mock
private ParsedSchema parsedSchema;
@Mock
private Schema connectSchema;
@Mock
private Schema ksqlSchema;
@Mock
private Format format;

private SchemaRegistryTopicSchemaSupplier supplier;

@Before
public void setUp() throws Exception {
supplier = new SchemaRegistryTopicSchemaSupplier(
srClient, toAvroTranslator, toConnectTranslator, toKsqlTranslator);
srClient, toKsqlTranslator, f -> format);

when(srClient.getLatestSchemaMetadata(any()))
.thenReturn(new SchemaMetadata(SCHEMA_ID, -1, AVRO_SCHEMA));
Expand All @@ -88,11 +85,7 @@ public void setUp() throws Exception {

when(parsedSchema.canonicalString()).thenReturn(AVRO_SCHEMA);

when(toAvroTranslator.apply(any()))
.thenReturn(avroSchema);

when(toConnectTranslator.apply(any()))
.thenReturn(connectSchema);
when(format.toConnectSchema(parsedSchema)).thenReturn(connectSchema);

when(toKsqlTranslator.apply(any()))
.thenReturn(ksqlSchema);
Expand Down Expand Up @@ -226,28 +219,10 @@ public void shouldThrowFromGetValueWithIdSchemaOnOtherException() throws Excepti
supplier.getValueSchema(TOPIC_NAME, Optional.of(42));
}

@Test
public void shouldReturnErrorFromGetValueSchemaIfCanNotConvertToAvroSchema() {
// Given:
when(toAvroTranslator.apply(any()))
.thenThrow(new RuntimeException("it went boom"));

// When:
final SchemaResult result = supplier.getValueSchema(TOPIC_NAME, Optional.empty());

// Then:
assertThat(result.schemaAndId, is(Optional.empty()));
assertThat(result.failureReason.get().getMessage(), containsString(
"Unable to verify if the schema for topic some-topic is compatible with KSQL."));
assertThat(result.failureReason.get().getMessage(), containsString(
"it went boom"));
assertThat(result.failureReason.get().getMessage(), containsString(AVRO_SCHEMA));
}

@Test
public void shouldReturnErrorFromGetValueSchemaIfCanNotConvertToConnectSchema() {
// Given:
when(toConnectTranslator.apply(any()))
when(format.toConnectSchema(any()))
.thenThrow(new RuntimeException("it went boom"));

// When:
Expand Down Expand Up @@ -299,21 +274,12 @@ public void shouldRequestCorrectSchemaOnGetValueSchemaWithId() throws Exception
}

@Test
public void shouldPassWriteSchemaToAvroTranslator() {
// When:
supplier.getValueSchema(TOPIC_NAME, Optional.empty());

// Then:
verify(toAvroTranslator).apply(AVRO_SCHEMA);
}

@Test
public void shouldPassWriteSchemaToConnectTranslator() {
public void shouldPassWriteSchemaToFormat() {
// When:
supplier.getValueSchema(TOPIC_NAME, Optional.empty());

// Then:
verify(toConnectTranslator).apply(avroSchema);
verify(format).toConnectSchema(parsedSchema);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class KsqlSchemaRegistryClientFactoryTest {

@Before
public void setUp() {
when(srClientFactory.create(any(), anyInt(), any(), any()))
when(srClientFactory.create(any(), anyInt(), any(), any(), any()))
.thenReturn(mock(CachedSchemaRegistryClient.class));

when(restServiceSupplier.get()).thenReturn(restService);
Expand Down Expand Up @@ -166,7 +166,7 @@ public void shouldPassBasicAuthCredentialsToSchemaRegistryClient() {

// Then:
verify(restService).setSslSocketFactory(isA(SSL_CONTEXT.getSocketFactory().getClass()));
srClientFactory.create(same(restService), anyInt(), eq(expectedConfigs), any());
srClientFactory.create(same(restService), anyInt(), any(), eq(expectedConfigs), any());
}

private static Map<String, Object> defaultConfigs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

package io.confluent.ksql.datagen;

import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.util.KsqlConfig;
Expand All @@ -42,8 +45,9 @@ static Optional<SchemaRegistryClient> getSrClient(
}

return Optional.of(new CachedSchemaRegistryClient(
ksqlConfig.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY),
ImmutableList.of(ksqlConfig.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY)),
100,
ImmutableList.of(new AvroSchemaProvider(), new ProtobufSchemaProvider()),
ksqlConfig.originalsWithPrefix(KsqlConfig.KSQL_SCHEMA_REGISTRY_PREFIX)
));
}
Expand Down
10 changes: 10 additions & 0 deletions ksql-functional-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>

</dependencies>

<build>
Expand Down
Loading

0 comments on commit a77cebe

Please sign in to comment.