-
Notifications
You must be signed in to change notification settings - Fork 210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Modify EventHandle to be created for every event and support internal and external origination times #3546
Modify EventHandle to be created for every event and support internal and external origination times #3546
Conversation
0e382c6
to
310bb2e
Compare
… and external origination times Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
310bb2e
to
5e144bd
Compare
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
|
||
@Override | ||
public void release(boolean result) { | ||
System.out.println("======release called==="+result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should remove this print statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops. Yes
} | ||
|
||
public static Event fromMessage(String message) { | ||
return JacksonEvent.builder() | ||
JacksonEvent event = JacksonEvent.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this change accomplish?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed. I was trying something. undid the change. Thanks!
/* | ||
lenient().doAnswer(a -> { | ||
return null; | ||
}).when(eventHandle).release(any(Boolean.class)); | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please delete commented out code.
s3SinkService.output(records); | ||
|
||
final List<EventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); | ||
//final List<EventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra comment
return this.internalOriginationTime; | ||
} | ||
|
||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the next PR let's scope down and first only implement internal end to end latency metric. For ddb use case we could potentially use this value for external (https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_StreamRecord.html#DDB-Type-streams_StreamRecord-ApproximateCreationDateTime), so we may not need to make any hasty decisions on users configuring this time. @dlvenable What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand your proposal, the Source
will set this value on the Event. We don't need to let the user configure this in the pipeline. But, we could in the future. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Will only implement internal latency for now in the OpenSearchSink and S3 Sink
import java.time.Instant; | ||
import java.io.Serializable; | ||
|
||
public class DefaultEventHandle implements EventHandle, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this package protected? I'd really like to avoid this getting used as it will be hard to clean-up in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By "package protected" you mean make this as
protected class DefaultEventHandle implements EventHandle, Serializable
?
* @param acknowledgementSet acknowledgementSet to be set in the event handle | ||
* @since 2.6 | ||
*/ | ||
void setAcknowledgementSet(final AcknowledgementSet acknowledgementSet); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to expose all of this? It might make sense to have a different interface for internal use. Say InternalEventHandle
. Then DefaultEventHandle
can implement that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand this suggestion. Are you suggesting we have InternalEventHandle
and EventHandle
separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we'd have two interfaces:
public interface InterfaceEventHandle {
...
// methods that data-prepper-core calls are here.
}
And we implement it as:
class DefaultEventHandle implements EventHandle, InternalEventHandle
{
...
}
In data-prepper, core we'd need a cast: InternalEventHandle internalEventHandle = (InternalEventHandle) eventHandle
.
This would be a good indication that somebody is doing something they shouldn't if we see it outside of data-prepper-core.
/* | ||
lenient().doAnswer(a -> { | ||
return null; | ||
}).when(eventHandle).release(any(Boolean.class)); | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please delete commented out code.
s3SinkService.output(records); | ||
|
||
for (EventHandle eventHandle : eventHandles) { | ||
System.out.println("==2====EventHandle=="+eventHandle+"==="+acknowledgementSet); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove println
usage.
* @param externalOriginationTime externalOriginationTime to be set in the event handle | ||
* @since 2.6 | ||
*/ | ||
void setExternalOriginationTime(final Instant externalOriginationTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should set the origination time on the EventMetadata. The EventHandle can then get this data from the EventMetadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internal origination time is set in EventMetadata and EventHandle gets it from it. Do you want the external origination time also set in the EventMetadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, add this to EventMetadata
.
The EventHandle
can also hold a reference to the time in there.
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
The backport to
To backport manually, run these commands in your terminal: # Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-2.5 2.5
# Navigate to the new working tree
cd .worktrees/backport-2.5
# Create a new branch
git switch --create backport/backport-3546-to-2.5
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 7869eb7aac41965adc9a3eab4f85d4121f7b41b9
# Push it to GitHub
git push --set-upstream origin backport/backport-3546-to-2.5
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-2.5 Then, create a pull request where the |
Description
Modify EventHandle to be created for every event and support internal and external origination times.
Partially resolves #3494
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.