Skip to content

Commit

Permalink
Add allowlist setting for ingest-common processors (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#14479)

Add a new static setting that lets an operator choose specific ingest
processors to enable by name. The behavior is as follows:

- If the allowlist setting is not defined, all installed processors are
  enabled. This is the status quo.
- If the allowlist setting is defined as the empty set, then all processors
  are disabled.
- If the allowlist setting contains the names of valid processors, only those
  processors are enabled.
- If the allowlist setting contains a name of a processor that does not exist,
  then the server will fail to start with an IllegalStateException
  listing which processors were defined in the allowlist but are not
  installed.
- If the allowlist setting is changed between server restarts then any
  ingest pipeline using a now-disabled processor will fail. This is the
  same experience if a pipeline used a processor defined by a plugin but
  then that plugin were to be uninstalled across restarts.

Related to opensearch-project#14439

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross authored and harshavamsi committed Jul 12, 2024
1 parent 00f7928 commit a30e5da
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,20 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class IngestCommonModulePlugin extends Plugin implements ActionPlugin, IngestPlugin {

static final Setting<List<String>> PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
"ingest.common.processors.allowed",
List.of(),
Function.identity(),
Setting.Property.NodeScope
);

static final Setting<TimeValue> WATCHDOG_INTERVAL = Setting.timeSetting(
"ingest.grok.watchdog.interval",
TimeValue.timeValueSeconds(1),
Expand All @@ -77,7 +87,7 @@ public IngestCommonModulePlugin() {}

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
Map<String, Processor.Factory> processors = new HashMap<>();
final Map<String, Processor.Factory> processors = new HashMap<>();
processors.put(DateProcessor.TYPE, new DateProcessor.Factory(parameters.scriptService));
processors.put(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService));
processors.put(AppendProcessor.TYPE, new AppendProcessor.Factory(parameters.scriptService));
Expand Down Expand Up @@ -110,7 +120,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory());
processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory());
processors.put(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory());
return Collections.unmodifiableMap(processors);
return filterForAllowlistSetting(parameters.env.settings(), processors);
}

@Override
Expand All @@ -133,7 +143,7 @@ public List<RestHandler> getRestHandlers(

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME);
return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME, PROCESSORS_ALLOWLIST_SETTING);
}

private static MatcherWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) {
Expand All @@ -147,4 +157,27 @@ private static MatcherWatchdog createGrokThreadWatchdog(Processor.Parameters par
);
}

private Map<String, Processor.Factory> filterForAllowlistSetting(Settings settings, Map<String, Processor.Factory> map) {
if (PROCESSORS_ALLOWLIST_SETTING.exists(settings) == false) {
return Map.copyOf(map);
}
final Set<String> allowlist = Set.copyOf(PROCESSORS_ALLOWLIST_SETTING.get(settings));
// Assert that no unknown processors are defined in the allowlist
final Set<String> unknownAllowlistProcessors = allowlist.stream()
.filter(p -> map.containsKey(p) == false)
.collect(Collectors.toSet());
if (unknownAllowlistProcessors.isEmpty() == false) {
throw new IllegalArgumentException(
"Processor(s) "
+ unknownAllowlistProcessors
+ " were defined in ["
+ PROCESSORS_ALLOWLIST_SETTING.getKey()
+ "] but do not exist"
);
}
return map.entrySet()
.stream()
.filter(e -> allowlist.contains(e.getKey()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.common;

import org.opensearch.common.settings.Settings;
import org.opensearch.env.TestEnvironment;
import org.opensearch.ingest.Processor;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.List;
import java.util.Set;

public class IngestCommonModulePluginTests extends OpenSearchTestCase {

public void testAllowlist() throws IOException {
runAllowlistTest(List.of());
runAllowlistTest(List.of("date"));
runAllowlistTest(List.of("set"));
runAllowlistTest(List.of("copy", "date"));
runAllowlistTest(List.of("date", "set", "copy"));
}

private void runAllowlistTest(List<String> allowlist) throws IOException {
final Settings settings = Settings.builder()
.putList(IngestCommonModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), allowlist)
.build();
try (IngestCommonModulePlugin plugin = new IngestCommonModulePlugin()) {
assertEquals(Set.copyOf(allowlist), plugin.getProcessors(createParameters(settings)).keySet());
}
}

public void testAllowlistNotSpecified() throws IOException {
final Settings.Builder builder = Settings.builder();
builder.remove(IngestCommonModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey());
final Settings settings = builder.build();
try (IngestCommonModulePlugin plugin = new IngestCommonModulePlugin()) {
final Set<String> expected = Set.of(
"append",
"urldecode",
"sort",
"fail",
"trim",
"set",
"fingerprint",
"pipeline",
"json",
"join",
"kv",
"bytes",
"date",
"drop",
"community_id",
"lowercase",
"convert",
"copy",
"gsub",
"dot_expander",
"rename",
"remove_by_pattern",
"html_strip",
"remove",
"csv",
"grok",
"date_index_name",
"foreach",
"script",
"dissect",
"uppercase",
"split"
);
assertEquals(expected, plugin.getProcessors(createParameters(settings)).keySet());
}
}

public void testAllowlistHasNonexistentProcessors() throws IOException {
final Settings settings = Settings.builder()
.putList(IngestCommonModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), List.of("threeve"))
.build();
try (IngestCommonModulePlugin plugin = new IngestCommonModulePlugin()) {
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> plugin.getProcessors(createParameters(settings))
);
assertTrue(e.getMessage(), e.getMessage().contains("threeve"));
}
}

private static Processor.Parameters createParameters(Settings settings) {
return new Processor.Parameters(
TestEnvironment.newEnvironment(Settings.builder().put(settings).put("path.home", "").build()),
null,
null,
null,
() -> 0L,
(a, b) -> null,
null,
null,
$ -> {},
null
);
}
}

0 comments on commit a30e5da

Please sign in to comment.