Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add option for _ingest.timestamp to use new ZonedDateTime (5.x backport) #24030

Merged
merged 1 commit into from
May 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,18 @@ static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config,
if (pipeline == null) {
throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist");
}
List<IngestDocument> ingestDocumentList = parseDocs(config);
List<IngestDocument> ingestDocumentList = parseDocs(config, pipelineStore.isNewIngestDateFormat());
return new Parsed(pipeline, ingestDocumentList, verbose);
}

static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories());
List<IngestDocument> ingestDocumentList = parseDocs(config);
List<IngestDocument> ingestDocumentList = parseDocs(config, pipelineStore.isNewIngestDateFormat());
return new Parsed(pipeline, ingestDocumentList, verbose);
}

private static List<IngestDocument> parseDocs(Map<String, Object> config) {
private static List<IngestDocument> parseDocs(Map<String, Object> config, boolean newDateFormat) {
List<Map<String, Object>> docs = ConfigurationUtils.readList(null, null, config, Fields.DOCS);
List<IngestDocument> ingestDocumentList = new ArrayList<>();
for (Map<String, Object> dataMap : docs) {
Expand All @@ -183,7 +183,7 @@ private static List<IngestDocument> parseDocs(Map<String, Object> config) {
ConfigurationUtils.readStringProperty(null, null, dataMap, MetaData.ID.getFieldName(), "_id"),
ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.ROUTING.getFieldName()),
ConfigurationUtils.readOptionalStringProperty(null, null, dataMap, MetaData.PARENT.getFieldName()),
document);
document, newDateFormat);
ingestDocumentList.add(ingestDocument);
}
return ingestDocumentList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.fs.FsService;
import org.elasticsearch.monitor.jvm.JvmGcMonitorService;
import org.elasticsearch.monitor.jvm.JvmService;
Expand Down Expand Up @@ -404,6 +405,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchModule.INDICES_MAX_CLAUSE_COUNT_SETTING,
ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING,
FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE,
Node.BREAKER_TYPE_KEY
Node.BREAKER_TYPE_KEY,
IngestService.NEW_INGEST_DATE_FORMAT
)));
}
17 changes: 16 additions & 1 deletion core/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
package org.elasticsearch.ingest;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.IndexFieldMapper;
import org.elasticsearch.index.mapper.ParentFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.TypeFieldMapper;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
Expand All @@ -52,6 +55,11 @@ public final class IngestDocument {
private final Map<String, Object> ingestMetadata;

public IngestDocument(String index, String type, String id, String routing, String parent, Map<String, Object> source) {
this(index, type, id, routing, parent, source, false);
}

public IngestDocument(String index, String type, String id, String routing, String parent, Map<String, Object> source,
boolean newDateFormat) {
this.sourceAndMetadata = new HashMap<>();
this.sourceAndMetadata.putAll(source);
this.sourceAndMetadata.put(MetaData.INDEX.getFieldName(), index);
Expand All @@ -65,7 +73,11 @@ public IngestDocument(String index, String type, String id, String routing, Stri
}

this.ingestMetadata = new HashMap<>();
this.ingestMetadata.put(TIMESTAMP, new Date());
if (newDateFormat) {
this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC));
} else {
this.ingestMetadata.put(TIMESTAMP, new Date());
}
}

/**
Expand Down Expand Up @@ -608,6 +620,9 @@ private static Object deepCopy(Object value) {
return value;
} else if (value instanceof Date) {
return ((Date) value).clone();
} else if (value instanceof ZonedDateTime) {
ZonedDateTime zonedDateTime = (ZonedDateTime) value;
return ZonedDateTime.of(zonedDateTime.toLocalDate(), zonedDateTime.toLocalTime(), zonedDateTime.getZone());
} else {
throw new IllegalArgumentException("unexpected value type [" + value.getClass() + "]");
}
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,28 @@
import java.util.List;
import java.util.Map;

import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

import static org.elasticsearch.common.settings.Setting.Property;

/**
* Holder class for several ingest related services.
*/
public class IngestService {
public static final Setting<Boolean> NEW_INGEST_DATE_FORMAT =
Setting.boolSetting("ingest.new_date_format", false, Property.NodeScope, Property.Dynamic, Property.Deprecated);

private final PipelineStore pipelineStore;
private final PipelineExecutionService pipelineExecutionService;

public IngestService(Settings settings, ThreadPool threadPool,
public IngestService(ClusterSettings clusterSettings, Settings settings, ThreadPool threadPool,
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
List<IngestPlugin> ingestPlugins) {

Expand All @@ -56,7 +62,7 @@ public IngestService(Settings settings, ThreadPool threadPool,
}
}
}
this.pipelineStore = new PipelineStore(settings, Collections.unmodifiableMap(processorFactories));
this.pipelineStore = new PipelineStore(clusterSettings, settings, Collections.unmodifiableMap(processorFactories));
this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws E
String routing = indexRequest.routing();
String parent = indexRequest.parent();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, sourceAsMap);
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent,
sourceAsMap, store.isNewIngestDateFormat());
pipeline.execute(ingestDocument);

Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/java/org/elasticsearch/ingest/PipelineStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;

Expand All @@ -51,16 +52,23 @@ public class PipelineStore extends AbstractComponent implements ClusterStateAppl

private final Pipeline.Factory factory = new Pipeline.Factory();
private final Map<String, Processor.Factory> processorFactories;
private volatile boolean newIngestDateFormat;

// Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
// processor factories rely on other node services. Custom metadata is statically registered when classes
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
volatile Map<String, Pipeline> pipelines = new HashMap<>();

