Skip to content

Commit

Permalink
Implement delete operation (elastic#1324)
Browse files Browse the repository at this point in the history
This PR adds support for delete operations. Document ID's must be present for each document for this to work. A new bulk command factory has been added which will render a delete operation with the id and no document body in the bulk output.
  • Loading branch information
vincentarnaud90 authored Mar 13, 2020
1 parent fbc52bc commit 41cfc5e
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 95 deletions.
2 changes: 2 additions & 0 deletions mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public String getSerializerValueWriterClassName() {
return getProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS);
}


public String getSerializerBytesConverterClassName() {
return getProperty(ES_SERIALIZATION_WRITER_BYTES_CLASS);
}
Expand Down Expand Up @@ -765,3 +766,4 @@ public String save() {

public abstract Properties asProperties();
}

Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public static void filterNonDataNodesIfNeeded(Settings settings, Log log) {
}

RestClient bootstrap = new RestClient(settings);
try {
try {
String message = "No data nodes with HTTP-enabled available";
List<NodeInfo> dataNodes = bootstrap.getHttpDataNodes();
if (dataNodes.isEmpty()) {
Expand Down Expand Up @@ -253,10 +253,18 @@ public static void validateSettings(Settings settings) {
Assert.isTrue(settings.getMappingExcludes().isEmpty(), "When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...");
}

//check the configuration is coherent in order to use the delete operation
if (ConfigurationOptions.ES_OPERATION_DELETE.equals(settings.getOperation())) {
Assert.isTrue(!settings.getInputAsJson(), "When using delete operation, providing data as JSON is not coherent because this operation does not need document as a payload. This is most likely not what the user intended. Bailing out...");
Assert.isTrue(settings.getMappingIncludes().isEmpty(), "When using delete operation, the field inclusion feature is ignored. This is most likely not what the user intended. Bailing out...");
Assert.isTrue(settings.getMappingExcludes().isEmpty(), "When using delete operation, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...");
Assert.isTrue(settings.getMappingId() != null && !StringUtils.EMPTY.equals(settings.getMappingId()), "When using delete operation, the property " + ConfigurationOptions.ES_MAPPING_ID + " must be set and must not be empty since we need the document id in order to delete it. Bailing out...");
}

// Check to make sure user doesn't specify more than one script type
boolean hasScript = false;
String[] scripts = {settings.getUpdateScriptInline(), settings.getUpdateScriptFile(), settings.getUpdateScriptStored()};
for (String script: scripts) {
for (String script : scripts) {
boolean isSet = StringUtils.hasText(script);
Assert.isTrue((hasScript && isSet) == false, "Multiple scripts are specified. Please specify only one via [es.update.script.inline], [es.update.script.file], or [es.update.script.stored]");
hasScript = hasScript || isSet;
Expand Down Expand Up @@ -468,4 +476,4 @@ public static boolean setUserProviderIfNotSet(Settings settings, Class<? extends
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
*/
package org.elasticsearch.hadoop.serialization.bulk;

import java.util.ArrayList;
import java.util.List;
import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
Expand All @@ -44,6 +41,10 @@
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.StringUtils;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public abstract class AbstractBulkFactory implements BulkFactory {

private static Log log = LogFactory.getLog(AbstractBulkFactory.class);
Expand All @@ -60,13 +61,13 @@ public abstract class AbstractBulkFactory implements BulkFactory {
// used when specifying an index pattern
private IndexExtractor indexExtractor;
private FieldExtractor idExtractor,
typeExtractor,
parentExtractor,
routingExtractor,
versionExtractor,
ttlExtractor,
timestampExtractor,
paramsExtractor;
typeExtractor,
parentExtractor,
routingExtractor,
versionExtractor,
ttlExtractor,
timestampExtractor,
paramsExtractor;

private final FieldExtractor versionTypeExtractor = new FieldExtractor() {

Expand Down Expand Up @@ -139,14 +140,10 @@ void doWrite(Object value) {
}

pool.get().bytes(valueString);
}

else if (value instanceof Date) {
String valueString = (value == null ? "null": Long.toString(((Date) value).getTime()));
} else if (value instanceof Date) {
String valueString = (value == null ? "null" : Long.toString(((Date) value).getTime()));
pool.get().bytes(valueString);
}

else if (value instanceof RawJson) {
} else if (value instanceof RawJson) {
pool.get().bytes(((RawJson) value).json());
}
// library specific type - use the value writer (a bit overkill but handles collections/arrays properly)
Expand Down Expand Up @@ -249,25 +246,24 @@ private void initExtractorsFromSettings(final Settings settings) {
ttlExtractor = jsonExtractors.ttl();
timestampExtractor = jsonExtractors.timestamp();
paramsExtractor = jsonExtractors.params();
}
else {
} else {
// init extractors (if needed)
if (settings.getMappingId() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingId());
idExtractor = ObjectUtils.<FieldExtractor> instantiate(settings.getMappingIdExtractorClassName(),
idExtractor = ObjectUtils.<FieldExtractor>instantiate(settings.getMappingIdExtractorClassName(),
settings);
}
if (settings.getMappingParent() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingParent());
parentExtractor = ObjectUtils.<FieldExtractor> instantiate(
parentExtractor = ObjectUtils.<FieldExtractor>instantiate(
settings.getMappingParentExtractorClassName(), settings);
}
// Two different properties can satisfy the routing field extraction
ChainedFieldExtractor.NoValueHandler routingResponse = ChainedFieldExtractor.NoValueHandler.SKIP;
List<FieldExtractor> routings = new ArrayList<FieldExtractor>(2);
if (settings.getMappingRouting() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingRouting());
FieldExtractor extractor = ObjectUtils.<FieldExtractor> instantiate(
FieldExtractor extractor = ObjectUtils.<FieldExtractor>instantiate(
settings.getMappingRoutingExtractorClassName(), settings);
// If we specify a routing field, return NOT_FOUND if we ultimately cannot find one instead of skipping
routingResponse = ChainedFieldExtractor.NoValueHandler.NOT_FOUND;
Expand All @@ -286,22 +282,22 @@ private void initExtractorsFromSettings(final Settings settings) {

if (settings.getMappingTtl() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTtl());
ttlExtractor = ObjectUtils.<FieldExtractor> instantiate(settings.getMappingTtlExtractorClassName(),
ttlExtractor = ObjectUtils.<FieldExtractor>instantiate(settings.getMappingTtlExtractorClassName(),
settings);
}
if (settings.getMappingVersion() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingVersion());
versionExtractor = ObjectUtils.<FieldExtractor> instantiate(
versionExtractor = ObjectUtils.<FieldExtractor>instantiate(
settings.getMappingVersionExtractorClassName(), settings);
}
if (settings.getMappingTimestamp() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTimestamp());
timestampExtractor = ObjectUtils.<FieldExtractor> instantiate(
timestampExtractor = ObjectUtils.<FieldExtractor>instantiate(
settings.getMappingTimestampExtractorClassName(), settings);
}

// create adapter
IndexExtractor iformat = ObjectUtils.<IndexExtractor> instantiate(settings.getMappingIndexExtractorClassName(), settings);
IndexExtractor iformat = ObjectUtils.<IndexExtractor>instantiate(settings.getMappingIndexExtractorClassName(), settings);
iformat.compile(new Resource(settings, false).toString());

if (iformat.hasPattern()) {
Expand Down Expand Up @@ -371,14 +367,15 @@ public BulkCommand createBulk() {
if (!isStatic) {
before.add(new DynamicHeaderRef());
after.add(new DynamicEndRef());
}
else {
} else {
writeObjectHeader(before);
before = compact(before);
writeObjectEnd(after);
after = compact(after);
}

if (ConfigurationOptions.ES_OPERATION_DELETE.equals(getOperation())) {
return new DeleteTemplatedBulk(before, after);
}
boolean isScriptUpdate = settings.hasUpdateScript();
// compress pieces
if (jsonInput) {
Expand Down Expand Up @@ -523,15 +520,13 @@ private List<Object> compact(List<Object> list) {
stringAccumulator.setLength(0);
}
compacted.add(object);
}
else if (object instanceof FieldExtractor) {
} else if (object instanceof FieldExtractor) {
if (stringAccumulator.length() > 0) {
compacted.add(new BytesArray(stringAccumulator.toString()));
stringAccumulator.setLength(0);
}
compacted.add(new FieldWriter((FieldExtractor) object));
}
else {
} else {
stringAccumulator.append(object.toString());
}
}
Expand All @@ -546,3 +541,4 @@ protected FieldExtractor getParamExtractor() {
return paramsExtractor;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ else if (ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation)) {
else if (ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation)) {
factory = new UpdateBulkFactory(settings, true, metaExtractor, version);
}
else if (ConfigurationOptions.ES_OPERATION_DELETE.equals(operation)) {
factory = new DeleteBulkFactory(settings, metaExtractor, version);
}
else {
throw new EsHadoopIllegalArgumentException("Unknown operation " + operation);
throw new EsHadoopIllegalArgumentException("Unsupported bulk operation " + operation);
}

return factory.createBulk();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization.bulk;

import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.io.VLongWritable;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.Generator;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.StringUtils;

import java.util.List;

public class DeleteBulkFactory extends AbstractBulkFactory {


public DeleteBulkFactory(Settings settings, MetadataExtractor metaExtractor, EsMajorVersion version) {
super(settings, metaExtractor, version);
}

@Override
protected String getOperation() {
return ConfigurationOptions.ES_OPERATION_DELETE;
}

@Override
protected void writeObjectEnd(List<Object> list) {
// skip adding new-line for each entity as delete doesn't need entity output
list.add(StringUtils.EMPTY);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization.bulk;

import org.elasticsearch.hadoop.util.BytesRef;

import java.util.Collection;

public class DeleteTemplatedBulk extends TemplatedBulk {

DeleteTemplatedBulk(Collection<Object> beforeObject, Collection<Object> afterObject) {
super(beforeObject, afterObject, null);
}

@Override
public BytesRef write(Object object) {
ref.reset();
scratchPad.reset();
Object processed = preProcess(object, scratchPad);
writeTemplate(beforeObject, processed);
writeTemplate(afterObject, processed);
return ref;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@

class TemplatedBulk implements BulkCommand {

private final Collection<Object> beforeObject;
private final Collection<Object> afterObject;
protected final Collection<Object> beforeObject;
protected final Collection<Object> afterObject;

private BytesArray scratchPad = new BytesArray(1024);
private BytesRef ref = new BytesRef();
protected BytesArray scratchPad = new BytesArray(1024);
protected BytesRef ref = new BytesRef();

private final ValueWriter valueWriter;

Expand Down Expand Up @@ -71,7 +71,7 @@ protected void doWriteObject(Object object, BytesArray storage, ValueWriter<?> w
ContentBuilder.generate(bos, writer).value(object).flush().close();
}

private void writeTemplate(Collection<Object> template, Object object) {
protected void writeTemplate(Collection<Object> template, Object object) {
for (Object item : template) {
if (item instanceof BytesArray) {
ref.add((BytesArray) item);
Expand All @@ -89,4 +89,4 @@ else if (item instanceof DynamicContentRef) {
}
}
}
}
}
Loading

0 comments on commit 41cfc5e

Please sign in to comment.