Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3902 #1556

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3827d91
remove verify left join from KStreamRepartitionJoinTest as it is caus…
dguy Jun 24, 2016
e07388b
KAFKA-3902
phderome Jun 26, 2016
3563044
KAFKA-3902
phderome Jun 26, 2016
721e00d
KAFKA-3902
phderome Jun 26, 2016
0a93d16
Make the JoinWindow in KStreamRepartitionJoinTest much larger (1 minu…
dguy Jun 27, 2016
1919fa5
Merge branch 'kafka-3896' of https://github.com/dguy/kafka into DEROM…
phderome Jun 27, 2016
aa036cb
Merge branch 'trunk' of https://github.com/apache/kafka into DEROME-3902
phderome Jun 27, 2016
a607e24
avoid unconditional materialization as per Guozhang's explanation and…
phderome Jun 28, 2016
e6beae8
Fix test cases accordingly so only the oldValue null, null are suppre…
phderome Jun 28, 2016
a8f9ef7
clears state after a check as it used to be in the first place, thus …
phderome Jun 28, 2016
7c277cf
revert the double null filter addition in KTableFilter and add a mean…
phderome Jun 29, 2016
73479d3
revert the double null filter addition in KTableFilter and add a mean…
phderome Jun 29, 2016
b039e4e
keep Guozhang's 2nd fix
phderome Jun 29, 2016
356b0a6
suppress nulls more aggressively than originally discussed, seems mor…
phderome Jun 30, 2016
66aada0
Merge branch 'trunk' of https://github.com/apache/kafka into DEROME-3902
phderome Jun 30, 2016
b2f5c06
suppress nulls more aggressively than originally discussed, seems mor…
phderome Jun 30, 2016
0fe41c8
revert back to Guozhang's suggestions.
phderome Jun 30, 2016
5b19a71
revert back to Guozhang's suggestions.
phderome Jul 1, 2016
c1e4ddf
Merge branch 'trunk' of https://github.com/apache/kafka into DEROME-3902
phderome Jul 1, 2016
117c661
removed unnecessary/incorrect comments.
phderome Jul 1, 2016
525d1b4
simplified unit test testSkipNullOnMaterialization as per feedback.
phderome Jul 1, 2016
1dccdba
simplified unit test testSkipNullOnMaterialization as per feedback.
phderome Jul 1, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ public void process(K key, Change<V> change) {
V newValue = computeValue(key, change.newValue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a check on the original change value that newValue and oldValue cannot be both null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean something semantically equivalent to
if (change.oldValue == null && change.newValue == null) return;
or rather
if (sendOldValues && change.oldValue == null && change.newValue == null) return;

I'd like to understand whether you're addressing case 2 alone or case 2 and 3. Once I am clear on what you mean, I can make change and test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually never mind, I was originally thinking that after this patch we should if (change.oldValue == null && change.newValue == null) throw StreamsException, since it should not happen; but I just realized a mapValues operator can still make both values to be null and pass it to the downstream filter.

V oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;

if (sendOldValues && oldValue == null && newValue == null) return; // unnecessary to forward here.
if ((sendOldValues && oldValue == null && newValue == null) ||
(change.oldValue == null && change.newValue == null)) return; // unnecessary to forward here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think we do not need the second condition here, since a better fix would be in the mapValues function, do not forward if both are null, instead of checking them here.

Also minor comment: use a new line for "return" for better debugging, and move the comment on top of line 80 as "if both the new and old values are null after the filtering, do not need to forward downstream anymore".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the minor comment is not addressed in your latest PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirm that you are absolutely right about your last mail about trying to remove sendOldValues from test is an error and I had misunderstood the implication. I also appreciate the debug-friendly syntax of return on next line, I'll adjust that. I'll also remove the change test as per your comment above that spells out you want to handle that in mapValues function. Hopefully, things will be sorted out within an hour or so.


context().forward(key, new Change<>(newValue, oldValue));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public boolean test(String key, Integer value) {
driver.process(topic1, "A", null);
driver.process(topic1, "B", null);

// Guozhang's second fix suppresses A:null, B:null for proc2 and proc3 below
proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4");
proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null");
}
Expand Down Expand Up @@ -224,6 +225,7 @@ public boolean test(String key, Integer value) {
driver.process(topic1, "B", null);

proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
// Guozhang's second fix suppresses A:null, B:null for proc2 below
proc2.checkEmptyAndClearProcessResult();
}

Expand Down