Skip to content

Commit

Permalink
MINOR: improve Session expiration notice (#6618)
Browse files Browse the repository at this point in the history
Reviewers: Matthias J. Sax <[email protected]>, A. Sophie Blee-Goldman <[email protected]>
  • Loading branch information
vvcephei authored and mjsax committed Apr 29, 2019
1 parent 607cf8f commit b8b153a
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,24 @@ value, context().topic(), context().partition(), context().offset()
tupleForwarder.maybeForward(sessionKey, agg, null);
} else {
LOG.debug(
"Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
key, context().topic(), context().partition(), context().offset(), context().timestamp(), mergedWindow.start(), mergedWindow.end(), closeTime
"Skipping record for expired window. " +
"key=[{}] " +
"topic=[{}] " +
"partition=[{}] " +
"offset=[{}] " +
"timestamp=[{}] " +
"window=[{},{}) " +
"expiration=[{}] " +
"streamTime=[{}]",
key,
context().topic(),
context().partition(),
context().offset(),
context().timestamp(),
mergedWindow.start(),
mergedWindow.end(),
closeTime,
observedStreamTime
);
lateRecordDropSensor.record();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,23 @@ value, context().topic(), context().partition(), context().offset()
tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, sendOldValues ? oldAgg : null);
} else {
log.debug(
"Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
key, context().topic(), context().partition(), context().offset(), context().timestamp(), windowStart, windowEnd, closeTime
"Skipping record for expired window. " +
"key=[{}] " +
"topic=[{}] " +
"partition=[{}] " +
"offset=[{}] " +
"timestamp=[{}] " +
"window=[{},{}) " +
"expiration=[{}] " +
"streamTime=[{}]",
key,
context().topic(),
context().partition(),
context().offset(),
context().timestamp(),
windowStart, windowEnd,
closeTime,
observedStreamTime
);
lateRecordDropSensor.record();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,9 @@ public void shouldLogAndMeterWhenSkippingLateRecord() {
greaterThan(0.0));
assertThat(
appender.getMessages(),
hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]"));
hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10] streamTime=[20]"));
assertThat(
appender.getMessages(),
hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10]"));
hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10] streamTime=[20]"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,13 @@ public void shouldLogAndMeterWhenSkippingExpiredWindow() {
);

assertThat(appender.getMessages(), hasItems(
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]"
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10] streamTime=[100]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10] streamTime=[100]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10] streamTime=[100]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10] streamTime=[100]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10] streamTime=[100]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10] streamTime=[100]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10] streamTime=[100]"
));

OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@95/105]", "+100", 100);
Expand Down Expand Up @@ -342,13 +342,13 @@ public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() {
assertLatenessMetrics(driver, is(7.0), is(194.0), is(97.375));

assertThat(appender.getMessages(), hasItems(
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110]"
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110] streamTime=[200]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110] streamTime=[200]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110] streamTime=[200]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110] streamTime=[200]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110] streamTime=[200]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110] streamTime=[200]",
"Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110] streamTime=[200]"
));

OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@200/210]", "+100", 200);
Expand Down

0 comments on commit b8b153a

Please sign in to comment.