public PipelineStore(Settings settings, Map<String, Processor.Factory> processorFactories) {
public PipelineStore(ClusterSettings clusterSettings, Settings settings, Map<String, Processor.Factory> processorFactories) {
super(settings);
this.processorFactories = processorFactories;
this.newIngestDateFormat = IngestService.NEW_INGEST_DATE_FORMAT.get(settings);
clusterSettings.addSettingsUpdateConsumer(IngestService.NEW_INGEST_DATE_FORMAT, this::setNewIngestDateFormat);
}

private void setNewIngestDateFormat(Boolean newIngestDateFormat) {
this.newIngestDateFormat = newIngestDateFormat;
}

@Override
Expand Down Expand Up @@ -204,6 +212,10 @@ public Map<String, Processor.Factory> getProcessorFactories() {
return processorFactories;
}

public boolean isNewIngestDateFormat() {
return newIngestDateFormat;
}

/**
* @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines
* may be returned
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.addListener(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
final IngestService ingestService = new IngestService(clusterService.getClusterSettings(), settings, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -45,13 +47,18 @@
public class IngestDocumentTests extends ESTestCase {

private static final Date BOGUS_TIMESTAMP = new Date(0L);
private static final ZonedDateTime BOGUS_TIMESTAMP_NEW_DATE_FORMAT = ZonedDateTime.of(2016, 10, 23, 0, 0, 0, 0, ZoneOffset.UTC);
private IngestDocument ingestDocument;
private IngestDocument ingestDocumentWithNewDateFormat;

@Before
public void setIngestDocument() {
public IngestDocument getTestIngestDocument(boolean newDateFormat) {
Map<String, Object> document = new HashMap<>();
Map<String, Object> ingestMap = new HashMap<>();
ingestMap.put("timestamp", BOGUS_TIMESTAMP);
if (newDateFormat) {
ingestMap.put("timestamp", BOGUS_TIMESTAMP_NEW_DATE_FORMAT);
} else {
ingestMap.put("timestamp", BOGUS_TIMESTAMP);
}
document.put("_ingest", ingestMap);
document.put("foo", "bar");
document.put("int", 123);
Expand All @@ -72,7 +79,18 @@ public void setIngestDocument() {
list.add(null);

document.put("list", list);
ingestDocument = new IngestDocument("index", "type", "id", null, null, document);
return new IngestDocument("index", "type", "id", null, null, document, newDateFormat);
}

@Before
public void setIngestDocuments() {
ingestDocument = getTestIngestDocument(false);
ingestDocumentWithNewDateFormat = getTestIngestDocument(true);
}

public void testDefaultConstructorUsesDateClass() {
IngestDocument ingestDocument = new IngestDocument("foo", "bar", "baz", "fuzz", "buzz", Collections.emptyMap());
assertThat(ingestDocument.getFieldValue("_ingest.timestamp", Object.class).getClass(), equalTo(Date.class));
}

public void testSimpleGetFieldValue() {
Expand All @@ -88,6 +106,13 @@ public void testSimpleGetFieldValue() {
assertThat(ingestDocument.getFieldValue("_source._ingest.timestamp", Date.class), equalTo(BOGUS_TIMESTAMP));
}

public void testNewDateFormat() {
assertThat(ingestDocumentWithNewDateFormat.getFieldValue("_ingest.timestamp", ZonedDateTime.class),
both(notNullValue()).and(not(equalTo(BOGUS_TIMESTAMP_NEW_DATE_FORMAT))));
assertThat(ingestDocumentWithNewDateFormat.getFieldValue("_source._ingest.timestamp", ZonedDateTime.class),
equalTo(BOGUS_TIMESTAMP_NEW_DATE_FORMAT));
}

public void testGetSourceObject() {
try {
ingestDocument.getFieldValue("_source", Object.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.Map;

import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -39,7 +40,8 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet

public void testIngestPlugin() {
ThreadPool tp = Mockito.mock(ThreadPool.class);
IngestService ingestService = new IngestService(Settings.EMPTY, tp, null, null, null, Collections.singletonList(DUMMY_PLUGIN));
IngestService ingestService = new IngestService(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
Settings.EMPTY, tp, null, null, null, Collections.singletonList(DUMMY_PLUGIN));
Map<String, Processor.Factory> factories = ingestService.getPipelineStore().getProcessorFactories();
assertTrue(factories.containsKey("foo"));
assertEquals(1, factories.size());
Expand All @@ -48,7 +50,8 @@ public void testIngestPlugin() {
public void testIngestPluginDuplicate() {
ThreadPool tp = Mockito.mock(ThreadPool.class);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
new IngestService(Settings.EMPTY, tp, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))
new IngestService(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
Settings.EMPTY, tp, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))
);
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -49,6 +50,7 @@

public class PipelineStoreTests extends ESTestCase {

private ClusterSettings clusterSettings;
private PipelineStore store;

@Before
Expand Down Expand Up @@ -93,7 +95,8 @@ public String getTag() {
}
};
});
store = new PipelineStore(Settings.EMPTY, processorFactories);
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
store = new PipelineStore(clusterSettings, Settings.EMPTY, processorFactories);
}

public void testUpdatePipelines() {
Expand Down Expand Up @@ -369,4 +372,11 @@ public void testValidateNoIngestInfo() throws Exception {
store.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest);
}

public void testUpdateIngestNewDateFormatSetting() throws Exception {
assertFalse(store.isNewIngestDateFormat());
clusterSettings.applySettings(Settings.builder().put(IngestService.NEW_INGEST_DATE_FORMAT.getKey(), true).build());
assertTrue(store.isNewIngestDateFormat());
assertWarnings("[ingest.new_date_format] setting was deprecated in Elasticsearch and will be " +
"removed in a future release! See the breaking changes documentation for the next major version.");
}
}
Loading