Skip to content

Commit

Permalink
SPI for loading ABC templates
Browse files Browse the repository at this point in the history
Signed-off-by: mgodwan <[email protected]>
  • Loading branch information
mgodwan committed Jul 5, 2024
1 parent 501a702 commit 21f882a
Show file tree
Hide file tree
Showing 14 changed files with 468 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
package org.opensearch.cluster.service;

import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.service.applicationtemplates.SystemTemplatesPlugin;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.threadpool.ThreadPool;

import java.util.List;

/**
* Main Cluster Manager Node Service
*
Expand All @@ -30,8 +33,9 @@ public ClusterManagerService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
ClusterManagerMetrics clusterManagerMetrics
ClusterManagerMetrics clusterManagerMetrics,
List<SystemTemplatesPlugin> systemTemplatesPlugins
) {
super(settings, clusterSettings, threadPool, clusterManagerMetrics);
super(settings, clusterSettings, threadPool, clusterManagerMetrics, systemTemplatesPlugins);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.OperationRouting;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.service.applicationtemplates.SystemTemplatesPlugin;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
Expand All @@ -58,6 +59,7 @@
import org.opensearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -94,19 +96,20 @@ public class ClusterService extends AbstractLifecycleComponent {
private IndexingPressureService indexingPressureService;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE));
this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), null);
}

public ClusterService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
ClusterManagerMetrics clusterManagerMetrics
ClusterManagerMetrics clusterManagerMetrics,
List<SystemTemplatesPlugin> systemTemplatesPlugins
) {
this(
settings,
clusterSettings,
new ClusterManagerService(settings, clusterSettings, threadPool, clusterManagerMetrics),
new ClusterManagerService(settings, clusterSettings, threadPool, clusterManagerMetrics, systemTemplatesPlugins),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, clusterManagerMetrics)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.service.applicationtemplates.SystemTemplatesPlugin;
import org.opensearch.cluster.service.applicationtemplates.SystemTemplatesService;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.annotation.DeprecatedApi;
Expand Down Expand Up @@ -140,16 +142,18 @@ public class MasterService extends AbstractLifecycleComponent {
private final ClusterManagerThrottlingStats throttlingStats;
private final ClusterStateStats stateStats;
private final ClusterManagerMetrics clusterManagerMetrics;
private final SystemTemplatesService systemTemplatesService;

public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE));
this(settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), null);
}

public MasterService(
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
ClusterManagerMetrics clusterManagerMetrics
ClusterManagerMetrics clusterManagerMetrics,
List<SystemTemplatesPlugin> systemTemplatesPlugins
) {
this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));

Expand All @@ -169,6 +173,7 @@ public MasterService(
this.stateStats = new ClusterStateStats();
this.threadPool = threadPool;
this.clusterManagerMetrics = clusterManagerMetrics;
this.systemTemplatesService = new SystemTemplatesService(systemTemplatesPlugins, clusterSettings, settings);
}

private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down Expand Up @@ -395,6 +400,7 @@ void onPublicationSuccess(ClusterChangedEvent clusterChangedEvent, TaskOutputs t

try {
taskOutputs.clusterStatePublished(clusterChangedEvent);
threadPool.executor(ThreadPool.Names.GENERIC).submit(() -> systemTemplatesService.refreshTemplates());
} catch (Exception e) {
logger.error(
() -> new ParameterizedMessage(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service.applicationtemplates;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.opensearch.client.Client;
import org.opensearch.client.OriginSettingClient;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.ComponentTemplate;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.function.Supplier;

public class ClusterStateComponentTemplateLoader implements TemplateLoader {

private Client client;

private Supplier<ClusterState> clusterStateSupplier;

private static final Logger logger = LogManager.getLogger(TemplateLoader.class);

public static final String TEMPLATE_LOADER_IDENTIFIER = "system_template_loader";

public ClusterStateComponentTemplateLoader(Client client,
ThreadPool threadPool,
Supplier<ClusterState> clusterStateSupplier) {
this.client = new OriginSettingClient(client, TEMPLATE_LOADER_IDENTIFIER);
this.clusterStateSupplier = clusterStateSupplier;
}

@Override
public void loadTemplate(SystemTemplate template) throws IOException {
ComponentTemplate existingTemplate = clusterStateSupplier.get().metadata().componentTemplates().get(template.templateInfo().fullyQualifiedName());

XContentParser contentParser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.IGNORE_DEPRECATIONS,
template.templateContent().utf8ToString());
ComponentTemplate newTemplate = ComponentTemplate.parse(contentParser);

if (existingTemplate != null && existingTemplate.version() >= newTemplate.version()) {
logger.debug("Skipping putting template {} as its existing version [{}] is >= fetched version [{}]", template.templateInfo().fullyQualifiedName(),
existingTemplate.version(),
newTemplate.version());
}

PutComponentTemplateAction.Request request = new PutComponentTemplateAction.Request(template.templateInfo().fullyQualifiedName())
.componentTemplate(newTemplate);

client.admin().indices().execute(PutComponentTemplateAction.INSTANCE, request).actionGet(TimeValue.timeValueMillis(30000));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service.applicationtemplates;

import org.opensearch.core.common.bytes.BytesReference;

/**
* Encapsulates the information and content about a system template available within a repository.
*/
public class SystemTemplate {

private final BytesReference templateContent;

private final SystemTemplateInfo templateInfo;

private final TemplateRepositoryInfo repositoryInfo;

public SystemTemplate(BytesReference templateContent, SystemTemplateInfo templateInfo, TemplateRepositoryInfo repositoryInfo) {
this.templateContent = templateContent;
this.templateInfo = templateInfo;
this.repositoryInfo = repositoryInfo;
}

public BytesReference templateContent() {
return templateContent;
}

public SystemTemplateInfo templateInfo() {
return templateInfo;
}

public TemplateRepositoryInfo repositoryInfo() {
return repositoryInfo;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service.applicationtemplates;

/**
* Metadata information about a template available in a template repository.
*/
public class SystemTemplateInfo {

private final long version;
private final String type;
private final String name;

private static final String DELIMITER = "@";

public static final String COMPONENT_TEMPLATE_TYPE = "@abc_template";

public SystemTemplateInfo(long version, String type, String name) {
this.version = version;
this.type = type;
this.name = name;
}

public String type() {
return type;
}

public String name() {
return name;
}

public long version() {
return version;
}

public static SystemTemplateInfo fromComponentTemplate(String fullyQualifiedName) {
return new SystemTemplateInfo(Long.parseLong(fullyQualifiedName.substring(fullyQualifiedName.lastIndexOf(DELIMITER))), COMPONENT_TEMPLATE_TYPE, fullyQualifiedName.substring(0, fullyQualifiedName.lastIndexOf(DELIMITER)));
}

public static SystemTemplateInfo createComponentTemplateInfo(String name, long version) {
return new SystemTemplateInfo(version, COMPONENT_TEMPLATE_TYPE, name);
}

public final String fullyQualifiedName() {
return name + DELIMITER + version;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service.applicationtemplates;

import java.io.IOException;

/**
* Plugin interface to expose the template maintaining logic.
*/
public interface SystemTemplatesPlugin {

/**
* @return repository implementation from which templates are to be fetched.
*/
TemplateRepository loadRepository() throws IOException;

/**
* @param templateInfo Metadata about the template to load
* @return Implementation of TemplateLoader which determines how to make the template available at runtime.
*/
TemplateLoader loaderFor(SystemTemplateInfo templateInfo);
}
Loading

0 comments on commit 21f882a

Please sign in to comment.