Skip to content

Commit

Permalink
Add logstash system index APIs (#53350)
Browse files Browse the repository at this point in the history
We want Logstash indices to be system indices, but the logstash
service will still need to be able to manage its indices. This PR
adds special system index APIs to the logstash plugin so that
logstash can manage its pipelines without direct access to the
underlying indices.

* Add logstash module with dedicated logstash APIs
* merge with x-pack plugin
* add system index access allowance
* Break out serialization tests into distinct classes
* Log failures for partial multiget failure
* Move LogstashSystemIndexIT to javaRestTest task

Co-authored-by: William Brafford <[email protected]>
  • Loading branch information
jaymode and williamrandolph authored Sep 14, 2020
1 parent a19503e commit fb7471b
Show file tree
Hide file tree
Showing 28 changed files with 1,485 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,44 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import org.elasticsearch.xpack.logstash.action.DeletePipelineAction;
import org.elasticsearch.xpack.logstash.action.GetPipelineAction;
import org.elasticsearch.xpack.logstash.action.PutPipelineAction;
import org.elasticsearch.xpack.logstash.action.TransportDeletePipelineAction;
import org.elasticsearch.xpack.logstash.action.TransportGetPipelineAction;
import org.elasticsearch.xpack.logstash.action.TransportPutPipelineAction;
import org.elasticsearch.xpack.logstash.rest.RestDeletePipelineAction;
import org.elasticsearch.xpack.logstash.rest.RestGetPipelineAction;
import org.elasticsearch.xpack.logstash.rest.RestPutPipelineAction;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

/**
* This class supplies the logstash featureset and templates
*/
public class Logstash extends Plugin implements SystemIndexPlugin {

private static final String LOGSTASH_CONCRETE_INDEX_NAME = ".logstash";
public static final String LOGSTASH_CONCRETE_INDEX_NAME = ".logstash";
private static final String LOGSTASH_TEMPLATE_FILE_NAME = "logstash-management";
private static final String LOGSTASH_INDEX_TEMPLATE_NAME = ".logstash-management";
private static final String OLD_LOGSTASH_INDEX_NAME = "logstash-index-template";
Expand All @@ -40,12 +56,28 @@ public Logstash() {}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
return List.of(
new ActionHandler<>(XPackUsageFeatureAction.LOGSTASH, LogstashUsageTransportAction.class),
new ActionHandler<>(XPackInfoFeatureAction.LOGSTASH, LogstashInfoTransportAction.class)
new ActionHandler<>(XPackInfoFeatureAction.LOGSTASH, LogstashInfoTransportAction.class),
new ActionHandler<>(PutPipelineAction.INSTANCE, TransportPutPipelineAction.class),
new ActionHandler<>(GetPipelineAction.INSTANCE, TransportGetPipelineAction.class),
new ActionHandler<>(DeletePipelineAction.INSTANCE, TransportDeletePipelineAction.class)
);
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(new RestPutPipelineAction(), new RestGetPipelineAction(), new RestDeletePipelineAction());
}

public UnaryOperator<Map<String, IndexTemplateMetadata>> getIndexTemplateMetadataUpgrader() {
return templates -> {
templates.keySet().removeIf(OLD_LOGSTASH_INDEX_NAME::equals);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.logstash;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;

import java.time.Instant;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;

public class Pipeline {

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<Pipeline, String> PARSER = new ConstructingObjectParser<>(
"pipeline",
true,
(objects, id) -> {
Iterator<Object> iterator = Arrays.asList(objects).iterator();
return new Pipeline(
id,
(Instant) iterator.next(),
(Map<String, Object>) iterator.next(),
(String) iterator.next(),
(String) iterator.next(),
(Map<String, Object>) iterator.next()
);
}
);

public static final ParseField LAST_MODIFIED = new ParseField("last_modified");
public static final ParseField PIPELINE_METADATA = new ParseField("pipeline_metadata");
public static final ParseField USERNAME = new ParseField("username");
public static final ParseField PIPELINE = new ParseField("pipeline");
public static final ParseField PIPELINE_SETTINGS = new ParseField("pipeline_settings");

static {
PARSER.declareField(constructorArg(), (parser, s) -> {
final String instantISOString = parser.text();
return Instant.parse(instantISOString);
}, LAST_MODIFIED, ValueType.STRING);
PARSER.declareObject(constructorArg(), (parser, s) -> parser.map(), PIPELINE_METADATA);
PARSER.declareString(constructorArg(), USERNAME);
PARSER.declareString(constructorArg(), PIPELINE);
PARSER.declareObject(constructorArg(), (parser, s) -> parser.map(), PIPELINE_SETTINGS);
}

private final String id;
private final Instant lastModified;
private final Map<String, Object> pipelineMetadata;
private final String username;
private final String pipeline;
private final Map<String, Object> pipelineSettings;

public Pipeline(
String id,
Instant lastModified,
Map<String, Object> pipelineMetadata,
String username,
String pipeline,
Map<String, Object> pipelineSettings
) {
this.id = id;
this.lastModified = lastModified;
this.pipelineMetadata = pipelineMetadata;
this.username = username;
this.pipeline = pipeline;
this.pipelineSettings = pipelineSettings;
}

public String getId() {
return id;
}

public Instant getLastModified() {
return lastModified;
}

public Map<String, Object> getPipelineMetadata() {
return pipelineMetadata;
}

public String getUsername() {
return username;
}

public String getPipeline() {
return pipeline;
}

public Map<String, Object> getPipelineSettings() {
return pipelineSettings;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.logstash.action;

import org.elasticsearch.action.ActionType;

public class DeletePipelineAction extends ActionType<DeletePipelineResponse> {

public static final String NAME = "cluster:admin/logstash/pipeline/delete";
public static final DeletePipelineAction INSTANCE = new DeletePipelineAction();

private DeletePipelineAction() {
super(NAME, DeletePipelineResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.logstash.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Objects;

public class DeletePipelineRequest extends ActionRequest {

private final String id;

public DeletePipelineRequest(String id) {
this.id = Objects.requireNonNull(id);
}

public DeletePipelineRequest(StreamInput in) throws IOException {
super(in);
this.id = in.readString();
}

public String id() {
return id;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeletePipelineRequest that = (DeletePipelineRequest) o;
return Objects.equals(id, that.id);
}

@Override
public int hashCode() {
return Objects.hash(id);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.logstash.action;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Objects;

public class DeletePipelineResponse extends ActionResponse {

private final boolean deleted;

public DeletePipelineResponse(boolean deleted) {
this.deleted = deleted;
}

public DeletePipelineResponse(StreamInput in) throws IOException {
super(in);
this.deleted = in.readBoolean();
}

public boolean isDeleted() {
return deleted;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(deleted);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeletePipelineResponse that = (DeletePipelineResponse) o;
return deleted == that.deleted;
}

@Override
public int hashCode() {
return Objects.hash(deleted);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.logstash.action;

import org.elasticsearch.action.ActionType;

public class GetPipelineAction extends ActionType<GetPipelineResponse> {

public static final String NAME = "cluster:admin/logstash/pipeline/get";
public static final GetPipelineAction INSTANCE = new GetPipelineAction();

private GetPipelineAction() {
super(NAME, GetPipelineResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.logstash.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

public class GetPipelineRequest extends ActionRequest {

private final List<String> ids;

public GetPipelineRequest(List<String> ids) {
this.ids = Objects.requireNonNull(ids);
}

public GetPipelineRequest(StreamInput in) throws IOException {
super(in);
ids = in.readStringList();
}

public List<String> ids() {
return ids;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringCollection(ids);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GetPipelineRequest that = (GetPipelineRequest) o;
return Objects.equals(ids, that.ids);
}

@Override
public int hashCode() {
return Objects.hash(ids);
}
}
Loading

0 comments on commit fb7471b

Please sign in to comment.