Skip to content

Commit

Permalink
feat(static): allow windowed queries without bounds on rowtime (#3438)
Browse files Browse the repository at this point in the history
* feat(static): extend types of comparisons supported on window start

With this change it is no possible to have WHERE clause bounding WINDOWSTART using the following comparison types:
 - `>`
 - `>=`
 - `=`
 - `<`
 - `>=`

 Some of which were already supported in certain situations, but are now all full supported.
  • Loading branch information
big-andy-coates authored Oct 1, 2019
1 parent d83c787 commit 6593ee3
Show file tree
Hide file tree
Showing 5 changed files with 334 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,31 @@ class KsMaterializedWindowTable implements MaterializedWindowedTable {
@Override
public List<WindowedRow> get(
final Struct key,
final Range<Instant> windowStart
final Range<Instant> windowStartBounds
) {
try {
final ReadOnlyWindowStore<Struct, GenericRow> store = stateStore
.store(QueryableStoreTypes.windowStore());

final Instant lower = windowStart.lowerEndpoint();
final Instant upper = windowStart.upperEndpoint();
final Instant lower = windowStartBounds.hasLowerBound()
? windowStartBounds.lowerEndpoint()
: Instant.ofEpochMilli(Long.MIN_VALUE);

final Instant upper = windowStartBounds.hasUpperBound()
? windowStartBounds.upperEndpoint()
: Instant.ofEpochMilli(Long.MAX_VALUE);

try (WindowStoreIterator<GenericRow> it = store.fetch(key, lower, upper)) {

final Builder<WindowedRow> builder = ImmutableList.builder();

while (it.hasNext()) {
final KeyValue<Long, GenericRow> next = it.next();
final Window window = Window.of(Instant.ofEpochMilli(next.key), Optional.empty());
builder.add(WindowedRow.of(stateStore.schema(), key, window, next.value));
final Instant windowStart = Instant.ofEpochMilli(next.key);
if (windowStartBounds.contains(windowStart)) {
final Window window = Window.of(windowStart, Optional.empty());
builder.add(WindowedRow.of(stateStore.schema(), key, window, next.value));
}
}

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -64,14 +65,14 @@ public class KsMaterializedSessionTableTest {
.build();

private static final Struct A_KEY = StructKeyUtil.asStructKey("x");
private static final GenericRow A_VALUE = new GenericRow("c0l");

private static final Instant LOWER_INSTANT = Instant.now();
private static final Instant UPPER_INSTANT = LOWER_INSTANT.plusSeconds(10);
private static final Range<Instant> WINDOW_START_BOUNDS = Range.closed(
LOWER_INSTANT,
UPPER_INSTANT
);
private static final GenericRow A_VALUE = new GenericRow("c0l");

@Rule
public final ExpectedException expectedException = ExpectedException.none();
Expand Down Expand Up @@ -199,12 +200,17 @@ public void shouldIgnoreSessionsThatStartAfterUpperBound() {
}

@Test
public void shouldReturnValueIfSessionStartsAtLowerBound() {
public void shouldReturnValueIfSessionStartsAtLowerBoundIfLowerBoundClosed() {
// Given:
final Range<Instant> startBounds = Range.closed(
LOWER_INSTANT,
UPPER_INSTANT
);

givenSingleSession(LOWER_INSTANT, LOWER_INSTANT.plusMillis(1));

// When:
final List<WindowedRow> result = table.get(A_KEY, WINDOW_START_BOUNDS);
final List<WindowedRow> result = table.get(A_KEY, startBounds);

// Then:
assertThat(result, contains(WindowedRow.of(
Expand All @@ -216,12 +222,34 @@ public void shouldReturnValueIfSessionStartsAtLowerBound() {
}

@Test
public void shouldReturnValueIfSessionStartsAtUpperBound() {
public void shouldIgnoreSessionsThatStartAtLowerBoundIfLowerBoundOpen() {
// Given:
final Range<Instant> startBounds = Range.openClosed(
LOWER_INSTANT,
UPPER_INSTANT
);

givenSingleSession(LOWER_INSTANT, LOWER_INSTANT.plusMillis(1));

// When:
final List<WindowedRow> result = table.get(A_KEY, startBounds);

// Then:
assertThat(result, is(empty()));
}

@Test
public void shouldReturnValueIfSessionStartsAtUpperBoundIfUpperBoundClosed() {
// Given:
final Range<Instant> startBounds = Range.closed(
LOWER_INSTANT,
UPPER_INSTANT
);

givenSingleSession(UPPER_INSTANT, UPPER_INSTANT.plusMillis(1));

// When:
final List<WindowedRow> result = table.get(A_KEY, WINDOW_START_BOUNDS);
final List<WindowedRow> result = table.get(A_KEY, startBounds);

// Then:
assertThat(result, contains(WindowedRow.of(
Expand All @@ -232,6 +260,23 @@ public void shouldReturnValueIfSessionStartsAtUpperBound() {
)));
}

@Test
public void shouldIgnoreSessionsThatStartAtUpperBoundIfUpperBoundOpen() {
// Given:
final Range<Instant> startBounds = Range.closedOpen(
LOWER_INSTANT,
UPPER_INSTANT
);

givenSingleSession(UPPER_INSTANT, UPPER_INSTANT.plusMillis(1));

// When:
final List<WindowedRow> result = table.get(A_KEY, startBounds);

// Then:
assertThat(result, is(empty()));
}

@Test
public void shouldReturnValueIfSessionStartsBetweenBounds() {
// Given:
Expand All @@ -252,8 +297,10 @@ public void shouldReturnValueIfSessionStartsBetweenBounds() {
@Test
public void shouldReturnMultipleSessions() {
// Given:
givenSingleSession(LOWER_INSTANT.minusMillis(1), LOWER_INSTANT.plusSeconds(1));
givenSingleSession(LOWER_INSTANT, LOWER_INSTANT);
givenSingleSession(UPPER_INSTANT, UPPER_INSTANT);
givenSingleSession(UPPER_INSTANT.plusMillis(1), UPPER_INSTANT.plusSeconds(1));

// When:
final List<WindowedRow> result = table.get(A_KEY, WINDOW_START_BOUNDS);
Expand All @@ -265,6 +312,19 @@ public void shouldReturnMultipleSessions() {
));
}

@Test
public void shouldReturnAllSessionsForRangeall() {
// Given:
givenSingleSession(Instant.now().minusSeconds(1000), Instant.now().plusSeconds(1000));
givenSingleSession(Instant.now().minusSeconds(1000), Instant.now().plusSeconds(1000));

// When:
final List<WindowedRow> result = table.get(A_KEY, Range.all());

// Then:
assertThat(result, hasSize(2));
}

private void givenSingleSession(
final Instant start,
final Instant end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,13 @@ public void shouldReturnEmptyIfKeyNotPresent() {
}

@Test
public void shouldReturnValueIfKeyPresent() {
public void shouldReturnValuesForClosedBounds() {
// Given:
final Range<Instant> bounds = Range.closed(
Instant.now(),
Instant.now().plusSeconds(10)
);

final GenericRow value1 = new GenericRow("col0");
final GenericRow value2 = new GenericRow("col1");

Expand All @@ -177,19 +182,59 @@ public void shouldReturnValueIfKeyPresent() {
.thenReturn(false);

when(fetchIterator.next())
.thenReturn(new KeyValue<>(1L, value1))
.thenReturn(new KeyValue<>(2L, value2))
.thenReturn(new KeyValue<>(bounds.lowerEndpoint().toEpochMilli(), value1))
.thenReturn(new KeyValue<>(bounds.upperEndpoint().toEpochMilli(), value2))
.thenThrow(new AssertionError());

when(tableStore.fetch(any(), any(), any())).thenReturn(fetchIterator);

// When:
final List<WindowedRow> result = table.get(A_KEY, WINDOW_START_BOUNDS);
final List<WindowedRow> result = table.get(A_KEY, bounds);

// Then:
assertThat(result, contains(
WindowedRow.of(SCHEMA, A_KEY, Window.of(bounds.lowerEndpoint(), Optional.empty()), value1),
WindowedRow.of(SCHEMA, A_KEY, Window.of(bounds.upperEndpoint(), Optional.empty()), value2)
));
}

@Test
public void shouldReturnValuesForOpenBounds() {
// Given:
final Range<Instant> bounds = Range.open(
Instant.now(),
Instant.now().plusSeconds(10)
);

final GenericRow value1 = new GenericRow("col0");
final GenericRow value2 = new GenericRow("col1");
final GenericRow value3 = new GenericRow("col2");

when(fetchIterator.hasNext())
.thenReturn(true)
.thenReturn(true)
.thenReturn(true)
.thenReturn(false);

when(fetchIterator.next())
.thenReturn(new KeyValue<>(bounds.lowerEndpoint().toEpochMilli(), value1))
.thenReturn(new KeyValue<>(bounds.lowerEndpoint().plusMillis(1).toEpochMilli(), value2))
.thenReturn(new KeyValue<>(bounds.upperEndpoint().toEpochMilli(), value3))
.thenThrow(new AssertionError());

when(tableStore.fetch(any(), any(), any())).thenReturn(fetchIterator);

// When:
final List<WindowedRow> result = table.get(A_KEY, bounds);

// Then:
assertThat(result, contains(
WindowedRow.of(SCHEMA, A_KEY, Window.of(Instant.ofEpochMilli(1), Optional.empty()), value1),
WindowedRow.of(SCHEMA, A_KEY, Window.of(Instant.ofEpochMilli(2), Optional.empty()), value2)
WindowedRow.of(
SCHEMA,
A_KEY,
Window.of(bounds.lowerEndpoint().plusMillis(1), Optional.empty()),
value2
)
));
}

Expand All @@ -202,10 +247,12 @@ public void shouldMaintainResultOrder() {
.thenReturn(true)
.thenReturn(false);

final Instant start = WINDOW_START_BOUNDS.lowerEndpoint();

when(fetchIterator.next())
.thenReturn(new KeyValue<>(1L, new GenericRow("a")))
.thenReturn(new KeyValue<>(3L, new GenericRow("b")))
.thenReturn(new KeyValue<>(2L, new GenericRow("c")))
.thenReturn(new KeyValue<>(start.toEpochMilli(), new GenericRow("a")))
.thenReturn(new KeyValue<>(start.plusMillis(1).toEpochMilli(), new GenericRow("b")))
.thenReturn(new KeyValue<>(start.plusMillis(2).toEpochMilli(), new GenericRow("c")))
.thenThrow(new AssertionError());

when(tableStore.fetch(any(), any(), any())).thenReturn(fetchIterator);
Expand All @@ -218,21 +265,34 @@ public void shouldMaintainResultOrder() {
WindowedRow.of(
SCHEMA,
A_KEY,
Window.of(Instant.ofEpochMilli(1), Optional.empty()),
Window.of(start, Optional.empty()),
new GenericRow("a")
),
WindowedRow.of(
SCHEMA,
A_KEY,
Window.of(Instant.ofEpochMilli(3), Optional.empty()),
Window.of(start.plusMillis(1), Optional.empty()),
new GenericRow("b")
),
WindowedRow.of(
SCHEMA,
A_KEY,
Window.of(Instant.ofEpochMilli(2), Optional.empty()),
Window.of(start.plusMillis(2), Optional.empty()),
new GenericRow("c")
)
));
}

@Test
public void shouldSupportRangeAll() {
// When:
table.get(A_KEY, Range.all());

// Then:
verify(tableStore).fetch(
A_KEY,
Instant.ofEpochMilli(Long.MIN_VALUE),
Instant.ofEpochMilli(Long.MAX_VALUE)
);
}
}
Loading

0 comments on commit 6593ee3

Please sign in to comment.