Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Remove unneeded command line arguments (#67344) #67367

Merged
merged 1 commit into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ public static String extractJobIdFromDocumentId(String docId) {
return jobId.equals(docId) ? null : jobId;
}


/**
* Return the Job Id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,13 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ScheduledEventToRuleWriter;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.job.process.ProcessBuilderUtils;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AnalysisLimitsWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.FieldConfigWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ModelPlotConfigWriter;

import java.io.BufferedWriter;
import java.io.IOException;
Expand All @@ -41,12 +34,9 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.ml.job.process.ProcessBuilderUtils.addIfNotNull;

/**
* The autodetect process builder.
*/
Expand All @@ -61,28 +51,15 @@ public class AutodetectBuilder {
/*
* Arguments used by both autodetect and normalize
*/
public static final String BUCKET_SPAN_ARG = "--bucketspan=";
public static final String DELETE_STATE_FILES_ARG = "--deleteStateFiles";
public static final String LENGTH_ENCODED_INPUT_ARG = "--lengthEncodedInput";
public static final String MODEL_CONFIG_ARG = "--modelconfig=";
public static final String QUANTILES_STATE_PATH_ARG = "--quantilesState=";

private static final String CONF_EXTENSION = ".conf";
private static final String JSON_EXTENSION = ".json";
static final String JOB_ID_ARG = "--jobid=";
private static final String CONFIG_ARG = "--config=";
private static final String EVENTS_CONFIG_ARG = "--eventsconfig=";
private static final String FILTERS_CONFIG_ARG = "--filtersconfig=";
private static final String LIMIT_CONFIG_ARG = "--limitconfig=";
private static final String MODEL_PLOT_CONFIG_ARG = "--modelplotconfig=";
private static final String FIELD_CONFIG_ARG = "--fieldconfig=";
static final String LATENCY_ARG = "--latency=";
static final String MULTIVARIATE_BY_FIELDS_ARG = "--multivariateByFields";
static final String PERSIST_INTERVAL_ARG = "--persistInterval=";
static final String MAX_QUANTILE_INTERVAL_ARG = "--maxQuantileInterval=";
static final String SUMMARY_COUNT_FIELD_ARG = "--summarycountfield=";
static final String TIME_FIELD_ARG = "--timefield=";
static final String STOP_CATEGORIZATION_ON_WARN_ARG = "--stopCategorizationOnWarnStatus";

/**
* Name of the config setting containing the path to the logs directory
Expand All @@ -96,21 +73,6 @@ public class AutodetectBuilder {
public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING_DYNAMIC = Setting.intSetting("xpack.ml.max_anomaly_records",
DEFAULT_MAX_NUM_RECORDS, Setting.Property.NodeScope, Setting.Property.Dynamic);

private static final int SECONDS_IN_HOUR = 3600;

/**
* Roughly how often should the C++ process persist state? A staggering
* factor that varies by job is added to this.
*/
private static final long DEFAULT_BASE_PERSIST_INTERVAL = 10800; // 3 hours

/**
* Roughly how often should the C++ process output quantiles when no
* anomalies are being detected? A staggering factor that varies by job is
* added to this.
*/
static final int BASE_MAX_QUANTILE_INTERVAL = 21600; // 6 hours

/**
* Persisted quantiles are written to disk so they can be read by
* the autodetect program. All quantiles files have this extension.
Expand Down Expand Up @@ -178,19 +140,8 @@ public void build() throws IOException, InterruptedException {

buildFiltersConfig(command);
buildScheduledEventsConfig(command);

// While it may appear that the JSON formatted job config file contains data that
// is duplicated in the existing limits, modelPlot and field config files, over time
// the C++ backend will retrieve all its required configuration data from the new
// JSON config file and the old-style configuration files will be removed.
buildJobConfig(command);

// Per the comment above, these three lines will eventually be removed once migration
// to the new JSON formatted configuration file has been completed.
buildLimits(command);
buildModelPlotConfig(command);
buildFieldConfig(command);

buildQuantiles(command);

processPipes.addArgs(command);
Expand All @@ -204,43 +155,12 @@ List<String> buildAutodetectCommand() {
List<String> command = new ArrayList<>();
command.add(AUTODETECT_PATH);

command.add(JOB_ID_ARG + job.getId());

AnalysisConfig analysisConfig = job.getAnalysisConfig();
if (analysisConfig != null) {
addIfNotNull(analysisConfig.getBucketSpan(), BUCKET_SPAN_ARG, command);
addIfNotNull(analysisConfig.getLatency(), LATENCY_ARG, command);
addIfNotNull(analysisConfig.getSummaryCountFieldName(), SUMMARY_COUNT_FIELD_ARG, command);
if (Boolean.TRUE.equals(analysisConfig.getMultivariateByFields())) {
command.add(MULTIVARIATE_BY_FIELDS_ARG);
}
if (Boolean.TRUE.equals(analysisConfig.getPerPartitionCategorizationConfig().isStopOnWarn())) {
command.add(STOP_CATEGORIZATION_ON_WARN_ARG);
}
}

// Input is always length encoded
command.add(LENGTH_ENCODED_INPUT_ARG);

// Limit the number of output records
command.add(maxAnomalyRecordsArg(settings));

// always set the time field
String timeFieldArg = TIME_FIELD_ARG + getTimeFieldOrDefault(job);
command.add(timeFieldArg);

int intervalStagger = calculateStaggeringInterval(job.getId());
logger.debug("[{}] Periodic operations staggered by {} seconds", job.getId(), intervalStagger);

// Persist model state every few hours even if the job isn't closed
long persistInterval = (job.getBackgroundPersistInterval() == null) ?
(DEFAULT_BASE_PERSIST_INTERVAL + intervalStagger) :
job.getBackgroundPersistInterval().getSeconds();
command.add(PERSIST_INTERVAL_ARG + persistInterval);

int maxQuantileInterval = BASE_MAX_QUANTILE_INTERVAL + intervalStagger;
command.add(MAX_QUANTILE_INTERVAL_ARG + maxQuantileInterval);

if (ProcessBuilderUtils.modelConfigFilePresent(env)) {
String modelConfigFile = XPackPlugin.resolveConfigFile(env, ProcessBuilderUtils.ML_MODEL_CONF).toString();
command.add(MODEL_CONFIG_ARG + modelConfigFile);
Expand All @@ -253,67 +173,6 @@ static String maxAnomalyRecordsArg(Settings settings) {
return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings);
}

private static String getTimeFieldOrDefault(Job job) {
DataDescription dataDescription = job.getDataDescription();
boolean useDefault = dataDescription == null
|| Strings.isNullOrEmpty(dataDescription.getTimeField());
return useDefault ? DataDescription.DEFAULT_TIME_FIELD : dataDescription.getTimeField();
}

/**
* This random time of up to 1 hour is added to intervals at which we
* tell the C++ process to perform periodic operations. This means that
* when there are many jobs there is a certain amount of staggering of
* their periodic operations. A given job will always be given the same
* staggering interval (for a given JVM implementation).
*
* @param jobId The ID of the job to calculate the staggering interval for
* @return The staggering interval
*/
static int calculateStaggeringInterval(String jobId) {
Random rng = new Random(jobId.hashCode());
return rng.nextInt(SECONDS_IN_HOUR);
}

private void buildLimits(List<String> command) throws IOException {
if (job.getAnalysisLimits() != null) {
Path limitConfigFile = Files.createTempFile(env.tmpFile(), "limitconfig", CONF_EXTENSION);
filesToDelete.add(limitConfigFile);
writeLimits(job.getAnalysisLimits(), limitConfigFile);
String limits = LIMIT_CONFIG_ARG + limitConfigFile.toString();
command.add(limits);
}
}

/**
* Write the Ml autodetect model options to <code>emptyConfFile</code>.
*/
private static void writeLimits(AnalysisLimits options, Path emptyConfFile) throws IOException {

try (OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(emptyConfFile), StandardCharsets.UTF_8)) {
new AnalysisLimitsWriter(options, osw).write();
}
}

private void buildModelPlotConfig(List<String> command) throws IOException {
if (job.getModelPlotConfig() != null) {
Path modelPlotConfigFile = Files.createTempFile(env.tmpFile(), "modelplotconfig", CONF_EXTENSION);
filesToDelete.add(modelPlotConfigFile);
writeModelPlotConfig(job.getModelPlotConfig(), modelPlotConfigFile);
String modelPlotConfig = MODEL_PLOT_CONFIG_ARG + modelPlotConfigFile.toString();
command.add(modelPlotConfig);
}
}

private static void writeModelPlotConfig(ModelPlotConfig config, Path emptyConfFile)
throws IOException {
try (OutputStreamWriter osw = new OutputStreamWriter(
Files.newOutputStream(emptyConfFile),
StandardCharsets.UTF_8)) {
new ModelPlotConfigWriter(config, osw).write();
}
}

private void buildQuantiles(List<String> command) throws IOException {
if (quantiles != null && !quantiles.getQuantileState().isEmpty()) {
logger.info("Restoring quantiles for job '" + job.getId() + "'");
Expand Down Expand Up @@ -344,22 +203,6 @@ public static Path writeNormalizerInitState(String jobId, String state, Environm
return stateFile;
}

private void buildFieldConfig(List<String> command) throws IOException {
if (job.getAnalysisConfig() != null) {
// write to a temporary field config file
Path fieldConfigFile = Files.createTempFile(env.tmpFile(), "fieldconfig", CONF_EXTENSION);
filesToDelete.add(fieldConfigFile);
try (OutputStreamWriter osw = new OutputStreamWriter(
Files.newOutputStream(fieldConfigFile),
StandardCharsets.UTF_8)) {
new FieldConfigWriter(job.getAnalysisConfig(), referencedFilters, scheduledEvents, osw, logger).write();
}

String fieldConfig = FIELD_CONFIG_ARG + fieldConfigFile.toString();
command.add(fieldConfig);
}
}

private void buildScheduledEventsConfig(List<String> command) throws IOException {
if (scheduledEvents.isEmpty()) {
return;
Expand Down

This file was deleted.

Loading