From 514025de32acb9ed077cd901fa19dafc74d2aa41 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Tue, 10 Mar 2020 16:42:06 -0700 Subject: [PATCH] fix: create schemas at topic creation (#4717) --- .../inference/SchemaRegisterInjector.java | 123 ++++++++ .../confluent/ksql/statement/Injectors.java | 4 +- .../inference/SchemaRegisterInjectorTest.java | 266 ++++++++++++++++++ .../java/io/confluent/ksql/serde/Format.java | 2 +- .../ksql/serde/connect/ConnectFormat.java | 2 +- 5 files changed, 394 insertions(+), 3 deletions(-) create mode 100644 ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.java create mode 100644 ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java diff --git a/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.java b/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.java new file mode 100644 index 000000000000..2f6b8be9832d --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.java @@ -0,0 +1,123 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.schema.ksql.inference; + +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; +import io.confluent.ksql.parser.tree.CreateAsSelect; +import io.confluent.ksql.parser.tree.CreateSource; +import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatFactory; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.services.SandboxedServiceContext; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.statement.Injector; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.KsqlStatementException; +import io.confluent.ksql.util.PersistentQueryMetadata; +import java.io.IOException; +import java.util.Objects; + +public class SchemaRegisterInjector implements Injector { + + private final KsqlExecutionContext executionContext; + private final ServiceContext serviceContext; + + public SchemaRegisterInjector( + final KsqlExecutionContext executionContext, + final ServiceContext serviceContext + ) { + this.executionContext = Objects.requireNonNull(executionContext, "executionContext"); + this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); + } + + @SuppressWarnings("unchecked") + @Override + public ConfiguredStatement inject( + final ConfiguredStatement statement + ) { + if (statement.getStatement() instanceof CreateAsSelect) { + registerForCreateAs((ConfiguredStatement) statement); + } else if (statement.getStatement() instanceof CreateSource) { + registerForCreateSource((ConfiguredStatement) statement); + } + + return statement; + } + + private void registerForCreateSource(final ConfiguredStatement cs) { + // since this injector is chained after the TopicCreateInjector, + // we can assume that the kafka topic is always present in the + // statement properties + registerSchema( + cs.getStatement().getElements().toLogicalSchema(false), + cs.getStatement().getProperties().getKafkaTopic(), + cs.getStatement().getProperties().getFormatInfo(), + cs.getConfig(), + cs.getStatementText() + ); + } + + private void registerForCreateAs(final ConfiguredStatement cas) { + final ServiceContext sandboxServiceContext = SandboxedServiceContext.create(serviceContext); + final ExecuteResult executeResult = executionContext + .createSandbox(sandboxServiceContext) + .execute(sandboxServiceContext, cas); + + final PersistentQueryMetadata queryMetadata = (PersistentQueryMetadata) executeResult + .getQuery() + .orElseThrow(() -> new KsqlStatementException( + "Could not determine output schema for query due to error: " + + executeResult.getCommandResult(), + cas.getStatementText() + )); + + registerSchema( + queryMetadata.getLogicalSchema(), + queryMetadata.getResultTopic().getKafkaTopicName(), + queryMetadata.getResultTopic().getValueFormat().getFormatInfo(), + cas.getConfig(), + cas.getStatementText() + ); + } + + private void registerSchema( + final LogicalSchema schema, + final String topic, + final FormatInfo formatInfo, + final KsqlConfig config, + final String statementText + ) { + final Format format = FormatFactory.of(formatInfo); + if (format.supportsSchemaInference() + && config.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY) != null + && !config.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY).isEmpty()) { + try { + serviceContext.getSchemaRegistryClient().register( + topic + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX, + format.toParsedSchema(schema.withoutMetaAndKeyColsInValue().value(), formatInfo) + ); + } catch (IOException | RestClientException e) { + throw new KsqlStatementException("Could not register schema for topic.", statementText, e); + } + } + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/statement/Injectors.java b/ksql-engine/src/main/java/io/confluent/ksql/statement/Injectors.java index 56bd5166b747..8f7b0a866822 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/statement/Injectors.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/statement/Injectors.java @@ -17,6 +17,7 @@ import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.schema.ksql.inference.DefaultSchemaInjector; +import io.confluent.ksql.schema.ksql.inference.SchemaRegisterInjector; import io.confluent.ksql.schema.ksql.inference.SchemaRegistryTopicSchemaSupplier; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.topic.TopicCreateInjector; @@ -29,7 +30,8 @@ public enum Injectors implements BiFunction InjectorChain.of( new DefaultSchemaInjector( new SchemaRegistryTopicSchemaSupplier(sc.getSchemaRegistryClient())), - new TopicCreateInjector(ec, sc) + new TopicCreateInjector(ec, sc), + new SchemaRegisterInjector(ec, sc) )), DEFAULT((ec, sc) -> InjectorChain.of( diff --git a/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java new file mode 100644 index 000000000000..992412b18c73 --- /dev/null +++ b/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java @@ -0,0 +1,266 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.schema.ksql.inference; + +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; +import io.confluent.ksql.execution.ddl.commands.KsqlTopic; +import io.confluent.ksql.function.InternalFunctionRegistry; +import io.confluent.ksql.metastore.MetaStoreImpl; +import io.confluent.ksql.metastore.MutableMetaStore; +import io.confluent.ksql.metastore.model.KeyField; +import io.confluent.ksql.metastore.model.KsqlStream; +import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.name.SourceName; +import io.confluent.ksql.parser.DefaultKsqlParser; +import io.confluent.ksql.parser.KsqlParser; +import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.FormatFactory; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.KeyFormat; +import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.serde.ValueFormat; +import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlStatementException; +import io.confluent.ksql.util.PersistentQueryMetadata; +import io.confluent.ksql.util.SchemaUtil; +import java.io.IOException; +import java.util.HashMap; +import java.util.Optional; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class SchemaRegisterInjectorTest { + + private static final LogicalSchema SCHEMA = LogicalSchema.builder() + .withRowTime() + .keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING) + .valueColumn(ColumnName.of("F1"), SqlTypes.STRING) + .build(); + + private static final AvroSchema AVRO_SCHEMA = new AvroSchema( + "{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\"," + + "\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":" + + "[{\"name\":\"F1\",\"type\":[\"null\",\"string\"],\"default\":null}]," + + "\"connect.name\":\"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema\"}"); + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Mock + private ServiceContext serviceContext; + @Mock + private SchemaRegistryClient schemaRegistryClient; + @Mock + private KafkaTopicClient topicClient; + @Mock + private KsqlExecutionContext executionContext; + @Mock + private KsqlExecutionContext executionSandbox; + @Mock + private PersistentQueryMetadata queryMetadata; + + private KsqlParser parser = new DefaultKsqlParser(); + + private MutableMetaStore metaStore; + private KsqlConfig config; + private SchemaRegisterInjector injector; + private ConfiguredStatement statement; + + @Before + public void setUp() { + metaStore = new MetaStoreImpl(new InternalFunctionRegistry()); + config = new KsqlConfig(ImmutableMap.of( + KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY, "foo:8081" + )); + injector = new SchemaRegisterInjector(executionContext, serviceContext); + + when(serviceContext.getSchemaRegistryClient()).thenReturn(schemaRegistryClient); + when(serviceContext.getTopicClient()).thenReturn(topicClient); + + when(executionContext.createSandbox(any())).thenReturn(executionSandbox); + + when(queryMetadata.getLogicalSchema()).thenReturn(SCHEMA); + when(queryMetadata.getResultTopic()).thenReturn(new KsqlTopic( + "SINK", + KeyFormat.of(FormatInfo.of(FormatFactory.KAFKA.name()), Optional.empty()), + ValueFormat.of(FormatInfo.of(FormatFactory.AVRO.name())) + )); + + final KsqlTopic sourceTopic = new KsqlTopic( + "source", + KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name())), + ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name())) + ); + final KsqlStream source = new KsqlStream<>( + "", + SourceName.of("SOURCE"), + SCHEMA, + SerdeOption.none(), + KeyField.none(), + Optional.empty(), + false, + sourceTopic + ); + metaStore.putSource(source); + } + + @Test + public void shouldNotRegisterSchemaIfSchemaRegistryIsDisabled() { + // Given: + config = new KsqlConfig(ImmutableMap.of()); + givenStatement("CREATE STREAM sink (f1 VARCHAR) WITH(kafka_topic='expectedName', value_format='AVRO', partitions=1);"); + + // When: + injector.inject(statement); + + // Then: + verifyNoMoreInteractions(schemaRegistryClient); + } + + @Test + public void shouldNotRegisterSchemaForSchemaRegistryDisabledFormatCreateSource() { + // Given: + givenStatement("CREATE STREAM sink (f1 VARCHAR) WITH(kafka_topic='expectedName', value_format='DELIMITED', partitions=1);"); + + // When: + injector.inject(statement); + + // Then: + verifyNoMoreInteractions(schemaRegistryClient); + } + + @Test + public void shouldRegisterSchemaForSchemaRegistryEnabledFormatCreateSource() + throws IOException, RestClientException { + // Given: + givenStatement("CREATE STREAM sink (f1 VARCHAR) WITH (kafka_topic='expectedName', value_format='AVRO', partitions=1);"); + + // When: + injector.inject(statement); + + // Then: + verify(schemaRegistryClient).register("expectedName-value", AVRO_SCHEMA); + } + + @Test + public void shouldNotRegisterSchemaForSchemaRegistryDisabledFormatCreateAsSelect() { + // Given: + config = new KsqlConfig(ImmutableMap.of()); + givenStatement("CREATE STREAM sink WITH(value_format='DELIMITED') AS SELECT * FROM SOURCE;"); + + // When: + injector.inject(statement); + + // Then: + verifyNoMoreInteractions(schemaRegistryClient); + } + + @Test + public void shouldRegisterSchemaForSchemaRegistryEnabledFormatCreateAsSelect() + throws IOException, RestClientException { + // Given: + givenStatement("CREATE STREAM sink WITH(value_format='AVRO') AS SELECT * FROM SOURCE;"); + + // When: + injector.inject(statement); + + // Then: + verify(schemaRegistryClient).register("SINK-value", AVRO_SCHEMA); + } + + @Test + public void shouldPropagateErrorOnFailureToExecuteQuery() { + // Given: + givenStatement("CREATE STREAM sink WITH(value_format='AVRO') AS SELECT * FROM SOURCE;"); + when(executionSandbox.execute(any(), eq(statement))).thenReturn(ExecuteResult.of("fail!")); + + // Expect: + expectedException.expect(KsqlStatementException.class); + expectedException.expectMessage("Could not determine output schema for query due to error: Optional[fail!]"); + + // When: + injector.inject(statement); + } + + @Test + public void shouldPropagateErrorOnSRClientError() throws IOException, RestClientException { + // Given: + givenStatement("CREATE STREAM sink WITH(value_format='AVRO') AS SELECT * FROM SOURCE;"); + when(schemaRegistryClient.register(anyString(), any(ParsedSchema.class))) + .thenThrow(new IOException("FUBAR")); + + // Expect: + expectedException.expect(KsqlStatementException.class); + expectedException.expectMessage("Could not register schema for topic"); + expectedException.expectCause(hasProperty("message", is("FUBAR"))); + + // When: + injector.inject(statement); + } + + @Test + public void shouldNotExecuteQueryOnOriginalExecutionContext() { + // Given: + config = new KsqlConfig(ImmutableMap.of()); + givenStatement("CREATE STREAM sink WITH(value_format='DELIMITED') AS SELECT * FROM SOURCE;"); + + // When: + injector.inject(statement); + + // Then: + verify(executionContext, Mockito.never()).execute(any(), any(ConfiguredStatement.class)); + verify(executionSandbox, Mockito.times(1)).execute(any(), any(ConfiguredStatement.class)); + } + + private void givenStatement(final String sql) { + final PreparedStatement preparedStatement = + parser.prepare(parser.parse(sql).get(0), metaStore); + statement = ConfiguredStatement.of( + preparedStatement, + new HashMap<>(), + config); + when(executionSandbox.execute(any(), eq(statement))) + .thenReturn(ExecuteResult.of(queryMetadata)); + } + +} \ No newline at end of file diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/Format.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/Format.java index cab6001a5381..044572b70752 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/Format.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/Format.java @@ -97,7 +97,7 @@ default List toColumns(ParsedSchema schema) { * @param formatInfo the format info potentially containing additional info required to convert * @return the {@code ParsedSchema} which will be added to the Schema Registry */ - default ParsedSchema toParsedSchema(List columns, FormatInfo formatInfo) { + default ParsedSchema toParsedSchema(List columns, FormatInfo formatInfo) { throw new KsqlException("Format does not implement Schema Registry support: " + name()); } diff --git a/ksql-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectFormat.java b/ksql-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectFormat.java index ffb94ae743fe..da9f1a08d2a5 100644 --- a/ksql-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectFormat.java +++ b/ksql-serde/src/main/java/io/confluent/ksql/serde/connect/ConnectFormat.java @@ -67,7 +67,7 @@ public List toColumns(final ParsedSchema schema) { } public ParsedSchema toParsedSchema( - final List columns, + final List columns, final FormatInfo formatInfo ) { final SchemaBuilder schemaBuilder = SchemaBuilder.struct();