From 83d5084f620e13678e7786b5c735de32c4e9fb80 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 2 Jan 2012 22:02:19 +0200 Subject: [PATCH] Update API: Allow to update a document based on a script, closes #1583. --- .../action/TransportActionModule.java | 2 + .../action/TransportActions.java | 2 + .../InstanceShardOperationRequest.java | 108 ++++++ ...ransportInstanceSingleOperationAction.java | 306 +++++++++++++++++ .../action/update/TransportUpdateAction.java | 254 ++++++++++++++ .../action/update/UpdateRequest.java | 325 ++++++++++++++++++ .../action/update/UpdateResponse.java | 122 +++++++ .../java/org/elasticsearch/client/Client.java | 30 +- .../action/update/UpdateRequestBuilder.java | 157 +++++++++ .../elasticsearch/client/node/NodeClient.java | 18 +- .../client/support/AbstractClient.java | 11 + .../client/transport/TransportClient.java | 14 +- .../action/ClientTransportActionModule.java | 2 + .../update/ClientTransportUpdateAction.java | 44 +++ .../support/InternalTransportClient.java | 28 +- .../cluster/metadata/MappingMetaData.java | 2 +- .../common/io/stream/StreamInput.java | 64 ++++ .../common/io/stream/StreamOutput.java | 68 ++++ .../common/xcontent/XContentHelper.java | 17 +- .../common/xcontent/XContentParser.java | 2 + .../support/AbstractXContentParser.java | 9 + .../DocumentMIssingEngineException.java | 38 ++ .../DocumentSourceMissingEngineException.java | 38 ++ .../elasticsearch/index/get/GetResult.java | 7 + .../mapper/internal/SourceFieldMapper.java | 2 +- .../rest/action/RestActionModule.java | 11 +- .../rest/action/update/RestUpdateAction.java | 146 ++++++++ .../search/lookup/SourceLookup.java | 2 +- .../DiscoveryTransportClientTests.yml | 6 - .../TransportClientDocumentActionsTests.java | 9 + .../TransportClientDocumentActionsTests.yml | 12 - ...ransportClientMoreLikeThisActionTests.java | 56 --- ...TransportClientMoreLikeThisActionTests.yml | 6 - ...nsportClientSniffDocumentActionsTests.java | 9 + ...ansportClientSniffDocumentActionsTests.yml | 12 - .../AliasedIndexDocumentActionsTests.java | 8 + .../AliasedIndexDocumentActionsTests.yml | 8 - .../document/DocumentActionsTests.java | 63 +++- .../document/DocumentActionsTests.yml | 6 - .../document/LocalDocumentActionsTests.java | 8 + .../document/LocalDocumentActionsTests.yml | 8 - .../document/MoreLikeThisActionTests.yml | 6 - .../{document => get}/GetActionTests.java | 2 +- .../MoreLikeThisActionTests.java | 2 +- 44 files changed, 1906 insertions(+), 144 deletions(-) create mode 100644 src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java create mode 100644 src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java create mode 100644 src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java create mode 100644 src/main/java/org/elasticsearch/action/update/UpdateRequest.java create mode 100644 src/main/java/org/elasticsearch/action/update/UpdateResponse.java create mode 100644 src/main/java/org/elasticsearch/client/action/update/UpdateRequestBuilder.java create mode 100644 src/main/java/org/elasticsearch/client/transport/action/update/ClientTransportUpdateAction.java create mode 100644 src/main/java/org/elasticsearch/index/engine/DocumentMIssingEngineException.java create mode 100644 src/main/java/org/elasticsearch/index/engine/DocumentSourceMissingEngineException.java create mode 100644 src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java delete mode 100644 src/test/java/org/elasticsearch/test/integration/client/transport/DiscoveryTransportClientTests.yml delete mode 100644 src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientDocumentActionsTests.yml delete mode 100644 src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientMoreLikeThisActionTests.java delete mode 100644 src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientMoreLikeThisActionTests.yml delete mode 100644 src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientSniffDocumentActionsTests.yml delete mode 100644 src/test/java/org/elasticsearch/test/integration/document/AliasedIndexDocumentActionsTests.yml delete mode 100644 src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.yml delete mode 100644 src/test/java/org/elasticsearch/test/integration/document/LocalDocumentActionsTests.yml delete mode 100644 src/test/java/org/elasticsearch/test/integration/document/MoreLikeThisActionTests.yml rename src/test/java/org/elasticsearch/test/integration/{document => get}/GetActionTests.java (99%) rename src/test/java/org/elasticsearch/test/integration/{document => mlt}/MoreLikeThisActionTests.java (99%) diff --git a/src/main/java/org/elasticsearch/action/TransportActionModule.java b/src/main/java/org/elasticsearch/action/TransportActionModule.java index 2b2c5119f8e8f..334f1d0911a09 100644 --- a/src/main/java/org/elasticsearch/action/TransportActionModule.java +++ b/src/main/java/org/elasticsearch/action/TransportActionModule.java @@ -71,6 +71,7 @@ import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.search.TransportSearchScrollAction; import org.elasticsearch.action.search.type.*; +import org.elasticsearch.action.update.TransportUpdateAction; import org.elasticsearch.common.inject.AbstractModule; /** @@ -126,6 +127,7 @@ protected void configure() { bind(TransportIndexDeleteAction.class).asEagerSingleton(); bind(TransportShardDeleteAction.class).asEagerSingleton(); bind(TransportCountAction.class).asEagerSingleton(); + bind(TransportUpdateAction.class).asEagerSingleton(); bind(TransportMultiGetAction.class).asEagerSingleton(); bind(TransportShardMultiGetAction.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/action/TransportActions.java b/src/main/java/org/elasticsearch/action/TransportActions.java index a0c7ab46851f1..9e6c061e518f5 100644 --- a/src/main/java/org/elasticsearch/action/TransportActions.java +++ b/src/main/java/org/elasticsearch/action/TransportActions.java @@ -28,6 +28,8 @@ public class TransportActions { public static final String INDEX = "indices/index/shard/index"; + public static final String UPDATE = "update"; + public static final String COUNT = "indices/count"; public static final String DELETE = "indices/index/shard/delete"; diff --git a/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java b/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java new file mode 100644 index 0000000000000..5e1a3941eb8d8 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java @@ -0,0 +1,108 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.single.instance; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.Actions; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * + */ +public abstract class InstanceShardOperationRequest implements ActionRequest { + + public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); + + protected TimeValue timeout = DEFAULT_TIMEOUT; + + protected String index; + // -1 means its not set, allows to explicitly direct a request to a specific shard + protected int shardId = -1; + + private boolean threadedListener = false; + + protected InstanceShardOperationRequest() { + } + + public InstanceShardOperationRequest(String index) { + this.index = index; + } + + public TimeValue timeout() { + return timeout; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (index == null) { + validationException = Actions.addValidationError("index is missing", validationException); + } + return validationException; + } + + public String index() { + return index; + } + + InstanceShardOperationRequest index(String index) { + this.index = index; + return this; + } + + /** + * Should the listener be called on a separate thread if needed. + */ + @Override + public boolean listenerThreaded() { + return threadedListener; + } + + @Override + public InstanceShardOperationRequest listenerThreaded(boolean threadedListener) { + this.threadedListener = threadedListener; + return this; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + index = in.readUTF(); + shardId = in.readInt(); + timeout = TimeValue.readTimeValue(in); + // no need to pass threading over the network, they are always false when coming throw a thread pool + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(index); + out.writeInt(shardId); + timeout.writeTo(out); + } + + public void beforeLocalFork() { + } +} + diff --git a/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java new file mode 100644 index 0000000000000..406935e719c71 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java @@ -0,0 +1,306 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.single.instance; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.action.support.BaseAction; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.TimeoutClusterStateListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * + */ +public abstract class TransportInstanceSingleOperationAction extends BaseAction { + + protected final ClusterService clusterService; + + protected final TransportService transportService; + + final String transportAction; + final String executor; + + protected TransportInstanceSingleOperationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) { + super(settings, threadPool); + this.clusterService = clusterService; + this.transportService = transportService; + + this.transportAction = transportAction(); + this.executor = executor(); + + transportService.registerHandler(transportAction, new TransportHandler()); + } + + @Override + protected void doExecute(Request request, ActionListener listener) { + new AsyncSingleAction(request, listener).start(); + } + + protected abstract String executor(); + + protected abstract String transportAction(); + + protected abstract void shardOperation(Request request, ActionListener listener) throws ElasticSearchException; + + protected abstract Request newRequest(); + + protected abstract Response newResponse(); + + protected void checkBlock(Request request, ClusterState state) { + + } + + protected boolean retryRequired(ShardIterator shardIt, ClusterState state) { + return false; + } + + protected boolean retryOnFailure(Throwable e) { + return false; + } + + protected TransportRequestOptions transportOptions() { + return TransportRequestOptions.EMPTY; + } + + /** + * Should return an iterator with a single shard! + */ + protected abstract ShardIterator shards(ClusterState clusterState, Request request) throws ElasticSearchException; + + class AsyncSingleAction { + + private final ActionListener listener; + + private final Request request; + + private ShardIterator shardIt; + + private DiscoveryNodes nodes; + + private final AtomicBoolean operationStarted = new AtomicBoolean(); + + private AsyncSingleAction(Request request, ActionListener listener) { + this.request = request; + this.listener = listener; + + ClusterState clusterState = clusterService.state(); + + nodes = clusterState.nodes(); + + request.index(clusterState.metaData().concreteIndex(request.index())); + + checkBlock(request, clusterState); + } + + public void start() { + start(false); + } + + public boolean start(final boolean fromClusterEvent) throws ElasticSearchException { + final ClusterState clusterState = clusterService.state(); + nodes = clusterState.nodes(); + if (!clusterState.routingTable().hasIndex(request.index())) { + retry(fromClusterEvent); + return false; + } + try { + shardIt = shards(clusterState, request); + } catch (Exception e) { + listener.onFailure(e); + return true; + } + + // no shardIt, might be in the case between index gateway recovery and shardIt initialization + if (shardIt.size() == 0) { + retry(fromClusterEvent); + return false; + } + + // this transport only make sense with an iterator that returns a single shard routing (like primary) + assert shardIt.size() == 1; + + ShardRouting shard = shardIt.nextOrNull(); + assert shard != null; + + if (!shard.active()) { + retry(fromClusterEvent); + return false; + } + + if (!operationStarted.compareAndSet(false, true)) { + return true; + } + + request.shardId = shardIt.shardId().id(); + if (shard.currentNodeId().equals(nodes.localNodeId())) { + request.beforeLocalFork(); + threadPool.executor(executor).execute(new Runnable() { + @Override + public void run() { + try { + shardOperation(request, listener); + } catch (Exception e) { + if (retryOnFailure(e)) { + retry(fromClusterEvent); + } else { + listener.onFailure(e); + } + } + } + }); + } else { + DiscoveryNode node = nodes.get(shard.currentNodeId()); + transportService.sendRequest(node, transportAction, request, transportOptions(), new BaseTransportResponseHandler() { + + @Override + public Response newInstance() { + return newResponse(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(Response response) { + listener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + // if we got disconnected from the node, or the node / shard is not in the right state (being closed) + if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException || + retryOnFailure(exp)) { + operationStarted.set(false); + // we already marked it as started when we executed it (removed the listener) so pass false + // to re-add to the cluster listener + retry(false); + } else { + listener.onFailure(exp); + } + } + }); + } + return true; + } + + void retry(boolean fromClusterEvent) { + if (!fromClusterEvent) { + // make it threaded operation so we fork on the discovery listener thread + request.beforeLocalFork(); + clusterService.add(request.timeout(), new TimeoutClusterStateListener() { + @Override + public void postAdded() { + if (start(true)) { + // if we managed to start and perform the operation on the primary, we can remove this listener + clusterService.remove(this); + } + } + + @Override + public void onClose() { + clusterService.remove(this); + listener.onFailure(new NodeClosedException(nodes.localNode())); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (start(true)) { + // if we managed to start and perform the operation on the primary, we can remove this listener + clusterService.remove(this); + } + } + + @Override + public void onTimeout(TimeValue timeValue) { + // just to be on the safe side, see if we can start it now? + if (start(true)) { + clusterService.remove(this); + return; + } + clusterService.remove(this); + final UnavailableShardsException failure; + if (shardIt == null) { + failure = new UnavailableShardsException(new ShardId(request.index(), -1), "Timeout waiting for [" + timeValue + "], request: " + request.toString()); + } else { + failure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString()); + } + listener.onFailure(failure); + } + }); + } + } + } + + class TransportHandler extends BaseTransportRequestHandler { + + @Override + public Request newInstance() { + return newRequest(); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void messageReceived(Request request, final TransportChannel channel) throws Exception { + // no need to have a threaded listener since we just send back a response + request.listenerThreaded(false); + execute(request, new ActionListener() { + @Override + public void onResponse(Response result) { + try { + channel.sendResponse(result); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(e); + } catch (Exception e1) { + logger.warn("Failed to send response for get", e1); + } + } + }); + } + } +} diff --git a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java new file mode 100644 index 0000000000000..7234317afe4b9 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -0,0 +1,254 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.update; + +import com.google.common.collect.ImmutableList; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.TransportActions; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.delete.TransportDeleteAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.index.TransportIndexAction; +import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.routing.PlainShardIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.engine.DocumentMissingEngineException; +import org.elasticsearch.index.engine.DocumentSourceMissingEngineException; +import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.internal.ParentFieldMapper; +import org.elasticsearch.index.mapper.internal.RoutingFieldMapper; +import org.elasticsearch.index.mapper.internal.SourceFieldMapper; +import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; +import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.script.ExecutableScript; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.HashMap; +import java.util.Map; + +/** + */ +public class TransportUpdateAction extends TransportInstanceSingleOperationAction { + + private final IndicesService indicesService; + + private final TransportDeleteAction deleteAction; + + private final TransportIndexAction indexAction; + + private final ScriptService scriptService; + + @Inject + public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, + IndicesService indicesService, TransportIndexAction indexAction, TransportDeleteAction deleteAction, ScriptService scriptService) { + super(settings, threadPool, clusterService, transportService); + this.indicesService = indicesService; + this.indexAction = indexAction; + this.deleteAction = deleteAction; + this.scriptService = scriptService; + } + + @Override + protected String transportAction() { + return TransportActions.UPDATE; + } + + @Override + protected String executor() { + return ThreadPool.Names.INDEX; + } + + @Override + protected UpdateRequest newRequest() { + return new UpdateRequest(); + } + + @Override + protected UpdateResponse newResponse() { + return new UpdateResponse(); + } + + @Override + protected void checkBlock(UpdateRequest request, ClusterState state) { + state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index()); + } + + @Override + protected boolean retryOnFailure(Throwable e) { + e = ExceptionsHelper.unwrapCause(e); + if (e instanceof IllegalIndexShardStateException) { + return true; + } + return false; + } + + @Override + protected ShardIterator shards(ClusterState clusterState, UpdateRequest request) throws ElasticSearchException { + if (request.shardId() != -1) { + return clusterState.routingTable().index(request.index()).shard(request.shardId()).primaryShardIt(); + } + ShardIterator shardIterator = clusterService.operationRouting() + .indexShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing()); + ShardRouting shard; + while ((shard = shardIterator.nextOrNull()) != null) { + if (shard.primary()) { + return new PlainShardIterator(shardIterator.shardId(), ImmutableList.of(shard)); + } + } + return new PlainShardIterator(shardIterator.shardId(), ImmutableList.of()); + } + + @Override + protected void shardOperation(final UpdateRequest request, final ActionListener listener) throws ElasticSearchException { + shardOperation(request, listener, 0); + } + + protected void shardOperation(final UpdateRequest request, final ActionListener listener, final int retryCount) throws ElasticSearchException { + IndexService indexService = indicesService.indexServiceSafe(request.index()); + IndexShard indexShard = indexService.shardSafe(request.shardId()); + + GetResult getResult = indexShard.getService().get(request.type(), request.id(), + new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TimestampFieldMapper.NAME}, true); + + // no doc, what to do, what to do... + if (!getResult.exists()) { + listener.onFailure(new DocumentMissingEngineException(new ShardId(request.index(), request.shardId()), request.type(), request.id())); + return; + } + + if (getResult.internalSourceRef() == null) { + // no source, we can't do nothing, through a failure... + listener.onFailure(new DocumentSourceMissingEngineException(new ShardId(request.index(), request.shardId()), request.type(), request.id())); + return; + } + + Tuple> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef().bytes(), getResult.internalSourceRef().offset(), getResult.internalSourceRef().length(), true); + Map source = sourceAndContent.v2(); + Map ctx = new HashMap(2); + ctx.put("_source", source); + + try { + ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptParams); + script.setNextVar("ctx", ctx); + script.run(); + // we need to unwrap the ctx... + ctx = (Map) script.unwrap(ctx); + } catch (Exception e) { + throw new ElasticSearchIllegalArgumentException("failed to execute script", e); + } + + String operation = (String) ctx.get("op"); + source = (Map) ctx.get("_source"); + + // apply script to update the source + String routing = getResult.fields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).value().toString() : null; + String parent = getResult.fields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).value().toString() : null; + // TODO ttl/timestamp + + // TODO percolate? + + // TODO: external version type, does it make sense here? does not seem like it... + + if (operation == null || "index".equals(operation)) { + IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) + .source(source, sourceAndContent.v1()) + .version(getResult.version()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); + indexRequest.operationThreaded(false); + indexAction.execute(indexRequest, new ActionListener() { + @Override + public void onResponse(IndexResponse response) { + UpdateResponse update = new UpdateResponse(response.index(), response.type(), response.id(), response.version()); + listener.onResponse(update); + } + + @Override + public void onFailure(Throwable e) { + e = ExceptionsHelper.unwrapCause(e); + if (e instanceof VersionConflictEngineException) { + if ((retryCount + 1) < request.retryOnConflict()) { + threadPool.executor(executor()).execute(new Runnable() { + @Override + public void run() { + shardOperation(request, listener, retryCount + 1); + } + }); + return; + } + } + } + }); + } else if ("delete".equals(operation)) { + DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) + .version(getResult.version()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); + deleteRequest.operationThreaded(false); + deleteAction.execute(deleteRequest, new ActionListener() { + @Override + public void onResponse(DeleteResponse response) { + UpdateResponse update = new UpdateResponse(response.index(), response.type(), response.id(), response.version()); + listener.onResponse(update); + } + + @Override + public void onFailure(Throwable e) { + e = ExceptionsHelper.unwrapCause(e); + if (e instanceof VersionConflictEngineException) { + if ((retryCount + 1) < request.retryOnConflict()) { + threadPool.executor(executor()).execute(new Runnable() { + @Override + public void run() { + shardOperation(request, listener, retryCount + 1); + } + }); + return; + } + } + listener.onFailure(e); + } + }); + } else if ("none".equals(operation)) { + listener.onResponse(new UpdateResponse(getResult.index(), getResult.type(), getResult.id(), getResult.version())); + } else { + logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script); + listener.onResponse(new UpdateResponse(getResult.index(), getResult.type(), getResult.id(), getResult.version())); + } + } +} diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java new file mode 100644 index 0000000000000..3e1e7a626352d --- /dev/null +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -0,0 +1,325 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.update; + +import com.google.common.collect.Maps; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.support.replication.ReplicationType; +import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.action.Actions.addValidationError; + +/** + */ +public class UpdateRequest extends InstanceShardOperationRequest { + + private String type; + private String id; + @Nullable + private String routing; + + String script; + @Nullable + String scriptLang; + @Nullable + Map scriptParams; + + int retryOnConflict = 1; + + private ReplicationType replicationType = ReplicationType.DEFAULT; + private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; + + UpdateRequest() { + + } + + public UpdateRequest(String index, String type, String id) { + this.index = index; + this.type = type; + this.id = id; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = super.validate(); + if (type == null) { + validationException = addValidationError("type is missing", validationException); + } + if (id == null) { + validationException = addValidationError("id is missing", validationException); + } + if (script == null) { + validationException = addValidationError("script is missing", validationException); + } + return validationException; + } + + /** + * Sets the index the document will exists on. + */ + public UpdateRequest index(String index) { + this.index = index; + return this; + } + + /** + * The type of the indexed document. + */ + public String type() { + return type; + } + + /** + * Sets the type of the indexed document. + */ + public UpdateRequest type(String type) { + this.type = type; + return this; + } + + /** + * The id of the indexed document. + */ + public String id() { + return id; + } + + /** + * Sets the id of the indexed document. + */ + public UpdateRequest id(String id) { + this.id = id; + return this; + } + + /** + * Controls the shard routing of the request. Using this value to hash the shard + * and not the id. + */ + public UpdateRequest routing(String routing) { + if (routing != null && routing.length() == 0) { + this.routing = null; + } else { + this.routing = routing; + } + return this; + } + + /** + * Sets the parent id of this document. Will simply set the routing to this value, as it is only + * used for routing with delete requests. + */ + public UpdateRequest parent(String parent) { + if (routing == null) { + routing = parent; + } + return this; + } + + /** + * Controls the shard routing of the request. Using this value to hash the shard + * and not the id. + */ + public String routing() { + return this.routing; + } + + int shardId() { + return this.shardId; + } + + /** + * The script to execute. Note, make sure not to send different script each times and instead + * use script params if possible with the same (automatically compiled) script. + */ + public UpdateRequest script(String script) { + this.script = script; + return this; + } + + /** + * The language of the script to execute. + */ + public UpdateRequest scriptLang(String scriptLang) { + this.scriptLang = scriptLang; + return this; + } + + /** + * Add a script parameter. + */ + public UpdateRequest addScriptParam(String name, Object value) { + if (scriptParams == null) { + scriptParams = Maps.newHashMap(); + } + scriptParams.put(name, value); + return this; + } + + /** + * Sets the script parameters to use with the script. + */ + public UpdateRequest scriptParams(Map scriptParams) { + if (this.scriptParams == null) { + this.scriptParams = scriptParams; + } else { + this.scriptParams.putAll(scriptParams); + } + return this; + } + + /** + * The script to execute. Note, make sure not to send different script each times and instead + * use script params if possible with the same (automatically compiled) script. + */ + public UpdateRequest script(String script, @Nullable Map scriptParams) { + this.script = script; + if (this.scriptParams != null) { + this.scriptParams.putAll(scriptParams); + } else { + this.scriptParams = scriptParams; + } + return this; + } + + /** + * The script to execute. Note, make sure not to send different script each times and instead + * use script params if possible with the same (automatically compiled) script. + * + * @param script The script to execute + * @param scriptLang The script language + * @param scriptParams The script parameters + */ + public UpdateRequest script(String script, @Nullable String scriptLang, @Nullable Map scriptParams) { + this.script = script; + this.scriptLang = scriptLang; + if (this.scriptParams != null) { + this.scriptParams.putAll(scriptParams); + } else { + this.scriptParams = scriptParams; + } + return this; + } + + /** + * Sets the number of retries of a version conflict occurs because the document was updated between + * getting it and updating it. Defaults to 1. + */ + public UpdateRequest retryOnConflict(int retryOnConflict) { + this.retryOnConflict = retryOnConflict; + return this; + } + + public int retryOnConflict() { + return this.retryOnConflict; + } + + /** + * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. + */ + public UpdateRequest timeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + + /** + * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. + */ + public UpdateRequest timeout(String timeout) { + return timeout(TimeValue.parseTimeValue(timeout, null)); + } + + /** + * The replication type. + */ + public ReplicationType replicationType() { + return this.replicationType; + } + + /** + * Sets the replication type. + */ + public UpdateRequest replicationType(ReplicationType replicationType) { + this.replicationType = replicationType; + return this; + } + + public WriteConsistencyLevel consistencyLevel() { + return this.consistencyLevel; + } + + /** + * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT} + */ + public UpdateRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) { + this.consistencyLevel = consistencyLevel; + return this; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + replicationType = ReplicationType.fromId(in.readByte()); + consistencyLevel = WriteConsistencyLevel.fromId(in.readByte()); + type = in.readUTF(); + id = in.readUTF(); + if (in.readBoolean()) { + routing = in.readUTF(); + } + script = in.readUTF(); + if (in.readBoolean()) { + scriptLang = in.readUTF(); + } + scriptParams = in.readMap(); + retryOnConflict = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeByte(replicationType.id()); + out.writeByte(consistencyLevel.id()); + out.writeUTF(type); + out.writeUTF(id); + if (routing == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(routing); + } + out.writeUTF(script); + if (scriptLang == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(scriptLang); + } + out.writeMap(scriptParams); + out.writeVInt(retryOnConflict); + } +} diff --git a/src/main/java/org/elasticsearch/action/update/UpdateResponse.java b/src/main/java/org/elasticsearch/action/update/UpdateResponse.java new file mode 100644 index 0000000000000..f4a41dd9d7658 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/update/UpdateResponse.java @@ -0,0 +1,122 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.update; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + */ +public class UpdateResponse implements ActionResponse { + + private String index; + + private String id; + + private String type; + + private long version; + + public UpdateResponse() { + + } + + public UpdateResponse(String index, String type, String id, long version) { + this.index = index; + this.id = id; + this.type = type; + this.version = version; + } + + /** + * The index the document was indexed into. + */ + public String index() { + return this.index; + } + + /** + * The index the document was indexed into. + */ + public String getIndex() { + return index; + } + + /** + * The type of the document indexed. + */ + public String type() { + return this.type; + } + + /** + * The type of the document indexed. + */ + public String getType() { + return type; + } + + /** + * The id of the document indexed. + */ + public String id() { + return this.id; + } + + /** + * The id of the document indexed. + */ + public String getId() { + return id; + } + + /** + * Returns the version of the doc indexed. + */ + public long version() { + return this.version; + } + + /** + * Returns the version of the doc indexed. + */ + public long getVersion() { + return version(); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + index = in.readUTF(); + id = in.readUTF(); + type = in.readUTF(); + version = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(index); + out.writeUTF(id); + out.writeUTF(type); + out.writeLong(version); + } +} diff --git a/src/main/java/org/elasticsearch/client/Client.java b/src/main/java/org/elasticsearch/client/Client.java index 18050ca8aa124..e72ef413c7258 100644 --- a/src/main/java/org/elasticsearch/client/Client.java +++ b/src/main/java/org/elasticsearch/client/Client.java @@ -41,6 +41,8 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.action.bulk.BulkRequestBuilder; import org.elasticsearch.client.action.count.CountRequestBuilder; import org.elasticsearch.client.action.delete.DeleteRequestBuilder; @@ -52,6 +54,7 @@ import org.elasticsearch.client.action.percolate.PercolateRequestBuilder; import org.elasticsearch.client.action.search.SearchRequestBuilder; import org.elasticsearch.client.action.search.SearchScrollRequestBuilder; +import org.elasticsearch.client.action.update.UpdateRequestBuilder; import org.elasticsearch.common.Nullable; /** @@ -64,7 +67,6 @@ *

