Skip to content

Commit

Permalink
Add stashcontext to connector getter (#2742) (#2745)
Browse files Browse the repository at this point in the history
Signed-off-by: b4sjoo <[email protected]>
(cherry picked from commit f8d6c7b)

Co-authored-by: Sicheng Song <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and b4sjoo authored Jul 24, 2024
1 parent 9ea79d0 commit ffbc35c
Showing 1 changed file with 25 additions and 18 deletions.
43 changes: 25 additions & 18 deletions plugin/src/main/java/org/opensearch/ml/model/MLModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1638,26 +1638,33 @@ public void getController(String modelId, ActionListener<MLController> listener)
*/
public void getConnector(String connectorId, ActionListener<Connector> listener) {
GetRequest getRequest = new GetRequest().index(CommonValue.ML_CONNECTOR_INDEX).id(connectorId);
client.get(getRequest, ActionListener.wrap(r -> {
if (r != null && r.isExists()) {
try (
XContentParser parser = MLNodeUtils
.createXContentParserFromRegistry(NamedXContentRegistry.EMPTY, r.getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
Connector connector = Connector.createConnector(parser);
listener.onResponse(connector);
} catch (Exception e) {
log.error("Failed to parse connector:" + connectorId);
listener.onFailure(e);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
ActionListener<Connector> wrappedListener = ActionListener.runBefore(listener, context::restore);
client.get(getRequest, ActionListener.wrap(r -> {
if (r != null && r.isExists()) {
try (
XContentParser parser = MLNodeUtils
.createXContentParserFromRegistry(NamedXContentRegistry.EMPTY, r.getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
Connector connector = Connector.createConnector(parser);
wrappedListener.onResponse(connector);
} catch (Exception e) {
log.error("Failed to parse connector:" + connectorId);
wrappedListener.onFailure(e);
}
} else {
wrappedListener
.onFailure(new OpenSearchStatusException("Failed to find connector:" + connectorId, RestStatus.NOT_FOUND));
}
} else {
listener.onFailure(new OpenSearchStatusException("Failed to find connector:" + connectorId, RestStatus.NOT_FOUND));
}
}, e -> {
}, e -> {
log.error("Failed to get connector", e);
wrappedListener.onFailure(new OpenSearchStatusException("Failed to get connector:" + connectorId, RestStatus.NOT_FOUND));
}));
} catch (Exception e) {
log.error("Failed to get connector", e);
listener.onFailure(new OpenSearchStatusException("Failed to get connector:" + connectorId, RestStatus.NOT_FOUND));
}));
listener.onFailure(e);
}
}

/**
Expand Down

0 comments on commit ffbc35c

Please sign in to comment.