Skip to content

Commit

Permalink
Update by query API (#3597)
Browse files Browse the repository at this point in the history
  • Loading branch information
russcam authored Mar 15, 2019
1 parent 25226cc commit afa8a7c
Show file tree
Hide file tree
Showing 13 changed files with 297 additions and 2 deletions.
1 change: 0 additions & 1 deletion src/CodeGeneration/ApiGenerator/ApiGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public class ApiGenerator
"xpack.ml.delete_forecast.json",
"xpack.ml.find_file_structure.json",
"delete_by_query_rethrottle.json",
"update_by_query_rethrottle.json",

"xpack.ml.update_filter.json",
"xpack.security.delete_privileges.json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2230,6 +2230,13 @@ public partial class UpdateByQueryRequestParameters : RequestParameters<UpdateBy
///<summary>The number of slices this task should be divided into. Defaults to 1 meaning the task isn't sliced into subtasks.</summary>
public long? Slices { get => Q<long?>("slices"); set => Q("slices", value); }
}
///<summary>Request options for UpdateByQueryRethrottle<pre>https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html</pre></summary>
public partial class UpdateByQueryRethrottleRequestParameters : RequestParameters<UpdateByQueryRethrottleRequestParameters>
{
public override HttpMethod DefaultHttpMethod => HttpMethod.POST;
///<summary>The throttle to set on this request in floating sub-requests per second. -1 means set no throttle.</summary>
public long? RequestsPerSecond { get => Q<long?>("requests_per_second"); set => Q("requests_per_second", value); }
}
///<summary>Request options for CcrDeleteAutoFollowPattern<pre>https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-delete-auto-follow-pattern.html</pre></summary>
public partial class DeleteAutoFollowPatternRequestParameters : RequestParameters<DeleteAutoFollowPatternRequestParameters>
{
Expand Down
10 changes: 10 additions & 0 deletions src/Elasticsearch.Net/ElasticLowLevelClient.Generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2930,6 +2930,16 @@ public TResponse UpdateByQuery<TResponse>(string index, string type, PostData bo
///<param name="requestParameters">A func that allows you to describe the querystring parameters &amp; request specific connection settings.</param>
public Task<TResponse> UpdateByQueryAsync<TResponse>(string index, string type, PostData body, UpdateByQueryRequestParameters requestParameters = null, CancellationToken ctx = default(CancellationToken))
where TResponse : class, IElasticsearchResponse, new() => this.DoRequestAsync<TResponse>(POST, Url($"{index.NotNull("index")}/{type.NotNull("type")}/_update_by_query"), ctx, body, _params(requestParameters));
///<summary>POST on /_update_by_query/{task_id}/_rethrottle <para>https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html</para></summary>
///<param name="task_id">The task id to rethrottle</param>
///<param name="requestParameters">A func that allows you to describe the querystring parameters &amp; request specific connection settings.</param>
public TResponse UpdateByQueryRethrottle<TResponse>(string task_id, UpdateByQueryRethrottleRequestParameters requestParameters = null)
where TResponse : class, IElasticsearchResponse, new() => this.DoRequest<TResponse>(POST, Url($"_update_by_query/{task_id.NotNull("task_id")}/_rethrottle"), null, _params(requestParameters));
///<summary>POST on /_update_by_query/{task_id}/_rethrottle <para>https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html</para></summary>
///<param name="task_id">The task id to rethrottle</param>
///<param name="requestParameters">A func that allows you to describe the querystring parameters &amp; request specific connection settings.</param>
public Task<TResponse> UpdateByQueryRethrottleAsync<TResponse>(string task_id, UpdateByQueryRethrottleRequestParameters requestParameters = null, CancellationToken ctx = default(CancellationToken))
where TResponse : class, IElasticsearchResponse, new() => this.DoRequestAsync<TResponse>(POST, Url($"_update_by_query/{task_id.NotNull("task_id")}/_rethrottle"), ctx, null, _params(requestParameters));
///<summary>DELETE on /_ccr/auto_follow/{name} <para>https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-delete-auto-follow-pattern.html</para></summary>
///<param name="name">The name of the auto follow pattern.</param>
///<param name="requestParameters">A func that allows you to describe the querystring parameters &amp; request specific connection settings.</param>
Expand Down
8 changes: 8 additions & 0 deletions src/Elasticsearch.Net/IElasticLowLevelClient.Generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2376,6 +2376,14 @@ public partial interface IElasticLowLevelClient
///<param name="body">The search definition using the Query DSL</param>
///<param name="requestParameters">A func that allows you to describe the querystring parameters &amp; request specific connection settings.</param>
Task<TResponse> UpdateByQueryAsync<TResponse>(string index, string type, PostData body, UpdateByQueryRequestParameters requestParameters = null, CancellationToken ctx = default(CancellationToken)) where TResponse : class, IElasticsearchResponse, new();
///<summary>POST on /_update_by_query/{task_id}/_rethrottle <para>https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html</para></summary>
///<param name="task_id">The task id to rethrottle</param>
///<param name="requestParameters">A func that allows you to describe the querystring parameters &amp; request specific connection settings.</param>
TResponse UpdateByQueryRethrottle<TResponse>(string task_id, UpdateByQueryRethrottleRequestParameters requestParameters = null) where TResponse : class, IElasticsearchResponse, new();
///<summary>POST on /_update_by_query/{task_id}/_rethrottle <para>https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html</para></summary>
///<param name="task_id">The task id to rethrottle</param>
///<param name="requestParameters">A func that allows you to describe the querystring parameters &amp; request specific connection settings.</param>
Task<TResponse> UpdateByQueryRethrottleAsync<TResponse>(string task_id, UpdateByQueryRethrottleRequestParameters requestParameters = null, CancellationToken ctx = default(CancellationToken)) where TResponse : class, IElasticsearchResponse, new();
///<summary>DELETE on /_ccr/auto_follow/{name} <para>https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-delete-auto-follow-pattern.html</para></summary>
///<param name="name">The name of the auto follow pattern.</param>
///<param name="requestParameters">A func that allows you to describe the querystring parameters &amp; request specific connection settings.</param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public class ListTasksResponse : ResponseBase, IListTasksResponse

public class TaskExecutingNode
{
[JsonProperty("attributes")]
[JsonConverter(typeof(VerbatimDictionaryKeysJsonConverter<string, string>))]
public IReadOnlyDictionary<string, string> Attributes { get; internal set; } =
EmptyReadOnly<string, string>.Dictionary;

[JsonProperty("host")]
public string Host { get; internal set; }

Expand All @@ -33,6 +38,9 @@ public class TaskExecutingNode
[JsonProperty("name")]
public string Name { get; internal set; }

[JsonProperty("roles")]
public IEnumerable<string> Roles { get; internal set; }

[JsonProperty("tasks")]
public IReadOnlyDictionary<TaskId, TaskState> Tasks { get; internal set; } = EmptyReadOnly<TaskId, TaskState>.Dictionary;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;

namespace Nest
{
public partial interface IElasticClient
{
/// <summary>
/// Rethrottles a running update by query. Rethrottling that speeds up the query takes effect immediately
/// but rethrotting that slows down the query will take effect after completing the current batch. This prevents scroll timeouts.
/// <para> </para>
/// <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html#docs-update-by-query-rethrottle">https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html#docs-update-by-query-rethrottle</a>
/// </summary>
IListTasksResponse UpdateByQueryRethrottle(TaskId taskId, Func<UpdateByQueryRethrottleDescriptor, IUpdateByQueryRethrottleRequest> selector = null);

/// <inheritdoc cref="UpdateByQueryRethrottle(Nest.TaskId,System.Func{Nest.UpdateByQueryRethrottleDescriptor,Nest.IUpdateByQueryRethrottleRequest})" />
IListTasksResponse UpdateByQueryRethrottle(IUpdateByQueryRethrottleRequest request);

/// <inheritdoc cref="UpdateByQueryRethrottle(Nest.TaskId,System.Func{Nest.UpdateByQueryRethrottleDescriptor,Nest.IUpdateByQueryRethrottleRequest})" />
Task<IListTasksResponse> UpdateByQueryRethrottleAsync(TaskId taskId,
Func<UpdateByQueryRethrottleDescriptor, IUpdateByQueryRethrottleRequest> selector = null,
CancellationToken cancellationToken = default(CancellationToken)
);

/// <inheritdoc cref="UpdateByQueryRethrottle(Nest.TaskId,System.Func{Nest.UpdateByQueryRethrottleDescriptor,Nest.IUpdateByQueryRethrottleRequest})" />
Task<IListTasksResponse> UpdateByQueryRethrottleAsync(IUpdateByQueryRethrottleRequest request,
CancellationToken cancellationToken = default(CancellationToken)
);
}

public partial class ElasticClient
{
/// <inheritdoc />
public IListTasksResponse UpdateByQueryRethrottle(TaskId taskId, Func<UpdateByQueryRethrottleDescriptor, IUpdateByQueryRethrottleRequest> selector = null) =>
UpdateByQueryRethrottle(selector.InvokeOrDefault(new UpdateByQueryRethrottleDescriptor(taskId)));

/// <inheritdoc />
public IListTasksResponse UpdateByQueryRethrottle(IUpdateByQueryRethrottleRequest request) =>
Dispatcher.Dispatch<IUpdateByQueryRethrottleRequest, UpdateByQueryRethrottleRequestParameters, ListTasksResponse>(
request,
(p, d) => LowLevelDispatch.UpdateByQueryRethrottleDispatch<ListTasksResponse>(p)
);

/// <inheritdoc />
public Task<IListTasksResponse> UpdateByQueryRethrottleAsync(TaskId taskId, Func<UpdateByQueryRethrottleDescriptor, IUpdateByQueryRethrottleRequest> selector = null,
CancellationToken cancellationToken = default(CancellationToken)
) =>
UpdateByQueryRethrottleAsync(selector.InvokeOrDefault(new UpdateByQueryRethrottleDescriptor(taskId)), cancellationToken);

/// <inheritdoc />
public Task<IListTasksResponse> UpdateByQueryRethrottleAsync(IUpdateByQueryRethrottleRequest request,
CancellationToken cancellationToken = default(CancellationToken)
) =>
Dispatcher.DispatchAsync<IUpdateByQueryRethrottleRequest, UpdateByQueryRethrottleRequestParameters, ListTasksResponse, IListTasksResponse>(
request,
cancellationToken,
(p, d, c) => LowLevelDispatch.UpdateByQueryRethrottleDispatchAsync<ListTasksResponse>(p, c)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using Newtonsoft.Json;

namespace Nest
{
public partial interface IUpdateByQueryRethrottleRequest
{
}

public partial class UpdateByQueryRethrottleRequest: IUpdateByQueryRethrottleRequest
{
}

public partial class UpdateByQueryRethrottleDescriptor : IUpdateByQueryRethrottleRequest
{
}
}
14 changes: 14 additions & 0 deletions src/Nest/_Generated/_Descriptors.generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3859,6 +3859,20 @@ public UpdateByQueryDescriptor(Indices index) : base(r=>r.Required("index", inde
///<summary>The number of slices this task should be divided into. Defaults to 1 meaning the task isn't sliced into subtasks.</summary>
public UpdateByQueryDescriptor<T> Slices(long? slices) => Qs("slices", slices);
}
///<summary>descriptor for UpdateByQueryRethrottle <pre>https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html</pre></summary>
public partial class UpdateByQueryRethrottleDescriptor : RequestDescriptorBase<UpdateByQueryRethrottleDescriptor,UpdateByQueryRethrottleRequestParameters, IUpdateByQueryRethrottleRequest>, IUpdateByQueryRethrottleRequest
{
/// <summary>/_update_by_query/{task_id}/_rethrottle</summary>
///<param name="task_id"> this parameter is required</param>
public UpdateByQueryRethrottleDescriptor(TaskId task_id) : base(r=>r.Required("task_id", task_id)){}
// values part of the url path
TaskId IUpdateByQueryRethrottleRequest.TaskId => Self.RouteValues.Get<TaskId>("task_id");

// Request parameters

///<summary>The throttle to set on this request in floating sub-requests per second. -1 means set no throttle.</summary>
public UpdateByQueryRethrottleDescriptor RequestsPerSecond(long? requestsPerSecond) => Qs("requests_per_second", requestsPerSecond);
}
///<summary>descriptor for CcrDeleteAutoFollowPattern <pre>https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-delete-auto-follow-pattern.html</pre></summary>
public partial class DeleteAutoFollowPatternDescriptor : RequestDescriptorBase<DeleteAutoFollowPatternDescriptor,DeleteAutoFollowPatternRequestParameters, IDeleteAutoFollowPatternRequest>, IDeleteAutoFollowPatternRequest
{
Expand Down
22 changes: 22 additions & 0 deletions src/Nest/_Generated/_LowLevelDispatch.generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2924,6 +2924,28 @@ internal partial class LowLevelDispatch
throw InvalidDispatch("UpdateByQuery", p, new [] { POST }, "/{index}/_update_by_query", "/{index}/{type}/_update_by_query");
}

internal TResponse UpdateByQueryRethrottleDispatch<TResponse>(IRequest<UpdateByQueryRethrottleRequestParameters> p) where TResponse : class, IElasticsearchResponse, new()
{
switch(p.HttpMethod)
{
case POST:
if (AllSetNoFallback(p.RouteValues.TaskId)) return _lowLevel.UpdateByQueryRethrottle<TResponse>(p.RouteValues.TaskId,p.RequestParameters);
break;
}
throw InvalidDispatch("UpdateByQueryRethrottle", p, new [] { POST }, "/_update_by_query/{task_id}/_rethrottle");
}

internal Task<TResponse> UpdateByQueryRethrottleDispatchAsync<TResponse>(IRequest<UpdateByQueryRethrottleRequestParameters> p, CancellationToken ct) where TResponse : class, IElasticsearchResponse, new()
{
switch(p.HttpMethod)
{
case POST:
if (AllSetNoFallback(p.RouteValues.TaskId)) return _lowLevel.UpdateByQueryRethrottleAsync<TResponse>(p.RouteValues.TaskId,p.RequestParameters,ct);
break;
}
throw InvalidDispatch("UpdateByQueryRethrottle", p, new [] { POST }, "/_update_by_query/{task_id}/_rethrottle");
}

internal TResponse CcrDeleteAutoFollowPatternDispatch<TResponse>(IRequest<DeleteAutoFollowPatternRequestParameters> p) where TResponse : class, IElasticsearchResponse, new()
{
switch(p.HttpMethod)
Expand Down
19 changes: 19 additions & 0 deletions src/Nest/_Generated/_Requests.generated.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6277,6 +6277,25 @@ public UpdateByQueryRequest(Indices index, Types type) : base(r=>r.Required("ind
public long? Slices { get => Q<long?>("slices"); set => Q("slices", value); }
}
[JsonObject(MemberSerialization = MemberSerialization.OptIn)]
public partial interface IUpdateByQueryRethrottleRequest : IRequest<UpdateByQueryRethrottleRequestParameters>
{
TaskId TaskId { get; }
}
///<summary>Request parameters for UpdateByQueryRethrottle <pre>https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html</pre></summary>
public partial class UpdateByQueryRethrottleRequest : PlainRequestBase<UpdateByQueryRethrottleRequestParameters>, IUpdateByQueryRethrottleRequest
{
protected IUpdateByQueryRethrottleRequest Self => this;
///<summary>/_update_by_query/{task_id}/_rethrottle</summary>
///<param name="task_id">this parameter is required</param>
public UpdateByQueryRethrottleRequest(TaskId task_id) : base(r=>r.Required("task_id", task_id)){}
// values part of the url path
TaskId IUpdateByQueryRethrottleRequest.TaskId => Self.RouteValues.Get<TaskId>("task_id");

// Request parameters
///<summary>The throttle to set on this request in floating sub-requests per second. -1 means set no throttle.</summary>
public long? RequestsPerSecond { get => Q<long?>("requests_per_second"); set => Q("requests_per_second", value); }
}
[JsonObject(MemberSerialization = MemberSerialization.OptIn)]
public partial interface IUpdateDatafeedRequest : IRequest<UpdateDatafeedRequestParameters>
{
Id DatafeedId { get; }
Expand Down
3 changes: 2 additions & 1 deletion src/Tests/Tests/CodeStandards/NamingConventions.doc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public void ParityBetweenRequestsAndResponses()
typeof(IndicesShardStoresRequest),
typeof(RenderSearchTemplateRequest),
typeof(MultiSearchTemplateRequest),
typeof(CreateRequest<>)
typeof(CreateRequest<>),
typeof(UpdateByQueryRethrottleRequest) // uses ListTasksResponse
};

var types = typeof(IRequest).Assembly().GetTypes();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
using System;
using System.Linq;
using Elasticsearch.Net;
using FluentAssertions;
using Nest;
using Tests.Document.Multiple.Reindex;
using Tests.Domain;
using Tests.Framework;
using Tests.Framework.Integration;
using Tests.Core.Extensions;

namespace Tests.Document.Multiple.UpdateByQueryRethrottle
{
public class UpdateByQueryRethrottleApiTests
: ApiIntegrationTestBase<ReindexCluster, IListTasksResponse, IUpdateByQueryRethrottleRequest,
UpdateByQueryRethrottleDescriptor, UpdateByQueryRethrottleRequest>
{
protected const string TaskIdKey = "taskId";

public UpdateByQueryRethrottleApiTests(ReindexCluster cluster, EndpointUsage usage) : base(cluster, usage) { }

protected override bool ExpectIsValid => true;
protected override object ExpectJson => null;
protected override int ExpectStatusCode => 200;

protected override Func<UpdateByQueryRethrottleDescriptor, IUpdateByQueryRethrottleRequest> Fluent => d => d
.RequestsPerSecond(-1);

protected override HttpMethod HttpMethod => HttpMethod.POST;

protected override UpdateByQueryRethrottleRequest Initializer => new UpdateByQueryRethrottleRequest(TaskId)
{
RequestsPerSecond = -1
};

protected override bool SupportsDeserialization => false;
protected TaskId TaskId => RanIntegrationSetup ? ExtendedValue<TaskId>(TaskIdKey) : "foo:1";

protected override string UrlPath => $"/_update_by_query/{TaskId.NodeId}%3A{TaskId.TaskNumber}/_rethrottle?requests_per_second=-1";

protected override UpdateByQueryRethrottleDescriptor NewDescriptor() => new UpdateByQueryRethrottleDescriptor(TaskId);

protected override void IntegrationSetup(IElasticClient client, CallUniqueValues values)
{
foreach (var callUniqueValue in values)
{
client.IndexMany(Project.Projects, callUniqueValue.Value);
client.Refresh(callUniqueValue.Value);
}
}

protected override LazyResponses ClientUsage() => Calls(
(client, f) => client.UpdateByQueryRethrottle(TaskId, f),
(client, f) => client.UpdateByQueryRethrottleAsync(TaskId, f),
(client, r) => client.UpdateByQueryRethrottle(r),
(client, r) => client.UpdateByQueryRethrottleAsync(r)
);

protected override void OnBeforeCall(IElasticClient client)
{
client.IndexMany(Project.Projects, CallIsolatedValue);
client.Refresh(CallIsolatedValue);

var updateByQuery = client.UpdateByQuery<Project>(u => u
.Index(CallIsolatedValue)
.Conflicts(Conflicts.Proceed)
.Query(q => q.MatchAll())
.Script(s => s.Source("ctx._source.numberOfCommits+=10"))
.Refresh()
.RequestsPerSecond(1)
.WaitForCompletion(false)
);

updateByQuery.ShouldBeValid();
ExtendedValue(TaskIdKey, updateByQuery.Task);
}

protected override void ExpectResponse(IListTasksResponse response)
{
response.ShouldBeValid();

response.Nodes.Should().NotBeEmpty().And.HaveCount(1);
var node = response.Nodes.First().Value;

node.Name.Should().NotBeNullOrEmpty();
node.TransportAddress.Should().NotBeNullOrEmpty();
node.Host.Should().NotBeNullOrEmpty();
node.Ip.Should().NotBeNullOrEmpty();
node.Roles.Should().NotBeEmpty();
node.Attributes.Should().NotBeEmpty();

node.Tasks.Should().NotBeEmpty().And.HaveCount(1);
node.Tasks.First().Key.Should().Be(TaskId);

var task = node.Tasks.First().Value;

task.Node.Should().NotBeNullOrEmpty().And.Be(TaskId.NodeId);
task.Id.Should().Be(TaskId.TaskNumber);
task.Type.Should().NotBeNullOrEmpty();
task.Action.Should().NotBeNullOrEmpty();

task.Status.RequestsPerSecond.Should().Be(-1);

task.StartTimeInMilliseconds.Should().BeGreaterThan(0);
task.RunningTimeInNanoSeconds.Should().BeGreaterThan(0);
task.Cancellable.Should().BeTrue();
}
}
}
Loading

0 comments on commit afa8a7c

Please sign in to comment.