Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3902 #1556

Closed
wants to merge 22 commits into from
Closed

KAFKA-3902 #1556

wants to merge 22 commits into from

Conversation

phderome
Copy link
Contributor

The contribution is my original work and that I license the work to the project under the project's open source license.

Contributors: Guozhang Wang, Phil Derome
@guozhangwang

Added checkEmpty to validate processor does nothing and added a inhibit check for filter to fix issue.

dguy and others added 4 commits June 24, 2016 08:09
Added checkEmpty to validate processor does nothing  and added a inhibit check for filter to fix issue.
Added checkEmpty to validate processor does nothing  and added a inhibit check for filter to fix issue.
Fixed some kstream internals unit tests.
@phderome
Copy link
Contributor Author

Added in modifications to integration/internals unit tests

@phderome
Copy link
Contributor Author

Merged in apache:kafka trunk to my changes into PR so that it's a proper merge with trunk.

@@ -28,7 +28,7 @@
private final Predicate<K, V> predicate;
private final boolean filterNot;

private boolean sendOldValues = false;
private boolean sendOldValues = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that, the overhead of requesting the source KTable to be materialized (i.e. creating a state store, and sending the {old -> new} pair instead of the new value only) may be over-whelming compared with its potential benefits of reducing the downstream traffic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am looking for your guidance here. On the one hand, I think your point has merit and have difficulty assessing the tradeoff given limited knowledge of app usage and app internals; on the other hand, I'd be very curious to hear what comparable projects do in similar situation (I read Samza's demonstration of materialized tables and user clicks example, so perhaps you have familiarity with Samza's or others' design decisions if in sync with K-Streams, you might have worked on that project as well). Specifically, I wonder if the rationale you had presented for case 2 in the JIRA statement (sendOldValues=false) really requires sending keys with nulls on false evaluation of filter (is that how Samza would do it too?). If case 2's rationale can be challenged, then this ticket can have a meaningful outcome, otherwise it seems that this ticket can not lead to a behaviour change.

I'd be happy to reset this to false if you find that more advisable. But then, I don't see any hook at higher level of abstraction to enable sending old values so that for instance RegionView example from Confluent would work without a final filter to exclude nulls on Long deserializer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Samza does not have a high-level DSL interface, so it dos not have this issue.

This issue is related to the more general question that when should we materialize a KTable object with a state store, and currently the rule is that stateless operators like filters should not necessarily materialize it. That being said, if a further downstream operator is stateful, it will cause backward propagation of setting enableSendOldValues to true up till the source KTable, as I mentioned in the email thread.

phderome added 2 commits June 27, 2016 21:09
…ssed. We could/should add unit test cases when aggregation is used and see that nulls are suppressed when filtering.
@phderome
Copy link
Contributor Author

If you like it as is so far, we could also consider more unit testing for aggregation triggering the suppression of nulls and also possibly bringing in a non-Lambda of UserRegion example from Confluent to apache (here) assuming it would work as expected.

…avoiding unpleasant possible side effects due to state when running multiple tests together.
@phderome
Copy link
Contributor Author

I don't understand why the last commit causes a regression. On my local host the same test PlaintextProducerSendTest passes fine. Also a "git pull https://github.com/apache/kafka trunk" informs me that I am up to date.

@guozhangwang
Copy link
Contributor

Yeah this will be a good idea to add more test cases in KTableFilterTest for this scenario.

If you think it is a transient failure, you can close / re-open this PR to trigger another Jenkins build and see if it passes.

phderome added 3 commits June 28, 2016 22:56
…ingful materialization filter test to avoid setting enableSendingOldValues, which is not available to DSL API user.
…ingful materialization filter test to avoid setting enableSendingOldValues, which is not available to DSL API user.
@phderome
Copy link
Contributor Author

I am keeping your second fix of if (change.oldValue == null && change.newValue == null) as it suppresses unnecessary nulls in my view when there is no materialization happening. I kept a comment saying "Guozhang's second fix" for those specifically to help your review but would remove comments once you have seen them. I also added an aggregation materialization unit case.

@phderome
Copy link
Contributor Author

New unit test as of this morning.

Thanks about transient failure tip.

phil
On 28 Jun 2016 7:14 p.m., "Guozhang Wang" [email protected] wrote:

Yeah this will be a good idea to add more test cases in KTableFilterTest
for this scenario.

If you think it is a transient failure, you can close / re-open this PR to
trigger another Jenkins build and see if it passes.


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
#1556 (comment), or mute
the thread
https://github.com/notifications/unsubscribe/AKhhmKj1xxj0XV8vYNff80N3lzZyOJShks5qQarugaJpZM4I-i7H
.

