Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16448: Fix raw record not being cached in store #16684

Conversation

loicgreffier
Copy link
Contributor

@cadonna @mjsax

After #16093 has been merged, there is a scenario where processing exception handling ends with NullPointerException:

Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.clients.consumer.ConsumerRecord.key()" because the return value of "org.apache.kafka.streams.processor.internals.ProcessorRecordContext.rawRecord()" is null
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:212) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$6(MeteredWindowStore.java:190) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:125) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:100) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:159) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:148) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.state.internals.CachingWindowStore.flushCache(CachingWindowStore.java:426) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.state.internals.WrappedStateStore.flushCache(WrappedStateStore.java:87) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flushCache(ProcessorStateManager.java:537) ~[kafka-streams-3.9.0-SNAPSHOT.jar:na]

This happened with the following topology:

builder
  .stream()
  .groupByKey()
  .windowedBy(...) // Does not really matter. NPE thrown windowing or not
  .aggregate(...)
  .mapValues(value -> throw new RuntimeException(...))

Raw record, that has been added to ProcessorRecordContext, is lost when caching store. This PR fixes it. Looking forward to provide unit tests

@loicgreffier loicgreffier changed the title KAFKA-16448: Fix raw record not being cached causing null pointer exception KAFKA-16448: Fix raw record not being cached in store Jul 24, 2024
@cadonna
Copy link
Member

cadonna commented Jul 25, 2024

@loicgreffier Thanks for the investigation and the PR!

It seems that the issue is when a record is stored and then forwarded independently of the current input record. That might happen in different places. The ones that come to my mind are:

  • store caches
  • buffers (RocksDBTimeOrderedKeyValueBuffer and InMemoryTimeOrderedKeyValueChangeBuffer)

For the buffers, I see that the raw record is not serialized to the changelog topic or the store. That means that the raw record will always be null after a failover for the records in the in-memory buffer. With the RocksDB-based buffer the records will never have a raw records attached. When records without the raw records are evicted from those buffers they will cause the NullPointerException when an exception during processing happens after the buffer. Have you considered serializing the raw record?
Even if you serialize the raw record, to be backward compatible, Streams needs to be able to read record context without the raw record (from previous versions of Streams). Thus, Streams cannot always provide a raw record to the error context and you need to account for that.

@mjsax could you double-check my understanding?
(@loicgreffier and @sebastienviale you are of course also invited to double-check)

@cadonna cadonna added the kip Requires or implements a KIP label Jul 25, 2024
@loicgreffier
Copy link
Contributor Author

@cadonna

So maybe the fix brought by this PR should be:

  1. Checking in ProcessorNode#process if rawRecord != null before accessing the raw key and the raw value. It is the safest approach for me and will avoid crashing to NPE. The drawback is the processing exception handler can end up with no sourceRawKey nor sourceRawValue while the values are actually available in upstream.
  2. Fix the source raw key and source raw value being null from store caches and buffers. The recordContext() needs to be accessed which currently fails some tests.

Does it make sense?

@cadonna
Copy link
Member

cadonna commented Jul 25, 2024

@loicgreffier
Point 1 is absolutely needed.
Point 2 is the proper fix. I think the cache part you have fixed in this PR by adding the raw record to the cache entry. For the buffers, you need to add the raw record to the serialized BufferValue. However, with that you modify a serialization format and we need take great care to maintain backwards compatibility.

@loicgreffier
Copy link
Contributor Author

@cadonna

PR has been updated to solve point 1.

