Skip to content

Commit

Permalink
Move downsample feature to its own xpack plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed Aug 13, 2023
1 parent c9aa3b1 commit c7ac145
Show file tree
Hide file tree
Showing 31 changed files with 98 additions and 33 deletions.
21 changes: 21 additions & 0 deletions x-pack/plugin/downsample/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apply plugin: 'elasticsearch.internal-es-plugin'
esplugin {
name 'x-pack-downsample'
description 'Elasticsearch Expanded Pack Plugin - Downsample'
classname 'org.elasticsearch.xpack.downsample.Downsample'
extendedPlugins = ['x-pack-aggregate-metric']
}

base {
archivesName = 'x-pack-downsample'
}

dependencies {
compileOnly project(path: xpackModule('core'))
testImplementation project(':modules:data-streams')
testImplementation project(path: xpackModule('ilm'))
compileOnly project(path: xpackModule('mapper-aggregate-metric'))
testImplementation(testArtifact(project(xpackModule('core'))))
}

addQaCheckDependencies(project)
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.downsample;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.downsample.DownsampleAction;
import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction;

import java.util.List;
import java.util.function.Supplier;

public class Downsample extends Plugin implements ActionPlugin {

public static final String DOWSAMPLE_TASK_THREAD_POOL_NAME = "downsample_indexing";
private static final int DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE = 256;

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
final FixedExecutorBuilder downsample = new FixedExecutorBuilder(
settings,
DOWSAMPLE_TASK_THREAD_POOL_NAME,
ThreadPool.oneEighthAllocatedProcessors(EsExecutors.allocatedProcessors(settings)),
DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE,
"xpack.downsample.thread_pool",
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
);
return List.of(downsample);
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(
new ActionHandler<>(DownsampleIndexerAction.INSTANCE, TransportDownsampleIndexerAction.class),
new ActionHandler<>(DownsampleAction.INSTANCE, TransportDownsampleAction.class)
);
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(new RestDownsampleAction());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction;
import org.elasticsearch.xpack.core.rollup.action.RollupShardTask;
import org.elasticsearch.xpack.rollup.Rollup;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -68,7 +67,7 @@ public TransportDownsampleIndexerAction(
indexNameExpressionResolver,
DownsampleIndexerAction.Request::new,
DownsampleIndexerAction.ShardDownsampleRequest::new,
Rollup.DOWSAMPLE_TASK_THREAD_POOL_NAME
Downsample.DOWSAMPLE_TASK_THREAD_POOL_NAME
);
this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN);
this.clusterService = clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import org.elasticsearch.xpack.core.rollup.action.RollupShardIndexerStatus;
import org.elasticsearch.xpack.core.rollup.action.RollupShardTask;
import org.elasticsearch.xpack.ilm.IndexLifecycle;
import org.elasticsearch.xpack.rollup.Rollup;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -150,7 +149,7 @@ public class DownsampleActionSingleNodeTests extends ESSingleNodeTestCase {
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(
LocalStateCompositeXPackPlugin.class,
Rollup.class,
Downsample.class,
AggregateMetricMapperPlugin.class,
DataStreamsPlugin.class,
IndexLifecycle.class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.downsample.DownsampleAction;
import org.elasticsearch.xpack.rollup.Rollup;
import org.hamcrest.Matchers;

import java.io.IOException;
Expand All @@ -67,7 +66,7 @@ public class DownsampleDataStreamTests extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(Rollup.class, DataStreamsPlugin.class);
return List.of(Downsample.class, DataStreamsPlugin.class);
}

public void testDataStreamDownsample() throws ExecutionException, InterruptedException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.downsample.DownsampleAction;
import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction;
import org.elasticsearch.xpack.rollup.Rollup;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -160,7 +159,7 @@ public String workerName() {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(LocalStateCompositeXPackPlugin.class, Rollup.class, AggregateMetricMapperPlugin.class);
return List.of(LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class);
}

@Override
Expand Down
2 changes: 0 additions & 2 deletions x-pack/plugin/rollup/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ esplugin {
name 'x-pack-rollup'
description 'Elasticsearch Expanded Pack Plugin - Rollup'
classname 'org.elasticsearch.xpack.rollup.Rollup'
extendedPlugins = ['x-pack-aggregate-metric']
}

base {
Expand All @@ -14,7 +13,6 @@ dependencies {
compileOnly project(path: xpackModule('core'))
testImplementation project(':modules:data-streams')
testImplementation project(path: xpackModule('ilm'))
compileOnly project(path: xpackModule('mapper-aggregate-metric'))
testImplementation(testArtifact(project(xpackModule('core'))))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
import org.elasticsearch.xpack.core.downsample.DownsampleAction;
import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction;
import org.elasticsearch.xpack.core.rollup.action.GetRollupCapsAction;
Expand All @@ -42,9 +40,6 @@
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction;
import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
import org.elasticsearch.xpack.downsample.RestDownsampleAction;
import org.elasticsearch.xpack.downsample.TransportDownsampleAction;
import org.elasticsearch.xpack.downsample.TransportDownsampleIndexerAction;
import org.elasticsearch.xpack.rollup.action.TransportDeleteRollupJobAction;
import org.elasticsearch.xpack.rollup.action.TransportGetRollupCapsAction;
import org.elasticsearch.xpack.rollup.action.TransportGetRollupIndexCapsAction;
Expand Down Expand Up @@ -79,8 +74,6 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
public static final int CURRENT_ROLLUP_VERSION = ROLLUP_VERSION_V2;

public static final String TASK_THREAD_POOL_NAME = RollupField.NAME + "_indexing";
public static final String DOWSAMPLE_TASK_THREAD_POOL_NAME = "downsample_indexing";
public static final int DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE = 256;

public static final String ROLLUP_TEMPLATE_VERSION_FIELD = "rollup-version";

Expand Down Expand Up @@ -109,8 +102,7 @@ public List<RestHandler> getRestHandlers(
new RestDeleteRollupJobAction(),
new RestGetRollupJobsAction(),
new RestGetRollupCapsAction(),
new RestGetRollupIndexCapsAction(),
new RestDownsampleAction() // TSDB Downsampling
new RestGetRollupIndexCapsAction()
);
}

Expand All @@ -126,9 +118,7 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(GetRollupCapsAction.INSTANCE, TransportGetRollupCapsAction.class),
new ActionHandler<>(GetRollupIndexCapsAction.INSTANCE, TransportGetRollupIndexCapsAction.class),
new ActionHandler<>(XPackUsageFeatureAction.ROLLUP, RollupUsageTransportAction.class),
new ActionHandler<>(XPackInfoFeatureAction.ROLLUP, RollupInfoTransportAction.class),
new ActionHandler<>(DownsampleIndexerAction.INSTANCE, TransportDownsampleIndexerAction.class),
new ActionHandler<>(DownsampleAction.INSTANCE, TransportDownsampleAction.class)
new ActionHandler<>(XPackInfoFeatureAction.ROLLUP, RollupInfoTransportAction.class)
);
}

Expand All @@ -142,17 +132,7 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
"xpack.rollup.task_thread_pool",
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
);

final FixedExecutorBuilder downsample = new FixedExecutorBuilder(
settingsToUse,
Rollup.DOWSAMPLE_TASK_THREAD_POOL_NAME,
ThreadPool.oneEighthAllocatedProcessors(EsExecutors.allocatedProcessors(settingsToUse)),
Rollup.DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE,
"xpack.downsample.thread_pool",
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
);

return List.of(rollup, downsample);
return List.of(rollup);
}

@Override
Expand Down

0 comments on commit c7ac145

Please sign in to comment.