@@ -77,7 +77,9 @@ public void process(K key, Change<V> change) {
V newValue = computeValue(key, change.newValue);
V oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;

if (sendOldValues && oldValue == null && newValue == null) return; // unnecessary to forward here.
if ((sendOldValues && oldValue == null && newValue == null) ||
(change.oldValue == null && change.newValue == null)) return; // unnecessary to forward here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think we do not need the second condition here, since a better fix would be in the mapValues function, do not forward if both are null, instead of checking them here.

Also minor comment: use a new line for "return" for better debugging, and move the comment on top of line 80 as "if both the new and old values are null after the filtering, do not need to forward downstream anymore".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the minor comment is not addressed in your latest PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirm that you are absolutely right about your last mail about trying to remove sendOldValues from test is an error and I had misunderstood the implication. I also appreciate the debug-friendly syntax of return on next line, I'll adjust that. I'll also remove the change test as per your comment above that spells out you want to handle that in mapValues function. Hopefully, things will be sorted out within an hour or so.

@phderome
Copy link
Contributor Author

I am keeping your second fix of if (change.oldValue == null && change.newValue == null) as it suppresses unnecessary nulls in my view when there is no materialization happening. I kept a comment saying "Guozhang's second fix" for those specifically to help your review but would remove comments once you have seen them. I also added an aggregation materialization unit case.

@phderome
Copy link
Contributor Author

There is no longer a test on change.oldValue/newValue being null and no longer a test on sendOldValues to filter out a null (so same logic for cases #2 and #3 instead of just #3); accordingly unit test cases now filter out nulls on table that use filter more aggressively.

@@ -77,6 +77,7 @@ public void process(K key, Change<V> change) {
V newValue = computeValue(key, change.newValue);
V oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;

if (oldValue == null && newValue == null) return; // unnecessary to forward here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we can optimize the case when sendOldValues is false as well. Following your example:

KTable T1 = builder.table("source-topic");
KTable T2 = table.filter(value > 2);
T2.to("sink-topic");

And suppose the "source-topic" is piping the messages to T1 as: {a: 3}, {b: 5}, {a: 1}...

When {a: 3} is passed from T1 to T2, the filter will pass and hence it is forwarded to downstream operators already; so now when later {a: 1} is passed from T1 to T2, meaning "modifying the value with key {a} from 3 to 1", the filter will not pass any more, and hence in this case we need to forward a {a: null} record downstreams in order to indicate the previously forwarded {a: 3} has now been deleted in T2, right? Otherwise the sink topic will have the following messages:

{a: 3}, {b: 5}, ...

whereas the right sequence should be

{a: 3}, {b: 5}, {a: null}, ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clear explanation which I validated with a temporary test that proved indeed it was a wrong idea. The deletion under not copying old value failed to occur with my earlier commit.

So, it's back to original solution with return on 2nd line and a unit test to demonstrate usage of sendOldValues indirectly via materialization caused by aggregation.

@phderome
Copy link
Contributor Author

phderome commented Jul 1, 2016

@guozhangwang your suggestions have been integrated as is now as far as I can judge. Most recent build is to bring my branch DEROME-3902 with recent trunk commits.

new Predicate<String, String>() {
@Override
public boolean test(String key, String value) {
return value.compareToIgnoreCase("accept") == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use equalsIgnoreCase directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

@guozhangwang
Copy link
Contributor

LGTM overall except some comments on the unit tests.

@guozhangwang
Copy link
Contributor

@phderome I may be misleading in my previous comment regarding the mock, that we can actually use org.apache.kafka.test.NoOpKeyValueMapper here to reduce duplicated code here, does that sound good to you?

@phderome
Copy link
Contributor Author

phderome commented Jul 1, 2016

@guozhangwang about NoOpKeyValueMapper for groupBy I am favourable as it leads to simpler and more concise example. Not sure if the reduce can be simplified further. See for instance:
KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
new Predicate<String, String>() {
@OverRide
public boolean test(String key, String value) {
return value.equalsIgnoreCase("accept");
}
}).groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result");

asfgit pushed a commit that referenced this pull request Jul 1, 2016
… if both old and new values are null

The contribution is my original work and that I license the work to the project under the project's open source license.

Contributors: Guozhang Wang, Phil Derome
guozhangwang

Added checkEmpty to validate processor does nothing  and added a inhibit check for filter to fix issue.

Author: Philippe Derome <[email protected]>
Author: Phil Derome <[email protected]>
Author: Damian Guy <[email protected]>

Reviewers: Guozhang Wang <[email protected]>

Closes #1556 from phderome/DEROME-3902

(cherry picked from commit 2098529)
Signed-off-by: Guozhang Wang <[email protected]>
@asfgit asfgit closed this in 2098529 Jul 1, 2016
@guozhangwang
Copy link
Contributor

The latest patch LGTM, merged to both trunk and 0.10.0. Thanks @phderome !

@phderome
Copy link
Contributor Author

phderome commented Jul 2, 2016

Thanks @guozhangwang for all the patient help. This is my first open source commit!

@@ -77,6 +77,9 @@ public void process(K key, Change<V> change) {
V newValue = computeValue(key, change.newValue);
V oldValue = sendOldValues ? computeValue(key, change.oldValue) : null;

if (sendOldValues && oldValue == null && newValue == null)
Copy link
Contributor

@miguno miguno Jul 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to double-check because the JIRA ticket is a bit difficult to follow: Is this single condition sufficient to cover the following discussion/flow in the JIRA ticket?

  1. If "send old value" is enabled, then there are a couple of cases we can consider:

a. If old value is <key: null> and new value is <key: not-null>, and the filter predicate return false for the new value, then in this case it is safe to optimize and not returning anything to the downstream operator, since in this case we know there is no value for the key previously anyways; otherwise we send the original pair.

b. If old value is <key: not-null> and new value is <key: null>, indicating to delete this key, and the filter predicate return false for the old value, then in this case it is safe to optimize and not returning anything to the downstream operator, since we know that the old value has already been filtered in a previous message; otherwise we send the original pair.

c. If both old and new values are not null, and:

  1. predicate return true on both, send the original pair;
  2. predicate return false on both, we can optimize and do not send anything;
  3. predicate return true on old and false on new, send the key: {old -> null};
  4. predicate return false on old and true on new, send the key: {null -> new};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With /cc @guozhangwang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it covers all cases above, which can be summarized as if both new and old values are null.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants