Skip to content

Commit

Permalink
Upgrade Confluent version to 7.3.1
Browse files Browse the repository at this point in the history
Updates transitive dependencies for Avro and ZooKeeper.
Wire 4.x is required for Confluent 7.3.1 and is updated
in the modules that need it, but leaves Wire at 3.x for
the remaining modules.
  • Loading branch information
adamjshook authored and hashhar committed Mar 29, 2023
1 parent 4d34ed9 commit d85df0d
Show file tree
Hide file tree
Showing 17 changed files with 383 additions and 30 deletions.
13 changes: 12 additions & 1 deletion lib/trino-record-decoder/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@

<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema</artifactId>
<artifactId>wire-runtime-jvm</artifactId>
</dependency>

<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema-jvm</artifactId>
</dependency>

<dependency>
Expand Down Expand Up @@ -86,6 +91,12 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.squareup.okio</groupId>
<artifactId>okio-jvm</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -127,7 +128,18 @@ private boolean isSupportedPrimitive(Type type)

public FieldValueProvider decodeField(GenericRecord avroRecord)
{
Object avroColumnValue = locateNode(avroRecord, columnMapping);
Object avroColumnValue;
try {
avroColumnValue = locateNode(avroRecord, columnMapping);
}
catch (AvroRuntimeException e) {
if (e.getMessage().contains("Not a valid schema field")) {
avroColumnValue = null;
}
else {
throw e;
}
}
return new ObjectValueProvider(avroColumnValue, columnType, columnName);
}

Expand Down
21 changes: 18 additions & 3 deletions plugin/trino-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@
<scope>runtime</scope>
</dependency>

<!-- used by trino-record-decoder -->
<!-- used by kafka-protobuf-provider -->
<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema</artifactId>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
<version>2.5.1</version>
<scope>runtime</scope>
</dependency>

Expand All @@ -158,6 +159,12 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
Expand Down Expand Up @@ -298,6 +305,13 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-types</artifactId>
<!-- This is under Confluent Community License and it should not be used with compile scope -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-serializer</artifactId>
Expand Down Expand Up @@ -326,6 +340,7 @@
<configuration>
<allowedProvidedDependencies>
<id>io.confluent:kafka-protobuf-provider</id>
<id>io.confluent:kafka-protobuf-types</id>
</allowedProvidedDependencies>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ public int register(String subject, ParsedSchema parsedSchema, int version, int
}
}

@Override
public int register(String subject, ParsedSchema schema, boolean normalize)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.register(subject, schema, normalize);
}
}

@Override
public Schema getByID(int id)
throws IOException, RestClientException
Expand All @@ -104,6 +113,140 @@ public Schema getById(int id)
}
}

@Override
public int getId(String subject, ParsedSchema schema, boolean normalize)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getId(subject, schema, normalize);
}
}

@Override
public Optional<ParsedSchema> parseSchema(io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.parseSchema(schema);
}
}

@Override
public List<ParsedSchema> getSchemas(String subjectPrefix, boolean lookupDeletedSchema, boolean latestOnly)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getSchemas(subjectPrefix, lookupDeletedSchema, latestOnly);
}
}

@Override
public SchemaMetadata getSchemaMetadata(String subject, int version, boolean lookupDeletedSchema)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getSchemaMetadata(subject, version, lookupDeletedSchema);
}
}

@Override
public int getVersion(String subject, ParsedSchema schema, boolean normalize)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getVersion(subject, schema, normalize);
}
}

@Override
public List<Integer> getAllVersions(String subject, boolean lookupDeletedSchema)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getAllVersions(subject, lookupDeletedSchema);
}
}

@Override
public List<String> testCompatibilityVerbose(String subject, ParsedSchema schema)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.testCompatibilityVerbose(subject, schema);
}
}

@Override
public void deleteCompatibility(String subject)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
delegate.deleteCompatibility(subject);
}
}

@Override
public void deleteMode(String subject)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
delegate.deleteMode(subject);
}
}

@Override
public Collection<String> getAllSubjects(boolean lookupDeletedSubject)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getAllSubjects(lookupDeletedSubject);
}
}

@Override
public Collection<String> getAllSubjectsByPrefix(String subjectPrefix)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getAllSubjectsByPrefix(subjectPrefix);
}
}

@Override
public List<Integer> deleteSubject(String subject, boolean isPermanent)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.deleteSubject(subject, isPermanent);
}
}

@Override
public List<Integer> deleteSubject(Map<String, String> requestProperties, String subject, boolean isPermanent)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.deleteSubject(requestProperties, subject, isPermanent);
}
}

@Override
public Integer deleteSchemaVersion(String subject, String version, boolean isPermanent)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.deleteSchemaVersion(subject, version, isPermanent);
}
}

@Override
public Integer deleteSchemaVersion(Map<String, String> requestProperties, String subject, String version, boolean isPermanent)
throws IOException, RestClientException
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.deleteSchemaVersion(requestProperties, subject, version, isPermanent);
}
}

@Override
public ParsedSchema getSchemaById(int id)
throws IOException, RestClientException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.trino.decoder.DispatchingRowDecoderFactory;
Expand Down Expand Up @@ -175,9 +176,15 @@ public void configure(Map<String, ?> configuration)
}

@Override
public Optional<ParsedSchema> parseSchema(String schema, List<SchemaReference> references)
public Optional<ParsedSchema> parseSchema(String schema, List<SchemaReference> references, boolean isNew)
{
return delegate.get().parseSchema(schema, references);
return delegate.get().parseSchema(schema, references, isNew);
}

@Override
public ParsedSchema parseSchemaOrElseThrow(Schema schema, boolean isNew)
{
return delegate.get().parseSchemaOrElseThrow(schema, isNew);
}

private SchemaProvider create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.decoder.avro.AvroBytesDeserializer;
import io.trino.decoder.avro.AvroRowDecoderFactory;
import io.trino.plugin.kafka.KafkaColumnHandle;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericDatumWriter;
Expand Down Expand Up @@ -166,7 +167,7 @@ private static void assertRowsAreEqual(Optional<Map<DecoderColumnHandle, FieldVa
checkState(decodedRow.isPresent(), "decoded row is not present");
for (Map.Entry<DecoderColumnHandle, FieldValueProvider> entry : decodedRow.get().entrySet()) {
String columnName = entry.getKey().getName();
if (expected.get(columnName) == null) {
if (getValue(expected, columnName) == null) {
// The record uses the old schema and does not contain the new field.
assertTrue(entry.getValue().isNull());
}
Expand All @@ -176,6 +177,20 @@ private static void assertRowsAreEqual(Optional<Map<DecoderColumnHandle, FieldVa
}
}

public static Object getValue(GenericRecord record, String columnName)
{
try {
return record.get(columnName);
}
catch (AvroRuntimeException e) {
if (e.getMessage().contains("Not a valid schema field")) {
return null;
}

throw e;
}
}

private static void assertValuesAreEqual(FieldValueProvider actual, Object expected, Schema schema)
{
if (actual.isNull()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void testAmbiguousOverriddenSubject()
.build(),
new SchemaTableName(DEFAULT_NAME, format("%s&value-subject=%s", topicName, overriddenSubject))))
.isInstanceOf(TrinoException.class)
.hasMessage("Subject 'ambiguousoverriddensubject' is ambiguous, and may refer to one of the following: ambiguousOverriddenSubject, AMBIGUOUSOVERRIDDENSUBJECT");
.hasMessage("Subject 'ambiguousoverriddensubject' is ambiguous, and may refer to one of the following: AMBIGUOUSOVERRIDDENSUBJECT, ambiguousOverriddenSubject");
}

private KafkaTopicDescription getKafkaTopicDescription(TableDescriptionSupplier tableDescriptionSupplier, SchemaTableName schemaTableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import io.trino.testing.kafka.TestingKafka;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -254,7 +255,7 @@ private static String getExpectedValues(List<ProducerRecord<Long, GenericRecord>
private static void addExpectedColumns(Schema schema, GenericRecord record, ImmutableList.Builder<String> columnsBuilder)
{
for (Schema.Field field : schema.getFields()) {
Object value = record.get(field.name());
Object value = getValue(record, field.name());
if (value == null && field.schema().getType().equals(Schema.Type.UNION) && field.schema().getTypes().contains(Schema.create(Schema.Type.NULL))) {
if (field.schema().getTypes().contains(Schema.create(Schema.Type.DOUBLE))) {
columnsBuilder.add("CAST(null AS double)");
Expand All @@ -275,6 +276,20 @@ else if (field.schema().getType().equals(Schema.Type.LONG)) {
}
}

public static Object getValue(GenericRecord record, String columnName)
{
try {
return record.get(columnName);
}
catch (AvroRuntimeException e) {
if (e.getMessage().contains("Not a valid schema field")) {
return null;
}

throw e;
}
}

private void assertNotExists(String tableName)
{
if (schemaExists()) {
Expand Down
2 changes: 0 additions & 2 deletions plugin/trino-mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<mongo-java.version>4.4.0</mongo-java.version>
<netty.version>4.1.72.Final</netty.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -194,7 +193,6 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
<scope>test</scope>
</dependency>

Expand Down
Loading

0 comments on commit d85df0d

Please sign in to comment.