Skip to content

Commit

Permalink
refactor: clean up offset reset from plan steps (#4064)
Browse files Browse the repository at this point in the history
Cleans up offset reset from plan steps. All streams now default to
starting from the end. All tables now default to starting from the
first record. The reset can be overridden in the config.
  • Loading branch information
rodesai authored Dec 6, 2019
1 parent aabfb47 commit b6a20b9
Show file tree
Hide file tree
Showing 17 changed files with 120 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.stream.Collectors;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand Down Expand Up @@ -580,8 +579,6 @@ private KsqlConfig(final ConfigGeneration generation, final Map<?, ?> props) {
super(configDef(generation), props);

final Map<String, Object> streamsConfigDefaults = new HashMap<>();
streamsConfigDefaults.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KsqlConstants
.defaultAutoOffsetRestConfig);
streamsConfigDefaults.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, KsqlConstants
.defaultCommitIntervalMsConfig);
streamsConfigDefaults.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ private KsqlConstants() {
public static final short legacyDefaultSinkReplicaCount = 1;
public static final long defaultSinkWindowChangeLogAdditionalRetention = 1000000;

public static final String defaultAutoOffsetRestConfig = "latest";
public static final long defaultCommitIntervalMsConfig = 2000;
public static final long defaultCacheMaxBytesBufferingConfig = 10000000;
public static final int defaultNumberOfStreamsThreads = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ public void shouldSetLogAndContinueExceptionHandlerByDefault() {
assertThat(result, equalTo(LogMetricAndContinueExceptionHandler.class));
}

@Test
public void shouldNotSetAutoOffsetResetByDefault() {
final KsqlConfig ksqlConfig = new KsqlConfig(Collections.emptyMap());
final Object result = ksqlConfig.getKsqlStreamConfigProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
assertThat(result, is(nullValue()));
}

@Test
public void shouldSetLogAndContinueExceptionHandlerWhenFailOnDeserializationErrorFalse() {
final KsqlConfig ksqlConfig = new KsqlConfig(Collections.singletonMap(KsqlConfig.FAIL_ON_DESERIALIZATION_ERROR_CONFIG, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@
import io.confluent.ksql.structured.SchemaKSourceFactory;
import io.confluent.ksql.structured.SchemaKStream;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.Topology.AutoOffsetReset;

@Immutable
public class DataSourceNode extends PlanNode {
Expand Down Expand Up @@ -140,7 +134,6 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
builder,
dataSource,
contextStacker.push(SOURCE_OP_NAME),
getAutoOffsetReset(builder.getKsqlConfig().getKsqlStreamConfigProps()),
keyField,
alias
);
Expand All @@ -152,27 +145,8 @@ SchemaKStream<?> create(
KsqlQueryBuilder builder,
DataSource<?> dataSource,
QueryContext.Stacker contextStacker,
Optional<AutoOffsetReset> offsetReset,
KeyField keyField,
SourceName alias
);
}

private static Optional<Topology.AutoOffsetReset> getAutoOffsetReset(
final Map<String, Object> props) {
final Object offestReset = props.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
if (offestReset == null) {
return Optional.empty();
}

try {
return Optional.of(AutoOffsetReset.valueOf(offestReset.toString().toUpperCase()));
} catch (final Exception e) {
throw new ConfigException(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
offestReset,
"Unknown value"
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.WindowInfo;
import java.util.Optional;
import org.apache.kafka.streams.Topology.AutoOffsetReset;

/**
* Factory class used to create stream and table sources
Expand All @@ -50,7 +48,6 @@ public static SchemaKStream<?> buildSource(
final KsqlQueryBuilder builder,
final DataSource<?> dataSource,
final QueryContext.Stacker contextStacker,
final Optional<AutoOffsetReset> offsetReset,
final KeyField keyField,
final SourceName alias
) {
Expand All @@ -62,14 +59,12 @@ public static SchemaKStream<?> buildSource(
builder,
dataSource,
contextStacker,
offsetReset,
keyField,
alias
) : buildStream(
builder,
dataSource,
contextStacker,
offsetReset,
keyField,
alias
);
Expand All @@ -80,14 +75,12 @@ public static SchemaKStream<?> buildSource(
builder,
dataSource,
contextStacker,
offsetReset,
keyField,
alias
) : buildTable(
builder,
dataSource,
contextStacker,
offsetReset,
keyField,
alias
);
Expand All @@ -101,7 +94,6 @@ private static SchemaKStream<?> buildWindowedStream(
final KsqlQueryBuilder builder,
final DataSource<?> dataSource,
final Stacker contextStacker,
final Optional<AutoOffsetReset> offsetReset,
final KeyField keyField,
final SourceName alias
) {
Expand All @@ -115,7 +107,6 @@ private static SchemaKStream<?> buildWindowedStream(
buildFormats(dataSource),
windowInfo,
dataSource.getTimestampColumn(),
offsetReset,
alias
);

Expand All @@ -132,7 +123,6 @@ private static SchemaKStream<?> buildStream(
final KsqlQueryBuilder builder,
final DataSource<?> dataSource,
final Stacker contextStacker,
final Optional<AutoOffsetReset> offsetReset,
final KeyField keyField,
final SourceName alias
) {
Expand All @@ -146,7 +136,6 @@ private static SchemaKStream<?> buildStream(
dataSource.getKafkaTopicName(),
buildFormats(dataSource),
dataSource.getTimestampColumn(),
offsetReset,
alias
);

Expand All @@ -163,7 +152,6 @@ private static SchemaKTable<?> buildWindowedTable(
final KsqlQueryBuilder builder,
final DataSource<?> dataSource,
final Stacker contextStacker,
final Optional<AutoOffsetReset> offsetReset,
final KeyField keyField,
final SourceName alias
) {
Expand All @@ -177,7 +165,6 @@ private static SchemaKTable<?> buildWindowedTable(
buildFormats(dataSource),
windowInfo,
dataSource.getTimestampColumn(),
offsetReset,
alias
);

Expand All @@ -194,7 +181,6 @@ private static SchemaKTable<?> buildTable(
final KsqlQueryBuilder builder,
final DataSource<?> dataSource,
final Stacker contextStacker,
final Optional<AutoOffsetReset> offsetReset,
final KeyField keyField,
final SourceName alias
) {
Expand All @@ -208,7 +194,6 @@ private static SchemaKTable<?> buildTable(
dataSource.getKafkaTopicName(),
buildFormats(dataSource),
dataSource.getTimestampColumn(),
offsetReset,
alias
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.execution.streams.KSPlanBuilder;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.function.FunctionRegistry;
Expand Down Expand Up @@ -99,7 +98,6 @@ public class DataSourceNodeTest {
= KeyField.of(ColumnRef.withoutSource(ColumnName.of("field1")));
private static final TimestampColumn TIMESTAMP_COLUMN =
new TimestampColumn(TIMESTAMP_FIELD, Optional.empty());
private static final Optional<AutoOffsetReset> OFFSET_RESET = Optional.of(AutoOffsetReset.LATEST);

private final KsqlStream<String> SOME_SOURCE = new KsqlStream<>(
"sqlExpression",
Expand Down Expand Up @@ -166,7 +164,7 @@ public void before() {
when(rowSerde.deserializer()).thenReturn(mock(Deserializer.class));

when(dataSource.getDataSourceType()).thenReturn(DataSourceType.KTABLE);
when(schemaKStreamFactory.create(any(), any(), any(), any(), any(), any()))
when(schemaKStreamFactory.create(any(), any(), any(), any(), any()))
.thenAnswer(inv -> inv.<DataSource<?>>getArgument(1)
.getDataSourceType() == DataSourceType.KSTREAM
? stream : table
Expand Down Expand Up @@ -271,7 +269,7 @@ public void shouldBuildSourceStreamWithCorrectTimestampIndex() {
node.buildStream(ksqlStreamBuilder);

// Then:
verify(schemaKStreamFactory).create(any(), any(), any(), any(), any(), any());
verify(schemaKStreamFactory).create(any(), any(), any(), any(), any());
}

// should this even be possible? if you are using a timestamp extractor then shouldn't the name
Expand All @@ -285,7 +283,7 @@ public void shouldBuildSourceStreamWithCorrectTimestampIndexForQualifiedFieldNam
node.buildStream(ksqlStreamBuilder);

// Then:
verify(schemaKStreamFactory).create(any(), any(), any(), any(), any(), any());
verify(schemaKStreamFactory).create(any(), any(), any(), any(), any());
}

@Test
Expand All @@ -303,7 +301,6 @@ public void shouldBuildSourceStreamWithCorrectParams() {
same(ksqlStreamBuilder),
same(dataSource),
stackerCaptor.capture(),
eq(OFFSET_RESET),
same(node.getKeyField()),
eq(SourceName.of("name"))
);
Expand All @@ -326,7 +323,6 @@ public void shouldBuildSourceStreamWithCorrectParamsWhenBuildingTable() {
same(ksqlStreamBuilder),
same(dataSource),
stackerCaptor.capture(),
eq(OFFSET_RESET),
same(node.getKeyField()),
eq(SourceName.of("name"))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import io.confluent.ksql.util.KsqlConfig;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.streams.Topology.AutoOffsetReset;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -58,8 +57,6 @@

@RunWith(MockitoJUnitRunner.class)
public class SchemaKSourceFactoryTest {
private static final Optional<AutoOffsetReset> OFFSET_RESET = Optional.of(
AutoOffsetReset.EARLIEST);
private static final KeyField KEY_FIELD = KeyField.none();
private static final SourceName ALIAS = SourceName.of("bob");
private static final LogicalSchema SCHEMA = LogicalSchema.builder()
Expand Down Expand Up @@ -125,7 +122,6 @@ public void shouldBuildWindowedStream() {
builder,
dataSource,
contextStacker,
OFFSET_RESET,
KEY_FIELD,
ALIAS
);
Expand All @@ -150,7 +146,6 @@ public void shouldBuildNonWindowedStream() {
builder,
dataSource,
contextStacker,
OFFSET_RESET,
KEY_FIELD,
ALIAS
);
Expand All @@ -175,7 +170,6 @@ public void shouldBuildWindowedTable() {
builder,
dataSource,
contextStacker,
OFFSET_RESET,
KEY_FIELD,
ALIAS
);
Expand All @@ -200,7 +194,6 @@ public void shouldBuildNonWindowedTable() {
builder,
dataSource,
contextStacker,
OFFSET_RESET,
KEY_FIELD,
ALIAS
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.streams.Topology.AutoOffsetReset;

@Immutable
public abstract class AbstractStreamSource<K> implements ExecutionStep<K> {
private final ExecutionStepPropertiesV1 properties;
private final String topicName;
private final Formats formats;
private final Optional<TimestampColumn> timestampColumn;
private final Optional<AutoOffsetReset> offsetReset;
private final LogicalSchema sourceSchema;
private final SourceName alias;

Expand All @@ -48,14 +46,12 @@ public AbstractStreamSource(
String topicName,
Formats formats,
Optional<TimestampColumn> timestampColumn,
Optional<AutoOffsetReset> offsetReset,
LogicalSchema sourceSchema,
SourceName alias) {
this.properties = Objects.requireNonNull(properties, "properties");
this.topicName = Objects.requireNonNull(topicName, "topicName");
this.formats = Objects.requireNonNull(formats, "formats");
this.timestampColumn = Objects.requireNonNull(timestampColumn, "timestampColumn");
this.offsetReset = Objects.requireNonNull(offsetReset, "offsetReset");
this.sourceSchema = Objects.requireNonNull(sourceSchema, "sourceSchema");
this.alias = Objects.requireNonNull(alias, "alias");
}
Expand All @@ -74,10 +70,6 @@ public LogicalSchema getSourceSchema() {
return sourceSchema;
}

public Optional<AutoOffsetReset> getOffsetReset() {
return offsetReset;
}

public Optional<TimestampColumn> getTimestampColumn() {
return timestampColumn;
}
Expand Down Expand Up @@ -107,7 +99,6 @@ public boolean equals(Object o) {
&& Objects.equals(topicName, that.topicName)
&& Objects.equals(formats, that.formats)
&& Objects.equals(timestampColumn, that.timestampColumn)
&& Objects.equals(offsetReset, that.offsetReset)
&& Objects.equals(sourceSchema, that.sourceSchema);
}

Expand All @@ -118,7 +109,6 @@ public int hashCode() {
topicName,
formats,
timestampColumn,
offsetReset,
sourceSchema
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.Optional;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.Topology.AutoOffsetReset;

@Immutable
public final class StreamSource extends AbstractStreamSource<KStreamHolder<Struct>> {
Expand All @@ -30,15 +29,13 @@ public StreamSource(
@JsonProperty(value = "topicName", required = true) String topicName,
@JsonProperty(value = "formats", required = true) Formats formats,
@JsonProperty("timestampColumn") Optional<TimestampColumn> timestampColumn,
@JsonProperty("offsetReset") Optional<AutoOffsetReset> offsetReset,
@JsonProperty(value = "sourceSchema", required = true) LogicalSchema sourceSchema,
@JsonProperty(value = "alias", required = true) SourceName alias) {
super(
properties,
topicName,
formats,
timestampColumn,
offsetReset,
sourceSchema,
alias
);
Expand Down
Loading

0 comments on commit b6a20b9

Please sign in to comment.