Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ProgressCheck callbacks to end-to-end acknowledgements #3565

Merged
merged 9 commits into from
Nov 4, 2023

Conversation

kkondaka
Copy link
Collaborator

Description

Add ProgressCheck callbacks to end-to-end acknowledgements
Adds optional progress check callbacks the acknowledgement set create api

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • [X ] New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [ X] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@@ -30,6 +30,7 @@ public interface AcknowledgementSetManager {
*/
AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout);

AcknowledgementSet create(final Consumer<Double> progressCheckCallback, final Duration progressCheckInterval, final Consumer<Boolean> callback, final Duration timeout);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the progress check is optional, it might make sense to add the progress check on the AcknowledgementSet itself. This will help with clarity in the caller's code.

e.g.

acknowledgementSet = manager.create(myCallback, timeout);
acknowledgementSet.addProgressCheck(myProgressCallback, progressInterval);

We may also want an expiry callback right? So continuing to add to the create method is making the code harder to read. I'm open to other ideas as well - perhaps a builder?

manager.builder(myCallback, timeout)
  .withProgressCheck(myProgressCallback, progressInterval)
  .build()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because progress check is optional, we have two different constructors. Isn't that good alternative way?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I want an expiry callback, but not progress? This becomes confusing to use. It is also already difficult to read and understand in the SQS code because you need to know the order of these callbacks and durations.

@@ -36,12 +39,23 @@ public DefaultAcknowledgementSetManager(
public DefaultAcknowledgementSetManager(final ExecutorService callbackExecutor, final Duration waitTime) {
this.acknowledgementSetMonitor = new AcknowledgementSetMonitor();
this.executor = Objects.requireNonNull(callbackExecutor);
// Single thread executor should be sufficient in most cases (esp if the underlying host has 1 or 2 cores)
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current executor is dependency injected. I think we should continue that pattern. It comes from:

@Bean(name = "acknowledgementCallbackExecutor")
ExecutorService acknowledgementCallbackExecutor(final CallbackTheadFactory callbackTheadFactory) {
return Executors.newFixedThreadPool(MAX_THREADS, callbackTheadFactory);
}

Also, can we share a single executor?

Change this:

https://github.com/opensearch-project/data-prepper/blob/068c57172c0dfb0c4e43d1d796e8a59fa11fb1df/data-prepper-core/src/main/java/org/opensearch/dataprepper/acknowledgements/AcknowledgementAppConfig.java#L27C28-L27C28

To this:

return Executors.newScheduledThreadPool(MAX_THREADS, callbackTheadFactory);

And change the interfaces to use ScheduledExecutorService throughout.


public DefaultAcknowledgementSet(final ExecutorService executor, final Consumer<Boolean> callback, final Duration expiryTime, final DefaultAcknowledgementSetMetrics metrics) {
public DefaultAcknowledgementSet(final ExecutorService executor,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment below. I believe that we can use a single ScheduledExecutorService.

acknowledgementSet = acknowledgementSetManager.create(
(ratio) -> {
final int newVisibilityTimeoutSeconds = (int) (visibilityTimeout);
final ChangeMessageVisibilityRequest changeMessageVisibilityRequest = ChangeMessageVisibilityRequest.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to think that we should make this a configuration option. It is changing existing behavior and is now requiring access to a new API and thus requires new permissions.

Also, we should let the user configure a maximum. What if the acknowledgement set is stuck for some reason and it never completes after some number of hours?

@kkondaka kkondaka marked this pull request as ready for review November 2, 2023 06:33
Krishna Kondaka added 4 commits November 2, 2023 15:11
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
Comment on lines +43 to +48
private DefaultEventHandle eventHandle1;
private DefaultEventHandle eventHandle2;
private DefaultEventHandle eventHandle3;
private DefaultEventHandle eventHandle4;
private DefaultEventHandle eventHandle5;
private DefaultEventHandle eventHandle6;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could make this a List<DefaultEventHandle> just to make it more flexible for future test case additions

@JsonProperty("extend_visibility_timeout")
private Boolean extendVisibilityTimeout = DEFAULT_EXTEND_VISIBILITY_TIMEOUT;

@JsonProperty("max_visibility_timeout_extesion")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo here on extension

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few more comments to consider. Also, let's correct the property misspelling that @graytaylor0 noted.

events.add(event);
}
try {
Thread.sleep(4000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we decrease this sleep?

* @param progressCheckInterval frequency of invocation of progress check callback
* @since 2.6
*/
public void addProgressCheck(final Consumer<Double> progressCheckCallback, final Duration progressCheckInterval);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why you chose to make this a Consumer<Double>. The current code isn't using this ratio presently.

A few alternatives:

  • Make this a Runnable without any input.
  • Create a new interface that can be provided so that the relevant arguments can be expanded in the future. For example, Consumer<ProgressCheck>.
public interface ProgressCheck {
  Double getRatio();
}

Then we could add other items if desired such as Duration getTimeToExpiry().

Krishna Kondaka added 2 commits November 3, 2023 19:06
Signed-off-by: Krishna Kondaka <[email protected]>
graytaylor0
graytaylor0 previously approved these changes Nov 3, 2023
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are checkstyle violations in this PR. I'm good once the build is running.

@dlvenable
Copy link
Member

[ERROR] /home/runner/work/data-prepper/data-prepper/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java:41:8: Unused import - java.util.concurrent.atomic.AtomicInteger. [UnusedImports]

Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
@dlvenable dlvenable merged commit 6681e75 into opensearch-project:main Nov 4, 2023
66 checks passed
@opensearch-trigger-bot
Copy link
Contributor

The backport to 2.5 failed:

The process '/usr/bin/git' failed with exit code 1

To backport manually, run these commands in your terminal:

# Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-2.5 2.5
# Navigate to the new working tree
cd .worktrees/backport-2.5
# Create a new branch
git switch --create backport/backport-3565-to-2.5
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 6681e75d8b8cfa3985e4a11f9fa9d6238562e462
# Push it to GitHub
git push --set-upstream origin backport/backport-3565-to-2.5
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-2.5

Then, create a pull request where the base branch is 2.5 and the compare/head branch is backport/backport-3565-to-2.5.

@dlvenable
Copy link
Member

@asifsmohammed , This should not be back ported as it is a new feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants