Skip to content

Commit

Permalink
Add class for loading mapping templates in bulk (#1550)
Browse files Browse the repository at this point in the history
* Add class for loading mapping templates in bulk

Signed-off-by: Simeon Widdis <[email protected]>

* Remove unused imports

Signed-off-by: Simeon Widdis <[email protected]>

---------

Signed-off-by: Simeon Widdis <[email protected]>
  • Loading branch information
Swiddis authored Jun 13, 2023
1 parent 77d6e2a commit 0628a01
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import org.opensearch.observability.action.DeleteObservabilityObjectAction
import org.opensearch.observability.action.GetObservabilityObjectAction
import org.opensearch.observability.action.UpdateObservabilityObjectAction
import org.opensearch.observability.index.ObservabilityIndex
import org.opensearch.observability.index.ObservabilityMetricsIndex
import org.opensearch.observability.index.ObservabilityTracesIndex
import org.opensearch.observability.index.ObservabilityIntegrationsIndex
import org.opensearch.observability.resthandler.ObservabilityRestHandler
import org.opensearch.observability.resthandler.ObservabilityStatsRestHandler
import org.opensearch.observability.resthandler.SchedulerRestHandler
Expand Down Expand Up @@ -85,15 +84,13 @@ class ObservabilityPlugin : Plugin(), ActionPlugin, ClusterPlugin, JobSchedulerE
): Collection<Any> {
PluginSettings.addSettingsUpdateConsumer(clusterService)
ObservabilityIndex.initialize(client, clusterService)
ObservabilityMetricsIndex.initialize(client, clusterService)
ObservabilityTracesIndex.initialize(client, clusterService)
ObservabilityIntegrationsIndex.initialize(client, clusterService)
return emptyList()
}

