-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][function] Support Record<?> as Function output type #16041
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Is this change ABI compatible so that existing user created Pulsar Functions implementations can be run without recompiling when upgrading to a version that includes this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM
I left some feedback
currentRecord.getPartitionIndex().ifPresent(builder::partitionIndex); | ||
currentRecord.getRecordSequence().ifPresent(builder::recordSequence); | ||
|
||
// TODO: add message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you want to do this, please create a ticket and link it, otherwise please remove this "TODO"
TODOs are usually some kind of code smell, especially in a big open source project like Pulsar.
we could explain why the "message" is not available here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I forgot to remove that one.
I totally agree that TODOs must not hit main branch 😄
@@ -166,7 +166,7 @@ public void testSlidingCountWindowTest() throws Exception { | |||
@Test(groups = {"java_function", "function"}) | |||
public void testMergeFunctionTest() throws Exception { | |||
testMergeFunction(); | |||
} | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove space
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to align with the rest of the function on 4 spaces.
But this whole class is misaligned...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realigned completely this block but I guess the full class should be realigned.
Maybe better to do it in another PR ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this PR should focus on the feature and not on code clean up
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
Show resolved
Hide resolved
private final Integer partitionIndex; | ||
private final Long recordSequence; | ||
|
||
public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need unit test coverage for this method (ensure that every field is properly handled)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -161,4 +162,11 @@ public interface Context extends BaseContext { | |||
* @throws PulsarClientException | |||
*/ | |||
<X> ConsumerBuilder<X> newConsumerBuilder(Schema<X> schema) throws PulsarClientException; | |||
|
|||
/** | |||
* Create a FunctionRecordBuilder. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please explain a little bit how this is supposed to be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@lhotari I think it is since we only add a method to the |
* Rename TargetSinkRecord to OutputRecordSinkRecord * Add UT for FunctionCommon::getFunctionTypes * Add UT for Context::newOutputRecordBuilder * Add some javadoc * Always set message properties for OutputRecordSinkRecord
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@nlu90 Please help review this PR. |
/pulsarbot rerun-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but I do have a question, it seems this method can be a replacement of newOutputMessage
, so is it possible to deprecate newOutputMessage
? if not, could you please add some more context comparing newOutputRecordBuilder
and newOutputMessage
? thanks.
* @param <T> type of Record to build | ||
* @return a Record builder initialised with values from the Function Context | ||
*/ | ||
public static <T> FunctionRecord.FunctionRecordBuilder<T> from(Context context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help me understand why the whole context
is passed here, instead of the currentRecord
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We set the destinationTopic
from the Context getOutputTopic
@@ -481,6 +482,11 @@ public <T> ConsumerBuilder<T> newConsumerBuilder(Schema<T> schema) throws Pulsar | |||
return this.client.newConsumer(schema); | |||
} | |||
|
|||
@Override | |||
public <X> FunctionRecord.FunctionRecordBuilder<X> newOutputRecordBuilder() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding a new API here, do you think about creating a util class with a record generation method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is better to not add a general purpose API to build "Record" instances.
A Record is like a Message and you cannot build a Message using the client API or a utility class.
The record may be tied to some internal context or pre-configured (as it does currently in this PR).
So it is better to use Context as a starting point for building a new record, the same way it happens with newOutputMessage()
That's 2 different approaches. |
Hi @cbornet , will you update relevant docs in a follow-up PR? |
import org.apache.pulsar.functions.api.Context; | ||
import org.apache.pulsar.functions.api.Record; | ||
|
||
@Builder(builderMethodName = "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to suppress generating the builder method?
Motivation
Currently, when a user wants to dynamically set the output topic, the message properties or change the output schema in a Function, the only possibility is to create a Function that returns
Void
, use the Context and manually create a message withContext::newOutputMessage
. The TypedMessageBuilderPublish Function inpulsar-functions-api-examples
shows how to do that.This way of doing is not intuitive and it would be better to return a structure like
Record
that carries this info.Modifications
This PR adds support for returning
Record
in a Function.JavaInstanceRunnable::sendOutputMessage
, we check the type of the output object and if it's of typeRecord
we create aTargetSinkRecord
instead of aSinkRecord
.TargetSinkRecord
uses the info from the output record in the various Record methods that are used by the Sink when creating the output message.getFunctionTypes
, if the output type is a Record, we get the wrapped type as type for the Sink.newOutputRecordBuilder
is added toContext
that returns aFunctionRecord
builder initialized with the info from the source record. The builder methods can then be used to override these values as needed.RecordFunction
is added in the Function examples to demonstrate the use of this featurePulsarFunctionsJavaTest
Verifying this change
This change added tests and can be verified as follows:
Run
PulsarFunctionsJavaTest:: testRecordFunctionTest
Does this pull request potentially affect one of the following parts:
yes
If
yes
was chosen, please highlight the changesDependencies (does it add or upgrade a dependency): (yes / no)
The public API: (yes / no)
Yes.
Record<>
as a possible return type for FunctionsnewOutputRecordBuilder
method toContext
The schema: (yes / no / don't know)
The default values of configurations: (yes / no)
The wire protocol: (yes / no)
The rest endpoints: (yes / no)
The admin cli options: (yes / no)
Anything that affects deployment: (yes / no / don't know)
Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)