A client can either be retrieved from a {@link org.elasticsearch.node.Node} started, or connected remotely * to one or more nodes using {@link org.elasticsearch.client.transport.TransportClient}. * - * * @see org.elasticsearch.node.Node#client() * @see org.elasticsearch.client.transport.TransportClient */ @@ -109,6 +111,32 @@ public interface Client { */ IndexRequestBuilder prepareIndex(); + /** + * Updates a document based on a script. + * + * @param request The update request + * @return The result future + */ + ActionFuture update(UpdateRequest request); + + /** + * Updates a document based on a script. + * + * @param request The update request + * @param listener A listener to be notified with a result + */ + void update(UpdateRequest request, ActionListener listener); + + /** + * Updates a document based on a script. + */ + UpdateRequestBuilder prepareUpdate(); + + /** + * Updates a document based on a script. + */ + UpdateRequestBuilder prepareUpdate(String index, String type, String id); + /** * Index a document associated with a given index and type. *

diff --git a/src/main/java/org/elasticsearch/client/action/update/UpdateRequestBuilder.java b/src/main/java/org/elasticsearch/client/action/update/UpdateRequestBuilder.java new file mode 100644 index 0000000000000..dd5c4e3e90de2 --- /dev/null +++ b/src/main/java/org/elasticsearch/client/action/update/UpdateRequestBuilder.java @@ -0,0 +1,157 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.action.update; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.support.replication.ReplicationType; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.action.support.BaseRequestBuilder; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.Map; + +/** + */ +public class UpdateRequestBuilder extends BaseRequestBuilder { + + public UpdateRequestBuilder(Client client, String index, String type, String id) { + super(client, new UpdateRequest(index, type, id)); + } + + /** + * Sets the index the document will exists on. + */ + public UpdateRequestBuilder setIndex(String index) { + request.index(index); + return this; + } + + /** + * Sets the type of the indexed document. + */ + public UpdateRequestBuilder setType(String type) { + request.type(type); + return this; + } + + /** + * Sets the id of the indexed document. + */ + public UpdateRequestBuilder setId(String id) { + request.id(id); + return this; + } + + /** + * Controls the shard routing of the request. Using this value to hash the shard + * and not the id. + */ + public UpdateRequestBuilder setRouting(String routing) { + request.routing(routing); + return this; + } + + public UpdateRequestBuilder setParent(String parent) { + request.parent(parent); + return this; + } + + /** + * The script to execute. Note, make sure not to send different script each times and instead + * use script params if possible with the same (automatically compiled) script. + */ + public UpdateRequestBuilder setScript(String script) { + request.script(script); + return this; + } + + /** + * The language of the script to execute. + */ + public UpdateRequestBuilder setScriptLang(String scriptLang) { + request.scriptLang(scriptLang); + return this; + } + + /** + * Sets the script parameters to use with the script. + */ + public UpdateRequestBuilder setScriptParams(Map scriptParams) { + request.scriptParams(scriptParams); + return this; + } + + /** + * Add a script parameter. + */ + public UpdateRequestBuilder addScriptParam(String name, Object value) { + request.addScriptParam(name, value); + return this; + } + + /** + * Sets the number of retries of a version conflict occurs because the document was updated between + * getting it and updating it. Defaults to 1. + */ + public UpdateRequestBuilder setRetryOnConflict(int retryOnConflict) { + request.retryOnConflict(retryOnConflict); + return this; + } + + /** + * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. + */ + public UpdateRequestBuilder setTimeout(TimeValue timeout) { + request.timeout(timeout); + return this; + } + + /** + * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. + */ + public UpdateRequestBuilder setTimeout(String timeout) { + request.timeout(timeout); + return this; + } + + /** + * Sets the replication type. + */ + public UpdateRequestBuilder setReplicationType(ReplicationType replicationType) { + request.replicationType(replicationType); + return this; + } + + /** + * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT} + */ + public UpdateRequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) { + request.consistencyLevel(consistencyLevel); + return this; + } + + @Override + protected void doExecute(ActionListener listener) { + client.update(request, listener); + } +} diff --git a/src/main/java/org/elasticsearch/client/node/NodeClient.java b/src/main/java/org/elasticsearch/client/node/NodeClient.java index 74812963a95ae..8f52e14e8aba7 100644 --- a/src/main/java/org/elasticsearch/client/node/NodeClient.java +++ b/src/main/java/org/elasticsearch/client/node/NodeClient.java @@ -43,6 +43,9 @@ import org.elasticsearch.action.percolate.PercolateResponse; import org.elasticsearch.action.percolate.TransportPercolateAction; import org.elasticsearch.action.search.*; +import org.elasticsearch.action.update.TransportUpdateAction; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.internal.InternalClient; import org.elasticsearch.client.support.AbstractClient; @@ -61,6 +64,8 @@ public class NodeClient extends AbstractClient implements InternalClient { private final TransportIndexAction indexAction; + private final TransportUpdateAction updateAction; + private final TransportDeleteAction deleteAction; private final TransportBulkAction bulkAction; @@ -83,13 +88,14 @@ public class NodeClient extends AbstractClient implements InternalClient { @Inject public NodeClient(Settings settings, ThreadPool threadPool, NodeAdminClient admin, - TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportBulkAction bulkAction, + TransportIndexAction indexAction, TransportUpdateAction updateAction, TransportDeleteAction deleteAction, TransportBulkAction bulkAction, TransportDeleteByQueryAction deleteByQueryAction, TransportGetAction getAction, TransportMultiGetAction multiGetAction, TransportCountAction countAction, TransportSearchAction searchAction, TransportSearchScrollAction searchScrollAction, TransportMoreLikeThisAction moreLikeThisAction, TransportPercolateAction percolateAction) { this.threadPool = threadPool; this.admin = admin; this.indexAction = indexAction; + this.updateAction = updateAction; this.deleteAction = deleteAction; this.bulkAction = bulkAction; this.deleteByQueryAction = deleteByQueryAction; @@ -127,6 +133,16 @@ public void index(IndexRequest request, ActionListener listener) indexAction.execute(request, listener); } + @Override + public ActionFuture update(UpdateRequest request) { + return updateAction.execute(request); + } + + @Override + public void update(UpdateRequest request, ActionListener listener) { + updateAction.execute(request, listener); + } + @Override public ActionFuture delete(DeleteRequest request) { return deleteAction.execute(request); diff --git a/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 38340d8f36d4f..4012ad515db86 100644 --- a/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -30,6 +30,7 @@ import org.elasticsearch.client.action.percolate.PercolateRequestBuilder; import org.elasticsearch.client.action.search.SearchRequestBuilder; import org.elasticsearch.client.action.search.SearchScrollRequestBuilder; +import org.elasticsearch.client.action.update.UpdateRequestBuilder; import org.elasticsearch.client.internal.InternalClient; import org.elasticsearch.common.Nullable; @@ -53,6 +54,16 @@ public IndexRequestBuilder prepareIndex(String index, String type, @Nullable Str return prepareIndex().setIndex(index).setType(type).setId(id); } + @Override + public UpdateRequestBuilder prepareUpdate() { + return new UpdateRequestBuilder(this, null, null, null); + } + + @Override + public UpdateRequestBuilder prepareUpdate(String index, String type, String id) { + return new UpdateRequestBuilder(this, index, type, id); + } + @Override public DeleteRequestBuilder prepareDelete() { return new DeleteRequestBuilder(this, null); diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 890ad1f3b3859..c13d0d9ff6c01 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -43,6 +43,8 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.client.transport.action.ClientTransportActionModule; @@ -80,8 +82,6 @@ *

*

The transport client important modules used is the {@link org.elasticsearch.transport.TransportModule} which is * started in client mode (only connects, no bind). - * - * */ public class TransportClient extends AbstractClient { @@ -259,6 +259,16 @@ public void index(IndexRequest request, ActionListener listener) internalClient.index(request, listener); } + @Override + public ActionFuture update(UpdateRequest request) { + return internalClient.update(request); + } + + @Override + public void update(UpdateRequest request, ActionListener listener) { + internalClient.update(request, listener); + } + @Override public ActionFuture delete(DeleteRequest request) { return internalClient.delete(request); diff --git a/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java b/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java index a8b7fdac8ee19..1db0d2f5053bc 100644 --- a/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java +++ b/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java @@ -61,6 +61,7 @@ import org.elasticsearch.client.transport.action.percolate.ClientTransportPercolateAction; import org.elasticsearch.client.transport.action.search.ClientTransportSearchAction; import org.elasticsearch.client.transport.action.search.ClientTransportSearchScrollAction; +import org.elasticsearch.client.transport.action.update.ClientTransportUpdateAction; import org.elasticsearch.common.inject.AbstractModule; /** @@ -80,6 +81,7 @@ protected void configure() { bind(ClientTransportSearchScrollAction.class).asEagerSingleton(); bind(ClientTransportBulkAction.class).asEagerSingleton(); bind(ClientTransportPercolateAction.class).asEagerSingleton(); + bind(ClientTransportUpdateAction.class).asEagerSingleton(); bind(ClientTransportIndicesExistsAction.class).asEagerSingleton(); bind(ClientTransportIndicesStatsAction.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/client/transport/action/update/ClientTransportUpdateAction.java b/src/main/java/org/elasticsearch/client/transport/action/update/ClientTransportUpdateAction.java new file mode 100644 index 0000000000000..deec84c62a39e --- /dev/null +++ b/src/main/java/org/elasticsearch/client/transport/action/update/ClientTransportUpdateAction.java @@ -0,0 +1,44 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.transport.action.update; + +import org.elasticsearch.action.TransportActions; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.transport.action.support.BaseClientTransportAction; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.TransportService; + +/** + * + */ +public class ClientTransportUpdateAction extends BaseClientTransportAction { + + @Inject + public ClientTransportUpdateAction(Settings settings, TransportService transportService) { + super(settings, transportService, UpdateResponse.class); + } + + @Override + protected String action() { + return TransportActions.UPDATE; + } +} diff --git a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java index 6f9370c4c61e4..526e1bbe3e5dc 100644 --- a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java @@ -42,6 +42,8 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.internal.InternalClient; import org.elasticsearch.client.support.AbstractClient; @@ -57,6 +59,7 @@ import org.elasticsearch.client.transport.action.percolate.ClientTransportPercolateAction; import org.elasticsearch.client.transport.action.search.ClientTransportSearchAction; import org.elasticsearch.client.transport.action.search.ClientTransportSearchScrollAction; +import org.elasticsearch.client.transport.action.update.ClientTransportUpdateAction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -75,6 +78,8 @@ public class InternalTransportClient extends AbstractClient implements InternalC private final ClientTransportIndexAction indexAction; + private final ClientTransportUpdateAction updateAction; + private final ClientTransportDeleteAction deleteAction; private final ClientTransportBulkAction bulkAction; @@ -98,7 +103,7 @@ public class InternalTransportClient extends AbstractClient implements InternalC @Inject public InternalTransportClient(Settings settings, ThreadPool threadPool, TransportClientNodesService nodesService, InternalTransportAdminClient adminClient, - ClientTransportIndexAction indexAction, ClientTransportDeleteAction deleteAction, ClientTransportBulkAction bulkAction, ClientTransportGetAction getAction, ClientTransportMultiGetAction multiGetAction, + ClientTransportIndexAction indexAction, ClientTransportUpdateAction updateAction, ClientTransportDeleteAction deleteAction, ClientTransportBulkAction bulkAction, ClientTransportGetAction getAction, ClientTransportMultiGetAction multiGetAction, ClientTransportDeleteByQueryAction deleteByQueryAction, ClientTransportCountAction countAction, ClientTransportSearchAction searchAction, ClientTransportSearchScrollAction searchScrollAction, ClientTransportMoreLikeThisAction moreLikeThisAction, ClientTransportPercolateAction percolateAction) { @@ -107,6 +112,7 @@ public InternalTransportClient(Settings settings, ThreadPool threadPool, this.adminClient = adminClient; this.indexAction = indexAction; + this.updateAction = updateAction; this.deleteAction = deleteAction; this.bulkAction = bulkAction; this.getAction = getAction; @@ -154,6 +160,26 @@ public void doWithNode(DiscoveryNode node, ActionListener listene }, listener); } + @Override + public ActionFuture update(final UpdateRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override + public ActionFuture doWithNode(DiscoveryNode node) throws ElasticSearchException { + return updateAction.execute(node, request); + } + }); + } + + @Override + public void update(final UpdateRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeListenerCallback() { + @Override + public void doWithNode(DiscoveryNode node, ActionListener listener) throws ElasticSearchException { + updateAction.execute(node, request, listener); + } + }, listener); + } + @Override public ActionFuture delete(final DeleteRequest request) { return nodesService.execute(new TransportClientNodesService.NodeCallback>() { diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java index d0babc944b7f3..7d6e38bf782f5 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java @@ -342,7 +342,7 @@ public CompressedString source() { * Converts the serialized compressed form of the mappings into a parsed map. */ public Map sourceAsMap() throws IOException { - Map mapping = XContentHelper.convertToMap(source.compressed(), 0, source.compressed().length).v2(); + Map mapping = XContentHelper.convertToMap(source.compressed(), 0, source.compressed().length, true).v2(); if (mapping.size() == 1 && mapping.containsKey(type())) { // the type name is the root value, reduce it mapping = (Map) mapping.get(type()); diff --git a/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 1aacc1c787ace..66f2a114b9d44 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -19,9 +19,12 @@ package org.elasticsearch.common.io.stream; +import org.elasticsearch.common.Nullable; + import java.io.IOException; import java.io.InputStream; import java.io.UTFDataFormatException; +import java.util.*; /** * @@ -249,4 +252,65 @@ public final boolean readBoolean() throws IOException { // readBytes(b, off, len); // return len; // } + + public + @Nullable + Map readMap() throws IOException { + return (Map) readFieldValue(); + } + + @SuppressWarnings({"unchecked"}) + private + @Nullable + Object readFieldValue() throws IOException { + byte type = readByte(); + if (type == -1) { + return null; + } else if (type == 0) { + return readUTF(); + } else if (type == 1) { + return readInt(); + } else if (type == 2) { + return readLong(); + } else if (type == 3) { + return readFloat(); + } else if (type == 4) { + return readDouble(); + } else if (type == 5) { + return readBoolean(); + } else if (type == 6) { + int bytesSize = readVInt(); + byte[] value = new byte[bytesSize]; + readFully(value); + return value; + } else if (type == 7) { + int size = readVInt(); + List list = new ArrayList(size); + for (int i = 0; i < size; i++) { + list.add(readFieldValue()); + } + return list; + } else if (type == 8) { + int size = readVInt(); + Object[] list = new Object[size]; + for (int i = 0; i < size; i++) { + list[i] = readFieldValue(); + } + return list; + } else if (type == 9 || type == 10) { + int size = readVInt(); + Map map; + if (type == 9) { + map = new LinkedHashMap(size); + } else { + map = new HashMap(size); + } + for (int i = 0; i < size; i++) { + map.put(readUTF(), readFieldValue()); + } + return map; + } else { + throw new IOException("Can't read unknown type [" + type + "]"); + } + } } diff --git a/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 31705e08478b1..704ab40f469d7 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -19,8 +19,13 @@ package org.elasticsearch.common.io.stream; +import org.elasticsearch.common.Nullable; + import java.io.IOException; import java.io.OutputStream; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; /** * @@ -209,4 +214,67 @@ public void write(int b) throws IOException { public void write(byte[] b, int off, int len) throws IOException { writeBytes(b, off, len); } + + public void writeMap(@Nullable Map map) throws IOException { + writeValue(map); + } + + private void writeValue(@Nullable Object value) throws IOException { + if (value == null) { + writeByte((byte) -1); + return; + } + Class type = value.getClass(); + if (type == String.class) { + writeByte((byte) 0); + writeUTF((String) value); + } else if (type == Integer.class) { + writeByte((byte) 1); + writeInt((Integer) value); + } else if (type == Long.class) { + writeByte((byte) 2); + writeLong((Long) value); + } else if (type == Float.class) { + writeByte((byte) 3); + writeFloat((Float) value); + } else if (type == Double.class) { + writeByte((byte) 4); + writeDouble((Double) value); + } else if (type == Boolean.class) { + writeByte((byte) 5); + writeBoolean((Boolean) value); + } else if (type == byte[].class) { + writeByte((byte) 6); + writeVInt(((byte[]) value).length); + writeBytes(((byte[]) value)); + } else if (value instanceof List) { + writeByte((byte) 7); + List list = (List) value; + writeVInt(list.size()); + for (Object o : list) { + writeValue(o); + } + } else if (value instanceof Object[]) { + writeByte((byte) 8); + Object[] list = (Object[]) value; + writeVInt(list.length); + for (Object o : list) { + writeValue(o); + } + } else if (value instanceof Map) { + if (value instanceof LinkedHashMap) { + writeByte((byte) 9); + } else { + writeByte((byte) 10); + } + Map map = (Map) value; + writeVInt(map.size()); + for (Map.Entry entry : map.entrySet()) { + writeUTF(entry.getKey()); + writeValue(entry.getValue()); + } + } else { + throw new IOException("Can't write type [" + type + "]"); + } + } } diff --git a/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java b/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java index d48520791a749..75b70983afd1a 100644 --- a/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java +++ b/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java @@ -50,17 +50,24 @@ public static XContentParser createParser(byte[] data, int offset, int length) t } } - public static Tuple> convertToMap(byte[] data, int offset, int length) throws ElasticSearchParseException { + public static Tuple> convertToMap(byte[] data, int offset, int length, boolean ordered) throws ElasticSearchParseException { try { + XContentParser parser; + XContentType contentType; if (LZF.isCompressed(data, offset, length)) { BytesStreamInput siBytes = new BytesStreamInput(data, offset, length); LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); - XContentType contentType = XContentFactory.xContentType(siLzf); + contentType = XContentFactory.xContentType(siLzf); siLzf.resetToBufferStart(); - return Tuple.create(contentType, XContentFactory.xContent(contentType).createParser(siLzf).mapAndClose()); + parser = XContentFactory.xContent(contentType).createParser(siLzf); } else { - XContentType contentType = XContentFactory.xContentType(data, offset, length); - return Tuple.create(contentType, XContentFactory.xContent(contentType).createParser(data, offset, length).mapAndClose()); + contentType = XContentFactory.xContentType(data, offset, length); + parser = XContentFactory.xContent(contentType).createParser(data, offset, length); + } + if (ordered) { + return Tuple.create(contentType, parser.mapOrderedAndClose()); + } else { + return Tuple.create(contentType, parser.mapAndClose()); } } catch (IOException e) { throw new ElasticSearchParseException("Failed to parse content to map", e); diff --git a/src/main/java/org/elasticsearch/common/xcontent/XContentParser.java b/src/main/java/org/elasticsearch/common/xcontent/XContentParser.java index 227855f4ca02a..95e326abf8907 100644 --- a/src/main/java/org/elasticsearch/common/xcontent/XContentParser.java +++ b/src/main/java/org/elasticsearch/common/xcontent/XContentParser.java @@ -122,6 +122,8 @@ enum NumberType { Map mapAndClose() throws IOException; + Map mapOrderedAndClose() throws IOException; + String text() throws IOException; String textOrNull() throws IOException; diff --git a/src/main/java/org/elasticsearch/common/xcontent/support/AbstractXContentParser.java b/src/main/java/org/elasticsearch/common/xcontent/support/AbstractXContentParser.java index 300fbf20f786f..080df1c060fd3 100644 --- a/src/main/java/org/elasticsearch/common/xcontent/support/AbstractXContentParser.java +++ b/src/main/java/org/elasticsearch/common/xcontent/support/AbstractXContentParser.java @@ -124,4 +124,13 @@ public Map mapAndClose() throws IOException { close(); } } + + @Override + public Map mapOrderedAndClose() throws IOException { + try { + return mapOrdered(); + } finally { + close(); + } + } } diff --git a/src/main/java/org/elasticsearch/index/engine/DocumentMIssingEngineException.java b/src/main/java/org/elasticsearch/index/engine/DocumentMIssingEngineException.java new file mode 100644 index 0000000000000..1bd3dda7b701a --- /dev/null +++ b/src/main/java/org/elasticsearch/index/engine/DocumentMIssingEngineException.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; + +/** + * + */ +public class DocumentMissingEngineException extends EngineException { + + public DocumentMissingEngineException(ShardId shardId, String type, String id) { + super(shardId, "[" + type + "][" + id + "]: document missing"); + } + + @Override + public RestStatus status() { + return RestStatus.CONFLICT; + } +} diff --git a/src/main/java/org/elasticsearch/index/engine/DocumentSourceMissingEngineException.java b/src/main/java/org/elasticsearch/index/engine/DocumentSourceMissingEngineException.java new file mode 100644 index 0000000000000..a918632eefd71 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/engine/DocumentSourceMissingEngineException.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; + +/** + * + */ +public class DocumentSourceMissingEngineException extends EngineException { + + public DocumentSourceMissingEngineException(ShardId shardId, String type, String id) { + super(shardId, "[" + type + "][" + id + "]: document source missing"); + } + + @Override + public RestStatus status() { + return RestStatus.BAD_REQUEST; + } +} diff --git a/src/main/java/org/elasticsearch/index/get/GetResult.java b/src/main/java/org/elasticsearch/index/get/GetResult.java index 028381d0f60ae..ef15d4089beb8 100644 --- a/src/main/java/org/elasticsearch/index/get/GetResult.java +++ b/src/main/java/org/elasticsearch/index/get/GetResult.java @@ -179,6 +179,13 @@ public BytesHolder sourceRef() { return this.source; } + /** + * Internal source representation, might be compressed.... + */ + public BytesHolder internalSourceRef() { + return source; + } + /** * Is the source empty (not available) or not. */ diff --git a/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java b/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java index b5596fc6bdd1e..34ac7473b862d 100644 --- a/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java +++ b/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java @@ -230,7 +230,7 @@ protected Field parseCreateField(ParseContext context) throws IOException { if (filtered) { // we don't update the context source if we filter, we want to keep it as is... - Tuple> mapTuple = XContentHelper.convertToMap(data, dataOffset, dataLength); + Tuple> mapTuple = XContentHelper.convertToMap(data, dataOffset, dataLength, true); Map filteredSource = XContentMapValues.filter(mapTuple.v2(), includes, excludes); CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); StreamOutput streamOutput; diff --git a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java index b007b1738a5e9..c90fadf63cca4 100644 --- a/src/main/java/org/elasticsearch/rest/action/RestActionModule.java +++ b/src/main/java/org/elasticsearch/rest/action/RestActionModule.java @@ -71,6 +71,7 @@ import org.elasticsearch.rest.action.percolate.RestPercolateAction; import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.rest.action.search.RestSearchScrollAction; +import org.elasticsearch.rest.action.update.RestUpdateAction; import java.util.List; @@ -137,24 +138,20 @@ protected void configure() { bind(RestClearIndicesCacheAction.class).asEagerSingleton(); bind(RestIndexAction.class).asEagerSingleton(); - bind(RestGetAction.class).asEagerSingleton(); bind(RestMultiGetAction.class).asEagerSingleton(); - bind(RestDeleteAction.class).asEagerSingleton(); - bind(RestDeleteByQueryAction.class).asEagerSingleton(); - bind(RestCountAction.class).asEagerSingleton(); bind(RestBulkAction.class).asEagerSingleton(); + bind(RestUpdateAction.class).asEagerSingleton(); + bind(RestPercolateAction.class).asEagerSingleton(); bind(RestSearchAction.class).asEagerSingleton(); bind(RestSearchScrollAction.class).asEagerSingleton(); - + bind(RestValidateQueryAction.class).asEagerSingleton(); bind(RestMoreLikeThisAction.class).asEagerSingleton(); - - bind(RestPercolateAction.class).asEagerSingleton(); } } diff --git a/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java b/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java new file mode 100644 index 0000000000000..e6bef6050b161 --- /dev/null +++ b/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java @@ -0,0 +1,146 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.update; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.support.replication.ReplicationType; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.RestXContentBuilder; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.rest.RestStatus.CREATED; +import static org.elasticsearch.rest.RestStatus.OK; + +/** + */ +public class RestUpdateAction extends BaseRestHandler { + + @Inject + public RestUpdateAction(Settings settings, Client client, RestController controller) { + super(settings, client); + controller.registerHandler(POST, "/{index}/{type}/{id}/_update", this); + } + + @Override + public void handleRequest(final RestRequest request, final RestChannel channel) { + UpdateRequest updateRequest = new UpdateRequest(request.param("index"), request.param("type"), request.param("id")); + updateRequest.routing(request.param("routing")); + updateRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing + updateRequest.timeout(request.paramAsTime("timeout", updateRequest.timeout())); + String replicationType = request.param("replication"); + if (replicationType != null) { + updateRequest.replicationType(ReplicationType.fromString(replicationType)); + } + String consistencyLevel = request.param("consistency"); + if (consistencyLevel != null) { + updateRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); + } + // we just send a response, no need to fork + updateRequest.listenerThreaded(false); + updateRequest.script(request.param("script")); + updateRequest.scriptLang(request.param("lang")); + for (Map.Entry entry : request.params().entrySet()) { + if (entry.getKey().startsWith("sp_")) { + updateRequest.addScriptParam(entry.getKey().substring(3), entry.getValue()); + } + } + updateRequest.retryOnConflict(request.paramAsInt("retry_on_conflict", updateRequest.retryOnConflict())); + + // see if we have it in the body + if (request.hasContent()) { + XContentType xContentType = XContentFactory.xContentType(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()); + if (xContentType != null) { + try { + Map content = XContentFactory.xContent(xContentType) + .createParser(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()).mapAndClose(); + if (content.containsKey("script")) { + updateRequest.script(content.get("script").toString()); + } + if (content.containsKey("lang")) { + updateRequest.scriptLang(content.get("lang").toString()); + } + if (content.containsKey("params")) { + updateRequest.scriptParams((Map) content.get("params")); + } + } catch (Exception e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.warn("Failed to send response", e1); + } + return; + } + } + } + + client.update(updateRequest, new ActionListener() { + @Override + public void onResponse(UpdateResponse response) { + try { + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); + builder.startObject() + .field(Fields.OK, true) + .field(Fields._INDEX, response.index()) + .field(Fields._TYPE, response.type()) + .field(Fields._ID, response.id()) + .field(Fields._VERSION, response.version()); + builder.endObject(); + RestStatus status = OK; + if (response.version() == 1) { + status = CREATED; + } + channel.sendResponse(new XContentRestResponse(request, status, builder)); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } + + static final class Fields { + static final XContentBuilderString OK = new XContentBuilderString("ok"); + static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); + static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString _ID = new XContentBuilderString("_id"); + static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); + } +} diff --git a/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java b/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java index 125cc6a614927..54e4637887707 100644 --- a/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java +++ b/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java @@ -76,7 +76,7 @@ private Map loadSourceIfNeeded() { } public static Map sourceAsMap(byte[] bytes, int offset, int length) throws ElasticSearchParseException { - return XContentHelper.convertToMap(bytes, offset, length).v2(); + return XContentHelper.convertToMap(bytes, offset, length, false).v2(); } public void setNextReader(IndexReader reader) { diff --git a/src/test/java/org/elasticsearch/test/integration/client/transport/DiscoveryTransportClientTests.yml b/src/test/java/org/elasticsearch/test/integration/client/transport/DiscoveryTransportClientTests.yml deleted file mode 100644 index 641d322a2b3aa..0000000000000 --- a/src/test/java/org/elasticsearch/test/integration/client/transport/DiscoveryTransportClientTests.yml +++ /dev/null @@ -1,6 +0,0 @@ -cluster: - routing: - schedule: 100ms -index: - number_of_shards: 5 - number_of_replicas: 1 diff --git a/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientDocumentActionsTests.java b/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientDocumentActionsTests.java index bed87ea860e0d..e4a4841a0064b 100644 --- a/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientDocumentActionsTests.java +++ b/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientDocumentActionsTests.java @@ -22,6 +22,8 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.network.NetworkUtils; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.test.integration.document.DocumentActionsTests; @@ -38,6 +40,7 @@ public class TransportClientDocumentActionsTests extends DocumentActionsTests { protected Client getClient1() { TransportAddress server1Address = ((InternalNode) node("server1")).injector().getInstance(TransportService.class).boundAddress().publishAddress(); TransportClient client = new TransportClient(settingsBuilder() + .put(nodeSettings()) .put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName()) .put("client.transport.sniff", false).build()); client.addTransportAddress(server1Address); @@ -48,9 +51,15 @@ protected Client getClient1() { protected Client getClient2() { TransportAddress server2Address = ((InternalNode) node("server2")).injector().getInstance(TransportService.class).boundAddress().publishAddress(); TransportClient client = new TransportClient(settingsBuilder() + .put(nodeSettings()) .put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName()) .put("client.transport.sniff", false).build()); client.addTransportAddress(server2Address); return client; } + + @Override + protected Settings nodeSettings() { + return ImmutableSettings.settingsBuilder().put("client.transport.nodes_sampler_interval", "30s").build(); + } } diff --git a/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientDocumentActionsTests.yml b/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientDocumentActionsTests.yml deleted file mode 100644 index f137ce572b97a..0000000000000 --- a/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientDocumentActionsTests.yml +++ /dev/null @@ -1,12 +0,0 @@ -cluster: - routing: - schedule: 100ms -index: - number_of_shards: 5 - number_of_replicas: 1 - -# use large interval node sampler -client: - transport: - nodes_sampler_interval: 30s - diff --git a/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientMoreLikeThisActionTests.java b/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientMoreLikeThisActionTests.java deleted file mode 100644 index a81140983abe0..0000000000000 --- a/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientMoreLikeThisActionTests.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to ElasticSearch and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. ElasticSearch licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.test.integration.client.transport; - -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.network.NetworkUtils; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.node.internal.InternalNode; -import org.elasticsearch.test.integration.document.MoreLikeThisActionTests; -import org.elasticsearch.transport.TransportService; - -import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; - -/** - * - */ -public class TransportClientMoreLikeThisActionTests extends MoreLikeThisActionTests { - - @Override - protected Client getClient1() { - TransportAddress server1Address = ((InternalNode) node("server1")).injector().getInstance(TransportService.class).boundAddress().publishAddress(); - TransportClient client = new TransportClient(settingsBuilder() - .put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName()) - .put("discovery.enabled", false).build()); - client.addTransportAddress(server1Address); - return client; - } - - @Override - protected Client getClient2() { - TransportAddress server1Address = ((InternalNode) node("server2")).injector().getInstance(TransportService.class).boundAddress().publishAddress(); - TransportClient client = new TransportClient(settingsBuilder() - .put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName()) - .put("discovery.enabled", false).build()); - client.addTransportAddress(server1Address); - return client; - } -} diff --git a/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientMoreLikeThisActionTests.yml b/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientMoreLikeThisActionTests.yml deleted file mode 100644 index 641d322a2b3aa..0000000000000 --- a/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientMoreLikeThisActionTests.yml +++ /dev/null @@ -1,6 +0,0 @@ -cluster: - routing: - schedule: 100ms -index: - number_of_shards: 5 - number_of_replicas: 1 diff --git a/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientSniffDocumentActionsTests.java b/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientSniffDocumentActionsTests.java index 0afba385fdb3f..326ddf040bdc7 100644 --- a/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientSniffDocumentActionsTests.java +++ b/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientSniffDocumentActionsTests.java @@ -22,6 +22,8 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.network.NetworkUtils; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.test.integration.document.DocumentActionsTests; @@ -38,6 +40,7 @@ public class TransportClientSniffDocumentActionsTests extends DocumentActionsTes protected Client getClient1() { TransportAddress server1Address = ((InternalNode) node("server1")).injector().getInstance(TransportService.class).boundAddress().publishAddress(); TransportClient client = new TransportClient(settingsBuilder() + .put(nodeSettings()) .put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName()) .put("client.transport.sniff", true).build()); client.addTransportAddress(server1Address); @@ -48,9 +51,15 @@ protected Client getClient1() { protected Client getClient2() { TransportAddress server2Address = ((InternalNode) node("server2")).injector().getInstance(TransportService.class).boundAddress().publishAddress(); TransportClient client = new TransportClient(settingsBuilder() + .put(nodeSettings()) .put("cluster.name", "test-cluster-" + NetworkUtils.getLocalAddress().getHostName()) .put("client.transport.sniff", true).build()); client.addTransportAddress(server2Address); return client; } + + @Override + protected Settings nodeSettings() { + return ImmutableSettings.settingsBuilder().put("client.transport.nodes_sampler_interval", "30s").build(); + } } diff --git a/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientSniffDocumentActionsTests.yml b/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientSniffDocumentActionsTests.yml deleted file mode 100644 index f137ce572b97a..0000000000000 --- a/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientSniffDocumentActionsTests.yml +++ /dev/null @@ -1,12 +0,0 @@ -cluster: - routing: - schedule: 100ms -index: - number_of_shards: 5 - number_of_replicas: 1 - -# use large interval node sampler -client: - transport: - nodes_sampler_interval: 30s - diff --git a/src/test/java/org/elasticsearch/test/integration/document/AliasedIndexDocumentActionsTests.java b/src/test/java/org/elasticsearch/test/integration/document/AliasedIndexDocumentActionsTests.java index 1091f4bc3a586..8a01e365a1f80 100644 --- a/src/test/java/org/elasticsearch/test/integration/document/AliasedIndexDocumentActionsTests.java +++ b/src/test/java/org/elasticsearch/test/integration/document/AliasedIndexDocumentActionsTests.java @@ -19,6 +19,9 @@ package org.elasticsearch.test.integration.document; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; + import static org.elasticsearch.client.Requests.createIndexRequest; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; @@ -42,4 +45,9 @@ protected void createIndex() { protected String getConcreteIndexName() { return "test1"; } + + @Override + protected Settings nodeSettings() { + return ImmutableSettings.settingsBuilder().put("action.auto_create_index", false).build(); + } } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/test/integration/document/AliasedIndexDocumentActionsTests.yml b/src/test/java/org/elasticsearch/test/integration/document/AliasedIndexDocumentActionsTests.yml deleted file mode 100644 index 91495fb3cf631..0000000000000 --- a/src/test/java/org/elasticsearch/test/integration/document/AliasedIndexDocumentActionsTests.yml +++ /dev/null @@ -1,8 +0,0 @@ -cluster: - routing: - schedule: 100ms -index: - number_of_shards: 5 - number_of_replicas: 1 -action: - auto_create_index: false \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.java b/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.java index a81dc19f5a012..d1411bc4e1bfd 100644 --- a/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.java +++ b/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.java @@ -34,10 +34,14 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; import org.elasticsearch.action.support.replication.ReplicationType; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.Unicode; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.engine.DocumentMissingEngineException; import org.elasticsearch.test.integration.AbstractNodesTests; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -62,8 +66,8 @@ public class DocumentActionsTests extends AbstractNodesTests { @BeforeClass public void startNodes() { - startNode("server1"); - startNode("server2"); + startNode("server1", nodeSettings()); + startNode("server2", nodeSettings()); client1 = getClient1(); client2 = getClient2(); @@ -87,6 +91,10 @@ protected void createIndex() { client1.admin().indices().create(createIndexRequest("test")).actionGet(); } + protected Settings nodeSettings() { + return ImmutableSettings.Builder.EMPTY_SETTINGS; + } + protected String getConcreteIndexName() { return "test"; } @@ -261,6 +269,57 @@ public void testIndexActions() throws Exception { } } + @Test + public void testUpdate() throws Exception { + createIndex(); + ClusterHealthResponse clusterHealth = client1.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + + try { + client1.prepareUpdate("test", "type1", "1").setScript("ctx._source.field++").execute().actionGet(); + assert false; + } catch (DocumentMissingEngineException e) { + // all is well + } + + client1.prepareIndex("test", "type1", "1").setSource("field", 1).execute().actionGet(); + + UpdateResponse updateResponse = client1.prepareUpdate("test", "type1", "1").setScript("ctx._source.field += 1").execute().actionGet(); + assertThat(updateResponse.version(), equalTo(2L)); + + for (int i = 0; i < 5; i++) { + GetResponse getResponse = client1.prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("2")); + } + + updateResponse = client1.prepareUpdate("test", "type1", "1").setScript("ctx._source.field += count").addScriptParam("count", 3).execute().actionGet(); + assertThat(updateResponse.version(), equalTo(3L)); + + for (int i = 0; i < 5; i++) { + GetResponse getResponse = client1.prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("5")); + } + + // check noop + updateResponse = client1.prepareUpdate("test", "type1", "1").setScript("ctx.op = 'none'").execute().actionGet(); + assertThat(updateResponse.version(), equalTo(3L)); + + for (int i = 0; i < 5; i++) { + GetResponse getResponse = client1.prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("5")); + } + + // check delete + updateResponse = client1.prepareUpdate("test", "type1", "1").setScript("ctx.op = 'delete'").execute().actionGet(); + assertThat(updateResponse.version(), equalTo(4L)); + + for (int i = 0; i < 5; i++) { + GetResponse getResponse = client1.prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(getResponse.exists(), equalTo(false)); + } + } + @Test public void testBulk() throws Exception { createIndex(); diff --git a/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.yml b/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.yml deleted file mode 100644 index 641d322a2b3aa..0000000000000 --- a/src/test/java/org/elasticsearch/test/integration/document/DocumentActionsTests.yml +++ /dev/null @@ -1,6 +0,0 @@ -cluster: - routing: - schedule: 100ms -index: - number_of_shards: 5 - number_of_replicas: 1 diff --git a/src/test/java/org/elasticsearch/test/integration/document/LocalDocumentActionsTests.java b/src/test/java/org/elasticsearch/test/integration/document/LocalDocumentActionsTests.java index ac14ad31c7063..a16abe24e6dd3 100644 --- a/src/test/java/org/elasticsearch/test/integration/document/LocalDocumentActionsTests.java +++ b/src/test/java/org/elasticsearch/test/integration/document/LocalDocumentActionsTests.java @@ -19,8 +19,16 @@ package org.elasticsearch.test.integration.document; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; + /** * */ public class LocalDocumentActionsTests extends DocumentActionsTests { + + @Override + protected Settings nodeSettings() { + return ImmutableSettings.settingsBuilder().put("node.local", true).build(); + } } diff --git a/src/test/java/org/elasticsearch/test/integration/document/LocalDocumentActionsTests.yml b/src/test/java/org/elasticsearch/test/integration/document/LocalDocumentActionsTests.yml deleted file mode 100644 index 8d48736336845..0000000000000 --- a/src/test/java/org/elasticsearch/test/integration/document/LocalDocumentActionsTests.yml +++ /dev/null @@ -1,8 +0,0 @@ -node: - local: true -cluster: - routing: - schedule: 100ms -index: - number_of_shards: 5 - number_of_replicas: 1 diff --git a/src/test/java/org/elasticsearch/test/integration/document/MoreLikeThisActionTests.yml b/src/test/java/org/elasticsearch/test/integration/document/MoreLikeThisActionTests.yml deleted file mode 100644 index 641d322a2b3aa..0000000000000 --- a/src/test/java/org/elasticsearch/test/integration/document/MoreLikeThisActionTests.yml +++ /dev/null @@ -1,6 +0,0 @@ -cluster: - routing: - schedule: 100ms -index: - number_of_shards: 5 - number_of_replicas: 1 diff --git a/src/test/java/org/elasticsearch/test/integration/document/GetActionTests.java b/src/test/java/org/elasticsearch/test/integration/get/GetActionTests.java similarity index 99% rename from src/test/java/org/elasticsearch/test/integration/document/GetActionTests.java rename to src/test/java/org/elasticsearch/test/integration/get/GetActionTests.java index 874066135a229..15c3237698b37 100644 --- a/src/test/java/org/elasticsearch/test/integration/document/GetActionTests.java +++ b/src/test/java/org/elasticsearch/test/integration/get/GetActionTests.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.test.integration.document; +package org.elasticsearch.test.integration.get; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; diff --git a/src/test/java/org/elasticsearch/test/integration/document/MoreLikeThisActionTests.java b/src/test/java/org/elasticsearch/test/integration/mlt/MoreLikeThisActionTests.java similarity index 99% rename from src/test/java/org/elasticsearch/test/integration/document/MoreLikeThisActionTests.java rename to src/test/java/org/elasticsearch/test/integration/mlt/MoreLikeThisActionTests.java index 64950fd5c5b17..1204c3a143987 100644 --- a/src/test/java/org/elasticsearch/test/integration/document/MoreLikeThisActionTests.java +++ b/src/test/java/org/elasticsearch/test/integration/mlt/MoreLikeThisActionTests.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.test.integration.document; +package org.elasticsearch.test.integration.mlt; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;