Skip to content

Commit

Permalink
feat: remove unnecessary changelog for topics (#3987)
Browse files Browse the repository at this point in the history
* refactor: remove unnecessary stream to table conversion

BREAKING CHANGE: KSQL will now, by default, not create duplicate changelog for table sources.

fixes: #3621

Now that Kafka Steams has a `KTable.transformValues` we no longer need to create a table by first creating a stream, then doing a select/groupby/aggregate on it. Instead, we can just use `StreamBuilder.table`.

This change makes the switch, removing the `StreamToTable` types and calls and replaces them with either `TableSource` or `WindowedTableSource`, copying the existing pattern for `StreamSource` and `WindowedStreamSource`.

It also reinstates a change in `KsqlConfig` that ensures topology optimisations are on by default. This was the case for 5.4.x, but was inadvertently turned off.

With the optimisation config turned on, and the new builder step used, KSQL no longer creates a changelog topic to back the tables state store. This is not needed as the source topic is itself the changelog.  The change includes new tests in `table.json` to confirm the change log topic is not created by default and is created if the user turns off optimisations.

This change also removes the line in the `TestExecutor` that explicitly sets topology optimisations to `all`. The test should _not_ of being doing tis. This may been why the bug turning off optimisations was not detected.

* chore: remove the reduce step from the expected schemas

The state store backing a table no longer requires explicit instantiation, so no value serde is being created, and its the creating of the value serde that results in a line in the expected schemas.

To put this another way, the state store now uses the same serde as the source topic.

* chore: rewrite expected topologies for tables

The expected topology for tables has changed and needs updating.

* chore: remove reduce step from expected schemas for joins

missed this previously

* chore: fix tests

* chore: post merge fix up

* chore: update JSON schema for command topic

* chore: fix test

* chore: fix test post merge

* chore: fix test post merge
  • Loading branch information
big-andy-coates authored Nov 28, 2019
1 parent 4e2f1cf commit 6e0d00e
Show file tree
Hide file tree
Showing 126 changed files with 3,128 additions and 3,272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,13 @@ void defineCurrent(final ConfigDef configDef) {
}

private static final Collection<CompatibilityBreakingStreamsConfig>
COMPATIBILITY_BREAKING_STREAMS_CONFIGS = ImmutableList.of();
COMPATIBILITY_BREAKING_STREAMS_CONFIGS = ImmutableList.of(
// Turn on optimizations by default, unless the user explicitly disables in config:
new CompatibilityBreakingStreamsConfig(
StreamsConfig.TOPOLOGY_OPTIMIZATION,
StreamsConfig.OPTIMIZE,
StreamsConfig.OPTIMIZE)
);

private static final class CompatibilityBreakingStreamsConfig {
final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.timestamp.PartialStringToTimestampParser;
import io.confluent.ksql.util.timestamp.StringToTimestampParser;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import io.confluent.ksql.util.timestamp.StringToTimestampParser;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import org.hamcrest.Description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.structured.SchemaKSourceFactory;
import io.confluent.ksql.structured.SchemaKStream;
import java.util.List;
import java.util.Map;
Expand All @@ -44,7 +45,6 @@
public class DataSourceNode extends PlanNode {

private static final String SOURCE_OP_NAME = "source";
private static final String REDUCE_OP_NAME = "reduce";

private final DataSource<?> dataSource;
private final SourceName alias;
Expand All @@ -59,7 +59,7 @@ public DataSourceNode(
final SourceName alias,
final List<SelectExpression> selectExpressions
) {
this(id, dataSource, alias, selectExpressions, SchemaKStream::forSource);
this(id, dataSource, alias, selectExpressions, SchemaKSourceFactory::buildSource);
}

DataSourceNode(
Expand Down Expand Up @@ -136,7 +136,7 @@ public <C, R> R accept(final PlanVisitor<C, R> visitor, final C context) {
@Override
public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
final Stacker contextStacker = builder.buildNodeContext(getId().toString());
final SchemaKStream<?> schemaKStream = schemaKStreamFactory.create(
return schemaKStreamFactory.create(
builder,
dataSource,
schema,
Expand All @@ -145,15 +145,6 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
keyField,
alias
);
if (getDataSourceType() == DataSourceType.KSTREAM) {
return schemaKStream;
}
final Stacker reduceContextStacker = contextStacker.push(REDUCE_OP_NAME);
return schemaKStream.toTable(
dataSource.getKsqlTopic().getKeyFormat(),
dataSource.getKsqlTopic().getValueFormat(),
reduceContextStacker
);
}

interface SchemaKStreamFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,27 @@

package io.confluent.ksql.query;

import static java.util.Objects.requireNonNull;

import java.util.Map;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;

public interface KafkaStreamsBuilder {
KafkaStreams buildKafkaStreams(StreamsBuilder builder, Map<String, Object> conf);
BuildResult buildKafkaStreams(StreamsBuilder builder, Map<String, Object> conf);

class BuildResult {

final Topology topology;
final KafkaStreams kafkaStreams;

public BuildResult(
final Topology topology,
final KafkaStreams kafkaStreams
) {
this.topology = requireNonNull(topology, "topology");
this.kafkaStreams = requireNonNull(kafkaStreams, "kafkaStreams");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,25 @@
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;

public class KafkaStreamsBuilderImpl implements KafkaStreamsBuilder {

private final KafkaClientSupplier clientSupplier;

public KafkaStreamsBuilderImpl(final KafkaClientSupplier clientSupplier) {
Objects.requireNonNull(clientSupplier, "clientSupplier can't be null");
this.clientSupplier = clientSupplier;
KafkaStreamsBuilderImpl(final KafkaClientSupplier clientSupplier) {
this.clientSupplier = Objects.requireNonNull(clientSupplier, "clientSupplier");
}

@Override
public KafkaStreams buildKafkaStreams(
public BuildResult buildKafkaStreams(
final StreamsBuilder builder,
final Map<String, Object> conf) {

final Map<String, Object> conf
) {
final Properties props = new Properties();
props.putAll(conf);
return new KafkaStreams(builder.build(), props, clientSupplier);
final Topology topology = builder.build(props);
final KafkaStreams kafkaStreams = new KafkaStreams(topology, props, clientSupplier);
return new BuildResult(topology, kafkaStreams);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.confluent.ksql.metrics.ConsumerCollector;
import io.confluent.ksql.metrics.ProducerCollector;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.KafkaStreamsBuilder.BuildResult;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.GenericKeySerDe;
Expand Down Expand Up @@ -66,7 +67,6 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

Expand Down Expand Up @@ -160,23 +160,23 @@ public TransientQueryMetadata buildTransientQuery(

final Map<String, Object> streamsProperties = buildStreamsProperties(applicationId, queryId);

final KafkaStreams streams =
final BuildResult built =
kafkaStreamsBuilder.buildKafkaStreams(streamsBuilder, streamsProperties);

streams.setUncaughtExceptionHandler(new KafkaStreamsUncaughtExceptionHandler());
built.kafkaStreams.setUncaughtExceptionHandler(new KafkaStreamsUncaughtExceptionHandler());

final LogicalSchema transientSchema = buildTransientQuerySchema(schema);

return new TransientQueryMetadata(
statementText,
streams,
built.kafkaStreams,
transientSchema,
sources,
queue::setLimitHandler,
planSummary,
queue.getQueue(),
applicationId,
streamsBuilder.build(),
built.topology,
streamsProperties,
overrides,
queryCloseCallback
Expand Down Expand Up @@ -209,24 +209,24 @@ public PersistentQueryMetadata buildQuery(
queryId
);
final Map<String, Object> streamsProperties = buildStreamsProperties(applicationId, queryId);
final KafkaStreams streams =
final BuildResult built =
kafkaStreamsBuilder.buildKafkaStreams(streamsBuilder, streamsProperties);
final Topology topology = streamsBuilder.build();

final PhysicalSchema querySchema = PhysicalSchema.from(
sinkDataSource.getSchema(),
sinkDataSource.getSerdeOptions()
);
final Optional<MaterializationProvider> materializationBuilder = getMaterializationInfo(result)
.flatMap(info -> buildMaterializationProvider(
info,
streams,
built.kafkaStreams,
querySchema,
sinkDataSource.getKsqlTopic().getKeyFormat(),
streamsProperties
));
return new PersistentQueryMetadata(
statementText,
streams,
built.kafkaStreams,
querySchema,
sources,
sinkDataSource.getName(),
Expand All @@ -236,7 +236,7 @@ public PersistentQueryMetadata buildQuery(
materializationBuilder,
applicationId,
sinkDataSource.getKsqlTopic(),
topology,
built.topology,
ksqlQueryBuilder.getSchemas(),
streamsProperties,
overrides,
Expand Down
Loading

0 comments on commit 6e0d00e

Please sign in to comment.