override fun onNodeStarted() {
ObservabilityIndex.afterStart()
ObservabilityTracesIndex.afterStart()
ObservabilityMetricsIndex.afterStart()
ObservabilityIntegrationsIndex.afterStart()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ import java.util.*
/**
* Class for doing OpenSearch Metrics schema mapping & default index init operation
*/
internal object ObservabilityMetricsIndex : LifecycleListener() {
private val log by logger(ObservabilityMetricsIndex::class.java)
private const val METRICS_MAPPING_TEMPLATE_NAME = "ss4o_metric_template"
private const val METRICS_MAPPING_TEMPLATE_FILE = "metrics-mapping-template.json"
private const val METRIC_PATTERN_NAME = "ss4o_metrics-*-*"
internal object ObservabilityIntegrationsIndex : LifecycleListener() {
private val log by logger(ObservabilityIntegrationsIndex::class.java)

private lateinit var client: Client
private lateinit var clusterService: ClusterService
Expand All @@ -38,7 +35,7 @@ internal object ObservabilityMetricsIndex : LifecycleListener() {
* @param client The OpenSearch client
* @param clusterService The OpenSearch cluster service
*/
fun initialize(client: Client, clusterService: ClusterService): ObservabilityMetricsIndex {
fun initialize(client: Client, clusterService: ClusterService): ObservabilityIntegrationsIndex {
this.client = SecureIndexClient(client)
this.clusterService = clusterService
return this
Expand All @@ -48,47 +45,59 @@ internal object ObservabilityMetricsIndex : LifecycleListener() {
* once lifecycle indicate start has occurred - instantiating the mapping template
*/
override fun afterStart() {
// create default mapping
createMappingTemplate()
// create default mappings
createMappingTemplates()
}

private fun getTemplateNames(): List<String> {
// TODO classloader doesn't support directory scanning by default, need to write manual traversal later
// Hardcoding template list for now, as a hotfix you can add names from resources/templates/{name}-mapping-template.json here
return listOf("metrics", "traces")
}

private fun createMappingTemplates() {
for (name in getTemplateNames()) {
createMappingTemplate("ss4o_${name}_template", "ss4o_$name-*-*", "templates/$name-mapping-template.json")
}
}

/**
* Create the pre-defined mapping template
*/
@Suppress("TooGenericExceptionCaught", "MagicNumber")
private fun createMappingTemplate() {
log.info("$LOG_PREFIX:createMappingTemplate $METRICS_MAPPING_TEMPLATE_NAME API called")
if (!isTemplateExists(METRICS_MAPPING_TEMPLATE_NAME)) {
val classLoader = ObservabilityMetricsIndex::class.java.classLoader
val indexMappingSource = classLoader.getResource(METRICS_MAPPING_TEMPLATE_FILE)?.readText()!!
private fun createMappingTemplate(templateName: String, patternName: String, path: String) {
log.info("$LOG_PREFIX:createMappingTemplate $templateName API called")
if (!isTemplateExists(templateName)) {
val classLoader = ObservabilityIntegrationsIndex::class.java.classLoader
val indexMappingSource = classLoader.getResource(path)?.readText()!!
val settings = Settings.builder()
.put("index.number_of_shards", 3)
.put("index.auto_expand_replicas", "0-2")
.build()
val template = Template(settings, CompressedXContent(indexMappingSource), null)
val request = PutComposableIndexTemplateAction.Request(METRICS_MAPPING_TEMPLATE_NAME)
val request = PutComposableIndexTemplateAction.Request(templateName)
.indexTemplate(
ComposableIndexTemplate(
listOf(METRIC_PATTERN_NAME),
listOf(patternName),
template,
Collections.emptyList(),
1,
1,
Collections.singletonMap("description", "Observability Metrics Mapping Template") as Map<String, Any>?,
Collections.singletonMap("description", "Observability $templateName Mapping Template") as Map<String, Any>?,
ComposableIndexTemplate.DataStreamTemplate()
)
)
try {
val validationException = request.validateIndexTemplate(null)
if (validationException != null && !validationException.validationErrors().isEmpty()) {
error("$LOG_PREFIX:Index Template $METRICS_MAPPING_TEMPLATE_NAME validation errors ${validationException.message}")
error("$LOG_PREFIX:Index Template $templateName validation errors ${validationException.message}")
}
val actionFuture = client.admin().indices().execute(PutComposableIndexTemplateAction.INSTANCE, request)
val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs)
if (response.isAcknowledged) {
log.info("$LOG_PREFIX:Mapping Template $METRICS_MAPPING_TEMPLATE_NAME creation Acknowledged")
log.info("$LOG_PREFIX:Mapping Template $templateName creation Acknowledged")
} else {
error("$LOG_PREFIX:Mapping Template $METRICS_MAPPING_TEMPLATE_NAME creation not Acknowledged")
error("$LOG_PREFIX:Mapping Template $templateName creation not Acknowledged")
}
} catch (exception: ResourceAlreadyExistsException) {
log.warn("message: ${exception.message}")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,33 @@ import org.opensearch.rest.RestStatus

class AssemblyValidationIT : PluginRestTestCase() {
companion object {
private const val TRACES_MAPPING_TEMPLATE_NAME = "ss4o_trace_template"
private const val METRICS_MAPPING_TEMPLATE_NAME = "ss4o_metric_template"
private const val TRACES_MAPPING_TEMPLATE_NAME = "ss4o_traces_template"
private const val METRICS_MAPPING_TEMPLATE_NAME = "ss4o_metrics_template"
}

fun `test observability traces template and was created`() {
// verify traces mapping template was created successfully as part of the plugin initialization
Thread.sleep(1000)
fun `test observability metrics template was created`() {
// verify metrics mapping template was created successfully as part of the plugin initialization
var response = executeRequest(
RestRequest.Method.GET.name,
"/_index_template/$TRACES_MAPPING_TEMPLATE_NAME",
"",
"/_index_template/$METRICS_MAPPING_TEMPLATE_NAME",
"{}",
RestStatus.OK.status
)
Thread.sleep(1000)
Assert.assertNotNull(response.get("index_templates"))
Assert.assertNotNull(!response.get("index_templates").asJsonArray.isEmpty)

// verify metrics mapping template was created successfully as part of the plugin initialization
// verify traces mapping template was created successfully as part of the plugin initialization
/*
* TODO I'm not sure why, but when this test is moved to its own test function, it fails.
* The new function passes if run alone, but not as part as a suite.
* Exponential backoff to very long sleep intervals doesn't work either.
*/
response = executeRequest(
RestRequest.Method.GET.name,
"/_index_template/$METRICS_MAPPING_TEMPLATE_NAME",
"",
"/_index_template/$TRACES_MAPPING_TEMPLATE_NAME",
"{}",
RestStatus.OK.status
)
Thread.sleep(1000)
Assert.assertNotNull(response.get("index_templates"))
Assert.assertNotNull(!response.get("index_templates").asJsonArray.isEmpty)
}
Expand Down

0 comments on commit 0628a01

Please sign in to comment.