Skip to content

Commit

Permalink
[Connectors API] Implement update service type action (elastic#104643)
Browse files Browse the repository at this point in the history
  • Loading branch information
jedrazb authored and henningandersen committed Jan 25, 2024
1 parent 62fb40a commit 231c76c
Show file tree
Hide file tree
Showing 11 changed files with 494 additions and 0 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/104643.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 104643
summary: "[Connectors API] Implement update service type action"
area: Application
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"connector.update_service_type": {
"documentation": {
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/connector-apis.html",
"description": "Updates the service type of the connector."
},
"stability": "experimental",
"visibility": "public",
"headers": {
"accept": [
"application/json"
],
"content_type": [
"application/json"
]
},
"url": {
"paths": [
{
"path": "/_connector/{connector_id}/_service_type",
"methods": [
"PUT"
],
"parts": {
"connector_id": {
"type": "string",
"description": "The unique identifier of the connector to be updated."
}
}
}
]
},
"body": {
"description": "An object containing the connector's service type.",
"required": true
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
setup:
- skip:
version: " - 8.12.99"
reason: Introduced in 8.13.0

- do:
connector.put:
connector_id: test-connector
body:
index_name: search-1-test
name: my-connector
language: pl
is_native: false
service_type: super-connector

---
"Update Connector Service Type":
- do:
connector.update_service_type:
connector_id: test-connector
body:
service_type: even-better-connector


- match: { result: updated }

- do:
connector.get:
connector_id: test-connector

- match: { service_type: even-better-connector }
- match: { status: created }

---
"Update Connector Service Type - 404 when connector doesn't exist":
- do:
catch: "missing"
connector.update_service_type:
connector_id: test-non-existent-connector
body:
service_type: even-better-connector

---
"Update Connector Service Type - 400 status code when connector_id is empty":
- do:
catch: "bad_request"
connector.update_service_type:
connector_id: ""
body:
service_type: even-better-connector

---
"Update Connector Service Type - 400 status code when payload is not string":
- do:
catch: "bad_request"
connector.update_service_type:
connector_id: test-connector
body:
service_type:
field_1: test
field_2: something
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorNativeAction;
import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorPipelineAction;
import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorSchedulingAction;
import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorServiceTypeAction;
import org.elasticsearch.xpack.application.connector.action.TransportDeleteConnectorAction;
import org.elasticsearch.xpack.application.connector.action.TransportGetConnectorAction;
import org.elasticsearch.xpack.application.connector.action.TransportListConnectorAction;
Expand All @@ -76,6 +77,7 @@
import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorNativeAction;
import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorPipelineAction;
import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorSchedulingAction;
import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorServiceTypeAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorConfigurationAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorErrorAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorFilteringAction;
Expand All @@ -85,6 +87,7 @@
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorNativeAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorPipelineAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorSchedulingAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorServiceTypeAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.CancelConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.CheckInConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.DeleteConnectorSyncJobAction;
Expand Down Expand Up @@ -239,6 +242,7 @@ protected XPackLicenseState getLicenseState() {
new ActionHandler<>(UpdateConnectorNativeAction.INSTANCE, TransportUpdateConnectorNativeAction.class),
new ActionHandler<>(UpdateConnectorPipelineAction.INSTANCE, TransportUpdateConnectorPipelineAction.class),
new ActionHandler<>(UpdateConnectorSchedulingAction.INSTANCE, TransportUpdateConnectorSchedulingAction.class),
new ActionHandler<>(UpdateConnectorServiceTypeAction.INSTANCE, TransportUpdateConnectorServiceTypeAction.class),

// SyncJob API
new ActionHandler<>(GetConnectorSyncJobAction.INSTANCE, TransportGetConnectorSyncJobAction.class),
Expand Down Expand Up @@ -318,6 +322,7 @@ public List<RestHandler> getRestHandlers(
new RestUpdateConnectorNativeAction(),
new RestUpdateConnectorPipelineAction(),
new RestUpdateConnectorSchedulingAction(),
new RestUpdateConnectorServiceTypeAction(),

// SyncJob API
new RestGetConnectorSyncJobAction(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorNativeAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorPipelineAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorSchedulingAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorServiceTypeAction;

import java.time.Instant;
import java.util.Arrays;
Expand Down Expand Up @@ -555,6 +556,52 @@ public void updateConnectorScheduling(UpdateConnectorSchedulingAction.Request re
}
}

/**
* Updates the service type property of a {@link Connector} and its {@link ConnectorStatus}.
*
* @param request The request for updating the connector's service type.
* @param listener The listener for handling responses, including successful updates or errors.
*/
public void updateConnectorServiceType(UpdateConnectorServiceTypeAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
getConnector(connectorId, listener.delegateFailure((l, connector) -> {

ConnectorStatus prevStatus = connector.getStatus();
ConnectorStatus newStatus = prevStatus == ConnectorStatus.CREATED
? ConnectorStatus.CREATED
: ConnectorStatus.NEEDS_CONFIGURATION;

final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(
Map.of(
Connector.SERVICE_TYPE_FIELD.getPreferredName(),
request.getServiceType(),
Connector.STATUS_FIELD.getPreferredName(),
newStatus
)
)

);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (updateListener, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
updateListener.onFailure(new ResourceNotFoundException(connectorId));
return;
}
updateListener.onResponse(updateResponse);
})
);
}));
} catch (Exception e) {
listener.onFailure(e);
}
}

private static ConnectorIndexService.ConnectorResult mapSearchResponseToConnectorList(SearchResponse response) {
final List<Connector> connectorResults = Arrays.stream(response.getHits().getHits())
.map(ConnectorIndexService::hitToConnector)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.application.connector.action;

import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.application.EnterpriseSearch;

import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.PUT;

@ServerlessScope(Scope.PUBLIC)
public class RestUpdateConnectorServiceTypeAction extends BaseRestHandler {

@Override
public String getName() {
return "connector_update_service_type_action";
}

@Override
public List<Route> routes() {
return List.of(new Route(PUT, "/" + EnterpriseSearch.CONNECTOR_API_ENDPOINT + "/{connector_id}/_service_type"));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
UpdateConnectorServiceTypeAction.Request request = UpdateConnectorServiceTypeAction.Request.fromXContentBytes(
restRequest.param("connector_id"),
restRequest.content(),
restRequest.getXContentType()
);
return channel -> client.execute(
UpdateConnectorServiceTypeAction.INSTANCE,
request,
new RestToXContentListener<>(channel, ConnectorUpdateActionResponse::status)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.application.connector.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.application.connector.ConnectorIndexService;

public class TransportUpdateConnectorServiceTypeAction extends HandledTransportAction<
UpdateConnectorServiceTypeAction.Request,
ConnectorUpdateActionResponse> {

protected final ConnectorIndexService connectorIndexService;

@Inject
public TransportUpdateConnectorServiceTypeAction(
TransportService transportService,
ClusterService clusterService,
ActionFilters actionFilters,
Client client
) {
super(
UpdateConnectorServiceTypeAction.NAME,
transportService,
actionFilters,
UpdateConnectorServiceTypeAction.Request::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.connectorIndexService = new ConnectorIndexService(client);
}

@Override
protected void doExecute(
Task task,
UpdateConnectorServiceTypeAction.Request request,
ActionListener<ConnectorUpdateActionResponse> listener
) {
connectorIndexService.updateConnectorServiceType(request, listener.map(r -> new ConnectorUpdateActionResponse(r.getResult())));
}
}
Loading

0 comments on commit 231c76c

Please sign in to comment.