The PR about production (#16433) also access the rawRecord without any verification. I can bring a fix in this PR after it has been merged.

@mjsax
Copy link
Member

mjsax commented Jul 25, 2024

I am just catching up more on the impl, and caching mechanism and others similar things, seems to be a general issue for passing in the original raw source data (also applies to Punctuators for which I did see a PR about).

Even if we ignore all DSL features (eg suppress(), emit-final aggregations, caching/forward coupling), I can have a custom Processor with state store, I can put a record into the store and don't forward anything. Next, when a second input record comes in, I pull out the first record and forward it and processing downstream could lead to an error. For an error handling, it seems most useful to get the first record handed into it, and the source record (which is the second record) might not be too helpful for this case?

Thus, making a step back, I am wondering why we not just pass in the current key/value (or full Record) into the handler? Of course, for doing a DQL which is a follow up feature we want to build on top, having something unserialized at hand might not be ideal, but at the source-node level we should always be able to pass in the unserialized source data. -- Should we change the handler to pass in both current input Record and source raw key/value (making both sourceRawKey and sourceRawValue type Optional<byte[]>)? In the end, messing with the store cache seems to be brittle, and not solve the problem for all cases? Do we really think it would be the right way forward? (Also, I think that caches are only an issue in the DSL, for which we couple cache flushing and forwarding -- for the PAPI, caching and forwarding is independent, and there won't be a reason to add any raw record, but it would just be a waste of memory.)

While we want to use this new handler to build a DLQ, it's not the only way it can be used and thus we should not blindly optimize for the DLQ case, but try to make it useful for other cases as much as we can, too? (And we revisit this question what are serialized data we can pass into a DLQ handler on the DQL KIP and try to decouple the ProcessingExceptionHandler a little bit more from the DLQ KIP?)

IRRC, we did have some discussion about this issue on the mailing list, but considered it a DSL issue that we might want to address in a follow up KIP. But maybe this assessment was wrong, and it would be better to address it right away (at least partially)? In the end, won't it be easier for the handler to determine what to do, if we pass in the current input record of the called Processor, instead of some related (or maybe even unrelated source record)?

@cadonna
Copy link
Member

cadonna commented Jul 26, 2024

In the end, won't it be easier for the handler to determine what to do, if we pass in the current input record of the called Processor, instead of some related (or maybe even unrelated source record)?

Actually, we do pass in the current record into the processing exception handler. The issue here is that the error handler context also contains the raw source record which seems not to be straight-forward to get.

@cadonna
Copy link
Member

cadonna commented Jul 26, 2024

Even if we ignore all DSL features (eg suppress(), emit-final aggregations, caching/forward coupling), I can have a custom Processor with state store, I can put a record into the store and don't forward anything. Next, when a second input record comes in, I pull out the first record and forward it and processing downstream could lead to an error. For an error handling, it seems most useful to get the first record handed into it, and the source record (which is the second record) might not be too helpful for this case?

Given that we pass into the processing exception handler the first record (i.e. the record that throws the error), the raw source record (i.e. the record read from the source topic partition of the current sub-topology of the task) in the error handler context would give the context for which source record the first record ran into the issue. I think this is valuable information, although not for each and every use case. However, I am wondering whether the topic, partition, and offset would not be enough to give context. In the end, those are the information to identify the raw source record. The good thing is that we have all of this information in the current caches and buffers as far as I understand.

@sebastienviale
Copy link
Contributor

do we have to consider that the the first point of this comment #16684 (comment) is enough ?

Checking in ProcessorNode#process if rawRecord != null before accessing the raw key and the raw value. It is the safest approach for me and will avoid crashing to NPE. The drawback is the processing exception handler can end up with no sourceRawKey nor sourceRawValue while the values are actually available in upstream

@mjsax
Copy link
Member

mjsax commented Jul 26, 2024

Ah sorry. Mixed up the handler and the context... My bad.

do we have to consider that the the first point of this comment #16684 (comment) is enough ?

Checking in ProcessorNode#process if rawRecord != null before accessing the raw key and the raw value. It is the safest approach for me and will avoid crashing to NPE. The drawback is the processing exception handler can end up with no sourceRawKey nor sourceRawValue while the values are actually available in upstream

Yes, that is what I meant here:

Thus, making a step back, I am wondering why we not just pass in the current key/value (or full Record) into the handler? Of course, for doing a DQL which is a follow up feature we want to build on top, having something unserialized at hand might not be ideal, but at the source-node level we should always be able to pass in the unserialized source data. -- Should we change the handler to pass in both current input Record and source raw key/value (making both sourceRawKey and sourceRawValue type Optional<byte[]>)? In the end, messing with the store cache seems to be brittle, and not solve the problem for all cases? Do we really think it would be the right way forward? (Also, I think that caches are only an issue in the DSL, for which we couple cache flushing and forwarding -- for the PAPI, caching and forwarding is independent, and there won't be a reason to add any raw record, but it would just be a waste of memory.)

I think there is too many corner cases right now to address all of them properly quickly (and for some, we might never be able to fix it), and we might just want to go the path of least resistance. So just doing the null check seems reasonable to me for now. But as said, I would change to Optional type to highlight was both sourceRawKey and sourceRawValue may be null.

The drawback is the processing exception handler can end up with no sourceRawKey nor sourceRawValue while the values are actually available in upstream

Well, if we find good way later, to set both correctly, it would just be an improvement we can do anytime. The API contract just says, may or may not be there (if we use Optional) but we don't define when it will be there and when not, so we can change it IMHO w/o the need of a KIP or anything.

@mjsax
Copy link
Member

mjsax commented Jul 26, 2024

Talking to @cadonna, he would prefer to just omit both sourceRawKey and sourceRawValue entirely for now. We can always add the to the context object later. -- To make sure we get the KIP into 3.9 with lowest risk, this might be the best way forward.

If we need the information for DLQ, we can add both back via KIP-1034.

@loicgreffier
Copy link
Contributor Author

@mjsax Got it, should we update KIP-1033 to remove any link with sourceRawKey and sourceRawValue?

We'll update existing PRs, and open a new PR to remove sourceRawKey and sourceRawValue from what have been merged on trunk.

This PR could be closed afterward

@mjsax
Copy link
Member

mjsax commented Jul 30, 2024

Got it, should we update KIP-1033 to remove any link with sourceRawKey and sourceRawValue?

Yes, we should update the KIP accordingly.

We'll update existing PRs, and open a new PR to remove sourceRawKey and sourceRawValue from what have been merged on trunk.

Seems you did this on the other PRs already, which I just merged. So the issue should be fixed and we can close this PR?

@loicgreffier
Copy link
Contributor Author

loicgreffier commented Jul 30, 2024

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants