Skip to content

Commit

Permalink
[ML] Remove unneeded command line arguments (#67344)
Browse files Browse the repository at this point in the history
Do not pass the now unneeded command line arguments to autodetect and
remove any supporting code.

Remove staggering interval - now in C++

Relates elastic/ml-cpp#1253
  • Loading branch information
edsavage committed Jan 12, 2021
1 parent 679fc77 commit b67c246
Show file tree
Hide file tree
Showing 9 changed files with 5 additions and 768 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ public static String extractJobIdFromDocumentId(String docId) {
return jobId.equals(docId) ? null : jobId;
}


/**
* Return the Job Id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,13 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ScheduledEventToRuleWriter;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.job.process.ProcessBuilderUtils;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AnalysisLimitsWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.FieldConfigWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ModelPlotConfigWriter;

import java.io.BufferedWriter;
import java.io.IOException;
Expand All @@ -41,12 +34,9 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.ml.job.process.ProcessBuilderUtils.addIfNotNull;

/**
* The autodetect process builder.
*/
Expand All @@ -61,28 +51,15 @@ public class AutodetectBuilder {
/*
* Arguments used by both autodetect and normalize
*/
public static final String BUCKET_SPAN_ARG = "--bucketspan=";
public static final String DELETE_STATE_FILES_ARG = "--deleteStateFiles";
public static final String LENGTH_ENCODED_INPUT_ARG = "--lengthEncodedInput";
public static final String MODEL_CONFIG_ARG = "--modelconfig=";
public static final String QUANTILES_STATE_PATH_ARG = "--quantilesState=";

private static final String CONF_EXTENSION = ".conf";
private static final String JSON_EXTENSION = ".json";
static final String JOB_ID_ARG = "--jobid=";
private static final String CONFIG_ARG = "--config=";
private static final String EVENTS_CONFIG_ARG = "--eventsconfig=";
private static final String FILTERS_CONFIG_ARG = "--filtersconfig=";
private static final String LIMIT_CONFIG_ARG = "--limitconfig=";
private static final String MODEL_PLOT_CONFIG_ARG = "--modelplotconfig=";
private static final String FIELD_CONFIG_ARG = "--fieldconfig=";
static final String LATENCY_ARG = "--latency=";
static final String MULTIVARIATE_BY_FIELDS_ARG = "--multivariateByFields";
static final String PERSIST_INTERVAL_ARG = "--persistInterval=";
static final String MAX_QUANTILE_INTERVAL_ARG = "--maxQuantileInterval=";
static final String SUMMARY_COUNT_FIELD_ARG = "--summarycountfield=";
static final String TIME_FIELD_ARG = "--timefield=";
static final String STOP_CATEGORIZATION_ON_WARN_ARG = "--stopCategorizationOnWarnStatus";

/**
* Name of the config setting containing the path to the logs directory
Expand All @@ -96,21 +73,6 @@ public class AutodetectBuilder {
public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING_DYNAMIC = Setting.intSetting("xpack.ml.max_anomaly_records",
DEFAULT_MAX_NUM_RECORDS, Setting.Property.NodeScope, Setting.Property.Dynamic);

private static final int SECONDS_IN_HOUR = 3600;

/**
* Roughly how often should the C++ process persist state? A staggering
* factor that varies by job is added to this.
*/
private static final long DEFAULT_BASE_PERSIST_INTERVAL = 10800; // 3 hours

/**
* Roughly how often should the C++ process output quantiles when no
* anomalies are being detected? A staggering factor that varies by job is
* added to this.
*/
static final int BASE_MAX_QUANTILE_INTERVAL = 21600; // 6 hours

/**
* Persisted quantiles are written to disk so they can be read by
* the autodetect program. All quantiles files have this extension.
Expand Down Expand Up @@ -178,19 +140,8 @@ public void build() throws IOException, InterruptedException {

buildFiltersConfig(command);
buildScheduledEventsConfig(command);

// While it may appear that the JSON formatted job config file contains data that
// is duplicated in the existing limits, modelPlot and field config files, over time
// the C++ backend will retrieve all its required configuration data from the new
// JSON config file and the old-style configuration files will be removed.
buildJobConfig(command);

// Per the comment above, these three lines will eventually be removed once migration
// to the new JSON formatted configuration file has been completed.
buildLimits(command);
buildModelPlotConfig(command);
buildFieldConfig(command);

buildQuantiles(command);

processPipes.addArgs(command);
Expand All @@ -204,43 +155,12 @@ List<String> buildAutodetectCommand() {
List<String> command = new ArrayList<>();
command.add(AUTODETECT_PATH);

command.add(JOB_ID_ARG + job.getId());

AnalysisConfig analysisConfig = job.getAnalysisConfig();
if (analysisConfig != null) {
addIfNotNull(analysisConfig.getBucketSpan(), BUCKET_SPAN_ARG, command);
addIfNotNull(analysisConfig.getLatency(), LATENCY_ARG, command);
addIfNotNull(analysisConfig.getSummaryCountFieldName(), SUMMARY_COUNT_FIELD_ARG, command);
if (Boolean.TRUE.equals(analysisConfig.getMultivariateByFields())) {
command.add(MULTIVARIATE_BY_FIELDS_ARG);
}
if (Boolean.TRUE.equals(analysisConfig.getPerPartitionCategorizationConfig().isStopOnWarn())) {
command.add(STOP_CATEGORIZATION_ON_WARN_ARG);
}
}

// Input is always length encoded
command.add(LENGTH_ENCODED_INPUT_ARG);

// Limit the number of output records
command.add(maxAnomalyRecordsArg(settings));

// always set the time field
String timeFieldArg = TIME_FIELD_ARG + getTimeFieldOrDefault(job);
command.add(timeFieldArg);

int intervalStagger = calculateStaggeringInterval(job.getId());
logger.debug("[{}] Periodic operations staggered by {} seconds", job.getId(), intervalStagger);

// Persist model state every few hours even if the job isn't closed
long persistInterval = (job.getBackgroundPersistInterval() == null) ?
(DEFAULT_BASE_PERSIST_INTERVAL + intervalStagger) :
job.getBackgroundPersistInterval().getSeconds();
command.add(PERSIST_INTERVAL_ARG + persistInterval);

int maxQuantileInterval = BASE_MAX_QUANTILE_INTERVAL + intervalStagger;
command.add(MAX_QUANTILE_INTERVAL_ARG + maxQuantileInterval);

if (ProcessBuilderUtils.modelConfigFilePresent(env)) {
String modelConfigFile = XPackPlugin.resolveConfigFile(env, ProcessBuilderUtils.ML_MODEL_CONF).toString();
command.add(MODEL_CONFIG_ARG + modelConfigFile);
Expand All @@ -253,67 +173,6 @@ static String maxAnomalyRecordsArg(Settings settings) {
return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings);
}

private static String getTimeFieldOrDefault(Job job) {
DataDescription dataDescription = job.getDataDescription();
boolean useDefault = dataDescription == null
|| Strings.isNullOrEmpty(dataDescription.getTimeField());
return useDefault ? DataDescription.DEFAULT_TIME_FIELD : dataDescription.getTimeField();
}

/**
* This random time of up to 1 hour is added to intervals at which we
* tell the C++ process to perform periodic operations. This means that
* when there are many jobs there is a certain amount of staggering of
* their periodic operations. A given job will always be given the same
* staggering interval (for a given JVM implementation).
*
* @param jobId The ID of the job to calculate the staggering interval for
* @return The staggering interval
*/
static int calculateStaggeringInterval(String jobId) {
Random rng = new Random(jobId.hashCode());
return rng.nextInt(SECONDS_IN_HOUR);
}

private void buildLimits(List<String> command) throws IOException {
if (job.getAnalysisLimits() != null) {
Path limitConfigFile = Files.createTempFile(env.tmpFile(), "limitconfig", CONF_EXTENSION);
filesToDelete.add(limitConfigFile);
writeLimits(job.getAnalysisLimits(), limitConfigFile);
String limits = LIMIT_CONFIG_ARG + limitConfigFile.toString();
command.add(limits);
}
}

/**
* Write the Ml autodetect model options to <code>emptyConfFile</code>.
*/
private static void writeLimits(AnalysisLimits options, Path emptyConfFile) throws IOException {

try (OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(emptyConfFile), StandardCharsets.UTF_8)) {
new AnalysisLimitsWriter(options, osw).write();
}
}

private void buildModelPlotConfig(List<String> command) throws IOException {
if (job.getModelPlotConfig() != null) {
Path modelPlotConfigFile = Files.createTempFile(env.tmpFile(), "modelplotconfig", CONF_EXTENSION);
filesToDelete.add(modelPlotConfigFile);
writeModelPlotConfig(job.getModelPlotConfig(), modelPlotConfigFile);
String modelPlotConfig = MODEL_PLOT_CONFIG_ARG + modelPlotConfigFile.toString();
command.add(modelPlotConfig);
}
}

private static void writeModelPlotConfig(ModelPlotConfig config, Path emptyConfFile)
throws IOException {
try (OutputStreamWriter osw = new OutputStreamWriter(
Files.newOutputStream(emptyConfFile),
StandardCharsets.UTF_8)) {
new ModelPlotConfigWriter(config, osw).write();
}
}

private void buildQuantiles(List<String> command) throws IOException {
if (quantiles != null && !quantiles.getQuantileState().isEmpty()) {
logger.info("Restoring quantiles for job '" + job.getId() + "'");
Expand Down Expand Up @@ -344,22 +203,6 @@ public static Path writeNormalizerInitState(String jobId, String state, Environm
return stateFile;
}

private void buildFieldConfig(List<String> command) throws IOException {
if (job.getAnalysisConfig() != null) {
// write to a temporary field config file
Path fieldConfigFile = Files.createTempFile(env.tmpFile(), "fieldconfig", CONF_EXTENSION);
filesToDelete.add(fieldConfigFile);
try (OutputStreamWriter osw = new OutputStreamWriter(
Files.newOutputStream(fieldConfigFile),
StandardCharsets.UTF_8)) {
new FieldConfigWriter(job.getAnalysisConfig(), referencedFilters, scheduledEvents, osw, logger).write();
}

String fieldConfig = FIELD_CONFIG_ARG + fieldConfigFile.toString();
command.add(fieldConfig);
}
}

private void buildScheduledEventsConfig(List<String> command) throws IOException {
if (scheduledEvents.isEmpty()) {
return;
Expand Down

This file was deleted.

Loading

0 comments on commit b67c246

Please sign in to comment.