Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate RAG pipeline to async processing. #2345

Merged
merged 2 commits into from
Apr 23, 2024

Conversation

austintlee
Copy link
Collaborator

Description

Use the async version of the search pipeline process to avoid blocking (remote) calls.

Original bug - opensearch-project/OpenSearch#10248.

Issues Resolved

Check List

  • [x ] New functionality includes testing.
    • [x ] All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • [x ] Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@austintlee
Copy link
Collaborator Author

cc: @reta @msfroh

@austintlee
Copy link
Collaborator Author

@msfroh Can you take a look at my code changes? I have a really basic question about exception handling in processResponseAsync - if I want to throw things like IllegalArgumentException/InvalidInputException, do I just throw it and let it propagate out of the method or is there some contract I need to follow in this async world?

}
final int timeout = t;
log.info("Timeout for this request: {} seconds.", timeout);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unnecessary to print this on info level. Move to debug level?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to debug.

if (conversationId != null && !Strings.hasText(conversationId)) {
throw new IllegalArgumentException("Empty conversation_id is not allowed.");
}
// log.info("LLM question: {}, LLM model {}, conversation id: {}", llmQuestion, llmModel, conversationId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

List<Interaction> chatHistory = (conversationId == null)
? Collections.emptyList()
: memoryClient.getInteractions(conversationId, interactionSize);
log.info("Using interaction size of {}", interactionSize);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to debug level?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

try {
ChatCompletionOutput output = llm
.doChatCompletion(
// log.info("system_prompt: {}", systemPrompt);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these two lines?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

} else {
final Instant memoryStart = Instant.now();
memoryClient.getInteractions(conversationId, interactionSize, ActionListener.wrap(r -> {
log.info("getInteractions complete. ({})", getDuration(memoryStart));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line or move to debug level?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to debug.

llm.doChatCompletion(input, new ActionListener<>() {
@Override
public void onResponse(ChatCompletionOutput output) {
log.info("doChatCompletion complete. ({})", getDuration(chatStart));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line or move to debug level?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to debug.

.getMlModelTensors()
.get(0)
.getDataAsMap();
// log.info("dataAsMap: {}", dataAsMap.toString());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

SearchResponse response,
PipelineProcessingContext requestContext,
ActionListener<SearchResponse> responseListener
) {
log.info("Entering processResponse.");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log can be removed right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to debug.

Copy link
Collaborator

@Zhangxunmt Zhangxunmt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of "onFailure(Exception e)" implementations in the ActionListeners do not include log.error(). Should we add more logs for errors? This was brought up in the earlier security review too.

Copy link
Collaborator

@ylwu-amzn ylwu-amzn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick fix

@austintlee austintlee temporarily deployed to ml-commons-cicd-env April 23, 2024 01:42 — with GitHub Actions Inactive
@austintlee austintlee temporarily deployed to ml-commons-cicd-env April 23, 2024 01:42 — with GitHub Actions Inactive
@austintlee austintlee temporarily deployed to ml-commons-cicd-env April 23, 2024 01:42 — with GitHub Actions Inactive
@austintlee austintlee temporarily deployed to ml-commons-cicd-env April 23, 2024 01:42 — with GitHub Actions Inactive
@austintlee austintlee temporarily deployed to ml-commons-cicd-env April 23, 2024 01:42 — with GitHub Actions Inactive
@austintlee austintlee temporarily deployed to ml-commons-cicd-env April 23, 2024 01:42 — with GitHub Actions Inactive
@austintlee austintlee temporarily deployed to ml-commons-cicd-env April 23, 2024 02:32 — with GitHub Actions Inactive
@austintlee austintlee temporarily deployed to ml-commons-cicd-env April 23, 2024 02:32 — with GitHub Actions Inactive
@austintlee austintlee temporarily deployed to ml-commons-cicd-env April 23, 2024 02:32 — with GitHub Actions Inactive
@ylwu-amzn ylwu-amzn merged commit 4b26ebf into opensearch-project:main Apr 23, 2024
13 checks passed
opensearch-trigger-bot bot pushed a commit that referenced this pull request Apr 23, 2024
* Migrate RAG pipeline to async processing.

Signed-off-by: Austin Lee <[email protected]>

* Address reviewer comments.

Signed-off-by: Austin Lee <[email protected]>

---------

Signed-off-by: Austin Lee <[email protected]>
(cherry picked from commit 4b26ebf)
opensearch-trigger-bot bot pushed a commit that referenced this pull request Apr 23, 2024
* Migrate RAG pipeline to async processing.

Signed-off-by: Austin Lee <[email protected]>

* Address reviewer comments.

Signed-off-by: Austin Lee <[email protected]>

---------

Signed-off-by: Austin Lee <[email protected]>
(cherry picked from commit 4b26ebf)
@@ -128,14 +141,15 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp
}
String conversationId = params.getConversationId();

if (conversationId != null && !Strings.hasText(conversationId)) {
throw new IllegalArgumentException("Empty conversation_id is not allowed.");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you managed to test this?

You should probably invoke responseListener.onFailure(). Otherwise, the current thread may throw and the listener would sit there waiting for a response.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have a test for this, but it does not go through the REST layer. I may need an IT test.

log.error("Context " + contextField + " not found in search hit " + hits[i]);
// TODO throw a more meaningful error here?
throw new RuntimeException();
throw new RuntimeException("Context " + contextField + " not found in search hit " + hits[i]);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, you need to make sure that this exception gets propagated to the listener. (I don't remember if that's covered by ActionListener.wrap(). Maybe?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check and also test.

@msfroh
Copy link

msfroh commented Apr 23, 2024

@msfroh Can you take a look at my code changes? I have a really basic question about exception handling in processResponseAsync - if I want to throw things like IllegalArgumentException/InvalidInputException, do I just throw it and let it propagate out of the method or is there some contract I need to follow in this async world?

Shoot -- I didn't see your question before this got merged. (I spent most of yesterday traveling to a conference.)

The general contract in the async world is that every possible code path needs to notify the listener exactly once or else it will wait indefinitely.

  • If you have a response, you must give it to the listener.
  • If there's a failure, you must notify the listener (probably via onFailure).
  • If there's no response, you must notify the listener that there's no response.

@mingshl mingshl added v2.14.0 enhancement New feature or request labels Apr 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants