Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support WindowStart() and WindowEnd() in pull queries #4435

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

import com.google.common.collect.Streams;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;

import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,20 @@

package io.confluent.ksql.util;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import java.util.List;
import java.util.Set;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import java.util.List;
import java.util.Set;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

public class ReservedInternalTopicsTest {
private static final String KSQL_PROCESSING_LOG_TOPIC = "default_ksql_processing_log";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@
import io.confluent.ksql.metrics.ConsumerCollector;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.metrics.ProducerCollector;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.ReservedInternalTopics;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

import io.confluent.ksql.util.ReservedInternalTopics;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.common.collect.Sets;
import io.confluent.ksql.topic.TopicProperties;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -31,7 +30,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.exception.KafkaTopicExistsException;
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.util.ReservedInternalTopics;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -46,8 +47,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import io.confluent.ksql.util.ReservedInternalTopics;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.confluent.ksql.serde.connect.ConnectSchemaTranslator;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import org.apache.avro.Schema;
import org.junit.Before;
import org.junit.Rule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;

Expand Down Expand Up @@ -60,15 +61,15 @@ public boolean hasSelects() {
return !windowSelects.isEmpty();
}

public <K> KsqlTransformer<Windowed<K>, GenericRow> getTransformer() {
return new Transformer<>();
public KsqlTransformer<Windowed<Struct>, GenericRow> getTransformer() {
return new Transformer();
}

private final class Transformer<K> implements KsqlTransformer<Windowed<K>, GenericRow> {
private final class Transformer implements KsqlTransformer<Windowed<Struct>, GenericRow> {

@Override
public GenericRow transform(
final Windowed<K> readOnlyKey,
final Windowed<Struct> readOnlyKey,
final GenericRow value,
final KsqlProcessingContext ctx
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@
*/
public final class StructKeyUtil {

private static final Schema ROWKEY_STRUCT_SCHEMA = SchemaBuilder
.struct()
.field(SchemaUtil.ROWKEY_NAME.name(), Schema.OPTIONAL_STRING_SCHEMA)
.build();

private static final org.apache.kafka.connect.data.Field ROWKEY_FIELD =
ROWKEY_STRUCT_SCHEMA.fields().get(0);

private StructKeyUtil() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.transform.KsqlProcessingContext;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.function.KsqlAggregateFunction;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
Expand All @@ -40,6 +43,9 @@
@RunWith(MockitoJUnitRunner.class)
public class WindowSelectMapperTest {

private static final Struct A_KEY = StructKeyUtil.keyBuilder(SqlTypes.STRING)
.build("key");

@Mock
private KsqlAggregateFunction<?, ?, ?> windowStartFunc;
@Mock
Expand Down Expand Up @@ -77,7 +83,7 @@ public void shouldDetectWindowEndSelects() {
@Test
public void shouldUpdateRowWithWindowBounds() {
// Given:
final KsqlTransformer<Windowed<Object>, GenericRow> mapper = new WindowSelectMapper(
final KsqlTransformer<Windowed<Struct>, GenericRow> mapper = new WindowSelectMapper(
1,
ImmutableList.of(otherFunc, windowStartFunc, windowEndFunc, windowStartFunc)
).getTransformer();
Expand All @@ -86,7 +92,7 @@ public void shouldUpdateRowWithWindowBounds() {
final GenericRow row = new GenericRow(Arrays.asList(0, 1, 2, 3, 4, 5));

// When:
final GenericRow result = mapper.transform(new Windowed<>("k", window), row, ctx);
final GenericRow result = mapper.transform(new Windowed<>(A_KEY, window), row, ctx);

// Then:
assertThat(result, is(sameInstance(row)));
Expand All @@ -96,7 +102,7 @@ public void shouldUpdateRowWithWindowBounds() {
@Test(expected = IndexOutOfBoundsException.class)
public void shouldThrowIfRowNotBigEnough() {
// Given:
final KsqlTransformer<Windowed<Object>, GenericRow> mapper = new WindowSelectMapper(
final KsqlTransformer<Windowed<Struct>, GenericRow> mapper = new WindowSelectMapper(
0,
ImmutableList.of(windowStartFunc)
).getTransformer();
Expand All @@ -105,6 +111,6 @@ public void shouldThrowIfRowNotBigEnough() {
final GenericRow row = new GenericRow(new ArrayList<>());

// When:
mapper.transform(new Windowed<>("k", window), row, ctx);
mapper.transform(new Windowed<>(A_KEY, window), row, ctx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,28 @@
{"row":{"columns":["10", "2020-01-29 17:19:45 +0000", "2020-01-29 17:19:46 +0000", 1]}}
]}
]
},
{
"name": "windowStart and windowEnd UDAFs",
"statements": [
"CREATE STREAM ALL_TWEETS (LANG STRING) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE TWEET_LANG AS SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WSTART, TIMESTAMPTOSTRING(WINDOWEND(),'yyyy-MM-dd HH:mm:ss Z', 'UTC') AS WEND, LANG, COUNT(*) AS TWEET_COUNT FROM ALL_TWEETS WINDOW TUMBLING (SIZE 1 SECOND) GROUP BY LANG;",
"SELECT WSTART, WEND, LANG, TWEET_COUNT FROM TWEET_LANG WHERE ROWKEY='en';"
],
"inputs": [
{"topic": "test_topic", "timestamp": 1580223282123, "value": {"lang": "en"}},
{"topic": "test_topic", "timestamp": 1580223282323, "value": {"lang": "en"}},
{"topic": "test_topic", "timestamp": 1580223283123, "value": {"lang": "en"}}
],
"responses": [
{"admin": {"@type": "currentStatus"}},
{"admin": {"@type": "currentStatus"}},
{"query": [
{"header":{"schema":"`WSTART` STRING, `WEND` STRING, `LANG` STRING, `TWEET_COUNT` BIGINT"}},
{"row":{"columns":["2020-01-28 14:54:42 +0000", "2020-01-28 14:54:43 +0000", "en", 2]}},
{"row":{"columns":["2020-01-28 14:54:43 +0000", "2020-01-28 14:54:44 +0000", "en", 1]}}
]}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.util.ReservedInternalTopics;

import java.io.Closeable;
import java.util.Collections;
import java.util.Objects;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.materialization.Row;
import io.confluent.ksql.execution.streams.materialization.TableRow;
import io.confluent.ksql.execution.streams.materialization.Window;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder;
Expand All @@ -36,6 +35,8 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.junit.Test;

public class TableRowsEntityFactoryTest {
Expand Down Expand Up @@ -87,21 +88,19 @@ public void shouldAddNonWindowedRowToValues() {
public void shouldAddWindowedRowToValues() {
// Given:
final Instant now = Instant.now();
final Window window0 = Window.of(now, now.plusMillis(2));
final Window window1 = Window.of(now, now.plusMillis(1));
final TimeWindow window0 = new TimeWindow(now.toEpochMilli(), now.plusMillis(2).toEpochMilli());
final TimeWindow window1 = new TimeWindow(now.toEpochMilli(), now.plusMillis(1).toEpochMilli());

final List<? extends TableRow> input = ImmutableList.of(
WindowedRow.of(
SIMPLE_SCHEMA,
STRING_KEY_BUILDER.build("x"),
window0,
new Windowed<>(STRING_KEY_BUILDER.build("x"), window0),
GenericRow.genericRow(true),
ROWTIME
),
WindowedRow.of(
SIMPLE_SCHEMA,
STRING_KEY_BUILDER.build("y"),
window1,
new Windowed<>(STRING_KEY_BUILDER.build("y"), window1),
GenericRow.genericRow(false),
ROWTIME
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.test.util.secure.ClientTrustStore;
import io.confluent.ksql.util.PageViewDataProvider;
import java.io.IOException;
import java.util.Comparator;
Expand All @@ -30,7 +29,6 @@
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import io.confluent.ksql.rest.entity.TablesList;
import io.confluent.ksql.rest.server.context.KsqlSecurityContextBinder;
import io.confluent.ksql.rest.server.services.TestDefaultKsqlClientFactory;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.security.KsqlSecurityExtension;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.confluent.ksql.execution.expression.tree.StringLiteral;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.testing.EqualsTester;
import com.google.common.testing.NullPointerTester;
import java.util.Optional;
import org.junit.Test;

public class ValueFormatTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.execution.plan.StreamAggregate;
import io.confluent.ksql.execution.plan.StreamWindowedAggregate;
import io.confluent.ksql.execution.streams.transform.KsTransformer;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.execution.transform.window.WindowSelectMapper;
import io.confluent.ksql.execution.windows.HoppingWindowExpression;
import io.confluent.ksql.execution.windows.KsqlWindowExpression;
Expand Down Expand Up @@ -140,6 +141,7 @@ public static KTableHolder<Windowed<Struct>> build(
);
}

@SuppressWarnings({"rawtypes", "unchecked"})
static KTableHolder<Windowed<Struct>> build(
final KGroupedStreamHolder groupedStream,
final StreamWindowedAggregate aggregate,
Expand Down Expand Up @@ -191,6 +193,12 @@ static KTableHolder<Windowed<Struct>> build(
() -> new KsTransformer<>(windowSelectMapper.getTransformer()),
Named.as(StreamsUtil.buildOpName(AggregateBuilderUtils.windowSelectContext(aggregate)))
);

materializationBuilder.map(
pl -> (KsqlTransformer) windowSelectMapper.getTransformer(),
aggregateSchema,
AggregateBuilderUtils.windowSelectContext(aggregate)
);
}

return KTableHolder.materialized(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public MaterializedWindowedTable windowed() {
}

private Optional<GenericRow> filterAndTransform(
final Struct key,
final Object key,
final GenericRow value,
final long rowTime
) {
Expand Down Expand Up @@ -157,7 +157,7 @@ public List<WindowedRow> get(final Struct key, final Range<Instant> windowStart)
final Builder<WindowedRow> builder = ImmutableList.builder();

for (final WindowedRow row : result) {
filterAndTransform(key, row.value(), row.rowTime())
filterAndTransform(row.windowedKey(), row.value(), row.rowTime())
.ifPresent(v -> builder.add(row.withValue(v, schema())));
}

Expand Down
Loading